gluesql_core/executor/
validate.rs1use {
2 crate::{
3 ast::{ColumnDef, ColumnUniqueOption},
4 data::{Key, Value},
5 result::Result,
6 store::{DataRow, Store},
7 },
8 futures::stream::TryStreamExt,
9 im::HashSet,
10 serde::Serialize,
11 std::fmt::Debug,
12 thiserror::Error as ThisError,
13 utils::Vector,
14};
15
16#[derive(ThisError, Debug, PartialEq, Serialize)]
17pub enum ValidateError {
18 #[error("conflict! storage row has no column on index {0}")]
19 ConflictOnStorageColumnIndex(usize),
20
21 #[error("conflict! schemaless row found in schema based data")]
22 ConflictOnUnexpectedSchemalessRowFound,
23
24 #[error("duplicate entry '{}' for unique column '{1}'", String::from(.0))]
25 DuplicateEntryOnUniqueField(Value, String),
26
27 #[error("duplicate entry '{0:?}' for primary_key field")]
28 DuplicateEntryOnPrimaryKeyField(Key),
29}
30
31pub enum ColumnValidation<'column_def> {
32 All(&'column_def [ColumnDef]),
34 SpecifiedColumns(&'column_def [ColumnDef], Vec<String>),
36}
37
38#[derive(Debug)]
39struct UniqueConstraint {
40 column_index: usize,
41 column_name: String,
42 keys: HashSet<Key>,
43}
44
45impl UniqueConstraint {
46 fn new(column_index: usize, column_name: String) -> Self {
47 Self {
48 column_index,
49 column_name,
50 keys: HashSet::new(),
51 }
52 }
53
54 fn add(self, value: &Value) -> Result<Self> {
55 let new_key = self.check(value)?;
56
57 if matches!(new_key, Key::None) {
58 return Ok(self);
59 }
60
61 let keys = self.keys.update(new_key);
62
63 Ok(Self {
64 column_index: self.column_index,
65 column_name: self.column_name,
66 keys,
67 })
68 }
69
70 fn check(&self, value: &Value) -> Result<Key> {
71 let key = Key::try_from(value)?;
72
73 if !self.keys.contains(&key) {
74 Ok(key)
75 } else {
76 Err(ValidateError::DuplicateEntryOnUniqueField(
77 value.clone(),
78 self.column_name.to_owned(),
79 )
80 .into())
81 }
82 }
83}
84
85pub async fn validate_unique<T: Store>(
86 storage: &T,
87 table_name: &str,
88 column_validation: ColumnValidation<'_>,
89 row_iter: impl Iterator<Item = &[Value]> + Clone,
90) -> Result<()> {
91 enum Columns {
92 PrimaryKeyOnly(usize),
94 All(Vec<(usize, String)>),
96 }
97
98 let columns = match &column_validation {
99 ColumnValidation::All(column_defs) => {
100 let primary_key_index = column_defs
101 .iter()
102 .enumerate()
103 .find(|(_, ColumnDef { unique, .. })| {
104 unique == &Some(ColumnUniqueOption { is_primary: true })
105 })
106 .map(|(i, _)| i);
107 let other_unique_column_def_count = column_defs
108 .iter()
109 .filter(|ColumnDef { unique, .. }| {
110 unique == &Some(ColumnUniqueOption { is_primary: false })
111 })
112 .count();
113
114 match (primary_key_index, other_unique_column_def_count) {
115 (Some(primary_key_index), 0) => Columns::PrimaryKeyOnly(primary_key_index),
116 _ => Columns::All(fetch_all_unique_columns(column_defs)),
117 }
118 }
119 ColumnValidation::SpecifiedColumns(column_defs, specified_columns) => Columns::All(
120 fetch_specified_unique_columns(column_defs, specified_columns),
121 ),
122 };
123
124 match columns {
125 Columns::PrimaryKeyOnly(primary_key_index) => {
126 for primary_key in
127 row_iter.filter_map(|row| row.get(primary_key_index).map(Key::try_from))
128 {
129 let key = primary_key?;
130
131 if storage.fetch_data(table_name, &key).await?.is_some() {
132 return Err(ValidateError::DuplicateEntryOnPrimaryKeyField(key).into());
133 }
134 }
135
136 Ok(())
137 }
138 Columns::All(columns) => {
139 let unique_constraints: Vec<_> = create_unique_constraints(columns, row_iter)?.into();
140 if unique_constraints.is_empty() {
141 return Ok(());
142 }
143
144 let unique_constraints = &unique_constraints;
145 storage
146 .scan_data(table_name)
147 .await?
148 .try_for_each(|(_, data_row)| async {
149 let values = match data_row {
150 DataRow::Vec(values) => values,
151 DataRow::Map(_) => {
152 return Err(
153 ValidateError::ConflictOnUnexpectedSchemalessRowFound.into()
154 );
155 }
156 };
157
158 unique_constraints.iter().try_for_each(|constraint| {
159 let col_idx = constraint.column_index;
160 let val = values
161 .get(col_idx)
162 .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
163
164 constraint.check(val)?;
165
166 Ok(())
167 })
168 })
169 .await
170 }
171 }
172}
173
174fn create_unique_constraints<'a>(
175 unique_columns: Vec<(usize, String)>,
176 row_iter: impl Iterator<Item = &'a [Value]> + Clone,
177) -> Result<Vector<UniqueConstraint>> {
178 unique_columns
179 .into_iter()
180 .try_fold(Vector::new(), |constraints, col| {
181 let (col_idx, col_name) = col;
182 let new_constraint = UniqueConstraint::new(col_idx, col_name);
183 let new_constraint = row_iter
184 .clone()
185 .try_fold(new_constraint, |constraint, row| {
186 let val = row
187 .get(col_idx)
188 .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
189
190 constraint.add(val)
191 })?;
192 Ok(constraints.push(new_constraint))
193 })
194}
195
196fn fetch_all_unique_columns(column_defs: &[ColumnDef]) -> Vec<(usize, String)> {
197 column_defs
198 .iter()
199 .enumerate()
200 .filter_map(|(i, table_col)| table_col.unique.map(|_| (i, table_col.name.to_owned())))
201 .collect()
202}
203
204fn fetch_specified_unique_columns(
205 all_column_defs: &[ColumnDef],
206 specified_columns: &[String],
207) -> Vec<(usize, String)> {
208 all_column_defs
209 .iter()
210 .enumerate()
211 .filter_map(|(i, table_col)| {
212 (table_col.unique.is_some()
213 && specified_columns.iter().any(|col| col == &table_col.name))
214 .then_some((i, table_col.name.to_owned()))
215 })
216 .collect()
217}