Skip to main content

reifydb_engine/bulk_insert/
validation.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::iter;
5
6use reifydb_core::{
7	interface::catalog::{column::Column, ringbuffer::RingBuffer, series::Series, table::Table},
8	value::column::buffer::ColumnBuffer,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15/// Validate and coerce all rows for a table in columnar batch mode.
16///
17/// Processes all rows at once by:
18/// 1. Collecting params into columnar format
19/// 2. Coercing each column's data in one batch using `cast_column_data`
20/// 3. Extracting coerced values back to row format
21///
22/// Returns `Vec<Vec<Value>>` where outer vec is rows, inner is column values in table column order.
23pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
24	if rows.is_empty() {
25		return Ok(Vec::new());
26	}
27
28	let num_cols = table.columns.len();
29	let num_rows = rows.len();
30
31	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
32	let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
33
34	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
35}
36
37/// Validate and coerce all rows for a ring buffer in columnar batch mode.
38pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
39	if rows.is_empty() {
40		return Ok(Vec::new());
41	}
42
43	let num_cols = ringbuffer.columns.len();
44	let num_rows = rows.len();
45
46	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
47	let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
48
49	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
50}
51
52/// Reorder all rows for a table without coercion.
53///
54/// Used by `Unchecked` mode when validation is skipped for pre-validated
55/// internal data. The caller is responsible for ensuring the rows already
56/// conform to the table's column types.
57pub fn reorder_rows_unvalidated(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
58	if rows.is_empty() {
59		return Ok(Vec::new());
60	}
61
62	let num_cols = table.columns.len();
63	let num_rows = rows.len();
64
65	// Build columnar data from params (no coercion)
66	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
67
68	// Convert directly to row format without coercion
69	Ok(columns_to_rows(&column_data, num_rows, num_cols))
70}
71
72/// Reorder all rows for a ring buffer without coercion. Companion to
73/// `reorder_rows_unvalidated` for ring buffers.
74pub fn reorder_rows_unvalidated_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
75	if rows.is_empty() {
76		return Ok(Vec::new());
77	}
78
79	let num_cols = ringbuffer.columns.len();
80	let num_rows = rows.len();
81
82	// Build columnar data from params (no coercion)
83	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
84
85	// Convert directly to row format without coercion
86	Ok(columns_to_rows(&column_data, num_rows, num_cols))
87}
88
89/// Validate and coerce all rows for a series in columnar batch mode.
90pub fn validate_and_coerce_rows_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
91	if rows.is_empty() {
92		return Ok(Vec::new());
93	}
94
95	let num_cols = series.columns.len();
96	let num_rows = rows.len();
97
98	let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
99	let coerced_columns = coerce_columns(&column_data, &series.columns, num_rows)?;
100
101	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
102}
103
104/// Reorder all rows for a series without coercion. Companion to
105/// `reorder_rows_unvalidated` for series.
106pub fn reorder_rows_unvalidated_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
107	if rows.is_empty() {
108		return Ok(Vec::new());
109	}
110
111	let num_cols = series.columns.len();
112	let num_rows = rows.len();
113
114	let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
115
116	Ok(columns_to_rows(&column_data, num_rows, num_cols))
117}
118
119/// Collect rows (params) into columnar format.
120///
121/// Returns `Vec<ColumnBuffer>` where each entry contains all values for that column.
122fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnBuffer>> {
123	let num_cols = columns.len();
124	let mut column_data: Vec<ColumnBuffer> =
125		columns.iter().map(|col| ColumnBuffer::none_typed(col.constraint.get_type(), 0)).collect();
126
127	for params in rows {
128		match params {
129			Params::Named(map) => {
130				// For each column, look up value in map or use Undefined
131				for (col_idx, col) in columns.iter().enumerate() {
132					let value = map.get(&col.name).cloned().unwrap_or(Value::none());
133					column_data[col_idx].push_value(value);
134				}
135			}
136			Params::Positional(vals) => {
137				if vals.len() > num_cols {
138					return Err(EngineError::BulkInsertTooManyValues {
139						fragment: Fragment::None,
140						expected: num_cols,
141						actual: vals.len(),
142					}
143					.into());
144				}
145				for (col_data, val) in
146					column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
147				{
148					col_data.push_value(val.cloned().unwrap_or(Value::none()));
149				}
150			}
151			Params::None => {
152				for col_data in column_data.iter_mut() {
153					col_data.push_none();
154				}
155			}
156		}
157	}
158
159	for params in rows {
160		if let Params::Named(map) = params {
161			for name in map.keys() {
162				if !columns.iter().any(|c| &c.name == name) {
163					return Err(EngineError::BulkInsertColumnNotFound {
164						fragment: Fragment::None,
165						table_name: source_name.to_string(),
166						column: name.to_string(),
167					}
168					.into());
169				}
170			}
171		}
172	}
173
174	Ok(column_data)
175}
176
177/// Convert columnar data back to row format.
178fn columns_to_rows(columns: &[ColumnBuffer], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
179	let mut result = Vec::with_capacity(num_rows);
180
181	for row_idx in 0..num_rows {
182		let mut row_values = Vec::with_capacity(num_cols);
183		for col in columns.iter().take(num_cols) {
184			row_values.push(col.get_value(row_idx));
185		}
186		result.push(row_values);
187	}
188
189	result
190}