Skip to main content

cratestack_sqlx/query/batch/
create.rs

1//! `batch_create` driver — opens the outer tx, fans the inputs out to
2//! [`super::create_item::run_create_item`] (one savepoint per input),
3//! commits, drains the outbox.
4
5use cratestack_core::{BatchResponse, CoolContext, CoolError, ModelEventKind};
6
7use crate::audit::ensure_audit_table;
8use crate::descriptor::ensure_event_outbox_table;
9use crate::{CreateModelInput, ModelDescriptor, SqlxRuntime, sqlx};
10
11use super::create_item::run_create_item;
12use super::validate::validate_batch_size;
13
14#[derive(Debug, Clone)]
15pub struct BatchCreate<'a, M: 'static, PK: 'static, I> {
16    pub(crate) runtime: &'a SqlxRuntime,
17    pub(crate) descriptor: &'static ModelDescriptor<M, PK>,
18    pub(crate) inputs: Vec<I>,
19}
20
21impl<'a, M: 'static, PK: 'static, I> BatchCreate<'a, M, PK, I>
22where
23    I: CreateModelInput<M> + Send,
24{
25    pub async fn run(self, ctx: &CoolContext) -> Result<BatchResponse<M>, CoolError>
26    where
27        for<'r> M: Send + Unpin + sqlx::FromRow<'r, sqlx::postgres::PgRow> + serde::Serialize,
28    {
29        validate_batch_size(self.inputs.len())?;
30        // No PK dedup — `CreateModelInput` doesn't expose the PK
31        // generically (and server-generated PKs make duplicates
32        // impossible). Client-supplied PK collisions trip the DB
33        // uniqueness constraint and surface as `CoolError::Database`.
34        // The right primitive for idempotent client-PK ingestion is
35        // `.batch_upsert(...)`.
36        if self.inputs.is_empty() {
37            return Ok(BatchResponse::from_results(vec![]));
38        }
39
40        let emits_event = self.descriptor.emits(ModelEventKind::Created);
41        let audit_enabled = self.descriptor.audit_enabled;
42
43        let mut tx = self
44            .runtime
45            .pool()
46            .begin()
47            .await
48            .map_err(|error| CoolError::Database(error.to_string()))?;
49        if emits_event {
50            ensure_event_outbox_table(&mut *tx).await?;
51        }
52        if audit_enabled {
53            ensure_audit_table(self.runtime.pool()).await?;
54        }
55
56        let mut per_item: Vec<Result<M, CoolError>> = Vec::with_capacity(self.inputs.len());
57        for input in self.inputs {
58            let outcome = run_create_item(
59                &mut tx,
60                self.runtime.pool(),
61                self.descriptor,
62                input,
63                ctx,
64                emits_event,
65                audit_enabled,
66            )
67            .await?;
68            per_item.push(outcome);
69        }
70
71        tx.commit()
72            .await
73            .map_err(|error| CoolError::Database(error.to_string()))?;
74
75        if emits_event {
76            let _ = self.runtime.drain_event_outbox().await;
77        }
78
79        Ok(BatchResponse::from_results(per_item))
80    }
81}