gluesql_core/executor/
validate.rs

1use {
2    crate::{
3        ast::{ColumnDef, ColumnUniqueOption},
4        data::{Key, Value},
5        result::Result,
6        store::{DataRow, Store},
7    },
8    futures::stream::TryStreamExt,
9    im::HashSet,
10    serde::Serialize,
11    std::fmt::Debug,
12    thiserror::Error as ThisError,
13    utils::Vector,
14};
15
16#[derive(ThisError, Debug, PartialEq, Serialize)]
17pub enum ValidateError {
18    #[error("conflict! storage row has no column on index {0}")]
19    ConflictOnStorageColumnIndex(usize),
20
21    #[error("conflict! schemaless row found in schema based data")]
22    ConflictOnUnexpectedSchemalessRowFound,
23
24    #[error("duplicate entry '{}' for unique column '{1}'", String::from(.0))]
25    DuplicateEntryOnUniqueField(Value, String),
26
27    #[error("duplicate entry '{0:?}' for primary_key field")]
28    DuplicateEntryOnPrimaryKeyField(Key),
29}
30
31pub enum ColumnValidation<'column_def> {
32    /// `INSERT`
33    All(&'column_def [ColumnDef]),
34    /// `UPDATE`
35    SpecifiedColumns(&'column_def [ColumnDef], Vec<String>),
36}
37
38#[derive(Debug)]
39struct UniqueConstraint {
40    column_index: usize,
41    column_name: String,
42    keys: HashSet<Key>,
43}
44
45impl UniqueConstraint {
46    fn new(column_index: usize, column_name: String) -> Self {
47        Self {
48            column_index,
49            column_name,
50            keys: HashSet::new(),
51        }
52    }
53
54    fn add(self, value: &Value) -> Result<Self> {
55        let new_key = self.check(value)?;
56
57        if matches!(new_key, Key::None) {
58            return Ok(self);
59        }
60
61        let keys = self.keys.update(new_key);
62
63        Ok(Self {
64            column_index: self.column_index,
65            column_name: self.column_name,
66            keys,
67        })
68    }
69
70    fn check(&self, value: &Value) -> Result<Key> {
71        let key = Key::try_from(value)?;
72
73        if !self.keys.contains(&key) {
74            Ok(key)
75        } else {
76            Err(ValidateError::DuplicateEntryOnUniqueField(
77                value.clone(),
78                self.column_name.to_owned(),
79            )
80            .into())
81        }
82    }
83}
84
85pub async fn validate_unique<T: Store>(
86    storage: &T,
87    table_name: &str,
88    column_validation: ColumnValidation<'_>,
89    row_iter: impl Iterator<Item = &[Value]> + Clone,
90) -> Result<()> {
91    enum Columns {
92        /// key index
93        PrimaryKeyOnly(usize),
94        /// `[(key_index, table_name)]`
95        All(Vec<(usize, String)>),
96    }
97
98    let columns = match &column_validation {
99        ColumnValidation::All(column_defs) => {
100            let primary_key_index = column_defs
101                .iter()
102                .enumerate()
103                .find(|(_, ColumnDef { unique, .. })| {
104                    unique == &Some(ColumnUniqueOption { is_primary: true })
105                })
106                .map(|(i, _)| i);
107            let other_unique_column_def_count = column_defs
108                .iter()
109                .filter(|ColumnDef { unique, .. }| {
110                    unique == &Some(ColumnUniqueOption { is_primary: false })
111                })
112                .count();
113
114            match (primary_key_index, other_unique_column_def_count) {
115                (Some(primary_key_index), 0) => Columns::PrimaryKeyOnly(primary_key_index),
116                _ => Columns::All(fetch_all_unique_columns(column_defs)),
117            }
118        }
119        ColumnValidation::SpecifiedColumns(column_defs, specified_columns) => Columns::All(
120            fetch_specified_unique_columns(column_defs, specified_columns),
121        ),
122    };
123
124    match columns {
125        Columns::PrimaryKeyOnly(primary_key_index) => {
126            for primary_key in
127                row_iter.filter_map(|row| row.get(primary_key_index).map(Key::try_from))
128            {
129                let key = primary_key?;
130
131                if storage.fetch_data(table_name, &key).await?.is_some() {
132                    return Err(ValidateError::DuplicateEntryOnPrimaryKeyField(key).into());
133                }
134            }
135
136            Ok(())
137        }
138        Columns::All(columns) => {
139            let unique_constraints: Vec<_> = create_unique_constraints(columns, row_iter)?.into();
140            if unique_constraints.is_empty() {
141                return Ok(());
142            }
143
144            let unique_constraints = &unique_constraints;
145            storage
146                .scan_data(table_name)
147                .await?
148                .try_for_each(|(_, data_row)| async {
149                    let values = match data_row {
150                        DataRow::Vec(values) => values,
151                        DataRow::Map(_) => {
152                            return Err(
153                                ValidateError::ConflictOnUnexpectedSchemalessRowFound.into()
154                            );
155                        }
156                    };
157
158                    unique_constraints.iter().try_for_each(|constraint| {
159                        let col_idx = constraint.column_index;
160                        let val = values
161                            .get(col_idx)
162                            .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
163
164                        constraint.check(val)?;
165
166                        Ok(())
167                    })
168                })
169                .await
170        }
171    }
172}
173
174fn create_unique_constraints<'a>(
175    unique_columns: Vec<(usize, String)>,
176    row_iter: impl Iterator<Item = &'a [Value]> + Clone,
177) -> Result<Vector<UniqueConstraint>> {
178    unique_columns
179        .into_iter()
180        .try_fold(Vector::new(), |constraints, col| {
181            let (col_idx, col_name) = col;
182            let new_constraint = UniqueConstraint::new(col_idx, col_name);
183            let new_constraint = row_iter
184                .clone()
185                .try_fold(new_constraint, |constraint, row| {
186                    let val = row
187                        .get(col_idx)
188                        .ok_or(ValidateError::ConflictOnStorageColumnIndex(col_idx))?;
189
190                    constraint.add(val)
191                })?;
192            Ok(constraints.push(new_constraint))
193        })
194}
195
196fn fetch_all_unique_columns(column_defs: &[ColumnDef]) -> Vec<(usize, String)> {
197    column_defs
198        .iter()
199        .enumerate()
200        .filter_map(|(i, table_col)| table_col.unique.map(|_| (i, table_col.name.to_owned())))
201        .collect()
202}
203
204fn fetch_specified_unique_columns(
205    all_column_defs: &[ColumnDef],
206    specified_columns: &[String],
207) -> Vec<(usize, String)> {
208    all_column_defs
209        .iter()
210        .enumerate()
211        .filter_map(|(i, table_col)| {
212            (table_col.unique.is_some()
213                && specified_columns.iter().any(|col| col == &table_col.name))
214            .then_some((i, table_col.name.to_owned()))
215        })
216        .collect()
217}