obix 0.2.26

Implementation of outbox backed by PG / sqlx
Documentation
use std::{marker::PhantomData, sync::Arc};

use es_entity::hooks::{CommitHook, HookOperation, PreCommitRet};
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::broadcast;

use crate::out::event::{EphemeralEventType, EphemeralOutboxEvent};
use crate::tables::MailboxTables;

pub struct EphemeralEvent<P> {
    pub event_type: EphemeralEventType,
    pub payload: P,
}

pub struct PersistEphemeralEvents<P, Tables>
where
    P: Serialize + DeserializeOwned + Send + Sync + 'static + Unpin,
    Tables: MailboxTables,
{
    sender: broadcast::Sender<Arc<EphemeralOutboxEvent<P>>>,
    pre_commit_events: Vec<EphemeralEvent<P>>,
    post_commit_events: Vec<EphemeralOutboxEvent<P>>,
    _phantom: PhantomData<Tables>,
}

impl<P, Tables> PersistEphemeralEvents<P, Tables>
where
    P: Serialize + DeserializeOwned + Send + Sync + 'static + Unpin,
    Tables: MailboxTables,
{
    pub fn new(
        sender: broadcast::Sender<Arc<EphemeralOutboxEvent<P>>>,
        event_type: EphemeralEventType,
        event: impl Into<P>,
    ) -> Self {
        Self {
            sender,
            pre_commit_events: vec![EphemeralEvent {
                event_type,
                payload: event.into(),
            }],
            post_commit_events: Vec::new(),
            _phantom: PhantomData,
        }
    }
}

impl<P, Tables> CommitHook for PersistEphemeralEvents<P, Tables>
where
    P: Serialize + DeserializeOwned + Send + Sync + 'static + Unpin,
    Tables: MailboxTables,
{
    async fn pre_commit(
        mut self,
        mut op: HookOperation<'_>,
    ) -> Result<PreCommitRet<'_, Self>, sqlx::Error> {
        for event in self.pre_commit_events.drain(..) {
            let persisted =
                Tables::persist_ephemeral_event_in_op(&mut op, event.event_type, event.payload)
                    .await?;
            self.post_commit_events.push(persisted);
        }
        PreCommitRet::ok(self, op)
    }

    fn post_commit(self) {
        for event in self.post_commit_events {
            let _ = self.sender.send(Arc::new(event));
        }
    }

    fn merge(&mut self, other: &mut Self) -> bool {
        self.pre_commit_events.append(&mut other.pre_commit_events);
        true
    }
}