cala_ledger/outbox/
publisher.rs1use cala_types::outbox::OutboxEventPayload;
2
3use super::ObixOutbox;
4
5#[derive(Debug, Clone)]
6pub struct OutboxPublisher {
7 inner: ObixOutbox,
8}
9
10impl OutboxPublisher {
11 pub async fn init(pool: &sqlx::PgPool) -> Result<Self, sqlx::Error> {
12 let outbox = ObixOutbox::init(pool, Default::default()).await?;
13 Ok(Self { inner: outbox })
14 }
15
16 pub async fn publish_entity_events<Entity, Event>(
17 &self,
18 op: &mut impl es_entity::AtomicOperation,
19 _: Entity,
20 new_events: es_entity::LastPersisted<'_, Event>,
21 ) -> Result<(), sqlx::Error>
22 where
23 Event: es_entity::EsEvent,
24 for<'a> &'a Event: Into<OutboxEventPayload>,
25 {
26 self.inner
27 .publish_all_persisted(op, new_events.map(|e| &e.event))
28 .await
29 }
30
31 pub async fn publish_all(
32 &self,
33 op: &mut impl es_entity::AtomicOperation,
34 events: impl Iterator<Item = OutboxEventPayload>,
35 ) -> Result<(), sqlx::Error> {
36 self.inner.publish_all_persisted(op, events).await
37 }
38
39 pub fn inner(&self) -> &ObixOutbox {
40 &self.inner
41 }
42}