eventcore 0.1.8

Multi-stream aggregateless event sourcing library with type-driven development
Documentation
eventcore-0.1.8 has been yanked.

eventcore

Core library for EventCore - the multi-stream event sourcing framework.

Installation

[dependencies]
eventcore = "0.1"
eventcore-macros = "0.1"  # For #[derive(Command)] and helper macros

You'll also need an event store adapter:

eventcore-postgres = "0.1"  # For production
# or
eventcore-memory = "0.1"    # For testing

Core Concepts

Commands

Commands are the heart of EventCore. Each command:

  • Declares which streams it reads (automatically with #[derive(Command)])
  • Rebuilds state from events
  • Executes business logic
  • Returns events to append

The Modern Way: Using Derive Macros

use eventcore::{prelude::*, require, emit, CommandLogic};
use eventcore_macros::Command;

#[derive(Command)]
struct TransferMoney {
    #[stream]  // Automatically included in read_streams()
    from_account: StreamId,
    #[stream]  // Automatically included in read_streams()
    to_account: StreamId,
    amount: Money,
}

#[async_trait]
impl CommandLogic for TransferMoney {
    type State = TransferState;
    type Event = BankingEvent;

    fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) {
        match &event.payload {
            BankingEvent::Deposited { account, amount } => {
                state.credit(account, *amount);
            }
            BankingEvent::Withdrawn { account, amount } => {
                state.debit(account, *amount);
            }
        }
    }

    async fn handle(
        &self,
        read_streams: ReadStreams<Self::StreamSet>,
        state: Self::State,
        _: &mut StreamResolver,
    ) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
        require!(state.has_account(&self.from_account), "Source account not found");
        require!(state.has_account(&self.to_account), "Target account not found");
        require!(state.balance(&self.from_account) >= self.amount, "Insufficient funds");

        let mut events = vec![];
        
        emit!(events, &read_streams, self.from_account, 
            BankingEvent::Withdrawn { 
                account: self.from_account.to_string(), 
                amount: self.amount 
            });
            
        emit!(events, &read_streams, self.to_account,
            BankingEvent::Deposited { 
                account: self.to_account.to_string(), 
                amount: self.amount 
            });

        Ok(events)
    }
}

The Classic Way: Manual Implementation

If you prefer explicit control, you can implement everything manually:

#[derive(Clone)]
struct TransferMoney {
    from: StreamId,
    to: StreamId,
    amount: Money,
}

// Manual implementation of CommandStreams
impl CommandStreams for TransferMoney {
    type StreamSet = (); // Your own phantom type
    
    fn read_streams(&self) -> Vec<StreamId> {
        vec![self.from.clone(), self.to.clone()]
    }
}

// Implementation of CommandLogic
#[async_trait]
impl CommandLogic for TransferMoney {
    type State = TransferState;
    type Event = BankingEvent;
    
    // ... rest of implementation
}

Type-Safe Domain Modeling

Use nutype for domain types that validate at construction:

use nutype::nutype;

#[nutype(
    sanitize(trim),
    validate(not_empty, len_char_max = 50),
    derive(Debug, Clone, PartialEq, Eq, AsRef, Deref)
)]
pub struct AccountId(String);

#[nutype(
    validate(greater_or_equal = 0),
    derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)
)]
pub struct Money(i64);

Dynamic Stream Discovery

Commands can discover additional streams during execution:

async fn handle(
    &self,
    read_streams: ReadStreams<Self::StreamSet>,
    state: Self::State,
    input: Self::Input,
    stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
    require!(state.is_valid(), "Invalid state");
    
    if input.amount > Money::from_cents(1000000)? {
        let approval_stream = StreamId::try_new(format!("approval-{}", input.from))?;
        stream_resolver.add_streams(vec![approval_stream]);
    }
    
    let mut events = vec![];
    emit!(events, &read_streams, input.account, AccountEvent::Updated { 
        amount: input.amount 
    });
    
    Ok(events)
}

Event Store Usage

use eventcore::{CommandExecutor, EventStore};
use eventcore_macros::Command;
use eventcore_postgres::PostgresEventStore;

// Initialize store
let store = PostgresEventStore::new(config).await?;
store.initialize().await?;

#[derive(Command)]
struct OpenAccount {
    #[stream]
    account_id: StreamId,
    holder_name: String,
    initial_deposit: Money,
}

let executor = CommandExecutor::new(store);
let command = OpenAccount {
    account_id: StreamId::try_new("account-12345")?,
    holder_name: "Alice".to_string(),
    initial_deposit: Money::from_cents(50000)?,
};

let result = executor.execute(&command, command).await?;

API Reference

Core Traits

  • Command - Business logic implementation (use #[derive(Command)] for less boilerplate)
  • EventStore - Event persistence abstraction
  • Projection - Read model builders

Core Types

  • StreamId - Validated stream identifier
  • EventId - UUIDv7 for chronological ordering
  • EventVersion - Optimistic concurrency control
  • CommandError - Typed error handling

Utilities

  • CommandExecutor - Handles execution flow
  • StreamResolver - Dynamic stream discovery
  • ReadStreams<T> - Type-safe stream access
  • StreamWrite<T,E> - Type-safe event writing

Helper Macros

  • require!(condition, message) - Business rule validation
  • emit!(events, read_streams, stream, event) - Type-safe event generation
  • #[derive(Command)] - Generates complete CommandStreams implementation:
    • Automatically sets type Input = Self
    • Creates type StreamSet = CommandNameStreamSet
    • Implements read_streams() based on #[stream] fields
    • Enables implementation of just CommandLogic trait for 50% less boilerplate

Testing

EventCore provides comprehensive testing utilities that work seamlessly with derived commands:

use eventcore::testing::*;
use eventcore_macros::Command;

#[derive(Command)]
struct TransferMoney {
    #[stream]
    from_account: StreamId,
    #[stream] 
    to_account: StreamId,
    amount: Money,
}

#[tokio::test]
async fn test_transfer_with_macros() {
    let alice = StreamId::try_new("account-alice").unwrap();
    let bob = StreamId::try_new("account-bob").unwrap();
    
    let harness = CommandTestHarness::new()
        .given_events(vec![
            AccountOpened { account: alice.clone(), initial: Money::from_cents(100000) },
            AccountOpened { account: bob.clone(), initial: Money::zero() },
        ])
        .when(TransferMoney { 
            from_account: alice.clone(), 
            to_account: bob.clone(), 
            amount: Money::from_cents(10000),
        })
        .then_expect_events(vec![
            Withdrawn { account: alice, amount: Money::from_cents(10000) },
            Deposited { account: bob, amount: Money::from_cents(10000) },
        ]);
        
    harness.run().await.unwrap();
}

Performance

EventCore is designed for high throughput:

  • Zero-copy event processing where possible
  • Efficient stream merging algorithms
  • Connection pooling for database adapters
  • Automatic retry with exponential backoff

See Also