gluesql_core/executor/
validate.rs1use {
2 crate::{
3 ast::{ColumnDef, ColumnUniqueOption},
4 data::{Key, Value},
5 result::Result,
6 store::{DataRow, Store},
7 },
8 futures::stream::TryStreamExt,
9 im::HashSet,
10 serde::Serialize,
11 std::fmt::Debug,
12 thiserror::Error as ThisError,
13 utils::Vector,
14};
15
16#[derive(ThisError, Debug, PartialEq, Serialize)]
17pub enum ValidateError {
18 #[error("conflict! storage row has no column on index {0}")]
19 ConflictOnStorageColumnIndex(usize),
20
21 #[error("conflict! schemaless row found in schema based data")]
22 ConflictOnUnexpectedSchemalessRowFound,
23
24 #[error("duplicate entry '{}' for unique column '{1}'", String::from(.0))]
25 DuplicateEntryOnUniqueField(Value, String),
26
27 #[error("duplicate entry '{0:?}' for primary_key field")]
28 DuplicateEntryOnPrimaryKeyField(Key),
29}
30
31pub enum ColumnValidation<'column_def> {
32 All(&'column_def [ColumnDef]),
34 SpecifiedColumns(&'column_def [ColumnDef], Vec<String>),
36}
37
38#[derive(Debug)]
39struct UniqueConstraint {
40 column_index: usize,
41 column_name: String,
42 keys: HashSet<Key>,
43}
44
45impl UniqueConstraint {
46 fn new(column_index: usize, column_name: String) -> Self {
47 Self {
48 column_index,
49 column_name,
50 keys: HashSet::new(),
51 }
52 }
53
54 fn add(self, value: &Value) -> Result<Self> {
55 let new_key = self.check(value)?;
56
57 if matches!(new_key, Key::None) {
58 return Ok(self);
59 }
60
61 let keys = self.keys.update(new_key);
62
63 Ok(Self {
64 column_index: self.column_index,
65 column_name: self.column_name,
66 keys,
67 })
68 }
69
70 fn check(&self, value: &Value) -> Result<Key> {
71 let key = Key::try_from(value)?;
72
73 if self.keys.contains(&key) {
74 Err(
75 ValidateError::DuplicateEntryOnUniqueField(value.clone(), self.column_name.clone())
76 .into(),
77 )
78 } else {
79 Ok(key)
80 }
81 }
82}
83
84pub async fn validate_unique<T: Store>(
85 storage: &T,
86 table_name: &str,
87 column_validation: ColumnValidation<'_>,
88 row_iter: impl Iterator<Item = &[Value]> + Clone,
89) -> Result<()> {
90 enum Columns {
91 PrimaryKeyOnly(usize),
93 All(Vec<(usize, String)>),
95 }
96
97 let columns = match &column_validation {
98 ColumnValidation::All(column_defs) => {
99 let primary_key_index = column_defs
100 .iter()
101 .enumerate()
102 .find(|(_, ColumnDef { unique, .. })| {
103 unique == &Some(ColumnUniqueOption { is_primary: true })
104 })
105 .map(|(i, _)| i);
106 let other_unique_column_def_count = column_defs
107 .iter()
108 .filter(|ColumnDef { unique, .. }| {
109 unique == &Some(ColumnUniqueOption { is_primary: false })
110 })
111 .count();
112
113 match (primary_key_index, other_unique_column_def_count) {
114 (Some(primary_key_index), 0) => Columns::PrimaryKeyOnly(primary_key_index),
115 _ => Columns::All(fetch_all_unique_columns(column_defs)),
116 }
117 }
118 ColumnValidation::SpecifiedColumns(column_defs, specified_columns) => Columns::All(
119 fetch_specified_unique_columns(column_defs, specified_columns),
120 ),
121 };
122
123 match columns {
124 Columns::PrimaryKeyOnly(primary_key_index) => {
125 for primary_key in
126 row_iter.filter_map(|row| row.get(primary_key_index).map(Key::try_from))
127 {
128 let key = primary_key?;
129
130 if storage.fetch_data(table_name, &key).await?.is_some() {
131 return Err(ValidateError::DuplicateEntryOnPrimaryKeyField(key).into());
132 }
133 }
134
135 Ok(())
136 }
137 Columns::All(columns) => {
138 let unique_constraints: Vec<_> = create_unique_constraints(columns, &row_iter)?.into();
139 if unique_constraints.is_empty() {
140 return Ok(());
141 }
142
143 let unique_constraints = &unique_constraints;
144 storage
145 .scan_data(table_name)
146 .await?
147 .try_for_each(|(_, data_row)| async {
148 let values = match data_row {
149 DataRow::Vec(values) => values,
150 DataRow::Map(_) => {
151 return Err(
152 ValidateError::ConflictOnUnexpectedSchemalessRowFound.into()
153 );
154 }
155 };
156
157 unique_constraints.iter().try_for_each(|constraint| {
158 let col_idx = constraint.column_index;
159 let val = values
160 .get(col_idx)
161 .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
162
163 constraint.check(val)?;
164
165 Ok(())
166 })
167 })
168 .await
169 }
170 }
171}
172
173fn create_unique_constraints<'a>(
174 unique_columns: Vec<(usize, String)>,
175 row_iter: &(impl Iterator<Item = &'a [Value]> + Clone),
176) -> Result<Vector<UniqueConstraint>> {
177 unique_columns
178 .into_iter()
179 .try_fold(Vector::new(), |constraints, col| {
180 let (col_idx, col_name) = col;
181 let new_constraint = UniqueConstraint::new(col_idx, col_name);
182 let new_constraint = row_iter
183 .clone()
184 .try_fold(new_constraint, |constraint, row| {
185 let val = row
186 .get(col_idx)
187 .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
188
189 constraint.add(val)
190 })?;
191 Ok(constraints.push(new_constraint))
192 })
193}
194
195fn fetch_all_unique_columns(column_defs: &[ColumnDef]) -> Vec<(usize, String)> {
196 column_defs
197 .iter()
198 .enumerate()
199 .filter_map(|(i, table_col)| table_col.unique.map(|_| (i, table_col.name.clone())))
200 .collect()
201}
202
203fn fetch_specified_unique_columns(
204 all_column_defs: &[ColumnDef],
205 specified_columns: &[String],
206) -> Vec<(usize, String)> {
207 all_column_defs
208 .iter()
209 .enumerate()
210 .filter_map(|(i, table_col)| {
211 (table_col.unique.is_some()
212 && specified_columns.iter().any(|col| col == &table_col.name))
213 .then_some((i, table_col.name.clone()))
214 })
215 .collect()
216}