reifydb_engine/bulk_insert/
validation.rs1use std::iter;
7
8use reifydb_core::{
9 interface::catalog::{column::Column, ringbuffer::RingBuffer, table::Table},
10 value::column::data::ColumnData,
11};
12use reifydb_type::{fragment::Fragment, params::Params, value::Value};
13
14use super::coerce::coerce_columns;
15use crate::{Result, error::EngineError};
16
17pub fn validate_and_coerce_rows(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
26 if rows.is_empty() {
27 return Ok(Vec::new());
28 }
29
30 let num_cols = table.columns.len();
31 let num_rows = rows.len();
32
33 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
34 let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
35
36 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
37}
38
39pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
41 if rows.is_empty() {
42 return Ok(Vec::new());
43 }
44
45 let num_cols = ringbuffer.columns.len();
46 let num_rows = rows.len();
47
48 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
49 let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
50
51 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
52}
53
54pub fn reorder_rows_trusted(rows: &[Params], table: &Table) -> Result<Vec<Vec<Value>>> {
58 if rows.is_empty() {
59 return Ok(Vec::new());
60 }
61
62 let num_cols = table.columns.len();
63 let num_rows = rows.len();
64
65 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
67
68 Ok(columns_to_rows(&column_data, num_rows, num_cols))
70}
71
72pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBuffer) -> Result<Vec<Vec<Value>>> {
74 if rows.is_empty() {
75 return Ok(Vec::new());
76 }
77
78 let num_cols = ringbuffer.columns.len();
79 let num_rows = rows.len();
80
81 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
83
84 Ok(columns_to_rows(&column_data, num_rows, num_cols))
86}
87
88fn collect_rows_to_columns(rows: &[Params], columns: &[Column], source_name: &str) -> Result<Vec<ColumnData>> {
92 let num_cols = columns.len();
93 let mut column_data: Vec<ColumnData> =
94 columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
95
96 for params in rows {
97 match params {
98 Params::Named(map) => {
99 for (col_idx, col) in columns.iter().enumerate() {
101 let value = map.get(&col.name).cloned().unwrap_or(Value::none());
102 column_data[col_idx].push_value(value);
103 }
104 }
105 Params::Positional(vals) => {
106 if vals.len() > num_cols {
107 return Err(EngineError::BulkInsertTooManyValues {
108 fragment: Fragment::None,
109 expected: num_cols,
110 actual: vals.len(),
111 }
112 .into());
113 }
114 for (col_data, val) in
115 column_data.iter_mut().zip(vals.iter().map(Some).chain(iter::repeat(None)))
116 {
117 col_data.push_value(val.cloned().unwrap_or(Value::none()));
118 }
119 }
120 Params::None => {
121 for col_data in column_data.iter_mut() {
122 col_data.push_none();
123 }
124 }
125 }
126 }
127
128 for params in rows {
129 if let Params::Named(map) = params {
130 for name in map.keys() {
131 if !columns.iter().any(|c| &c.name == name) {
132 return Err(EngineError::BulkInsertColumnNotFound {
133 fragment: Fragment::None,
134 table_name: source_name.to_string(),
135 column: name.to_string(),
136 }
137 .into());
138 }
139 }
140 }
141 }
142
143 Ok(column_data)
144}
145
146fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
148 let mut result = Vec::with_capacity(num_rows);
149
150 for row_idx in 0..num_rows {
151 let mut row_values = Vec::with_capacity(num_cols);
152 for col in columns.iter().take(num_cols) {
153 row_values.push(col.get_value(row_idx));
154 }
155 result.push(row_values);
156 }
157
158 result
159}