reifydb_engine/bulk_insert/
validation.rs1use reifydb_core::{
7 interface::catalog::{column::ColumnDef, ringbuffer::RingBufferDef, table::TableDef},
8 value::column::data::ColumnData,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::coerce::coerce_columns;
13use crate::{Result, error::EngineError};
14
15pub fn validate_and_coerce_rows(rows: &[Params], table: &TableDef) -> Result<Vec<Vec<Value>>> {
24 if rows.is_empty() {
25 return Ok(Vec::new());
26 }
27
28 let num_cols = table.columns.len();
29 let num_rows = rows.len();
30
31 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
32 let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
33
34 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
35}
36
37pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> Result<Vec<Vec<Value>>> {
39 if rows.is_empty() {
40 return Ok(Vec::new());
41 }
42
43 let num_cols = ringbuffer.columns.len();
44 let num_rows = rows.len();
45
46 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
47 let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
48
49 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
50}
51
52pub fn reorder_rows_trusted(rows: &[Params], table: &TableDef) -> Result<Vec<Vec<Value>>> {
56 if rows.is_empty() {
57 return Ok(Vec::new());
58 }
59
60 let num_cols = table.columns.len();
61 let num_rows = rows.len();
62
63 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
65
66 Ok(columns_to_rows(&column_data, num_rows, num_cols))
68}
69
70pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> Result<Vec<Vec<Value>>> {
72 if rows.is_empty() {
73 return Ok(Vec::new());
74 }
75
76 let num_cols = ringbuffer.columns.len();
77 let num_rows = rows.len();
78
79 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
81
82 Ok(columns_to_rows(&column_data, num_rows, num_cols))
84}
85
86fn collect_rows_to_columns(rows: &[Params], columns: &[ColumnDef], source_name: &str) -> Result<Vec<ColumnData>> {
90 let num_cols = columns.len();
91 let mut column_data: Vec<ColumnData> =
92 columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
93
94 for params in rows {
95 match params {
96 Params::Named(map) => {
97 for (col_idx, col) in columns.iter().enumerate() {
99 let value = map.get(&col.name).cloned().unwrap_or(Value::none());
100 column_data[col_idx].push_value(value);
101 }
102 }
103 Params::Positional(vals) => {
104 if vals.len() > num_cols {
105 return Err(EngineError::BulkInsertTooManyValues {
106 fragment: Fragment::None,
107 expected: num_cols,
108 actual: vals.len(),
109 }
110 .into());
111 }
112 for col_idx in 0..num_cols {
113 let value = vals.get(col_idx).cloned().unwrap_or(Value::none());
114 column_data[col_idx].push_value(value);
115 }
116 }
117 Params::None => {
118 for col_idx in 0..num_cols {
119 column_data[col_idx].push_none();
120 }
121 }
122 }
123 }
124
125 for params in rows {
126 if let Params::Named(map) = params {
127 for name in map.keys() {
128 if !columns.iter().any(|c| &c.name == name) {
129 return Err(EngineError::BulkInsertColumnNotFound {
130 fragment: Fragment::None,
131 table_name: source_name.to_string(),
132 column: name.to_string(),
133 }
134 .into());
135 }
136 }
137 }
138 }
139
140 Ok(column_data)
141}
142
143fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
145 let mut result = Vec::with_capacity(num_rows);
146
147 for row_idx in 0..num_rows {
148 let mut row_values = Vec::with_capacity(num_cols);
149 for col_idx in 0..num_cols {
150 row_values.push(columns[col_idx].get_value(row_idx));
151 }
152 result.push(row_values);
153 }
154
155 result
156}