Skip to main content

reifydb_engine/bulk_insert/
validation.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Row validation and column mapping for bulk inserts with batch coercion.
5
6use std::iter;
7
8use reifydb_core::{
9	interface::catalog::{column::Column, ringbuffer::RingBuffer, table::Table},
10	value::column::data::ColumnData,
11};
12use reifydb_type::{fragment::Fragment, params::Params, value::Value};
13
14use super::coerce::coerce_columns;
15use crate::{Result, error::EngineError};
16
17/// Validate and coerce all rows for a table in columnar batch mode.
18///
19/// Processes all rows at once by:
20/// 1. Collecting params into columnar format
21/// 2. Coercing each column's data in one batch using `cast_column_data`
22/// 3. Extracting coerced values back to row format
23///
24/// Returns `Vec<Vec<Value>>` where outer vec is rows, inner is column values in table column order.
25pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
26	if rows.is_empty() {
27		return Ok(Vec::new());
28	}
29
30	let num_cols = table.columns.len();
31	let num_rows = rows.len();
32
33	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
34	let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
35
36	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
37}
38
39/// Validate and coerce all rows for a ring buffer in columnar batch mode.
40pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
41	if rows.is_empty() {
42		return Ok(Vec::new());
43	}
44
45	let num_cols = ringbuffer.columns.len();
46	let num_rows = rows.len();
47
48	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
49	let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
50
51	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
52}
53
54/// Reorder all rows for a table without coercion (trusted mode).
55///
56/// Used when validation is skipped for pre-validated internal data.
57pub fn reorder_rows_trusted(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
58	if rows.is_empty() {
59		return Ok(Vec::new());
60	}
61
62	let num_cols = table.columns.len();
63	let num_rows = rows.len();
64
65	// Build columnar data from params (no coercion)
66	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
67
68	// Convert directly to row format without coercion
69	Ok(columns_to_rows(&column_data, num_rows, num_cols))
70}
71
72/// Reorder all rows for a ring buffer without coercion (trusted mode).
73pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
74	if rows.is_empty() {
75		return Ok(Vec::new());
76	}
77
78	let num_cols = ringbuffer.columns.len();
79	let num_rows = rows.len();
80
81	// Build columnar data from params (no coercion)
82	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
83
84	// Convert directly to row format without coercion
85	Ok(columns_to_rows(&column_data, num_rows, num_cols))
86}
87
88/// Collect rows (params) into columnar format.
89///
90/// Returns `Vec<ColumnData>` where each entry contains all values for that column.
91fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnData>> {
92	let num_cols = columns.len();
93	let mut column_data: Vec<ColumnData> =
94		columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
95
96	for params in rows {
97		match params {
98			Params::Named(map) => {
99				// For each column, look up value in map or use Undefined
100				for (col_idx, col) in columns.iter().enumerate() {
101					let value = map.get(&col.name).cloned().unwrap_or(Value::none());
102					column_data[col_idx].push_value(value);
103				}
104			}
105			Params::Positional(vals) => {
106				if vals.len() > num_cols {
107					return Err(EngineError::BulkInsertTooManyValues {
108						fragment: Fragment::None,
109						expected: num_cols,
110						actual: vals.len(),
111					}
112					.into());
113				}
114				for (col_data, val) in
115					column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
116				{
117					col_data.push_value(val.cloned().unwrap_or(Value::none()));
118				}
119			}
120			Params::None => {
121				for col_data in column_data.iter_mut() {
122					col_data.push_none();
123				}
124			}
125		}
126	}
127
128	for params in rows {
129		if let Params::Named(map) = params {
130			for name in map.keys() {
131				if !columns.iter().any(|c| &c.name == name) {
132					return Err(EngineError::BulkInsertColumnNotFound {
133						fragment: Fragment::None,
134						table_name: source_name.to_string(),
135						column: name.to_string(),
136					}
137					.into());
138				}
139			}
140		}
141	}
142
143	Ok(column_data)
144}
145
146/// Convert columnar data back to row format.
147fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
148	let mut result = Vec::with_capacity(num_rows);
149
150	for row_idx in 0..num_rows {
151		let mut row_values = Vec::with_capacity(num_cols);
152		for col in columns.iter().take(num_cols) {
153			row_values.push(col.get_value(row_idx));
154		}
155		result.push(row_values);
156	}
157
158	result
159}