Skip to main content

reifydb_engine/bulk_insert/
validation.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Row validation and column mapping for bulk inserts with batch coercion.
5
6use reifydb_core::{
7	interface::catalog::{column::ColumnDef, ringbuffer::RingBufferDef, table::TableDef},
8	value::column::data::ColumnData,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::{coerce::coerce_columns, error::BulkInsertError};
13
14/// Validate and coerce all rows for a table in columnar batch mode.
15///
16/// Processes all rows at once by:
17/// 1. Collecting params into columnar format
18/// 2. Coercing each column's data in one batch using `cast_column_data`
19/// 3. Extracting coerced values back to row format
20///
21/// Returns `Vec<Vec<Value>>` where outer vec is rows, inner is column values in table column order.
22pub fn validate_and_coerce_rows(rows: &[Params], table: &TableDef) -> crate::Result<Vec<Vec<Value>>> {
23	if rows.is_empty() {
24		return Ok(Vec::new());
25	}
26
27	let num_cols = table.columns.len();
28	let num_rows = rows.len();
29
30	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
31	let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
32
33	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
34}
35
36/// Validate and coerce all rows for a ring buffer in columnar batch mode.
37pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> crate::Result<Vec<Vec<Value>>> {
38	if rows.is_empty() {
39		return Ok(Vec::new());
40	}
41
42	let num_cols = ringbuffer.columns.len();
43	let num_rows = rows.len();
44
45	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
46	let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
47
48	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
49}
50
51/// Reorder all rows for a table without coercion (trusted mode).
52///
53/// Used when validation is skipped for pre-validated internal data.
54pub fn reorder_rows_trusted(rows: &[Params], table: &TableDef) -> crate::Result<Vec<Vec<Value>>> {
55	if rows.is_empty() {
56		return Ok(Vec::new());
57	}
58
59	let num_cols = table.columns.len();
60	let num_rows = rows.len();
61
62	// Build columnar data from params (no coercion)
63	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
64
65	// Convert directly to row format without coercion
66	Ok(columns_to_rows(&column_data, num_rows, num_cols))
67}
68
69/// Reorder all rows for a ring buffer without coercion (trusted mode).
70pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> crate::Result<Vec<Vec<Value>>> {
71	if rows.is_empty() {
72		return Ok(Vec::new());
73	}
74
75	let num_cols = ringbuffer.columns.len();
76	let num_rows = rows.len();
77
78	// Build columnar data from params (no coercion)
79	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
80
81	// Convert directly to row format without coercion
82	Ok(columns_to_rows(&column_data, num_rows, num_cols))
83}
84
85/// Collect rows (params) into columnar format.
86///
87/// Returns `Vec<ColumnData>` where each entry contains all values for that column.
88fn collect_rows_to_columns(
89	rows: &[Params],
90	columns: &[ColumnDef],
91	source_name: &str,
92) -> crate::Result<Vec<ColumnData>> {
93	let num_cols = columns.len();
94	let mut column_data: Vec<ColumnData> =
95		columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
96
97	for params in rows {
98		match params {
99			Params::Named(map) => {
100				// For each column, look up value in map or use Undefined
101				for (col_idx, col) in columns.iter().enumerate() {
102					let value = map.get(&col.name).cloned().unwrap_or(Value::none());
103					column_data[col_idx].push_value(value);
104				}
105			}
106			Params::Positional(vals) => {
107				if vals.len() > num_cols {
108					return Err(BulkInsertError::too_many_values(
109						Fragment::None,
110						num_cols,
111						vals.len(),
112					)
113					.into());
114				}
115				for col_idx in 0..num_cols {
116					let value = vals.get(col_idx).cloned().unwrap_or(Value::none());
117					column_data[col_idx].push_value(value);
118				}
119			}
120			Params::None => {
121				for col_idx in 0..num_cols {
122					column_data[col_idx].push_none();
123				}
124			}
125		}
126	}
127
128	for params in rows {
129		if let Params::Named(map) = params {
130			for name in map.keys() {
131				if !columns.iter().any(|c| &c.name == name) {
132					return Err(BulkInsertError::column_not_found(
133						Fragment::None,
134						source_name,
135						name,
136					)
137					.into());
138				}
139			}
140		}
141	}
142
143	Ok(column_data)
144}
145
146/// Convert columnar data back to row format.
147fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
148	let mut result = Vec::with_capacity(num_rows);
149
150	for row_idx in 0..num_rows {
151		let mut row_values = Vec::with_capacity(num_cols);
152		for col_idx in 0..num_cols {
153			row_values.push(columns[col_idx].get_value(row_idx));
154		}
155		result.push(row_values);
156	}
157
158	result
159}