mire 0.2.0

A small, generic PostgreSQL event-sourcing library: append-only event streams, aggregates with optimistic concurrency, and subscription-based projections (requires tokio + sqlx)
Documentation
#[derive(Debug, thiserror::Error)]
pub enum EventStoreError {
    #[error(
        "concurrency conflict on stream '{stream_id}': expected version {expected}, found {actual}"
    )]
    ConcurrencyConflict {
        stream_id: String,
        expected: i64,
        actual: i64,
    },

    #[error("stream not found: '{0}'")]
    StreamNotFound(String),

    #[error(
        "failed to deserialize event at position {global_position} in stream '{stream_id}' (type: {event_type}): {source}"
    )]
    Deserialization {
        stream_id: String,
        global_position: i64,
        event_type: String,
        source: serde_json::Error,
    },

    #[error("event serialization error: {0}")]
    Serialization(#[from] serde_json::Error),

    /// The events read for a stream do not match its recorded
    /// `stream_version`: a gap, a non-contiguous version sequence, or a
    /// count/highest-version mismatch. This is a hard integrity failure
    /// (truncated read, partial append, or corrupted log) — surfaced
    /// rather than silently hydrating an aggregate from partial history,
    /// which would let a command be validated against wrong state and
    /// committed (review C1/SNAP-1).
    #[error(
        "stream '{stream_id}' is corrupt: recorded version {recorded_version} but {detail} (read {read_count} events)"
    )]
    StreamCorruption {
        stream_id: String,
        recorded_version: i64,
        read_count: i64,
        detail: String,
    },

    /// The always-on per-stream ordering backstop tripped (review
    /// CORE-2/SUB-1): within one continuous delivery session, an event
    /// arrived whose `stream_version` is not ahead of what this
    /// subscription already delivered for the stream. Either the log/
    /// checkpoint state is inconsistent (e.g. a pre-fix checkpoint that
    /// passed a delivered-out-of-order event) or there is an ordering bug —
    /// both mean a read model could silently regress, so delivery fails
    /// loudly instead of handing the event to a handler.
    #[error(
        "ordering violation on subscription '{subscription_id}', stream '{stream_id}': \
         event v{version} arrived but v{last_delivered} was already delivered \
         (checkpoint/log inconsistency or ordering bug — refusing to corrupt the read model)"
    )]
    OrderingViolation {
        subscription_id: String,
        stream_id: String,
        version: i64,
        last_delivered: i64,
    },

    #[error("database error: {0}")]
    Database(#[from] sqlx::Error),
}

/// Coarse classification of [`EventStoreError::Database`] for callers that
/// want to react differently to common failure modes without depending on
/// `sqlx` types. Use [`EventStoreError::db_kind`] to inspect.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DbErrorKind {
    /// Could not reach the database (network/TLS/IO problem). Retryable.
    Connection,
    /// Postgres `lock_not_available` (SQLSTATE `55P03`). The caller set
    /// `lock_timeout` and a row lock didn't free in time.
    LockTimeout,
    /// Postgres `deadlock_detected` (SQLSTATE `40P01`). Retryable.
    Deadlock,
    /// Postgres `unique_violation` (SQLSTATE `23505`). Typically a bug
    /// or genuine duplicate; not blindly retryable.
    UniqueViolation,
    /// Postgres `foreign_key_violation` (SQLSTATE `23503`).
    ForeignKeyViolation,
    /// Anything else — inspect the underlying [`sqlx::Error`] if you
    /// need more.
    Other,
}

impl EventStoreError {
    /// Classify a [`Database`](EventStoreError::Database) variant into a
    /// coarse bucket. Returns `None` for non-database variants. Lets
    /// callers branch on common cases (retry vs surface) without
    /// reaching into `sqlx::Error` internals.
    ///
    /// ```rust,ignore
    /// match store.save(&mut agg).await {
    ///     Err(e) if e.db_kind() == Some(DbErrorKind::LockTimeout) => retry_later(),
    ///     Err(e) => bail!("save failed: {e}"),
    ///     Ok(_) => {}
    /// }
    /// ```
    pub fn db_kind(&self) -> Option<DbErrorKind> {
        let Self::Database(e) = self else {
            return None;
        };
        match e {
            sqlx::Error::Io(_) => return Some(DbErrorKind::Connection),
            sqlx::Error::PoolTimedOut => return Some(DbErrorKind::Connection),
            sqlx::Error::PoolClosed => return Some(DbErrorKind::Connection),
            sqlx::Error::Tls(_) => return Some(DbErrorKind::Connection),
            _ => {}
        }
        let code = e.as_database_error().and_then(|d| d.code());
        Some(match code.as_deref() {
            Some("55P03") => DbErrorKind::LockTimeout,
            Some("40P01") => DbErrorKind::Deadlock,
            Some("23505") => DbErrorKind::UniqueViolation,
            Some("23503") => DbErrorKind::ForeignKeyViolation,
            _ => DbErrorKind::Other,
        })
    }

    /// Is this error worth retrying? Returns `true` for transient DB
    /// failures (connection issues, deadlocks) and `false` for
    /// semantic failures (concurrency conflicts, unique violations,
    /// serialization errors).
    pub fn is_retryable(&self) -> bool {
        matches!(
            self.db_kind(),
            Some(DbErrorKind::Connection | DbErrorKind::Deadlock | DbErrorKind::LockTimeout)
        )
    }
}