gluesql_core/executor/
validate.rs

1use {
2    crate::{
3        ast::{ColumnDef, ColumnUniqueOption},
4        data::{Key, Value},
5        result::Result,
6        store::{DataRow, Store},
7    },
8    futures::stream::TryStreamExt,
9    im::HashSet,
10    serde::Serialize,
11    std::fmt::Debug,
12    thiserror::Error as ThisError,
13    utils::Vector,
14};
15
16#[derive(ThisError, Debug, PartialEq, Serialize)]
17pub enum ValidateError {
18    #[error("conflict! storage row has no column on index {0}")]
19    ConflictOnStorageColumnIndex(usize),
20
21    #[error("conflict! schemaless row found in schema based data")]
22    ConflictOnUnexpectedSchemalessRowFound,
23
24    #[error("duplicate entry '{}' for unique column '{1}'", String::from(.0))]
25    DuplicateEntryOnUniqueField(Value, String),
26
27    #[error("duplicate entry '{0:?}' for primary_key field")]
28    DuplicateEntryOnPrimaryKeyField(Key),
29}
30
31pub enum ColumnValidation<'column_def> {
32    /// `INSERT`
33    All(&'column_def [ColumnDef]),
34    /// `UPDATE`
35    SpecifiedColumns(&'column_def [ColumnDef], Vec<String>),
36}
37
38#[derive(Debug)]
39struct UniqueConstraint {
40    column_index: usize,
41    column_name: String,
42    keys: HashSet<Key>,
43}
44
45impl UniqueConstraint {
46    fn new(column_index: usize, column_name: String) -> Self {
47        Self {
48            column_index,
49            column_name,
50            keys: HashSet::new(),
51        }
52    }
53
54    fn add(self, value: &Value) -> Result<Self> {
55        let new_key = self.check(value)?;
56
57        if matches!(new_key, Key::None) {
58            return Ok(self);
59        }
60
61        let keys = self.keys.update(new_key);
62
63        Ok(Self {
64            column_index: self.column_index,
65            column_name: self.column_name,
66            keys,
67        })
68    }
69
70    fn check(&self, value: &Value) -> Result<Key> {
71        let key = Key::try_from(value)?;
72
73        if self.keys.contains(&key) {
74            Err(
75                ValidateError::DuplicateEntryOnUniqueField(value.clone(), self.column_name.clone())
76                    .into(),
77            )
78        } else {
79            Ok(key)
80        }
81    }
82}
83
84pub async fn validate_unique<T: Store>(
85    storage: &T,
86    table_name: &str,
87    column_validation: ColumnValidation<'_>,
88    row_iter: impl Iterator<Item = &[Value]> + Clone,
89) -> Result<()> {
90    enum Columns {
91        /// key index
92        PrimaryKeyOnly(usize),
93        /// `[(key_index, table_name)]`
94        All(Vec<(usize, String)>),
95    }
96
97    let columns = match &column_validation {
98        ColumnValidation::All(column_defs) => {
99            let primary_key_index = column_defs
100                .iter()
101                .enumerate()
102                .find(|(_, ColumnDef { unique, .. })| {
103                    unique == &Some(ColumnUniqueOption { is_primary: true })
104                })
105                .map(|(i, _)| i);
106            let other_unique_column_def_count = column_defs
107                .iter()
108                .filter(|ColumnDef { unique, .. }| {
109                    unique == &Some(ColumnUniqueOption { is_primary: false })
110                })
111                .count();
112
113            match (primary_key_index, other_unique_column_def_count) {
114                (Some(primary_key_index), 0) => Columns::PrimaryKeyOnly(primary_key_index),
115                _ => Columns::All(fetch_all_unique_columns(column_defs)),
116            }
117        }
118        ColumnValidation::SpecifiedColumns(column_defs, specified_columns) => Columns::All(
119            fetch_specified_unique_columns(column_defs, specified_columns),
120        ),
121    };
122
123    match columns {
124        Columns::PrimaryKeyOnly(primary_key_index) => {
125            for primary_key in
126                row_iter.filter_map(|row| row.get(primary_key_index).map(Key::try_from))
127            {
128                let key = primary_key?;
129
130                if storage.fetch_data(table_name, &key).await?.is_some() {
131                    return Err(ValidateError::DuplicateEntryOnPrimaryKeyField(key).into());
132                }
133            }
134
135            Ok(())
136        }
137        Columns::All(columns) => {
138            let unique_constraints: Vec<_> = create_unique_constraints(columns, &row_iter)?.into();
139            if unique_constraints.is_empty() {
140                return Ok(());
141            }
142
143            let unique_constraints = &unique_constraints;
144            storage
145                .scan_data(table_name)
146                .await?
147                .try_for_each(|(_, data_row)| async {
148                    let values = match data_row {
149                        DataRow::Vec(values) => values,
150                        DataRow::Map(_) => {
151                            return Err(
152                                ValidateError::ConflictOnUnexpectedSchemalessRowFound.into()
153                            );
154                        }
155                    };
156
157                    unique_constraints.iter().try_for_each(|constraint| {
158                        let col_idx = constraint.column_index;
159                        let val = values
160                            .get(col_idx)
161                            .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
162
163                        constraint.check(val)?;
164
165                        Ok(())
166                    })
167                })
168                .await
169        }
170    }
171}
172
173fn create_unique_constraints<'a>(
174    unique_columns: Vec<(usize, String)>,
175    row_iter: &(impl Iterator<Item = &'a [Value]> + Clone),
176) -> Result<Vector<UniqueConstraint>> {
177    unique_columns
178        .into_iter()
179        .try_fold(Vector::new(), |constraints, col| {
180            let (col_idx, col_name) = col;
181            let new_constraint = UniqueConstraint::new(col_idx, col_name);
182            let new_constraint = row_iter
183                .clone()
184                .try_fold(new_constraint, |constraint, row| {
185                    let val = row
186                        .get(col_idx)
187                        .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
188
189                    constraint.add(val)
190                })?;
191            Ok(constraints.push(new_constraint))
192        })
193}
194
195fn fetch_all_unique_columns(column_defs: &[ColumnDef]) -> Vec<(usize, String)> {
196    column_defs
197        .iter()
198        .enumerate()
199        .filter_map(|(i, table_col)| table_col.unique.map(|_| (i, table_col.name.clone())))
200        .collect()
201}
202
203fn fetch_specified_unique_columns(
204    all_column_defs: &[ColumnDef],
205    specified_columns: &[String],
206) -> Vec<(usize, String)> {
207    all_column_defs
208        .iter()
209        .enumerate()
210        .filter_map(|(i, table_col)| {
211            (table_col.unique.is_some()
212                && specified_columns.iter().any(|col| col == &table_col.name))
213            .then_some((i, table_col.name.clone()))
214        })
215        .collect()
216}