cala_ledger/outbox/
publisher.rs

1use cala_types::outbox::OutboxEventPayload;
2use es_entity::clock::ClockHandle;
3
4use super::ObixOutbox;
5
6#[derive(Debug, Clone)]
7pub struct OutboxPublisher {
8    inner: ObixOutbox,
9}
10
11impl OutboxPublisher {
12    pub async fn init(pool: &sqlx::PgPool, clock: &ClockHandle) -> Result<Self, sqlx::Error> {
13        let config = obix::MailboxConfig::builder()
14            .clock(clock.clone())
15            .build()
16            .expect("MailboxConfig");
17        let outbox = ObixOutbox::init(pool, config).await?;
18        Ok(Self { inner: outbox })
19    }
20
21    pub async fn publish_entity_events<Entity, Event>(
22        &self,
23        op: &mut impl es_entity::AtomicOperation,
24        _: Entity,
25        new_events: es_entity::LastPersisted<'_, Event>,
26    ) -> Result<(), sqlx::Error>
27    where
28        Event: es_entity::EsEvent,
29        for<'a> &'a Event: Into<OutboxEventPayload>,
30    {
31        self.inner
32            .publish_all_persisted(op, new_events.map(|e| &e.event))
33            .await
34    }
35
36    pub async fn publish_all(
37        &self,
38        op: &mut impl es_entity::AtomicOperation,
39        events: impl Iterator<Item = OutboxEventPayload>,
40    ) -> Result<(), sqlx::Error> {
41        self.inner.publish_all_persisted(op, events).await
42    }
43
44    pub fn inner(&self) -> &ObixOutbox {
45        &self.inner
46    }
47}