Skip to main content

reifydb_engine/bulk_insert/
validation.rs

1// SPDX-License-Identifier: AGPL-3.0-or-later
2// Copyright (c) 2025 ReifyDB
3
4//! Row validation and column mapping for bulk inserts with batch coercion.
5
6use reifydb_core::{
7	interface::catalog::{column::ColumnDef, ringbuffer::RingBufferDef, table::TableDef},
8	value::column::data::ColumnData,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15/// Validate and coerce all rows for a table in columnar batch mode.
16///
17/// Processes all rows at once by:
18/// 1. Collecting params into columnar format
19/// 2. Coercing each column's data in one batch using `cast_column_data`
20/// 3. Extracting coerced values back to row format
21///
22/// Returns `Vec<Vec<Value>>` where outer vec is rows, inner is column values in table column order.
23pub fn validate_and_coerce_rows(rows: &[Params], table: &TableDef) -> Result<Vec<Vec<Value>>> {
24	if rows.is_empty() {
25		return Ok(Vec::new());
26	}
27
28	let num_cols = table.columns.len();
29	let num_rows = rows.len();
30
31	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
32	let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
33
34	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
35}
36
37/// Validate and coerce all rows for a ring buffer in columnar batch mode.
38pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> Result<Vec<Vec<Value>>> {
39	if rows.is_empty() {
40		return Ok(Vec::new());
41	}
42
43	let num_cols = ringbuffer.columns.len();
44	let num_rows = rows.len();
45
46	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
47	let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
48
49	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
50}
51
52/// Reorder all rows for a table without coercion (trusted mode).
53///
54/// Used when validation is skipped for pre-validated internal data.
55pub fn reorder_rows_trusted(rows: &[Params], table: &TableDef) -> Result<Vec<Vec<Value>>> {
56	if rows.is_empty() {
57		return Ok(Vec::new());
58	}
59
60	let num_cols = table.columns.len();
61	let num_rows = rows.len();
62
63	// Build columnar data from params (no coercion)
64	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
65
66	// Convert directly to row format without coercion
67	Ok(columns_to_rows(&column_data, num_rows, num_cols))
68}
69
70/// Reorder all rows for a ring buffer without coercion (trusted mode).
71pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> Result<Vec<Vec<Value>>> {
72	if rows.is_empty() {
73		return Ok(Vec::new());
74	}
75
76	let num_cols = ringbuffer.columns.len();
77	let num_rows = rows.len();
78
79	// Build columnar data from params (no coercion)
80	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
81
82	// Convert directly to row format without coercion
83	Ok(columns_to_rows(&column_data, num_rows, num_cols))
84}
85
86/// Collect rows (params) into columnar format.
87///
88/// Returns `Vec<ColumnData>` where each entry contains all values for that column.
89fn collect_rows_to_columns(rows: &[Params], columns: &[ColumnDef], source_name: &str) -> Result<Vec<ColumnData>> {
90	let num_cols = columns.len();
91	let mut column_data: Vec<ColumnData> =
92		columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
93
94	for params in rows {
95		match params {
96			Params::Named(map) => {
97				// For each column, look up value in map or use Undefined
98				for (col_idx, col) in columns.iter().enumerate() {
99					let value = map.get(&col.name).cloned().unwrap_or(Value::none());
100					column_data[col_idx].push_value(value);
101				}
102			}
103			Params::Positional(vals) => {
104				if vals.len() > num_cols {
105					return Err(EngineError::BulkInsertTooManyValues {
106						fragment: Fragment::None,
107						expected: num_cols,
108						actual: vals.len(),
109					}
110					.into());
111				}
112				for col_idx in 0..num_cols {
113					let value = vals.get(col_idx).cloned().unwrap_or(Value::none());
114					column_data[col_idx].push_value(value);
115				}
116			}
117			Params::None => {
118				for col_idx in 0..num_cols {
119					column_data[col_idx].push_none();
120				}
121			}
122		}
123	}
124
125	for params in rows {
126		if let Params::Named(map) = params {
127			for name in map.keys() {
128				if !columns.iter().any(|c| &c.name == name) {
129					return Err(EngineError::BulkInsertColumnNotFound {
130						fragment: Fragment::None,
131						table_name: source_name.to_string(),
132						column: name.to_string(),
133					}
134					.into());
135				}
136			}
137		}
138	}
139
140	Ok(column_data)
141}
142
143/// Convert columnar data back to row format.
144fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
145	let mut result = Vec::with_capacity(num_rows);
146
147	for row_idx in 0..num_rows {
148		let mut row_values = Vec::with_capacity(num_cols);
149		for col_idx in 0..num_cols {
150			row_values.push(columns[col_idx].get_value(row_idx));
151		}
152		result.push(row_values);
153	}
154
155	result
156}