Skip to main content

reifydb_engine/bulk_insert/
validation.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use std::iter;
5
6use reifydb_core::{
7	interface::catalog::{column::Column, ringbuffer::RingBuffer, series::Series, table::Table},
8	value::column::buffer::ColumnBuffer,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
16	if rows.is_empty() {
17		return Ok(Vec::new());
18	}
19
20	let num_cols = table.columns.len();
21	let num_rows = rows.len();
22
23	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
24	let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
25
26	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
27}
28
29pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
30	if rows.is_empty() {
31		return Ok(Vec::new());
32	}
33
34	let num_cols = ringbuffer.columns.len();
35	let num_rows = rows.len();
36
37	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
38	let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
39
40	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
41}
42
43pub fn reorder_rows_unvalidated(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
44	if rows.is_empty() {
45		return Ok(Vec::new());
46	}
47
48	let num_cols = table.columns.len();
49	let num_rows = rows.len();
50
51	let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
52
53	Ok(columns_to_rows(&column_data, num_rows, num_cols))
54}
55
56pub fn reorder_rows_unvalidated_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
57	if rows.is_empty() {
58		return Ok(Vec::new());
59	}
60
61	let num_cols = ringbuffer.columns.len();
62	let num_rows = rows.len();
63
64	let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
65
66	Ok(columns_to_rows(&column_data, num_rows, num_cols))
67}
68
69pub fn validate_and_coerce_rows_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
70	if rows.is_empty() {
71		return Ok(Vec::new());
72	}
73
74	let num_cols = series.columns.len();
75	let num_rows = rows.len();
76
77	let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
78	let coerced_columns = coerce_columns(&column_data, &series.columns, num_rows)?;
79
80	Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
81}
82
83pub fn reorder_rows_unvalidated_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
84	if rows.is_empty() {
85		return Ok(Vec::new());
86	}
87
88	let num_cols = series.columns.len();
89	let num_rows = rows.len();
90
91	let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
92
93	Ok(columns_to_rows(&column_data, num_rows, num_cols))
94}
95
96fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnBuffer>> {
97	let num_cols = columns.len();
98	let mut column_data: Vec<ColumnBuffer> =
99		columns.iter().map(|col| ColumnBuffer::none_typed(col.constraint.get_type(), 0)).collect();
100
101	for params in rows {
102		match params {
103			Params::Named(map) => {
104				for (col_idx, col) in columns.iter().enumerate() {
105					let value = map.get(&col.name).cloned().unwrap_or(Value::none());
106					column_data[col_idx].push_value(value);
107				}
108			}
109			Params::Positional(vals) => {
110				if vals.len() > num_cols {
111					return Err(EngineError::BulkInsertTooManyValues {
112						fragment: Fragment::None,
113						expected: num_cols,
114						actual: vals.len(),
115					}
116					.into());
117				}
118				for (col_data, val) in
119					column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
120				{
121					col_data.push_value(val.cloned().unwrap_or(Value::none()));
122				}
123			}
124			Params::None => {
125				for col_data in column_data.iter_mut() {
126					col_data.push_none();
127				}
128			}
129		}
130	}
131
132	for params in rows {
133		if let Params::Named(map) = params {
134			for name in map.keys() {
135				if !columns.iter().any(|c| &c.name == name) {
136					return Err(EngineError::BulkInsertColumnNotFound {
137						fragment: Fragment::None,
138						table_name: source_name.to_string(),
139						column: name.to_string(),
140					}
141					.into());
142				}
143			}
144		}
145	}
146
147	Ok(column_data)
148}
149
150fn columns_to_rows(columns: &[ColumnBuffer], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
151	let mut result = Vec::with_capacity(num_rows);
152
153	for row_idx in 0..num_rows {
154		let mut row_values = Vec::with_capacity(num_cols);
155		for col in columns.iter().take(num_cols) {
156			row_values.push(col.get_value(row_idx));
157		}
158		result.push(row_values);
159	}
160
161	result
162}