Skip to main content

cratestack_sqlx/query/write/
create.rs

1//! `CreateRecord` — single-row INSERT with policy + audit + event
2//! fan-out. `run()` opens its own tx only when audit/event capture is
3//! enabled; otherwise it goes straight against the pool.
4
5use cratestack_core::{AuditOperation, CoolContext, CoolError, ModelEventKind};
6
7use crate::audit::{build_audit_event, enqueue_audit_event, ensure_audit_table};
8use crate::descriptor::{enqueue_event_outbox, ensure_event_outbox_table};
9use crate::{CreateModelInput, ModelDescriptor, SqlxRuntime, sqlx};
10
11use super::create_exec::create_record_with_executor;
12
13#[derive(Debug, Clone)]
14pub struct CreateRecord<'a, M: 'static, PK: 'static, I> {
15    pub(crate) runtime: &'a SqlxRuntime,
16    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
17    pub(crate) input: I,
18}
19
20impl<'a, M: 'static, PK: 'static, I> CreateRecord<'a, M, PK, I>
21where
22    I: CreateModelInput<M>,
23{
24    pub fn preview_sql(&self) -> String {
25        let values = self.input.sql_values();
26        let placeholders = (1..=values.len())
27            .map(|index| format!("${index}"))
28            .collect::<Vec<_>>()
29            .join(", ");
30        let columns = values
31            .iter()
32            .map(|value| value.column)
33            .collect::<Vec<_>>()
34            .join(", ");
35
36        format!(
37            "INSERT INTO {} ({}) VALUES ({}) RETURNING {}",
38            self.descriptor.table_name,
39            columns,
40            placeholders,
41            self.descriptor.select_projection(),
42        )
43    }
44
45    /// Like [`Self::run`] but participates in a caller-supplied
46    /// transaction. The insert + outbox + audit writes all happen
47    /// inside `tx`; caller commits. Event outbox is *not* drained —
48    /// the outbox row isn't visible to the drain worker until commit.
49    pub async fn run_in_tx<'tx>(
50        self,
51        tx: &mut sqlx::Transaction<'tx, sqlx::Postgres>,
52        ctx: &CoolContext,
53    ) -> Result<M, CoolError>
54    where
55        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
56    {
57        let emits_event = self.descriptor.emits(ModelEventKind::Created);
58        let audit_enabled = self.descriptor.audit_enabled;
59        if emits_event {
60            ensure_event_outbox_table(&mut **tx).await?;
61        }
62        if audit_enabled {
63            ensure_audit_table(self.runtime.pool()).await?;
64        }
65        let record = create_record_with_executor(
66            &mut **tx,
67            self.runtime.pool(),
68            self.descriptor,
69            self.input,
70            ctx,
71        )
72        .await?;
73        if emits_event {
74            enqueue_event_outbox(
75                &mut **tx,
76                self.descriptor.schema_name,
77                ModelEventKind::Created,
78                &record,
79            )
80            .await?;
81        }
82        if audit_enabled {
83            let after = serde_json::to_value(&record).ok();
84            let event =
85                build_audit_event(self.descriptor, AuditOperation::Create, None, after, ctx);
86            enqueue_audit_event(&mut **tx, &event).await?;
87        }
88        Ok(record)
89    }
90
91    pub async fn run(self, ctx: &CoolContext) -> Result<M, CoolError>
92    where
93        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
94    {
95        let emits_event = self.descriptor.emits(ModelEventKind::Created);
96        let audit_enabled = self.descriptor.audit_enabled;
97        let needs_tx = emits_event || audit_enabled;
98        let record = if needs_tx {
99            let mut tx = self
100                .runtime
101                .pool()
102                .begin()
103                .await
104                .map_err(|error| CoolError::Database(error.to_string()))?;
105            if emits_event {
106                ensure_event_outbox_table(&mut *tx).await?;
107            }
108            if audit_enabled {
109                ensure_audit_table(self.runtime.pool()).await?;
110            }
111            let record = create_record_with_executor(
112                &mut *tx,
113                self.runtime.pool(),
114                self.descriptor,
115                self.input,
116                ctx,
117            )
118            .await?;
119            if emits_event {
120                enqueue_event_outbox(
121                    &mut *tx,
122                    self.descriptor.schema_name,
123                    ModelEventKind::Created,
124                    &record,
125                )
126                .await?;
127            }
128            if audit_enabled {
129                let after = serde_json::to_value(&record).ok();
130                let event =
131                    build_audit_event(self.descriptor, AuditOperation::Create, None, after, ctx);
132                enqueue_audit_event(&mut *tx, &event).await?;
133            }
134            tx.commit()
135                .await
136                .map_err(|error| CoolError::Database(error.to_string()))?;
137            record
138        } else {
139            create_record_with_executor(
140                self.runtime.pool(),
141                self.runtime.pool(),
142                self.descriptor,
143                self.input,
144                ctx,
145            )
146            .await?
147        };
148
149        if emits_event {
150            let _ = self.runtime.drain_event_outbox().await;
151        }
152
153        Ok(record)
154    }
155}