evento 2.0.0-alpha.8

Event sourcing and CQRS toolkit with SQL persistence, projections, and subscriptions
Documentation

Evento

Crates.io Documentation License

Event sourcing and CQRS toolkit with SQL persistence, projections, and subscriptions.

More information about this crate can be found in the crate documentation.

Features

  • Event Sourcing: Store state changes as immutable events with complete audit trail
  • CQRS Pattern: Separate read and write models for scalable architectures
  • SQL Database Support: Built-in support for SQLite, PostgreSQL, and MySQL
  • Embedded Storage: Fjall key-value store for embedded applications
  • Event Handlers: Async event processing with automatic retries
  • Event Subscriptions: Continuous event stream processing with cursor tracking
  • Projections: Build read models by replaying events
  • Database Migrations: Automated schema management
  • Compact Serialization: Fast binary serialization with bitcode
  • Type Safety: Fully typed events and aggregates with compile-time guarantees

Installation

Add to your Cargo.toml:

[dependencies]
evento = { version = "2", features = ["sqlite"] }
bitcode = "0.6"

Usage Example

use evento::{Executor, metadata::{Event, Metadata}, projection::{Action, Projection}};

// Define events using an enum
#[evento::aggregator]
pub enum User {
    UserCreated {
        name: String,
        email: String,
    },
    UserEmailChanged {
        email: String,
    },
}

// Define a view/projection
#[derive(Default)]
pub struct UserView {
    pub name: String,
    pub email: String,
}

// Define handler functions
#[evento::handler]
async fn on_user_created<E: Executor>(
    event: Event<UserCreated>,
    action: Action<'_, UserView, E>,
) -> anyhow::Result<()> {
    match action {
        Action::Apply(view) => {
            view.name = event.data.name.clone();
            view.email = event.data.email.clone();
        }
        Action::Handle(_context) => {
            // In Action::Handle, perform side effects like sending emails
        }
    };
    Ok(())
}

#[evento::handler]
async fn on_email_changed<E: Executor>(
    event: Event<UserEmailChanged>,
    action: Action<'_, UserView, E>,
) -> anyhow::Result<()> {
    if let Action::Apply(view) = action {
        view.email = event.data.email.clone();
    }
    Ok(())
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // Setup SQLite executor
    let pool = sqlx::SqlitePool::connect("sqlite:events.db").await?;
    let mut conn = pool.acquire().await?;

    // Run migrations
    evento::sql_migrator::new()?
        .run(&mut *conn, &evento::migrator::Plan::apply_all())
        .await?;

    let executor: evento::Sqlite = pool.into();

    // Create and save events
    let user_id = evento::create()
        .event(&UserCreated {
            name: "John Doe".to_string(),
            email: "john@example.com".to_string(),
        })
        .metadata(&Metadata::default())
        .commit(&executor)
        .await?;

    // Update user
    evento::aggregator(&user_id)
        .original_version(1)
        .event(&UserEmailChanged {
            email: "newemail@example.com".to_string(),
        })
        .metadata(&Metadata::default())
        .commit(&executor)
        .await?;

    // Build projection and load state
    let projection = Projection::<UserView, _>::new("users")
        .handler(on_user_created())
        .handler(on_email_changed());

    let result = projection
        .load::<User>(&user_id)
        .execute(&executor)
        .await?;

    if let Some(user) = result {
        println!("User: {} ({})", user.name, user.email);
    }

    Ok(())
}

Continuous Subscriptions

use std::time::Duration;

// Start a subscription that continuously processes events
let subscription = Projection::<UserView, _>::new("user-processor")
    .handler(on_user_created())
    .handler(on_email_changed())
    .subscription()
    .routing_key("users")
    .chunk_size(100)
    .retry(5)
    .delay(Duration::from_secs(10))
    .start(&executor)
    .await?;

// On application shutdown
subscription.shutdown().await?;

Feature Flags

  • macro (enabled by default) - Procedural macros for cleaner code
  • group - Multi-executor support for querying across databases
  • rw - Read-write split executor for CQRS patterns
  • sql - Enable all SQL database backends (SQLite, MySQL, PostgreSQL)
  • sqlite - SQLite support via sqlx
  • mysql - MySQL support via sqlx
  • postgres - PostgreSQL support via sqlx
  • fjall - Embedded key-value storage with Fjall

Minimum Supported Rust Version

Evento's MSRV is 1.75.

Safety

This crate uses #![forbid(unsafe_code)] to ensure everything is implemented in 100% safe Rust.

Examples

The examples directory contains sample applications demonstrating various features:

  • bank - Bank account domain model with commands, queries, and projections
  • bank-axum-sqlite - Integration with Axum web framework and SQLite

Getting Help

If you have questions or need help, please:

License

This project is licensed under the Apache-2.0 license.