gluesql_core/executor/
update.rs

1use {
2    super::{context::RowContext, evaluate::evaluate},
3    crate::{
4        ast::{Assignment, ColumnDef, ColumnUniqueOption, ForeignKey},
5        data::{Key, Row, Value},
6        result::{Error, Result},
7        store::GStore,
8    },
9    futures::stream::{self, StreamExt, TryStreamExt},
10    serde::Serialize,
11    std::{borrow::Cow, fmt::Debug, sync::Arc},
12    thiserror::Error,
13};
14
15#[derive(Error, Serialize, Debug, PartialEq, Eq)]
16pub enum UpdateError {
17    #[error("column not found {0}")]
18    ColumnNotFound(String),
19
20    #[error("update on primary key is not supported: {0}")]
21    UpdateOnPrimaryKeyNotSupported(String),
22
23    #[error("conflict on schema, row data does not fit to schema")]
24    ConflictOnSchema,
25
26    #[error(
27        "cannot find referenced value on {table_name}.{column_name} with value {referenced_value:?}"
28    )]
29    CannotFindReferencedValue {
30        table_name: String,
31        column_name: String,
32        referenced_value: String,
33    },
34}
35
36pub struct Update<'a, T: GStore> {
37    storage: &'a T,
38    table_name: &'a str,
39    fields: &'a [Assignment],
40    column_defs: Option<&'a [ColumnDef]>,
41}
42
43impl<'a, T: GStore> Update<'a, T> {
44    pub fn new(
45        storage: &'a T,
46        table_name: &'a str,
47        fields: &'a [Assignment],
48        column_defs: Option<&'a [ColumnDef]>,
49    ) -> Result<Self> {
50        if let Some(column_defs) = column_defs {
51            for assignment in fields {
52                let Assignment { id, .. } = assignment;
53
54                if column_defs.iter().all(|col_def| &col_def.name != id) {
55                    return Err(UpdateError::ColumnNotFound(id.to_owned()).into());
56                } else if column_defs.iter().any(|ColumnDef { name, unique, .. }| {
57                    name == id && matches!(unique, Some(ColumnUniqueOption { is_primary: true }))
58                }) {
59                    return Err(UpdateError::UpdateOnPrimaryKeyNotSupported(id.to_owned()).into());
60                }
61            }
62        }
63
64        Ok(Self {
65            storage,
66            table_name,
67            fields,
68            column_defs,
69        })
70    }
71
72    pub async fn apply(&self, row: Row, foreign_keys: &[ForeignKey]) -> Result<Row> {
73        let context = RowContext::new(self.table_name, Cow::Borrowed(&row), None);
74        let context = Some(Arc::new(context));
75
76        let assignments = stream::iter(self.fields.iter())
77            .then(|assignment| {
78                let Assignment {
79                    id,
80                    value: value_expr,
81                } = assignment;
82                let context = context.as_ref().map(Arc::clone);
83
84                async move {
85                    let evaluated = evaluate(self.storage, context, None, value_expr).await?;
86                    let value = match self.column_defs {
87                        Some(column_defs) => {
88                            let ColumnDef {
89                                data_type,
90                                nullable,
91                                ..
92                            } = column_defs
93                                .iter()
94                                .find(|column_def| id == &column_def.name)
95                                .ok_or(UpdateError::ConflictOnSchema)?;
96
97                            evaluated.try_into_value(data_type, *nullable)?
98                        }
99                        None => evaluated.try_into()?,
100                    };
101
102                    Ok::<_, Error>((id.as_ref(), value))
103                }
104            })
105            .and_then(|(id, value)| async move {
106                if value == Value::Null {
107                    return Ok((id, value));
108                }
109
110                for foreign_key in foreign_keys {
111                    let ForeignKey {
112                        referencing_column_name,
113                        referenced_table_name,
114                        referenced_column_name,
115                        ..
116                    } = foreign_key;
117
118                    if referencing_column_name != id {
119                        continue;
120                    }
121
122                    let no_referenced = self
123                        .storage
124                        .fetch_data(referenced_table_name, &Key::try_from(&value)?)
125                        .await?
126                        .is_none();
127
128                    if no_referenced {
129                        return Err(UpdateError::CannotFindReferencedValue {
130                            table_name: referenced_table_name.to_owned(),
131                            column_name: referenced_column_name.to_owned(),
132                            referenced_value: String::from(value),
133                        }
134                        .into());
135                    }
136                }
137
138                Ok((id, value))
139            })
140            .try_collect::<Vec<(&str, Value)>>()
141            .await?;
142
143        Ok(match row {
144            Row::Vec { columns, values } => {
145                let values = columns
146                    .iter()
147                    .zip(values)
148                    .map(|(column, value)| {
149                        assignments
150                            .iter()
151                            .find_map(|(id, new_value)| (column == id).then_some(new_value.clone()))
152                            .unwrap_or(value)
153                    })
154                    .collect();
155
156                Row::Vec { columns, values }
157            }
158            Row::Map(values) => {
159                let assignments = assignments
160                    .into_iter()
161                    .map(|(id, value)| (id.to_owned(), value));
162
163                let mut new_values = values;
164                new_values.extend(assignments);
165                Row::Map(new_values)
166            }
167        })
168    }
169}