mire 0.1.1

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
use sqlx::{PgPool, Row};

use crate::{EventStore, EventStoreError, RecordedEvent};

/// A resumable cursor over the event log.
///
/// Tracks its position as a `(transaction_id, global_position)` pair rather than
/// a bare position so it never skips an event that committed out of sequence
/// order (see the schema notes on `transaction_id`). Poll returns only events
/// from fully-committed transactions, so advancing the checkpoint is always
/// safe.
pub struct Subscription {
    id: String,
    store: EventStore,
    db: PgPool,
    last_transaction_id: u64,
    last_position: i64,
    batch_size: i64,
}

impl Subscription {
    pub async fn create(
        store: EventStore,
        db: PgPool,
        id: impl Into<String>,
        batch_size: i64,
    ) -> Result<Self, EventStoreError> {
        let id = id.into();

        sqlx::query(
            "INSERT INTO es_subscriptions (subscription_id, last_position)
             VALUES ($1, 0)
             ON CONFLICT (subscription_id) DO NOTHING",
        )
        .bind(&id)
        .execute(&db)
        .await?;

        let row = sqlx::query(
            "SELECT last_position, last_transaction_id::text AS last_transaction_id
             FROM es_subscriptions WHERE subscription_id = $1",
        )
        .bind(&id)
        .fetch_one(&db)
        .await?;

        let last_position: i64 = row.get("last_position");
        let last_transaction_id: String = row.get("last_transaction_id");

        Ok(Self {
            id,
            store,
            db,
            last_transaction_id: last_transaction_id.parse().unwrap_or(0),
            last_position,
            batch_size,
        })
    }

    pub async fn poll(&mut self) -> Result<Vec<RecordedEvent>, EventStoreError> {
        let events = self
            .store
            .read_all_after(
                self.last_transaction_id,
                self.last_position,
                self.batch_size,
            )
            .await?;
        self.advance(&events);
        Ok(events)
    }

    pub async fn poll_category(
        &mut self,
        category: &str,
    ) -> Result<Vec<RecordedEvent>, EventStoreError> {
        let events = self
            .store
            .read_category_after(
                category,
                self.last_transaction_id,
                self.last_position,
                self.batch_size,
            )
            .await?;
        self.advance(&events);
        Ok(events)
    }

    /// Move the in-memory cursor to the last event of a batch. Events are
    /// ordered by `(transaction_id, global_position)`, so the last one carries
    /// the furthest cursor.
    fn advance(&mut self, events: &[RecordedEvent]) {
        if let Some(last) = events.last() {
            self.last_transaction_id = last.transaction_id;
            self.last_position = last.global_position;
        }
    }

    pub async fn checkpoint(&self) -> Result<(), EventStoreError> {
        sqlx::query(
            "UPDATE es_subscriptions
             SET last_position = $1, last_transaction_id = $2::text::xid8, updated_at = now()
             WHERE subscription_id = $3",
        )
        .bind(self.last_position)
        .bind(self.last_transaction_id.to_string())
        .bind(&self.id)
        .execute(&self.db)
        .await?;
        Ok(())
    }

    pub async fn reset(&mut self) -> Result<(), EventStoreError> {
        self.last_position = 0;
        self.last_transaction_id = 0;
        sqlx::query(
            "UPDATE es_subscriptions
             SET last_position = 0, last_transaction_id = '0'::xid8, updated_at = now()
             WHERE subscription_id = $1",
        )
        .bind(&self.id)
        .execute(&self.db)
        .await?;
        Ok(())
    }

    pub fn position(&self) -> i64 {
        self.last_position
    }
}