obix 0.2.24

Implementation of outbox backed by PG / sqlx
Documentation
mod config;
mod error;
mod event;
mod job;

use es_entity::clock::ClockHandle;
use serde::Serialize;

pub use config::*;
pub use error::*;
pub use event::*;
pub use job::{InboxHandler, InboxResult};

use crate::tables::MailboxTables;

#[derive(Clone)]
pub struct Inbox<Tables = crate::tables::DefaultMailboxTables> {
    pool: sqlx::PgPool,
    spawner: job::InboxJobSpawner<Tables>,
    clock: ClockHandle,
    _phantom: std::marker::PhantomData<Tables>,
}

impl<Tables> Inbox<Tables>
where
    Tables: MailboxTables,
{
    pub fn new<H>(
        pool: &sqlx::PgPool,
        jobs: &mut job::Jobs,
        config: InboxConfig,
        handler: H,
    ) -> Self
    where
        H: InboxHandler,
    {
        let initializer = job::InboxJobInitializer::<H, Tables>::new(
            pool,
            handler,
            config.job_type.clone(),
            config.retry_settings.clone(),
            config.clock.clone(),
        );

        let spawner = jobs.add_initializer(initializer);

        Self {
            pool: pool.clone(),
            spawner,
            _phantom: std::marker::PhantomData,
            clock: config.clock.clone(),
        }
    }

    pub async fn begin_op(&self) -> Result<es_entity::DbOp<'static>, sqlx::Error> {
        es_entity::DbOp::init_with_clock(&self.pool, &self.clock).await
    }

    pub async fn persist_and_queue_job<P>(
        &self,
        idempotency_key: impl Into<InboxIdempotencyKey>,
        event: P,
    ) -> Result<es_entity::Idempotent<InboxEventId>, InboxError>
    where
        P: Serialize + Send + Sync,
    {
        let mut op = self.begin_op().await?;
        let res = self
            .persist_and_queue_job_in_op(&mut op, idempotency_key, event)
            .await?;
        op.commit().await?;
        Ok(res)
    }

    pub async fn persist_and_queue_job_in_op<P>(
        &self,
        op: &mut impl es_entity::AtomicOperation,
        idempotency_key: impl Into<InboxIdempotencyKey>,
        event: P,
    ) -> Result<es_entity::Idempotent<InboxEventId>, InboxError>
    where
        P: Serialize + Send + Sync,
    {
        let idempotency_key = idempotency_key.into();

        let Some(id) = Tables::insert_inbox_event(op, &idempotency_key, &event).await? else {
            return Ok(es_entity::Idempotent::AlreadyApplied);
        };

        let config = job::InboxJobData::<Tables> {
            inbox_event_id: id,
            _phantom: std::marker::PhantomData,
        };

        self.spawner.spawn_in_op(op, id, config).await?;

        Ok(es_entity::Idempotent::Executed(id))
    }

    pub async fn find_event_by_id(&self, id: InboxEventId) -> Result<InboxEvent, InboxError> {
        Tables::find_inbox_event_by_id(&self.pool, id).await
    }

    pub async fn list_failed(&self, limit: usize) -> Result<Vec<InboxEvent>, InboxError> {
        Tables::list_inbox_events_by_status(&self.pool, InboxEventStatus::Failed, limit).await
    }
}