cratestack_sqlx/query/write/
create.rs1use cratestack_core::{AuditOperation, CoolContext, CoolError, ModelEventKind};
6
7use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
8use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
9use crate::{CreateModelInput, ModelDescriptor, SqlxRuntime, sqlx};
10
11use super::create_exec::create_record_with_executor;
12
13#[derive(Debug, Clone)]
14pub struct CreateRecord<'a, M: 'static, PK: 'static, I> {
15 pub(crate) runtime: &'a SqlxRuntime,
16 pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
17 pub(crate) input: I,
18}
19
20impl<'a, M: 'static, PK: 'static, I> CreateRecord<'a, M, PK, I>
21where
22 I: CreateModelInput<M>,
23{
24 pub fn preview_sql(&self) -> String {
25 let values = self.input.sql_values();
26 let placeholders = (1..=values.len())
27 .map(|index| format!("${index}"))
28 .collect::<Vec<_>>()
29 .join(", ");
30 let columns = values
31 .iter()
32 .map(|value| value.column)
33 .collect::<Vec<_>>()
34 .join(", ");
35
36 format!(
37 "INSERT INTO {} ({}) VALUES ({}) RETURNING {}",
38 self.descriptor.table_name,
39 columns,
40 placeholders,
41 self.descriptor.select_projection(),
42 )
43 }
44
45 pub async fn run_in_tx<'tx>(
50 self,
51 tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
52 ctx: &CoolContext,
53 ) -> Result<M, CoolError>
54 where
55 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
56 {
57 let emits_event = self.descriptor.emits(ModelEventKind::Created);
58 let audit_enabled = self.descriptor.audit_enabled;
59 if emits_event {
60 ensure_event_outbox_table(&mut **tx).await?;
61 }
62 if audit_enabled {
63 ensure_audit_table(self.runtime.pool()).await?;
64 }
65 let record = create_record_with_executor(
66 &mut **tx,
67 self.runtime.pool(),
68 self.descriptor,
69 self.input,
70 ctx,
71 )
72 .await?;
73 if emits_event {
74 enqueue_event_outbox(
75 &mut **tx,
76 self.descriptor.schema_name,
77 ModelEventKind::Created,
78 &record,
79 )
80 .await?;
81 }
82 if audit_enabled {
83 let after = serde_json::to_value(&record).ok();
84 let event =
85 build_audit_event(self.descriptor, AuditOperation::Create, None, after, ctx);
86 enqueue_audit_event(&mut **tx, &event).await?;
87 }
88 Ok(record)
89 }
90
91 pub async fn run(self, ctx: &CoolContext) -> Result<M, CoolError>
92 where
93 for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
94 {
95 let emits_event = self.descriptor.emits(ModelEventKind::Created);
96 let audit_enabled = self.descriptor.audit_enabled;
97 let needs_tx = emits_event || audit_enabled;
98 let record = if needs_tx {
99 let mut tx = self
100 .runtime
101 .pool()
102 .begin()
103 .await
104 .map_err(|error| CoolError::Database(error.to_string()))?;
105 if emits_event {
106 ensure_event_outbox_table(&mut *tx).await?;
107 }
108 if audit_enabled {
109 ensure_audit_table(self.runtime.pool()).await?;
110 }
111 let record = create_record_with_executor(
112 &mut *tx,
113 self.runtime.pool(),
114 self.descriptor,
115 self.input,
116 ctx,
117 )
118 .await?;
119 if emits_event {
120 enqueue_event_outbox(
121 &mut *tx,
122 self.descriptor.schema_name,
123 ModelEventKind::Created,
124 &record,
125 )
126 .await?;
127 }
128 if audit_enabled {
129 let after = serde_json::to_value(&record).ok();
130 let event =
131 build_audit_event(self.descriptor, AuditOperation::Create, None, after, ctx);
132 enqueue_audit_event(&mut *tx, &event).await?;
133 }
134 tx.commit()
135 .await
136 .map_err(|error| CoolError::Database(error.to_string()))?;
137 record
138 } else {
139 create_record_with_executor(
140 self.runtime.pool(),
141 self.runtime.pool(),
142 self.descriptor,
143 self.input,
144 ctx,
145 )
146 .await?
147 };
148
149 if emits_event {
150 let _ = self.runtime.drain_event_outbox().await;
151 }
152
153 Ok(record)
154 }
155}