mire 0.1.1

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
//! Replica-safe projection leases.
//!
//! At-most-one-active-leader-per-subscription across the cluster, with
//! bounded-time failover when the leader dies. See `tasks/replica-leases.md`
//! for the design and proof sketch; the short version:
//!
//! - A worker [`try_acquire`]s a lease. If the row already exists and is
//!   still valid (`leased_until > now()`) and held by a different
//!   `worker_id`, acquire returns `None`. Otherwise the row is upserted
//!   atomically and the new `fence_token` is published into
//!   `es_subscriptions.fence_token` in the same transaction.
//! - The acquired worker [`heartbeat`]s periodically. A 0-row heartbeat
//!   means the worker has been fenced (someone else stole the lease, or
//!   the row was tampered with) and the runner must stop dispatching.
//! - Every [`checkpoint`] carries the worker's fence_token and matches
//!   strictly (`fence_token = $known`). A fenced checkpoint silently
//!   becomes a no-op rather than overwriting the new leader's writes.
//! - On graceful shutdown the worker [`release`]s the lease so failover
//!   doesn't have to wait out the TTL.
//!
//! The public surface is exposed under `#[doc(hidden)]` so concurrency
//! tests can drive it directly; production code should use the
//! `ProjectionRunner` which wraps these calls correctly.

use std::time::Duration;

use chrono::{DateTime, Utc};
use sqlx::{PgPool, Postgres, Row, postgres::PgConnection};

use crate::EventStoreError;

/// Outcome of [`try_acquire`]. `Acquired { fence_token }` means this worker
/// holds the lease; `Held` means another worker still holds it.
#[derive(Debug)]
pub enum AcquireOutcome {
    Acquired { fence_token: i64 },
    Held,
}

/// Snapshot of a lease row as seen in Postgres. Returned by
/// [`status`] for the public `ProjectionRunner::lease_status` API.
#[derive(Debug, Clone)]
pub struct LeaseStatus {
    pub worker_id: String,
    pub fence_token: i64,
    pub leased_until: DateTime<Utc>,
    pub heartbeat_at: DateTime<Utc>,
}

/// Try to acquire (or steal a stale) lease for `subscription_id`.
///
/// Three statements in one transaction:
/// 1. Ensure the `es_subscriptions` row exists (idempotent insert).
/// 2. Upsert the lease row; bump `fence_token` only when the existing row
///    is stealable AND belongs to a different worker (self re-acquire
///    keeps the same token).
/// 3. Publish the new `fence_token` into `es_subscriptions` so a stale
///    leader's checkpoint cannot win the race against ours.
///
/// Returns `AcquireOutcome::Acquired { fence_token }` on success or
/// `AcquireOutcome::Held` if another worker still holds it.
pub async fn try_acquire(
    pool: &PgPool,
    subscription_id: &str,
    worker_id: &str,
    ttl: Duration,
) -> Result<AcquireOutcome, EventStoreError> {
    let ttl_secs = ttl.as_secs_f64();
    let mut tx = pool.begin().await?;

    // 1. Make sure the subscription row exists so step 3 has a target.
    sqlx::query(
        "INSERT INTO es_subscriptions (subscription_id, last_position)
         VALUES ($1, 0)
         ON CONFLICT (subscription_id) DO NOTHING",
    )
    .bind(subscription_id)
    .execute(&mut *tx)
    .await?;

    // 2. Acquire or steal. Self re-acquire (same worker_id, lease still
    //    valid) is allowed and keeps the fence_token stable; the CASE
    //    only increments when a different worker is taking over a stale
    //    lease.
    let row = sqlx::query(
        "INSERT INTO es_projection_leases
            (subscription_id, worker_id, fence_token, leased_until, heartbeat_at)
         VALUES ($1, $2, 1, now() + ($3 || ' seconds')::interval, now())
         ON CONFLICT (subscription_id) DO UPDATE
            SET worker_id    = EXCLUDED.worker_id,
                fence_token  = CASE
                    WHEN es_projection_leases.worker_id = EXCLUDED.worker_id
                        THEN es_projection_leases.fence_token
                    ELSE es_projection_leases.fence_token + 1
                END,
                leased_until = EXCLUDED.leased_until,
                heartbeat_at = now()
            WHERE es_projection_leases.leased_until < now()
               OR es_projection_leases.worker_id    = EXCLUDED.worker_id
         RETURNING fence_token",
    )
    .bind(subscription_id)
    .bind(worker_id)
    .bind(ttl_secs.to_string())
    .fetch_optional(&mut *tx)
    .await?;

    let Some(row) = row else {
        // Another worker still holds the lease.
        tx.rollback().await?;
        return Ok(AcquireOutcome::Held);
    };

    let fence_token: i64 = row.get("fence_token");

    // 3. Publish the new fence_token to es_subscriptions atomically with
    //    the lease upsert, so any stale leader's `fence_token = $old`
    //    checkpoint immediately starts missing the WHERE clause.
    sqlx::query(
        "UPDATE es_subscriptions
            SET fence_token = $1
          WHERE subscription_id = $2
            AND fence_token < $1",
    )
    .bind(fence_token)
    .bind(subscription_id)
    .execute(&mut *tx)
    .await?;

    tx.commit().await?;
    Ok(AcquireOutcome::Acquired { fence_token })
}

/// Extend the lease deadline. Returns `Ok(true)` if the lease is still
/// ours (1 row updated), `Ok(false)` if we've been fenced.
///
/// `conn` is a dedicated, held connection so a saturated pool cannot
/// starve heartbeats — see the module docs.
pub async fn heartbeat(
    conn: &mut PgConnection,
    subscription_id: &str,
    worker_id: &str,
    fence_token: i64,
    ttl: Duration,
) -> Result<bool, EventStoreError> {
    let ttl_secs = ttl.as_secs_f64();
    let affected = sqlx::query(
        "UPDATE es_projection_leases
            SET leased_until = now() + ($1 || ' seconds')::interval,
                heartbeat_at = now()
          WHERE subscription_id = $2
            AND worker_id       = $3
            AND fence_token     = $4",
    )
    .bind(ttl_secs.to_string())
    .bind(subscription_id)
    .bind(worker_id)
    .bind(fence_token)
    .execute(conn)
    .await?
    .rows_affected();
    Ok(affected == 1)
}

/// Best-effort release on graceful shutdown. Sets `leased_until` to
/// `-infinity` so any other replica can steal immediately; the row
/// stays around to preserve the fence_token counter. Returns `Ok(true)`
/// if we were still the holder.
pub async fn release<'e, E>(
    executor: E,
    subscription_id: &str,
    worker_id: &str,
    fence_token: i64,
) -> Result<bool, EventStoreError>
where
    E: sqlx::Executor<'e, Database = Postgres>,
{
    let affected = sqlx::query(
        "UPDATE es_projection_leases
            SET leased_until = '-infinity'
          WHERE subscription_id = $1
            AND worker_id       = $2
            AND fence_token     = $3",
    )
    .bind(subscription_id)
    .bind(worker_id)
    .bind(fence_token)
    .execute(executor)
    .await?
    .rows_affected();
    Ok(affected == 1)
}

/// Fenced checkpoint: advance the subscription cursor *iff* we still hold
/// the lease (matched by strict `fence_token = $known`). Returns
/// `Ok(true)` on success, `Ok(false)` if we've been fenced.
pub async fn checkpoint(
    pool: &PgPool,
    subscription_id: &str,
    fence_token: i64,
    last_position: i64,
    last_transaction_id: u64,
) -> Result<bool, EventStoreError> {
    let affected = sqlx::query(
        "UPDATE es_subscriptions
            SET last_position       = $1,
                last_transaction_id = $2::text::xid8,
                updated_at          = now()
          WHERE subscription_id = $3
            AND fence_token     = $4",
    )
    .bind(last_position)
    .bind(last_transaction_id.to_string())
    .bind(subscription_id)
    .bind(fence_token)
    .execute(pool)
    .await?
    .rows_affected();
    Ok(affected == 1)
}

/// Read the current lease row, if one exists.
pub async fn status(
    pool: &PgPool,
    subscription_id: &str,
) -> Result<Option<LeaseStatus>, EventStoreError> {
    let row = sqlx::query(
        "SELECT worker_id, fence_token, leased_until, heartbeat_at
           FROM es_projection_leases
          WHERE subscription_id = $1",
    )
    .bind(subscription_id)
    .fetch_optional(pool)
    .await?;
    Ok(row.map(|r| LeaseStatus {
        worker_id: r.get("worker_id"),
        fence_token: r.get("fence_token"),
        leased_until: r.get("leased_until"),
        heartbeat_at: r.get("heartbeat_at"),
    }))
}