mire 0.2.2

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
use std::collections::{BTreeMap, HashMap};

use sqlx::{PgPool, Row};

use crate::{EventStore, EventStoreError, RecordedEvent};

/// A `(transaction_id, global_position)` cursor into the event log, ordered
/// lexicographically — the same order the poll queries use.
type Cursor = (u64, i64);

/// The cursor position immediately before `c`: a checkpoint persisted here
/// re-fetches the event at `c` on restart. Positions start at 1, so `p - 1`
/// never underflows into a different transaction's range.
fn just_before(c: Cursor) -> Cursor {
    (c.0, c.1 - 1)
}

fn cursor_of(e: &RecordedEvent) -> Cursor {
    (e.transaction_id, e.global_position)
}

/// A resumable cursor over the event log with a **per-stream ordering
/// guarantee** (review CORE-2/SUB-1; see `tasks/subscriber-ordering.md`).
///
/// Raw log order is `(transaction_id, global_position)` — xid-assignment
/// order, *not* per-stream version order. A multi-statement transaction
/// (`TransactionScope`, the saga runner) that writes before appending gets
/// its xid early, so a stream's v2 can sort — and become deliverable —
/// before v1. `poll`/`poll_category` therefore reorder:
///
/// - Each fetched batch is re-sorted by `global_position` (per-stream
///   position is version-monotonic, so this is per-stream version order).
/// - An event whose predecessor (`stream_version - 1`) has not been
///   delivered yet is **held** in memory instead of delivered; other
///   streams keep flowing. It is released the moment the predecessor
///   arrives. (The predecessor is always already committed — appends to a
///   stream serialize on the `es_streams` row lock — so the wait terminates
///   with whatever transaction is pinning the xmin horizon.)
/// - The **poll cursor** keeps advancing past held events (no livelock),
///   but the **checkpointable cursor** ([`checkpoint_cursor_after`]) lags
///   just before the earliest unhandled event, so a crash replays the held
///   window: at-least-once, never lost, never reordered.
///
/// The contract delivered to handlers: **each stream's events arrive in
/// strictly increasing `stream_version` order; redelivery after a restart
/// is a prefix-replay, never a reordering.** Handlers still need
/// idempotency (at-least-once stands); they no longer need
/// order-insensitivity.
///
/// As an always-on backstop, a delivery that would regress a stream
/// (version ≤ something already delivered this session) fails loudly with
/// [`EventStoreError::OrderingViolation`] rather than corrupting a read
/// model.
///
/// **Internal / unblessed (review CORE-4/SUB-2).** This is the low-level
/// poll primitive that [`ProjectionRunner`](crate::ProjectionRunner) is
/// built on; it is `#[doc(hidden)]` and is **not** part of the supported
/// public API. Production code should use `ProjectionRunner`, which
/// coordinates replicas via fenced leases. A bare `Subscription` has no
/// lease, so its [`checkpoint`](Self::checkpoint)/[`reset`](Self::reset)
/// are guarded to only touch *non-lease-managed* subscriptions
/// (`fence_token = 0`) — they can no longer silently clobber a
/// `ProjectionRunner`'s cursor on the same id.
pub struct Subscription {
    id: String,
    store: EventStore,
    db: PgPool,
    /// Poll cursor: where the next fetch starts. Advances over *fetched*
    /// events, including ones that were held rather than delivered.
    poll_cursor: Cursor,
    batch_size: i64,
    /// The persisted checkpoint at session start. Events at or before this
    /// cursor are treated as handled by previous sessions — the baseline
    /// for the one-time per-stream `last_delivered` initialisation.
    session_start: Cursor,
    /// Highest `stream_version` delivered this session, per stream. Lazily
    /// initialised per stream from the log at-or-before `session_start`.
    /// Doubles as the ordering backstop.
    last_delivered: HashMap<String, i64>,
    /// Fetched events waiting for their predecessor, keyed by cursor so the
    /// earliest hold bounds the checkpointable cursor.
    held: BTreeMap<Cursor, RecordedEvent>,
    /// Cursors of the most recent delivered batch, in delivery order —
    /// suffix-minimised by [`checkpoint_cursor_after`] so a partially
    /// handled batch checkpoints safely.
    last_batch_cursors: Vec<Cursor>,
}

impl Subscription {
    pub async fn create(
        store: EventStore,
        db: PgPool,
        id: impl Into<String>,
        batch_size: i64,
    ) -> Result<Self, EventStoreError> {
        let id = id.into();

        sqlx::query(
            "INSERT INTO es_subscriptions (subscription_id, last_position)
             VALUES ($1, 0)
             ON CONFLICT (subscription_id) DO NOTHING",
        )
        .bind(&id)
        .execute(&db)
        .await?;

        let row = sqlx::query(
            "SELECT last_position, last_transaction_id::text AS last_transaction_id
             FROM es_subscriptions WHERE subscription_id = $1",
        )
        .bind(&id)
        .fetch_one(&db)
        .await?;

        let last_position: i64 = row.get("last_position");
        let last_transaction_id: String = row.get("last_transaction_id");
        let cursor: Cursor = (last_transaction_id.parse().unwrap_or(0), last_position);

        Ok(Self {
            id,
            store,
            db,
            poll_cursor: cursor,
            batch_size,
            session_start: cursor,
            last_delivered: HashMap::new(),
            held: BTreeMap::new(),
            last_batch_cursors: Vec::new(),
        })
    }

    /// Fetch the next batch and return the *deliverable* events, per-stream
    /// ordered (see the type docs). May return an empty batch while events
    /// are held awaiting their predecessor — keep polling.
    pub async fn poll(&mut self) -> Result<Vec<RecordedEvent>, EventStoreError> {
        let raw = self
            .store
            .read_all_after(self.poll_cursor.0, self.poll_cursor.1, self.batch_size)
            .await?;
        self.gate(raw).await
    }

    /// Like [`poll`](Self::poll), restricted to one stream category.
    pub async fn poll_category(
        &mut self,
        category: &str,
    ) -> Result<Vec<RecordedEvent>, EventStoreError> {
        let raw = self
            .store
            .read_category_after(
                category,
                self.poll_cursor.0,
                self.poll_cursor.1,
                self.batch_size,
            )
            .await?;
        self.gate(raw).await
    }

    /// The ordering gate: advance the poll cursor over the raw batch, then
    /// deliver in `global_position` order, holding back any event whose
    /// predecessor hasn't been delivered and draining holds the moment they
    /// unblock.
    async fn gate(
        &mut self,
        raw: Vec<RecordedEvent>,
    ) -> Result<Vec<RecordedEvent>, EventStoreError> {
        // The raw batch arrives in (txid, pos) order; its last element is the
        // furthest poll cursor. Advance even past events we end up holding —
        // held events live in memory, not in re-fetches.
        if let Some(last) = raw.last() {
            self.poll_cursor = cursor_of(last);
        }

        let mut batch = raw;
        batch.sort_by_key(|e| e.global_position);

        // Initialise `last_delivered` for every stream this batch is the
        // first sighting of — ONE bulk query per poll, not one per stream
        // (a cold catch-up sees thousands of new streams; per-stream
        // round-trips dominate and can starve the poll loop).
        self.init_unknown_streams(&batch).await?;

        let mut out: Vec<RecordedEvent> = Vec::with_capacity(batch.len());
        for event in batch {
            let last = *self
                .last_delivered
                .get(&event.stream_id)
                .expect("init_unknown_streams populated every batch stream");
            if event.stream_version == last + 1 {
                self.deliver(event, &mut out);
            } else if event.stream_version > last + 1 {
                self.held.insert(cursor_of(&event), event);
            } else {
                // Backstop (D-ORD-3, always on): within one session this can
                // only mean checkpoint/log inconsistency or an ordering bug.
                // Refuse rather than regress a read model.
                return Err(EventStoreError::OrderingViolation {
                    subscription_id: self.id.clone(),
                    stream_id: event.stream_id.clone(),
                    version: event.stream_version,
                    last_delivered: last,
                });
            }
        }

        self.last_batch_cursors = out.iter().map(cursor_of).collect();
        Ok(out)
    }

    /// Deliver `event`, then drain any held successors it unblocks.
    fn deliver(&mut self, event: RecordedEvent, out: &mut Vec<RecordedEvent>) {
        let stream = event.stream_id.clone();
        let mut version = event.stream_version;
        out.push(event);
        loop {
            let next = self
                .held
                .iter()
                .find(|(_, e)| e.stream_id == stream && e.stream_version == version + 1)
                .map(|(k, _)| *k);
            let Some(key) = next else {
                break;
            };
            let e = self.held.remove(&key).expect("key just found");
            version = e.stream_version;
            out.push(e);
        }
        self.last_delivered.insert(stream, version);
    }

    /// Initialise `last_delivered` for batch streams seen for the first
    /// time this session: the highest version at or before the
    /// session-start checkpoint counts as delivered by previous sessions.
    /// One bulk query per poll; streams with no pre-checkpoint events
    /// initialise to 0.
    async fn init_unknown_streams(
        &mut self,
        batch: &[RecordedEvent],
    ) -> Result<(), EventStoreError> {
        let mut unknown: Vec<&str> = batch
            .iter()
            .map(|e| e.stream_id.as_str())
            .filter(|s| !self.last_delivered.contains_key(*s))
            .collect();
        unknown.sort_unstable();
        unknown.dedup();
        if unknown.is_empty() {
            return Ok(());
        }

        let rows: Vec<(String, i64)> = sqlx::query_as(
            "SELECT stream_id, max(stream_version)
             FROM es_events
             WHERE stream_id = ANY($1)
               AND ((transaction_id = $2::text::xid8 AND global_position <= $3)
                    OR transaction_id < $2::text::xid8)
             GROUP BY stream_id",
        )
        .bind(unknown.iter().map(|s| s.to_string()).collect::<Vec<_>>())
        .bind(self.session_start.0.to_string())
        .bind(self.session_start.1)
        .fetch_all(&self.db)
        .await?;

        for s in &unknown {
            self.last_delivered.insert((*s).to_string(), 0);
        }
        for (stream_id, max_version) in rows {
            self.last_delivered.insert(stream_id, max_version);
        }
        Ok(())
    }

    /// The safe persisted-checkpoint cursor once the first `handled` events
    /// of the most recent batch have been processed. Never passes an
    /// unhandled event: the rest of the batch, every held event, and
    /// everything beyond the poll cursor all stay re-fetchable. `handled =
    /// batch.len()` is the end-of-batch checkpoint.
    pub fn checkpoint_cursor_after(&self, handled: usize) -> Cursor {
        let mut safe = self.poll_cursor;
        if let Some((first_held, _)) = self.held.iter().next() {
            safe = safe.min(just_before(*first_held));
        }
        if let Some(min_rest) = self.last_batch_cursors.get(handled..).and_then(|rest| {
            // Delivery order is position order, not cursor order — take the
            // true minimum over the unhandled suffix.
            rest.iter().min().copied()
        }) {
            safe = safe.min(just_before(min_rest));
        }
        safe
    }

    /// Persist the cursor, treating the most recent batch as fully handled.
    /// **Fenced:** only applies to a non-lease-managed subscription
    /// (`fence_token = 0`). If a `ProjectionRunner` owns this subscription
    /// id (its lease bumped `fence_token` above 0), this is a no-op rather
    /// than a cursor-clobbering write (review CORE-4/SUB-2).
    pub async fn checkpoint(&self) -> Result<(), EventStoreError> {
        let (txid, pos) = self.checkpoint_cursor_after(self.last_batch_cursors.len());
        sqlx::query(
            "UPDATE es_subscriptions
             SET last_position = $1, last_transaction_id = $2::text::xid8, updated_at = now()
             WHERE subscription_id = $3 AND fence_token = 0",
        )
        .bind(pos)
        .bind(txid.to_string())
        .bind(&self.id)
        .execute(&self.db)
        .await?;
        Ok(())
    }

    /// Rewind the cursor to the start. Clears the ordering state too —
    /// replaying from an older cursor legitimately re-presents delivered
    /// events, which must not trip the backstop. **Fenced** like
    /// [`checkpoint`](Self::checkpoint): a no-op on a lease-managed
    /// subscription, so it can't rewind a `ProjectionRunner`'s cursor.
    pub async fn reset(&mut self) -> Result<(), EventStoreError> {
        self.poll_cursor = (0, 0);
        self.session_start = (0, 0);
        self.last_delivered.clear();
        self.held.clear();
        self.last_batch_cursors.clear();
        sqlx::query(
            "UPDATE es_subscriptions
             SET last_position = 0, last_transaction_id = '0'::xid8, updated_at = now()
             WHERE subscription_id = $1 AND fence_token = 0",
        )
        .bind(&self.id)
        .execute(&self.db)
        .await?;
        Ok(())
    }

    pub fn position(&self) -> i64 {
        self.poll_cursor.1
    }

    /// Number of events currently held awaiting a predecessor. Exposed for
    /// observability: a hold that survives many polls means some transaction
    /// is sitting on the xmin horizon (or, pathologically, an event was
    /// deleted from the log).
    pub fn held_count(&self) -> usize {
        self.held.len()
    }
}