Skip to main content

coil_data/
mutation.rs

1use std::fmt;
2
3use crate::{
4    CompiledStatement, DataModelError, DataValue, QueryField, QueryFilter, TableName,
5    compile_filters, quote_identifier, render_placeholder, require_non_empty,
6};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum TransactionIsolation {
10    ReadCommitted,
11    RepeatableRead,
12    Serializable,
13}
14
15impl fmt::Display for TransactionIsolation {
16    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17        match self {
18            Self::ReadCommitted => f.write_str("READ COMMITTED"),
19            Self::RepeatableRead => f.write_str("REPEATABLE READ"),
20            Self::Serializable => f.write_str("SERIALIZABLE"),
21        }
22    }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct DomainWrite {
27    pub resource: String,
28    pub action: String,
29}
30
31impl DomainWrite {
32    pub fn new(
33        resource: impl Into<String>,
34        action: impl Into<String>,
35    ) -> Result<Self, DataModelError> {
36        Ok(Self {
37            resource: require_non_empty("write_resource", resource.into())?,
38            action: require_non_empty("write_action", action.into())?,
39        })
40    }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum MutationAction {
45    Insert,
46    Update,
47    Upsert,
48    Delete,
49}
50
51impl fmt::Display for MutationAction {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        match self {
54            Self::Insert => f.write_str("insert"),
55            Self::Update => f.write_str("update"),
56            Self::Upsert => f.write_str("upsert"),
57            Self::Delete => f.write_str("delete"),
58        }
59    }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct MutationField {
64    pub field: QueryField,
65    pub value: DataValue,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct MutationSpec {
70    pub table: TableName,
71    pub action: MutationAction,
72    pub predicates: Vec<QueryFilter>,
73    pub assignments: Vec<MutationField>,
74    pub conflict_fields: Vec<QueryField>,
75}
76
77impl MutationSpec {
78    pub fn new(table: impl Into<String>, action: MutationAction) -> Result<Self, DataModelError> {
79        Ok(Self {
80            table: TableName::new(table)?,
81            action,
82            predicates: Vec::new(),
83            assignments: Vec::new(),
84            conflict_fields: Vec::new(),
85        })
86    }
87
88    pub fn with_predicate(mut self, predicate: QueryFilter) -> Self {
89        self.predicates.push(predicate);
90        self
91    }
92
93    pub fn with_assignment(
94        mut self,
95        field: impl Into<String>,
96        value: impl Into<DataValue>,
97    ) -> Result<Self, DataModelError> {
98        self.assignments.push(MutationField {
99            field: QueryField::new(field)?,
100            value: value.into(),
101        });
102        Ok(self)
103    }
104
105    pub fn on_conflict_field(mut self, field: impl Into<String>) -> Result<Self, DataModelError> {
106        self.conflict_fields.push(QueryField::new(field)?);
107        Ok(self)
108    }
109
110    pub fn compile(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
111        match self.action {
112            MutationAction::Insert => self.compile_insert(start_index),
113            MutationAction::Update => self.compile_update(start_index),
114            MutationAction::Upsert => self.compile_upsert(start_index),
115            MutationAction::Delete => self.compile_delete(start_index),
116        }
117    }
118
119    fn compile_insert(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
120        if self.assignments.is_empty() {
121            return Err(DataModelError::MissingMutationAssignments {
122                table: self.table.to_string(),
123                action: self.action,
124            });
125        }
126
127        let columns = self
128            .assignments
129            .iter()
130            .map(|assignment| quote_identifier(assignment.field.as_str()))
131            .collect::<Vec<_>>()
132            .join(", ");
133        let placeholders = (start_index..start_index + self.assignments.len())
134            .map(render_placeholder)
135            .collect::<Vec<_>>()
136            .join(", ");
137
138        Ok(CompiledStatement {
139            sql: format!(
140                "INSERT INTO {} ({columns}) VALUES ({placeholders})",
141                quote_identifier(self.table.as_str())
142            ),
143            bind_values: self
144                .assignments
145                .iter()
146                .map(|assignment| assignment.value.clone())
147                .collect(),
148        })
149    }
150
151    fn compile_update(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
152        if self.assignments.is_empty() {
153            return Err(DataModelError::MissingMutationAssignments {
154                table: self.table.to_string(),
155                action: self.action,
156            });
157        }
158        if self.predicates.is_empty() {
159            return Err(DataModelError::MissingMutationPredicates {
160                table: self.table.to_string(),
161                action: self.action,
162            });
163        }
164
165        let set_clause = self
166            .assignments
167            .iter()
168            .enumerate()
169            .map(|(offset, assignment)| {
170                format!(
171                    "{} = {}",
172                    quote_identifier(assignment.field.as_str()),
173                    render_placeholder(start_index + offset)
174                )
175            })
176            .collect::<Vec<_>>()
177            .join(", ");
178
179        let (where_clauses, mut bind_values, _) =
180            compile_filters(&self.predicates, start_index + self.assignments.len())?;
181        let mut assignment_values = self
182            .assignments
183            .iter()
184            .map(|assignment| assignment.value.clone())
185            .collect::<Vec<_>>();
186        assignment_values.append(&mut bind_values);
187
188        Ok(CompiledStatement {
189            sql: format!(
190                "UPDATE {} SET {set_clause} WHERE {}",
191                quote_identifier(self.table.as_str()),
192                where_clauses.join(" AND ")
193            ),
194            bind_values: assignment_values,
195        })
196    }
197
198    fn compile_upsert(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
199        if self.assignments.is_empty() {
200            return Err(DataModelError::MissingMutationAssignments {
201                table: self.table.to_string(),
202                action: self.action,
203            });
204        }
205        if self.conflict_fields.is_empty() {
206            return Err(DataModelError::MissingConflictFields {
207                table: self.table.to_string(),
208            });
209        }
210
211        let insert = self.compile_insert(start_index)?;
212        let conflict = self
213            .conflict_fields
214            .iter()
215            .map(|field| quote_identifier(field.as_str()))
216            .collect::<Vec<_>>()
217            .join(", ");
218        let update_clause = self
219            .assignments
220            .iter()
221            .map(|assignment| {
222                format!(
223                    "{} = EXCLUDED.{}",
224                    quote_identifier(assignment.field.as_str()),
225                    quote_identifier(assignment.field.as_str())
226                )
227            })
228            .collect::<Vec<_>>()
229            .join(", ");
230
231        Ok(CompiledStatement {
232            sql: format!(
233                "{} ON CONFLICT ({conflict}) DO UPDATE SET {update_clause}",
234                insert.sql
235            ),
236            bind_values: insert.bind_values,
237        })
238    }
239
240    fn compile_delete(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
241        if self.predicates.is_empty() {
242            return Err(DataModelError::MissingMutationPredicates {
243                table: self.table.to_string(),
244                action: self.action,
245            });
246        }
247
248        let (where_clauses, bind_values, _) = compile_filters(&self.predicates, start_index)?;
249        Ok(CompiledStatement {
250            sql: format!(
251                "DELETE FROM {} WHERE {}",
252                quote_identifier(self.table.as_str()),
253                where_clauses.join(" AND ")
254            ),
255            bind_values,
256        })
257    }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq)]
261pub struct TransactionPlan {
262    pub name: String,
263    pub isolation: TransactionIsolation,
264    pub writes: Vec<DomainWrite>,
265    pub after_commit_jobs: Vec<String>,
266    pub after_commit_events: Vec<String>,
267}
268
269impl TransactionPlan {
270    pub fn new(
271        name: impl Into<String>,
272        isolation: TransactionIsolation,
273    ) -> Result<Self, DataModelError> {
274        Ok(Self {
275            name: require_non_empty("transaction_name", name.into())?,
276            isolation,
277            writes: Vec::new(),
278            after_commit_jobs: Vec::new(),
279            after_commit_events: Vec::new(),
280        })
281    }
282
283    pub fn with_write(mut self, write: DomainWrite) -> Self {
284        self.writes.push(write);
285        self
286    }
287
288    pub fn with_after_commit_job(
289        mut self,
290        job_name: impl Into<String>,
291    ) -> Result<Self, DataModelError> {
292        self.after_commit_jobs
293            .push(require_non_empty("after_commit_job", job_name.into())?);
294        Ok(self)
295    }
296
297    pub fn with_after_commit_event(
298        mut self,
299        event_name: impl Into<String>,
300    ) -> Result<Self, DataModelError> {
301        self.after_commit_events
302            .push(require_non_empty("after_commit_event", event_name.into())?);
303        Ok(self)
304    }
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub struct CompiledTransaction {
309    pub begin_sql: String,
310    pub commit_sql: String,
311    pub statements: Vec<CompiledStatement>,
312}