cala_ledger/outbox/
publisher.rs

1use cala_types::outbox::OutboxEventPayload;
2
3use super::ObixOutbox;
4
5#[derive(Debug, Clone)]
6pub struct OutboxPublisher {
7    inner: ObixOutbox,
8}
9
10impl OutboxPublisher {
11    pub async fn init(pool: &sqlx::PgPool) -> Result<Self, sqlx::Error> {
12        let outbox = ObixOutbox::init(pool, Default::default()).await?;
13        Ok(Self { inner: outbox })
14    }
15
16    pub async fn publish_entity_events<Entity, Event>(
17        &self,
18        op: &mut impl es_entity::AtomicOperation,
19        _: Entity,
20        new_events: es_entity::LastPersisted<'_, Event>,
21    ) -> Result<(), sqlx::Error>
22    where
23        Event: es_entity::EsEvent,
24        for<'a> &'a Event: Into<OutboxEventPayload>,
25    {
26        self.inner
27            .publish_all_persisted(op, new_events.map(|e| &e.event))
28            .await
29    }
30
31    pub async fn publish_all(
32        &self,
33        op: &mut impl es_entity::AtomicOperation,
34        events: impl Iterator<Item = OutboxEventPayload>,
35    ) -> Result<(), sqlx::Error> {
36        self.inner.publish_all_persisted(op, events).await
37    }
38
39    pub fn inner(&self) -> &ObixOutbox {
40        &self.inner
41    }
42}