Expand description
CQRS (Command Query Responsibility Segregation) for Armature
This crate provides CQRS pattern implementation with command/query separation.
§Features
- Command Bus - Execute commands (writes)
- Query Bus - Execute queries (reads)
- Projections - Build read models from events
- Type-safe - Strong typing with compile-time safety
- Async - Full async/await support
§Quick Start
ⓘ
use armature_cqrs::*;
use async_trait::async_trait;
// Define a command
struct CreateUserCommand {
email: String,
}
impl Command for CreateUserCommand {
type Result = String; // User ID
}
// Define command handler
struct CreateUserHandler;
#[async_trait]
impl CommandHandler<CreateUserCommand> for CreateUserHandler {
async fn handle(&self, command: CreateUserCommand) -> Result<String, CommandError> {
// Business logic here
Ok(format!("user-{}", uuid::Uuid::new_v4()))
}
}
// Define a query
struct GetUserQuery {
user_id: String,
}
impl Query for GetUserQuery {
type Result = User;
}
// Define query handler
struct GetUserHandler;
#[async_trait]
impl QueryHandler<GetUserQuery> for GetUserHandler {
async fn handle(&self, query: GetUserQuery) -> Result<User, QueryError> {
// Fetch from read model
Ok(User {
id: query.user_id,
email: "alice@example.com".to_string(),
})
}
}
// Use the buses
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create buses
let command_bus = CommandBus::new();
let query_bus = QueryBus::new();
// Register handlers
command_bus.register::<CreateUserCommand, _>(CreateUserHandler);
query_bus.register::<GetUserQuery, _>(GetUserHandler);
// Execute command
let user_id = command_bus.execute(CreateUserCommand {
email: "alice@example.com".to_string(),
}).await?;
// Execute query
let user = query_bus.execute(GetUserQuery { user_id }).await?;
println!("User: {:?}", user);
Ok(())
}§Projections
ⓘ
use armature_events::Event;
use async_trait::async_trait;
struct UserListProjection {
// Read model storage
}
#[async_trait]
impl Projection for UserListProjection {
async fn project(&self, event: &dyn Event) -> Result<(), ProjectionError> {
match event.event_name() {
"user_created" => {
// Update read model
}
"user_deleted" => {
// Update read model
}
_ => {}
}
Ok(())
}
}§Integration with Event Sourcing
ⓘ
use armature_eventsourcing::*;
use armature_events::EventBus;
// Command handler uses aggregate repository
#[async_trait]
impl CommandHandler<CreateUserCommand> for CreateUserHandler {
async fn handle(&self, command: CreateUserCommand) -> Result<String, CommandError> {
let mut user = UserAggregate::new_instance(uuid::Uuid::new_v4().to_string());
// Add domain event
user.create(command.email)?;
// Save aggregate (persists events)
self.repository.save(&mut user).await?;
// Publish events to event bus for projections
for event in user.uncommitted_events() {
self.event_bus.publish(event.clone()).await?;
}
Ok(user.aggregate_id().to_string())
}
}Re-exports§
pub use command::Command;pub use command::CommandBus;pub use command::CommandError;pub use command::CommandHandler;pub use projection::Projection;pub use projection::ProjectionError;pub use projection::ProjectionEventHandler;pub use projection::ProjectionManager;pub use query::Query;pub use query::QueryBus;pub use query::QueryError;pub use query::QueryHandler;
Modules§
- command
- Command handling for CQRS
- projection
- Projections for read models
- query
- Query handling for CQRS