cqrs-rust-lib
A comprehensive Command Query Responsibility Segregation (CQRS) and Event Sourcing library for Rust applications.
Features
- CQRS and Event Sourcing core primitives
- Split
Aggregate / CommandHandler traits (Single Responsibility)
- Structured domain error system with
CqrsError and define_domain_errors! macro
- Multiple storage backends:
- In-memory (testing/development)
- MongoDB (feature:
mongodb)
- PostgreSQL (feature:
postgres)
- REST routers with Axum and OpenAPI integration (feature:
utoipa)
- Audit log router for event history
- Typed read storage and snapshot support
- Pluggable dispatchers for denormalization/projections
Installation
Add this to your Cargo.toml:
[dependencies]
cqrs-rust-lib = "0.1.0"
Cargo features
| Feature |
Description |
mongodb |
MongoDB event persistence |
postgres |
PostgreSQL event persistence and read utilities |
utoipa |
REST routers and OpenAPI/Swagger integration |
mcp |
MCP server integration (experimental) |
all |
Enables utoipa, mongodb, and postgres |
cqrs-rust-lib = { version = "0.1.0", features = ["postgres", "utoipa"] }
cqrs-rust-lib = { version = "0.1.0", features = ["mongodb"] }
Quick Start
1. Define your domain
use cqrs_rust_lib::{Aggregate, CommandHandler, CqrsContext, CqrsError, Event};
use http::StatusCode;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum AccountEvent {
Opened { owner: String },
Deposited { amount: i64 },
Withdrawn { amount: i64 },
}
impl Event for AccountEvent {
fn event_type(&self) -> String {
match self {
Self::Opened { .. } => "opened".into(),
Self::Deposited { .. } => "deposited".into(),
Self::Withdrawn { .. } => "withdrawn".into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum CreateCommand { Open { owner: String } }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UpdateCommand { Deposit { amount: i64 }, Withdraw { amount: i64 } }
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct Account {
pub id: String,
pub balance: i64,
}
#[async_trait::async_trait]
impl Aggregate for Account {
const TYPE: &'static str = "account";
type Event = AccountEvent;
type Error = CqrsError;
fn aggregate_id(&self) -> String { self.id.clone() }
fn with_aggregate_id(mut self, id: String) -> Self { self.id = id; self }
fn apply(&mut self, event: Self::Event) -> Result<(), Self::Error> {
match event {
AccountEvent::Opened { .. } => {}
AccountEvent::Deposited { amount } => self.balance += amount,
AccountEvent::Withdrawn { amount } => self.balance -= amount,
}
Ok(())
}
fn error(status: StatusCode, details: &str) -> Self::Error {
CqrsError::from_status(status, details)
}
}
#[async_trait::async_trait]
impl CommandHandler for Account {
type CreateCommand = CreateCommand;
type UpdateCommand = UpdateCommand;
type Services = ();
async fn handle_create(
&self, command: Self::CreateCommand, _svc: &(), _ctx: &CqrsContext,
) -> Result<Vec<Self::Event>, Self::Error> {
match command {
CreateCommand::Open { owner } => Ok(vec![AccountEvent::Opened { owner }]),
}
}
async fn handle_update(
&self, command: Self::UpdateCommand, _svc: &(), _ctx: &CqrsContext,
) -> Result<Vec<Self::Event>, Self::Error> {
match command {
UpdateCommand::Deposit { amount } => Ok(vec![AccountEvent::Deposited { amount }]),
UpdateCommand::Withdraw { amount } => Ok(vec![AccountEvent::Withdrawn { amount }]),
}
}
}
2. Create the engine and execute commands
use cqrs_rust_lib::es::{inmemory::InMemoryPersist, EventStoreImpl};
use cqrs_rust_lib::{CqrsCommandEngine, CqrsContext};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let store = EventStoreImpl::new(InMemoryPersist::<Account>::new());
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
let ctx = CqrsContext::default();
let id = engine.execute_create(CreateCommand::Open { owner: "Alice".into() }, &ctx).await?;
engine.execute_update(&id, UpdateCommand::Deposit { amount: 100 }, &ctx).await?;
Ok(())
}
Domain Error Codes
Define structured, domain-specific error codes with the define_domain_errors! macro:
use cqrs_rust_lib::{define_domain_errors, CqrsError, CqrsErrorCode};
use http::StatusCode;
define_domain_errors! {
domain: "account",
prefix: 10,
errors: {
InsufficientFunds => (1, StatusCode::BAD_REQUEST, "INSUFFICIENT_FUNDS"),
InvalidAmount => (2, StatusCode::BAD_REQUEST, "INVALID_AMOUNT"),
AccountClosed => (3, StatusCode::GONE, "ACCOUNT_CLOSED"),
}
}
impl From<ErrorCode> for CqrsError {
fn from(e: ErrorCode) -> Self {
e.error(e.to_string())
}
}
Use in command handlers:
if self.balance < amount {
return Err(ErrorCode::InsufficientFunds.error(
format!("Cannot withdraw {}, balance is {}", amount, self.balance)
));
}
API response:
{
"domain": "account",
"code": "ACCOUNT_INSUFFICIENT_FUNDS",
"internalCode": 10001,
"status": 400,
"message": "Cannot withdraw 500, balance is 200"
}
See docs/migration_guide/domain_errors.md for the full migration guide.
Custom Aggregate ID Generation
By default, aggregate IDs are generated as UUID v4. For patterns like Association as Aggregate — where the ID must be deterministic and derived from the command — you can provide a custom AggregateIdGenerator.
Example: UserRight (association between User and Right)
use cqrs_rust_lib::{
Aggregate, CommandHandler, AggregateIdGenerator, CqrsContext,
CqrsError, Event, CqrsCommandEngine,
es::{inmemory::InMemoryPersist, EventStoreImpl},
};
use serde::{Deserialize, Serialize};
use http::StatusCode;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum UserRightEvent {
Granted { user_id: String, right_id: String },
Revoked,
}
impl Event for UserRightEvent {
fn event_type(&self) -> String {
match self {
Self::Granted { .. } => "granted".into(),
Self::Revoked => "revoked".into(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GrantRight {
pub user_id: String,
pub right_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum UserRightUpdateCommand {
Revoke,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct UserRight {
pub id: String,
pub user_id: String,
pub right_id: String,
pub active: bool,
}
#[async_trait::async_trait]
impl Aggregate for UserRight {
const TYPE: &'static str = "user_right";
type Event = UserRightEvent;
type Error = CqrsError;
fn aggregate_id(&self) -> String { self.id.clone() }
fn with_aggregate_id(mut self, id: String) -> Self { self.id = id; self }
fn apply(&mut self, event: Self::Event) -> Result<(), Self::Error> {
match event {
UserRightEvent::Granted { user_id, right_id } => {
self.user_id = user_id;
self.right_id = right_id;
self.active = true;
}
UserRightEvent::Revoked => { self.active = false; }
}
Ok(())
}
fn error(status: StatusCode, details: &str) -> Self::Error {
CqrsError::from_status(status, details)
}
}
#[async_trait::async_trait]
impl CommandHandler for UserRight {
type CreateCommand = GrantRight;
type UpdateCommand = UserRightUpdateCommand;
type Services = ();
async fn handle_create(
&self, cmd: Self::CreateCommand, _svc: &(), _ctx: &CqrsContext,
) -> Result<Vec<Self::Event>, Self::Error> {
Ok(vec![UserRightEvent::Granted {
user_id: cmd.user_id,
right_id: cmd.right_id,
}])
}
async fn handle_update(
&self, cmd: Self::UpdateCommand, _svc: &(), _ctx: &CqrsContext,
) -> Result<Vec<Self::Event>, Self::Error> {
match cmd {
UserRightUpdateCommand::Revoke => Ok(vec![UserRightEvent::Revoked]),
}
}
}
Custom ID generator
The aggregate ID is derived from the association components, making it deterministic and idempotent:
struct UserRightIdGenerator;
impl AggregateIdGenerator<UserRight> for UserRightIdGenerator {
fn next_id(&self, cmd: &GrantRight, _ctx: &CqrsContext) -> String {
format!("{}_{}", cmd.user_id, cmd.right_id)
}
}
Wiring the engine
let store = EventStoreImpl::new(InMemoryPersist::<UserRight>::new());
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}))
.with_id_generator(Box::new(UserRightIdGenerator));
let ctx = CqrsContext::default();
let id = engine.execute_create(
GrantRight { user_id: "user_42".into(), right_id: "right_7".into() },
&ctx,
).await?;
assert_eq!(id, "user_42_right_7");
engine.execute_update(&id, UserRightUpdateCommand::Revoke, &ctx).await?;
Without with_id_generator, the engine falls back to UUID v4 generation (default behavior, no breaking change).
Storage Backends
PostgreSQL (feature: postgres)
use cqrs_rust_lib::es::{postgres::PostgresPersist, EventStoreImpl};
use std::sync::Arc;
use tokio_postgres::NoTls;
let (client, conn) = tokio_postgres::connect("postgres://user:pass@localhost/db", NoTls).await?;
tokio::spawn(async move { let _ = conn.await; });
let client = Arc::new(client);
let store = EventStoreImpl::new(PostgresPersist::<Account>::new(client.clone()));
let engine = CqrsCommandEngine::new(store, vec![], (), Box::new(|_e| {}));
MongoDB (feature: mongodb)
Same pattern with es::mongodb::MongoDBPersist.
REST Routers (feature: utoipa)
Axum routers with auto-generated OpenAPI schemas:
- Write router:
rest::CQRSWriteRouter::routes(Arc<CqrsCommandEngine<A>>)
- Read router:
rest::CQRSReadRouter::routes(repository, Aggregate::TYPE)
- Audit log router:
rest::CQRSAuditLogRouter::routes(event_store, tag)
See example/todolist/src/api.rs for complete wiring with Swagger UI.
Architecture
Aggregate (state + events) CommandHandler (commands -> events)
\ /
CqrsCommandEngine ----+---- EventStore (persist)
| |
Dispatchers Storage backends
(projections) (InMemory/PG/Mongo)
Key Types
| Type |
Description |
Aggregate |
Domain state, event application, identity |
CommandHandler |
Command processing, business validation |
CqrsCommandEngine |
Orchestrates command execution |
EventStore / EventStoreImpl |
Event persistence abstraction |
CqrsError |
Unified structured error type |
CqrsErrorCode |
Trait for domain-specific error codes |
CqrsContext |
Carries user, request ID, correlation |
Dispatcher |
React to persisted events (projections) |
View / ViewElements |
Read model projections |
Examples
| Example |
Storage |
Features |
example/bank |
MongoDB |
Domain errors, commands, queries, views |
example/todolist |
PostgreSQL |
REST API, Swagger UI, snapshots, integration tests |
Run todolist
cargo run -p todolist -- start --pg-uri="postgres://user:pass@localhost:5432/db" --http-port=8081
Run tests
cargo test cargo test -p todolist
Migration Guides
License
This project is licensed under the terms found in the LICENSE file.
Contributing
Contributions are welcome! Please see CONTRIBUTING.md for details.