reifydb_engine/bulk_insert/
validation.rs1use std::iter;
5
6use reifydb_core::{
7 interface::catalog::{column::Column, ringbuffer::RingBuffer, series::Series, table::Table},
8 value::column::buffer::ColumnBuffer,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
24 if rows.is_empty() {
25 return Ok(Vec::new());
26 }
27
28 let num_cols = table.columns.len();
29 let num_rows = rows.len();
30
31 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
32 let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
33
34 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
35}
36
37pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
39 if rows.is_empty() {
40 return Ok(Vec::new());
41 }
42
43 let num_cols = ringbuffer.columns.len();
44 let num_rows = rows.len();
45
46 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
47 let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
48
49 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
50}
51
52pub fn reorder_rows_unvalidated(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
58 if rows.is_empty() {
59 return Ok(Vec::new());
60 }
61
62 let num_cols = table.columns.len();
63 let num_rows = rows.len();
64
65 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
67
68 Ok(columns_to_rows(&column_data, num_rows, num_cols))
70}
71
72pub fn reorder_rows_unvalidated_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
75 if rows.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 let num_cols = ringbuffer.columns.len();
80 let num_rows = rows.len();
81
82 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
84
85 Ok(columns_to_rows(&column_data, num_rows, num_cols))
87}
88
89pub fn validate_and_coerce_rows_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
91 if rows.is_empty() {
92 return Ok(Vec::new());
93 }
94
95 let num_cols = series.columns.len();
96 let num_rows = rows.len();
97
98 let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
99 let coerced_columns = coerce_columns(&column_data, &series.columns, num_rows)?;
100
101 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
102}
103
104pub fn reorder_rows_unvalidated_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
107 if rows.is_empty() {
108 return Ok(Vec::new());
109 }
110
111 let num_cols = series.columns.len();
112 let num_rows = rows.len();
113
114 let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
115
116 Ok(columns_to_rows(&column_data, num_rows, num_cols))
117}
118
119fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnBuffer>> {
123 let num_cols = columns.len();
124 let mut column_data: Vec<ColumnBuffer> =
125 columns.iter().map(|col| ColumnBuffer::none_typed(col.constraint.get_type(), 0)).collect();
126
127 for params in rows {
128 match params {
129 Params::Named(map) => {
130 for (col_idx, col) in columns.iter().enumerate() {
132 let value = map.get(&col.name).cloned().unwrap_or(Value::none());
133 column_data[col_idx].push_value(value);
134 }
135 }
136 Params::Positional(vals) => {
137 if vals.len() > num_cols {
138 return Err(EngineError::BulkInsertTooManyValues {
139 fragment: Fragment::None,
140 expected: num_cols,
141 actual: vals.len(),
142 }
143 .into());
144 }
145 for (col_data, val) in
146 column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
147 {
148 col_data.push_value(val.cloned().unwrap_or(Value::none()));
149 }
150 }
151 Params::None => {
152 for col_data in column_data.iter_mut() {
153 col_data.push_none();
154 }
155 }
156 }
157 }
158
159 for params in rows {
160 if let Params::Named(map) = params {
161 for name in map.keys() {
162 if !columns.iter().any(|c| &c.name == name) {
163 return Err(EngineError::BulkInsertColumnNotFound {
164 fragment: Fragment::None,
165 table_name: source_name.to_string(),
166 column: name.to_string(),
167 }
168 .into());
169 }
170 }
171 }
172 }
173
174 Ok(column_data)
175}
176
177fn columns_to_rows(columns: &[ColumnBuffer], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
179 let mut result = Vec::with_capacity(num_rows);
180
181 for row_idx in 0..num_rows {
182 let mut row_values = Vec::with_capacity(num_cols);
183 for col in columns.iter().take(num_cols) {
184 row_values.push(col.get_value(row_idx));
185 }
186 result.push(row_values);
187 }
188
189 result
190}