cala_ledger/outbox/
publisher.rs1use cala_types::outbox::OutboxEventPayload;
2use es_entity::clock::ClockHandle;
3
4use super::ObixOutbox;
5
6#[derive(Debug, Clone)]
7pub struct OutboxPublisher {
8 inner: ObixOutbox,
9}
10
11impl OutboxPublisher {
12 pub async fn init(pool: &sqlx::PgPool, clock: &ClockHandle) -> Result<Self, sqlx::Error> {
13 let config = obix::MailboxConfig::builder()
14 .clock(clock.clone())
15 .build()
16 .expect("MailboxConfig");
17 let outbox = ObixOutbox::init(pool, config).await?;
18 Ok(Self { inner: outbox })
19 }
20
21 pub async fn publish_entity_events<Entity, Event>(
22 &self,
23 op: &mut impl es_entity::AtomicOperation,
24 _: Entity,
25 new_events: es_entity::LastPersisted<'_, Event>,
26 ) -> Result<(), sqlx::Error>
27 where
28 Event: es_entity::EsEvent,
29 for<'a> &'a Event: Into<OutboxEventPayload>,
30 {
31 self.inner
32 .publish_all_persisted(op, new_events.map(|e| &e.event))
33 .await
34 }
35
36 pub async fn publish_all(
37 &self,
38 op: &mut impl es_entity::AtomicOperation,
39 events: impl Iterator<Item = OutboxEventPayload>,
40 ) -> Result<(), sqlx::Error> {
41 self.inner.publish_all_persisted(op, events).await
42 }
43
44 pub fn inner(&self) -> &ObixOutbox {
45 &self.inner
46 }
47}