gluesql_core/executor/
update.rs1use {
2 super::{context::RowContext, evaluate::evaluate},
3 crate::{
4 ast::{Assignment, ColumnDef, ColumnUniqueOption, ForeignKey},
5 data::{Key, Row, Value},
6 result::{Error, Result},
7 store::GStore,
8 },
9 futures::stream::{self, StreamExt, TryStreamExt},
10 serde::Serialize,
11 std::{borrow::Cow, fmt::Debug, sync::Arc},
12 thiserror::Error,
13};
14
15#[derive(Error, Serialize, Debug, PartialEq, Eq)]
16pub enum UpdateError {
17 #[error("column not found {0}")]
18 ColumnNotFound(String),
19
20 #[error("update on primary key is not supported: {0}")]
21 UpdateOnPrimaryKeyNotSupported(String),
22
23 #[error("conflict on schema, row data does not fit to schema")]
24 ConflictOnSchema,
25
26 #[error(
27 "cannot find referenced value on {table_name}.{column_name} with value {referenced_value:?}"
28 )]
29 CannotFindReferencedValue {
30 table_name: String,
31 column_name: String,
32 referenced_value: String,
33 },
34}
35
36pub struct Update<'a, T: GStore> {
37 storage: &'a T,
38 table_name: &'a str,
39 fields: &'a [Assignment],
40 column_defs: Option<&'a [ColumnDef]>,
41}
42
43impl<'a, T: GStore> Update<'a, T> {
44 pub fn new(
45 storage: &'a T,
46 table_name: &'a str,
47 fields: &'a [Assignment],
48 column_defs: Option<&'a [ColumnDef]>,
49 ) -> Result<Self> {
50 if let Some(column_defs) = column_defs {
51 for assignment in fields {
52 let Assignment { id, .. } = assignment;
53
54 if column_defs.iter().all(|col_def| &col_def.name != id) {
55 return Err(UpdateError::ColumnNotFound(id.to_owned()).into());
56 } else if column_defs.iter().any(|ColumnDef { name, unique, .. }| {
57 name == id && matches!(unique, Some(ColumnUniqueOption { is_primary: true }))
58 }) {
59 return Err(UpdateError::UpdateOnPrimaryKeyNotSupported(id.to_owned()).into());
60 }
61 }
62 }
63
64 Ok(Self {
65 storage,
66 table_name,
67 fields,
68 column_defs,
69 })
70 }
71
72 pub async fn apply(&self, row: Row, foreign_keys: &[ForeignKey]) -> Result<Row> {
73 let context = RowContext::new(self.table_name, Cow::Borrowed(&row), None);
74 let context = Some(Arc::new(context));
75
76 let assignments = stream::iter(self.fields.iter())
77 .then(|assignment| {
78 let Assignment {
79 id,
80 value: value_expr,
81 } = assignment;
82 let context = context.as_ref().map(Arc::clone);
83
84 async move {
85 let evaluated = evaluate(self.storage, context, None, value_expr).await?;
86 let value = match self.column_defs {
87 Some(column_defs) => {
88 let ColumnDef {
89 data_type,
90 nullable,
91 ..
92 } = column_defs
93 .iter()
94 .find(|column_def| id == &column_def.name)
95 .ok_or(UpdateError::ConflictOnSchema)?;
96
97 evaluated.try_into_value(data_type, *nullable)?
98 }
99 None => evaluated.try_into()?,
100 };
101
102 Ok::<_, Error>((id.as_ref(), value))
103 }
104 })
105 .and_then(|(id, value)| async move {
106 if value == Value::Null {
107 return Ok((id, value));
108 }
109
110 for foreign_key in foreign_keys {
111 let ForeignKey {
112 referencing_column_name,
113 referenced_table_name,
114 referenced_column_name,
115 ..
116 } = foreign_key;
117
118 if referencing_column_name != id {
119 continue;
120 }
121
122 let no_referenced = self
123 .storage
124 .fetch_data(referenced_table_name, &Key::try_from(&value)?)
125 .await?
126 .is_none();
127
128 if no_referenced {
129 return Err(UpdateError::CannotFindReferencedValue {
130 table_name: referenced_table_name.to_owned(),
131 column_name: referenced_column_name.to_owned(),
132 referenced_value: String::from(value),
133 }
134 .into());
135 }
136 }
137
138 Ok((id, value))
139 })
140 .try_collect::<Vec<(&str, Value)>>()
141 .await?;
142
143 Ok(match row {
144 Row::Vec { columns, values } => {
145 let values = columns
146 .iter()
147 .zip(values)
148 .map(|(column, value)| {
149 assignments
150 .iter()
151 .find_map(|(id, new_value)| (column == id).then_some(new_value.clone()))
152 .unwrap_or(value)
153 })
154 .collect();
155
156 Row::Vec { columns, values }
157 }
158 Row::Map(values) => {
159 let assignments = assignments
160 .into_iter()
161 .map(|(id, value)| (id.to_owned(), value));
162
163 let mut new_values = values;
164 new_values.extend(assignments);
165 Row::Map(new_values)
166 }
167 })
168 }
169}