1use std::fmt;
2
3use crate::{
4 CompiledStatement, DataModelError, DataValue, QueryField, QueryFilter, TableName,
5 compile_filters, quote_identifier, render_placeholder, require_non_empty,
6};
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9pub enum TransactionIsolation {
10 ReadCommitted,
11 RepeatableRead,
12 Serializable,
13}
14
15impl fmt::Display for TransactionIsolation {
16 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
17 match self {
18 Self::ReadCommitted => f.write_str("READ COMMITTED"),
19 Self::RepeatableRead => f.write_str("REPEATABLE READ"),
20 Self::Serializable => f.write_str("SERIALIZABLE"),
21 }
22 }
23}
24
25#[derive(Debug, Clone, PartialEq, Eq)]
26pub struct DomainWrite {
27 pub resource: String,
28 pub action: String,
29}
30
31impl DomainWrite {
32 pub fn new(
33 resource: impl Into<String>,
34 action: impl Into<String>,
35 ) -> Result<Self, DataModelError> {
36 Ok(Self {
37 resource: require_non_empty("write_resource", resource.into())?,
38 action: require_non_empty("write_action", action.into())?,
39 })
40 }
41}
42
43#[derive(Debug, Clone, Copy, PartialEq, Eq)]
44pub enum MutationAction {
45 Insert,
46 Update,
47 Upsert,
48 Delete,
49}
50
51impl fmt::Display for MutationAction {
52 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53 match self {
54 Self::Insert => f.write_str("insert"),
55 Self::Update => f.write_str("update"),
56 Self::Upsert => f.write_str("upsert"),
57 Self::Delete => f.write_str("delete"),
58 }
59 }
60}
61
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct MutationField {
64 pub field: QueryField,
65 pub value: DataValue,
66}
67
68#[derive(Debug, Clone, PartialEq, Eq)]
69pub struct MutationSpec {
70 pub table: TableName,
71 pub action: MutationAction,
72 pub predicates: Vec<QueryFilter>,
73 pub assignments: Vec<MutationField>,
74 pub conflict_fields: Vec<QueryField>,
75}
76
77impl MutationSpec {
78 pub fn new(table: impl Into<String>, action: MutationAction) -> Result<Self, DataModelError> {
79 Ok(Self {
80 table: TableName::new(table)?,
81 action,
82 predicates: Vec::new(),
83 assignments: Vec::new(),
84 conflict_fields: Vec::new(),
85 })
86 }
87
88 pub fn with_predicate(mut self, predicate: QueryFilter) -> Self {
89 self.predicates.push(predicate);
90 self
91 }
92
93 pub fn with_assignment(
94 mut self,
95 field: impl Into<String>,
96 value: impl Into<DataValue>,
97 ) -> Result<Self, DataModelError> {
98 self.assignments.push(MutationField {
99 field: QueryField::new(field)?,
100 value: value.into(),
101 });
102 Ok(self)
103 }
104
105 pub fn on_conflict_field(mut self, field: impl Into<String>) -> Result<Self, DataModelError> {
106 self.conflict_fields.push(QueryField::new(field)?);
107 Ok(self)
108 }
109
110 pub fn compile(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
111 match self.action {
112 MutationAction::Insert => self.compile_insert(start_index),
113 MutationAction::Update => self.compile_update(start_index),
114 MutationAction::Upsert => self.compile_upsert(start_index),
115 MutationAction::Delete => self.compile_delete(start_index),
116 }
117 }
118
119 fn compile_insert(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
120 if self.assignments.is_empty() {
121 return Err(DataModelError::MissingMutationAssignments {
122 table: self.table.to_string(),
123 action: self.action,
124 });
125 }
126
127 let columns = self
128 .assignments
129 .iter()
130 .map(|assignment| quote_identifier(assignment.field.as_str()))
131 .collect::<Vec<_>>()
132 .join(", ");
133 let placeholders = (start_index..start_index + self.assignments.len())
134 .map(render_placeholder)
135 .collect::<Vec<_>>()
136 .join(", ");
137
138 Ok(CompiledStatement {
139 sql: format!(
140 "INSERT INTO {} ({columns}) VALUES ({placeholders})",
141 quote_identifier(self.table.as_str())
142 ),
143 bind_values: self
144 .assignments
145 .iter()
146 .map(|assignment| assignment.value.clone())
147 .collect(),
148 })
149 }
150
151 fn compile_update(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
152 if self.assignments.is_empty() {
153 return Err(DataModelError::MissingMutationAssignments {
154 table: self.table.to_string(),
155 action: self.action,
156 });
157 }
158 if self.predicates.is_empty() {
159 return Err(DataModelError::MissingMutationPredicates {
160 table: self.table.to_string(),
161 action: self.action,
162 });
163 }
164
165 let set_clause = self
166 .assignments
167 .iter()
168 .enumerate()
169 .map(|(offset, assignment)| {
170 format!(
171 "{} = {}",
172 quote_identifier(assignment.field.as_str()),
173 render_placeholder(start_index + offset)
174 )
175 })
176 .collect::<Vec<_>>()
177 .join(", ");
178
179 let (where_clauses, mut bind_values, _) =
180 compile_filters(&self.predicates, start_index + self.assignments.len())?;
181 let mut assignment_values = self
182 .assignments
183 .iter()
184 .map(|assignment| assignment.value.clone())
185 .collect::<Vec<_>>();
186 assignment_values.append(&mut bind_values);
187
188 Ok(CompiledStatement {
189 sql: format!(
190 "UPDATE {} SET {set_clause} WHERE {}",
191 quote_identifier(self.table.as_str()),
192 where_clauses.join(" AND ")
193 ),
194 bind_values: assignment_values,
195 })
196 }
197
198 fn compile_upsert(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
199 if self.assignments.is_empty() {
200 return Err(DataModelError::MissingMutationAssignments {
201 table: self.table.to_string(),
202 action: self.action,
203 });
204 }
205 if self.conflict_fields.is_empty() {
206 return Err(DataModelError::MissingConflictFields {
207 table: self.table.to_string(),
208 });
209 }
210
211 let insert = self.compile_insert(start_index)?;
212 let conflict = self
213 .conflict_fields
214 .iter()
215 .map(|field| quote_identifier(field.as_str()))
216 .collect::<Vec<_>>()
217 .join(", ");
218 let update_clause = self
219 .assignments
220 .iter()
221 .map(|assignment| {
222 format!(
223 "{} = EXCLUDED.{}",
224 quote_identifier(assignment.field.as_str()),
225 quote_identifier(assignment.field.as_str())
226 )
227 })
228 .collect::<Vec<_>>()
229 .join(", ");
230
231 Ok(CompiledStatement {
232 sql: format!(
233 "{} ON CONFLICT ({conflict}) DO UPDATE SET {update_clause}",
234 insert.sql
235 ),
236 bind_values: insert.bind_values,
237 })
238 }
239
240 fn compile_delete(&self, start_index: usize) -> Result<CompiledStatement, DataModelError> {
241 if self.predicates.is_empty() {
242 return Err(DataModelError::MissingMutationPredicates {
243 table: self.table.to_string(),
244 action: self.action,
245 });
246 }
247
248 let (where_clauses, bind_values, _) = compile_filters(&self.predicates, start_index)?;
249 Ok(CompiledStatement {
250 sql: format!(
251 "DELETE FROM {} WHERE {}",
252 quote_identifier(self.table.as_str()),
253 where_clauses.join(" AND ")
254 ),
255 bind_values,
256 })
257 }
258}
259
260#[derive(Debug, Clone, PartialEq, Eq)]
261pub struct TransactionPlan {
262 pub name: String,
263 pub isolation: TransactionIsolation,
264 pub writes: Vec<DomainWrite>,
265 pub after_commit_jobs: Vec<String>,
266 pub after_commit_events: Vec<String>,
267}
268
269impl TransactionPlan {
270 pub fn new(
271 name: impl Into<String>,
272 isolation: TransactionIsolation,
273 ) -> Result<Self, DataModelError> {
274 Ok(Self {
275 name: require_non_empty("transaction_name", name.into())?,
276 isolation,
277 writes: Vec::new(),
278 after_commit_jobs: Vec::new(),
279 after_commit_events: Vec::new(),
280 })
281 }
282
283 pub fn with_write(mut self, write: DomainWrite) -> Self {
284 self.writes.push(write);
285 self
286 }
287
288 pub fn with_after_commit_job(
289 mut self,
290 job_name: impl Into<String>,
291 ) -> Result<Self, DataModelError> {
292 self.after_commit_jobs
293 .push(require_non_empty("after_commit_job", job_name.into())?);
294 Ok(self)
295 }
296
297 pub fn with_after_commit_event(
298 mut self,
299 event_name: impl Into<String>,
300 ) -> Result<Self, DataModelError> {
301 self.after_commit_events
302 .push(require_non_empty("after_commit_event", event_name.into())?);
303 Ok(self)
304 }
305}
306
307#[derive(Debug, Clone, PartialEq, Eq)]
308pub struct CompiledTransaction {
309 pub begin_sql: String,
310 pub commit_sql: String,
311 pub statements: Vec<CompiledStatement>,
312}