cqrs-rust-lib 0.7.0

An opinionated implementation of CQRS/Event Sourcing with pluggable storage backends (InMemory, PostgreSQL, MongoDB, SurrealDB)
Documentation

cqrs-rust-lib

A comprehensive Command Query Responsibility Segregation (CQRS) and Event Sourcing library for Rust applications.

Features

  • CQRS and Event Sourcing core primitives
  • Split Aggregate / CommandHandler traits (Single Responsibility)
  • Structured domain error system with CqrsError and define_domain_errors! macro
  • Multiple storage backends:
    • In-memory (testing/development)
    • MongoDB (feature: mongodb)
    • PostgreSQL (feature: postgres)
  • REST routers with Axum and OpenAPI integration (feature: utoipa)
  • Audit log router for event history
  • Typed read storage and snapshot support
  • Pluggable dispatchers for denormalization/projections

Installation

Add this to your Cargo.toml:

[dependencies]
cqrs-rust-lib = "0.1.0"

Cargo features

Feature Description
mongodb MongoDB event persistence
postgres PostgreSQL event persistence and read utilities
utoipa REST routers and OpenAPI/Swagger integration
mcp MCP server integration (experimental)
all Enables utoipa, mongodb, and postgres
# PostgreSQL + REST
cqrs-rust-lib = { version = "0.1.0", features = ["postgres", "utoipa"] }

# MongoDB only
cqrs-rust-lib = { version = "0.1.0", features = ["mongodb"] }

Quick Start

1. Define your domain

use cqrs_rust_lib::{Aggregate, CommandHandler, CqrsContext, CqrsError, Event};
use http::StatusCode;
use serde::{Deserialize, Serialize};

// Events
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AccountEvent {
    Opened { owner: String },
    Deposited { amount: i64 },
    Withdrawn { amount: i64 },
}

impl Event for AccountEvent {
    fn event_type(&self) -> String {
        match self {
            Self::Opened { .. } => "opened".into(),
            Self::Deposited { .. } => "deposited".into(),
            Self::Withdrawn { .. } => "withdrawn".into(),
        }
    }
}

// Commands
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CreateCommand { Open { owner: String } }

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateCommand { Deposit { amount: i64 }, Withdraw { amount: i64 } }

// Aggregate
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Account {
    pub id: String,
    pub balance: i64,
}

#[async_trait::async_trait]
impl Aggregate for Account {
    const TYPE: &'static str = "account";
    type Event = AccountEvent;
    type Error = CqrsError;

    fn aggregate_id(&self) -> String { self.id.clone() }
    fn with_aggregate_id(mut self, id: String) -> Self { self.id = id; self }

    fn apply(&mut self, event: Self::Event) -> Result<(), Self::Error> {
        match event {
            AccountEvent::Opened { .. } => {}
            AccountEvent::Deposited { amount } => self.balance += amount,
            AccountEvent::Withdrawn { amount } => self.balance -= amount,
        }
        Ok(())
    }

    fn error(status: StatusCode, details: &str) -> Self::Error {
        CqrsError::from_status(status, details)
    }
}

#[async_trait::async_trait]
impl CommandHandler for Account {
    type CreateCommand = CreateCommand;
    type UpdateCommand = UpdateCommand;
    type Services = ();

    async fn handle_create(
        &self, command: Self::CreateCommand, _svc: &(), _ctx: &CqrsContext,
    ) -> Result<Vec<Self::Event>, Self::Error> {
        match command {
            CreateCommand::Open { owner } => Ok(vec![AccountEvent::Opened { owner }]),
        }
    }

    async fn handle_update(
        &self, command: Self::UpdateCommand, _svc: &(), _ctx: &CqrsContext,
    ) -> Result<Vec<Self::Event>, Self::Error> {
        match command {
            UpdateCommand::Deposit { amount } => Ok(vec![AccountEvent::Deposited { amount }]),
            UpdateCommand::Withdraw { amount } => Ok(vec![AccountEvent::Withdrawn { amount }]),
        }
    }
}

2. Create the engine and execute commands

use cqrs_rust_lib::es::{inmemory::InMemoryPersist, EventStoreImpl};
use cqrs_rust_lib::{CqrsCommandEngine, CqrsContext};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let store = EventStoreImpl::new(InMemoryPersist::<Account>::new());
    let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));

    let ctx = CqrsContext::default();
    let id = engine.execute_create(CreateCommand::Open { owner: "Alice".into() }, &ctx).await?;
    engine.execute_update(&id, UpdateCommand::Deposit { amount: 100 }, &ctx).await?;
    Ok(())
}

Domain Error Codes

Define structured, domain-specific error codes with the define_domain_errors! macro:

use cqrs_rust_lib::{define_domain_errors, CqrsError, CqrsErrorCode};
use http::StatusCode;

define_domain_errors! {
    domain: "account",
    prefix: 10,
    errors: {
        InsufficientFunds => (1, StatusCode::BAD_REQUEST, "INSUFFICIENT_FUNDS"),
        InvalidAmount     => (2, StatusCode::BAD_REQUEST, "INVALID_AMOUNT"),
        AccountClosed     => (3, StatusCode::GONE, "ACCOUNT_CLOSED"),
    }
}

impl From<ErrorCode> for CqrsError {
    fn from(e: ErrorCode) -> Self {
        e.error(e.to_string())
    }
}

Use in command handlers:

if self.balance < amount {
    return Err(ErrorCode::InsufficientFunds.error(
        format!("Cannot withdraw {}, balance is {}", amount, self.balance)
    ));
}

API response:

{
  "domain": "account",
  "code": "ACCOUNT_INSUFFICIENT_FUNDS",
  "internalCode": 10001,
  "status": 400,
  "message": "Cannot withdraw 500, balance is 200"
}

See docs/migration_guide/domain_errors.md for the full migration guide.

Custom Aggregate ID Generation

By default, aggregate IDs are generated as UUID v4. For patterns like Association as Aggregate — where the ID must be deterministic and derived from the command — you can provide a custom AggregateIdGenerator.

Example: UserRight (association between User and Right)

use cqrs_rust_lib::{
    Aggregate, CommandHandler, AggregateIdGenerator, CqrsContext,
    CqrsError, Event, CqrsCommandEngine,
    es::{inmemory::InMemoryPersist, EventStoreImpl},
};
use serde::{Deserialize, Serialize};
use http::StatusCode;

// --- Events ---

#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum UserRightEvent {
    Granted { user_id: String, right_id: String },
    Revoked,
}

impl Event for UserRightEvent {
    fn event_type(&self) -> String {
        match self {
            Self::Granted { .. } => "granted".into(),
            Self::Revoked => "revoked".into(),
        }
    }
}

// --- Commands ---

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrantRight {
    pub user_id: String,
    pub right_id: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UserRightUpdateCommand {
    Revoke,
}

// --- Aggregate ---

#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UserRight {
    pub id: String,
    pub user_id: String,
    pub right_id: String,
    pub active: bool,
}

#[async_trait::async_trait]
impl Aggregate for UserRight {
    const TYPE: &'static str = "user_right";
    type Event = UserRightEvent;
    type Error = CqrsError;

    fn aggregate_id(&self) -> String { self.id.clone() }
    fn with_aggregate_id(mut self, id: String) -> Self { self.id = id; self }

    fn apply(&mut self, event: Self::Event) -> Result<(), Self::Error> {
        match event {
            UserRightEvent::Granted { user_id, right_id } => {
                self.user_id = user_id;
                self.right_id = right_id;
                self.active = true;
            }
            UserRightEvent::Revoked => { self.active = false; }
        }
        Ok(())
    }

    fn error(status: StatusCode, details: &str) -> Self::Error {
        CqrsError::from_status(status, details)
    }
}

#[async_trait::async_trait]
impl CommandHandler for UserRight {
    type CreateCommand = GrantRight;
    type UpdateCommand = UserRightUpdateCommand;
    type Services = ();

    async fn handle_create(
        &self, cmd: Self::CreateCommand, _svc: &(), _ctx: &CqrsContext,
    ) -> Result<Vec<Self::Event>, Self::Error> {
        Ok(vec![UserRightEvent::Granted {
            user_id: cmd.user_id,
            right_id: cmd.right_id,
        }])
    }

    async fn handle_update(
        &self, cmd: Self::UpdateCommand, _svc: &(), _ctx: &CqrsContext,
    ) -> Result<Vec<Self::Event>, Self::Error> {
        match cmd {
            UserRightUpdateCommand::Revoke => Ok(vec![UserRightEvent::Revoked]),
        }
    }
}

Custom ID generator

The aggregate ID is derived from the association components, making it deterministic and idempotent:

struct UserRightIdGenerator;

impl AggregateIdGenerator<UserRight> for UserRightIdGenerator {
    fn next_id(&self, cmd: &GrantRight, _ctx: &CqrsContext) -> String {
        format!("{}_{}", cmd.user_id, cmd.right_id)
    }
}

Wiring the engine

let store = EventStoreImpl::new(InMemoryPersist::<UserRight>::new());
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}))
    .with_id_generator(Box::new(UserRightIdGenerator));

let ctx = CqrsContext::default();

// The aggregate ID will be "user_42_right_7" (deterministic)
let id = engine.execute_create(
    GrantRight { user_id: "user_42".into(), right_id: "right_7".into() },
    &ctx,
).await?;
assert_eq!(id, "user_42_right_7");

// Later, revoke using the same deterministic ID
engine.execute_update(&id, UserRightUpdateCommand::Revoke, &ctx).await?;

Without with_id_generator, the engine falls back to UUID v4 generation (default behavior, no breaking change).

Storage Backends

PostgreSQL (feature: postgres)

use cqrs_rust_lib::es::{postgres::PostgresPersist, EventStoreImpl};
use std::sync::Arc;
use tokio_postgres::NoTls;

let (client, conn) = tokio_postgres::connect("postgres://user:pass@localhost/db", NoTls).await?;
tokio::spawn(async move { let _ = conn.await; });
let client = Arc::new(client);

let store = EventStoreImpl::new(PostgresPersist::<Account>::new(client.clone()));
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));

MongoDB (feature: mongodb)

Same pattern with es::mongodb::MongoDBPersist.

REST Routers (feature: utoipa)

Axum routers with auto-generated OpenAPI schemas:

  • Write router: rest::CQRSWriteRouter::routes(Arc<CqrsCommandEngine<A>>)
  • Read router: rest::CQRSReadRouter::routes(repository, Aggregate::TYPE)
  • Audit log router: rest::CQRSAuditLogRouter::routes(event_store, tag)

See example/todolist/src/api.rs for complete wiring with Swagger UI.

Architecture

Aggregate (state + events)     CommandHandler (commands -> events)
         \                       /
          CqrsCommandEngine ----+---- EventStore (persist)
                |                         |
           Dispatchers              Storage backends
          (projections)          (InMemory/PG/Mongo)

Key Types

Type Description
Aggregate Domain state, event application, identity
CommandHandler Command processing, business validation
CqrsCommandEngine Orchestrates command execution
EventStore / EventStoreImpl Event persistence abstraction
CqrsError Unified structured error type
CqrsErrorCode Trait for domain-specific error codes
CqrsContext Carries user, request ID, correlation
Dispatcher React to persisted events (projections)
View / ViewElements Read model projections

Examples

Example Storage Features
example/bank MongoDB Domain errors, commands, queries, views
example/todolist PostgreSQL REST API, Swagger UI, snapshots, integration tests

Run todolist

cargo run -p todolist -- start --pg-uri="postgres://user:pass@localhost:5432/db" --http-port=8081

Run tests

cargo test             # lib tests
cargo test -p todolist # integration tests

Migration Guides

License

This project is licensed under the terms found in the LICENSE file.

Contributing

Contributions are welcome! Please see CONTRIBUTING.md for details.