reifydb_engine/bulk_insert/
validation.rs1use reifydb_core::{
7 interface::catalog::{column::ColumnDef, ringbuffer::RingBufferDef, table::TableDef},
8 value::column::data::ColumnData,
9};
10use reifydb_type::{fragment::Fragment, params::Params, value::Value};
11
12use super::{coerce::coerce_columns, error::BulkInsertError};
13
14pub fn validate_and_coerce_rows(rows: &[Params], table: &TableDef) -> crate::Result<Vec<Vec<Value>>> {
23 if rows.is_empty() {
24 return Ok(Vec::new());
25 }
26
27 let num_cols = table.columns.len();
28 let num_rows = rows.len();
29
30 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
31 let coerced_columns = coerce_columns(&column_data, &table.columns, num_rows)?;
32
33 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
34}
35
36pub fn validate_and_coerce_rows_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> crate::Result<Vec<Vec<Value>>> {
38 if rows.is_empty() {
39 return Ok(Vec::new());
40 }
41
42 let num_cols = ringbuffer.columns.len();
43 let num_rows = rows.len();
44
45 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
46 let coerced_columns = coerce_columns(&column_data, &ringbuffer.columns, num_rows)?;
47
48 Ok(columns_to_rows(&coerced_columns, num_rows, num_cols))
49}
50
51pub fn reorder_rows_trusted(rows: &[Params], table: &TableDef) -> crate::Result<Vec<Vec<Value>>> {
55 if rows.is_empty() {
56 return Ok(Vec::new());
57 }
58
59 let num_cols = table.columns.len();
60 let num_rows = rows.len();
61
62 let column_data = collect_rows_to_columns(rows, &table.columns, &table.name)?;
64
65 Ok(columns_to_rows(&column_data, num_rows, num_cols))
67}
68
69pub fn reorder_rows_trusted_rb(rows: &[Params], ringbuffer: &RingBufferDef) -> crate::Result<Vec<Vec<Value>>> {
71 if rows.is_empty() {
72 return Ok(Vec::new());
73 }
74
75 let num_cols = ringbuffer.columns.len();
76 let num_rows = rows.len();
77
78 let column_data = collect_rows_to_columns(rows, &ringbuffer.columns, &ringbuffer.name)?;
80
81 Ok(columns_to_rows(&column_data, num_rows, num_cols))
83}
84
85fn collect_rows_to_columns(
89 rows: &[Params],
90 columns: &[ColumnDef],
91 source_name: &str,
92) -> crate::Result<Vec<ColumnData>> {
93 let num_cols = columns.len();
94 let mut column_data: Vec<ColumnData> =
95 columns.iter().map(|col| ColumnData::none_typed(col.constraint.get_type(), 0)).collect();
96
97 for params in rows {
98 match params {
99 Params::Named(map) => {
100 for (col_idx, col) in columns.iter().enumerate() {
102 let value = map.get(&col.name).cloned().unwrap_or(Value::none());
103 column_data[col_idx].push_value(value);
104 }
105 }
106 Params::Positional(vals) => {
107 if vals.len() > num_cols {
108 return Err(BulkInsertError::too_many_values(
109 Fragment::None,
110 num_cols,
111 vals.len(),
112 )
113 .into());
114 }
115 for col_idx in 0..num_cols {
116 let value = vals.get(col_idx).cloned().unwrap_or(Value::none());
117 column_data[col_idx].push_value(value);
118 }
119 }
120 Params::None => {
121 for col_idx in 0..num_cols {
122 column_data[col_idx].push_none();
123 }
124 }
125 }
126 }
127
128 for params in rows {
129 if let Params::Named(map) = params {
130 for name in map.keys() {
131 if !columns.iter().any(|c| &c.name == name) {
132 return Err(BulkInsertError::column_not_found(
133 Fragment::None,
134 source_name,
135 name,
136 )
137 .into());
138 }
139 }
140 }
141 }
142
143 Ok(column_data)
144}
145
146fn columns_to_rows(columns: &[ColumnData], num_rows: usize, num_cols: usize) -> Vec<Vec<Value>> {
148 let mut result = Vec::with_capacity(num_rows);
149
150 for row_idx in 0..num_rows {
151 let mut row_values = Vec::with_capacity(num_cols);
152 for col_idx in 0..num_cols {
153 row_values.push(columns[col_idx].get_value(row_idx));
154 }
155 result.push(row_values);
156 }
157
158 result
159}