reifydb_engine/bulk_insert/
validation.rs1use std::iter;
5
6use reifydb_core::{
7 interface::catalog::{column::Column, ringbuffer::RingBuffer, series::Series, table::Table},
8 value::column::buffer::ColumnBuffer,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
16 if rows.is_empty() {
17 return Ok(Vec::new());
18 }
19
20 let num_cols = table.columns.len();
21 let num_rows = rows.len();
22
23 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
24 let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
25
26 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
27}
28
29pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
30 if rows.is_empty() {
31 return Ok(Vec::new());
32 }
33
34 let num_cols = ringbuffer.columns.len();
35 let num_rows = rows.len();
36
37 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
38 let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
39
40 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
41}
42
43pub fn reorder_rows_unvalidated(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
44 if rows.is_empty() {
45 return Ok(Vec::new());
46 }
47
48 let num_cols = table.columns.len();
49 let num_rows = rows.len();
50
51 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
52
53 Ok(columns_to_rows(&column_data, num_rows, num_cols))
54}
55
56pub fn reorder_rows_unvalidated_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
57 if rows.is_empty() {
58 return Ok(Vec::new());
59 }
60
61 let num_cols = ringbuffer.columns.len();
62 let num_rows = rows.len();
63
64 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
65
66 Ok(columns_to_rows(&column_data, num_rows, num_cols))
67}
68
69pub fn validate_and_coerce_rows_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
70 if rows.is_empty() {
71 return Ok(Vec::new());
72 }
73
74 let num_cols = series.columns.len();
75 let num_rows = rows.len();
76
77 let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
78 let coerced_columns = coerce_columns(&column_data, &series.columns, num_rows)?;
79
80 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
81}
82
83pub fn reorder_rows_unvalidated_series(rows: &[Params], series: &Series) -> Result<Vec<Vec<Value>>> {
84 if rows.is_empty() {
85 return Ok(Vec::new());
86 }
87
88 let num_cols = series.columns.len();
89 let num_rows = rows.len();
90
91 let column_data = collect_rows_to_columns(rows, &series.columns, &series.name)?;
92
93 Ok(columns_to_rows(&column_data, num_rows, num_cols))
94}
95
96fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnBuffer>> {
97 let num_cols = columns.len();
98 let mut column_data: Vec<ColumnBuffer> =
99 columns.iter().map(|col| ColumnBuffer::none_typed(col.constraint.get_type(), 0)).collect();
100
101 for params in rows {
102 match params {
103 Params::Named(map) => {
104 for (col_idx, col) in columns.iter().enumerate() {
105 let value = map.get(&col.name).cloned().unwrap_or(Value::none());
106 column_data[col_idx].push_value(value);
107 }
108 }
109 Params::Positional(vals) => {
110 if vals.len() > num_cols {
111 return Err(EngineError::BulkInsertTooManyValues {
112 fragment: Fragment::None,
113 expected: num_cols,
114 actual: vals.len(),
115 }
116 .into());
117 }
118 for (col_data, val) in
119 column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
120 {
121 col_data.push_value(val.cloned().unwrap_or(Value::none()));
122 }
123 }
124 Params::None => {
125 for col_data in column_data.iter_mut() {
126 col_data.push_none();
127 }
128 }
129 }
130 }
131
132 for params in rows {
133 if let Params::Named(map) = params {
134 for name in map.keys() {
135 if !columns.iter().any(|c| &c.name == name) {
136 return Err(EngineError::BulkInsertColumnNotFound {
137 fragment: Fragment::None,
138 table_name: source_name.to_string(),
139 column: name.to_string(),
140 }
141 .into());
142 }
143 }
144 }
145 }
146
147 Ok(column_data)
148}
149
150fn columns_to_rows(columns: &[ColumnBuffer], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
151 let mut result = Vec::with_capacity(num_rows);
152
153 for row_idx in 0..num_rows {
154 let mut row_values = Vec::with_capacity(num_cols);
155 for col in columns.iter().take(num_cols) {
156 row_values.push(col.get_value(row_idx));
157 }
158 result.push(row_values);
159 }
160
161 result
162}