eventcore-0.1.0 has been yanked.
eventcore
Core library for EventCore - the multi-stream event sourcing framework.
Installation
[dependencies]
eventcore = "0.1"
eventcore-macros = "0.1"
You'll also need an event store adapter:
eventcore-postgres = "0.1"
eventcore-memory = "0.1"
Core Concepts
Commands
Commands are the heart of EventCore. Each command:
- Declares which streams it reads (automatically with
#[derive(Command)])
- Rebuilds state from events
- Executes business logic
- Returns events to append
The Modern Way: Using Derive Macros
use eventcore::{prelude::*, require, emit, CommandLogic};
use eventcore_macros::Command;
#[derive(Command)]
struct TransferMoney {
#[stream] from_account: StreamId,
#[stream] to_account: StreamId,
amount: Money,
}
#[async_trait]
impl CommandLogic for TransferMoney {
type State = TransferState;
type Event = BankingEvent;
fn apply(&self, state: &mut Self::State, event: &StoredEvent<Self::Event>) {
match &event.payload {
BankingEvent::Deposited { account, amount } => {
state.credit(account, *amount);
}
BankingEvent::Withdrawn { account, amount } => {
state.debit(account, *amount);
}
}
}
async fn handle(
&self,
read_streams: ReadStreams<Self::StreamSet>,
state: Self::State,
_: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
require!(state.has_account(&self.from_account), "Source account not found");
require!(state.has_account(&self.to_account), "Target account not found");
require!(state.balance(&self.from_account) >= self.amount, "Insufficient funds");
let mut events = vec![];
emit!(events, &read_streams, self.from_account,
BankingEvent::Withdrawn {
account: self.from_account.to_string(),
amount: self.amount
});
emit!(events, &read_streams, self.to_account,
BankingEvent::Deposited {
account: self.to_account.to_string(),
amount: self.amount
});
Ok(events)
}
}
The Classic Way: Manual Implementation
If you prefer explicit control, you can implement everything manually:
#[derive(Clone)]
struct TransferMoney {
from: StreamId,
to: StreamId,
amount: Money,
}
impl CommandStreams for TransferMoney {
type StreamSet = ();
fn read_streams(&self) -> Vec<StreamId> {
vec![self.from.clone(), self.to.clone()]
}
}
#[async_trait]
impl CommandLogic for TransferMoney {
type State = TransferState;
type Event = BankingEvent;
}
Type-Safe Domain Modeling
Use nutype for domain types that validate at construction:
use nutype::nutype;
#[nutype(
sanitize(trim),
validate(not_empty, len_char_max = 50),
derive(Debug, Clone, PartialEq, Eq, AsRef, Deref)
)]
pub struct AccountId(String);
#[nutype(
validate(greater_or_equal = 0),
derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)
)]
pub struct Money(i64);
Dynamic Stream Discovery
Commands can discover additional streams during execution:
async fn handle(
&self,
read_streams: ReadStreams<Self::StreamSet>,
state: Self::State,
input: Self::Input,
stream_resolver: &mut StreamResolver,
) -> CommandResult<Vec<StreamWrite<Self::StreamSet, Self::Event>>> {
require!(state.is_valid(), "Invalid state");
if input.amount > Money::from_cents(1000000)? {
let approval_stream = StreamId::try_new(format!("approval-{}", input.from))?;
stream_resolver.add_streams(vec![approval_stream]);
}
let mut events = vec![];
emit!(events, &read_streams, input.account, AccountEvent::Updated {
amount: input.amount
});
Ok(events)
}
Event Store Usage
use eventcore::{CommandExecutor, EventStore};
use eventcore_macros::Command;
use eventcore_postgres::PostgresEventStore;
let store = PostgresEventStore::new(config).await?;
store.initialize().await?;
#[derive(Command)]
struct OpenAccount {
#[stream]
account_id: StreamId,
holder_name: String,
initial_deposit: Money,
}
let executor = CommandExecutor::new(store);
let command = OpenAccount {
account_id: StreamId::try_new("account-12345")?,
holder_name: "Alice".to_string(),
initial_deposit: Money::from_cents(50000)?,
};
let result = executor.execute(&command, command).await?;
API Reference
Core Traits
Command - Business logic implementation (use #[derive(Command)] for less boilerplate)
EventStore - Event persistence abstraction
Projection - Read model builders
Core Types
StreamId - Validated stream identifier
EventId - UUIDv7 for chronological ordering
EventVersion - Optimistic concurrency control
CommandError - Typed error handling
Utilities
CommandExecutor - Handles execution flow
StreamResolver - Dynamic stream discovery
ReadStreams<T> - Type-safe stream access
StreamWrite<T,E> - Type-safe event writing
Helper Macros
require!(condition, message) - Business rule validation
emit!(events, read_streams, stream, event) - Type-safe event generation
#[derive(Command)] - Generates complete CommandStreams implementation:
- Automatically sets
type Input = Self
- Creates
type StreamSet = CommandNameStreamSet
- Implements
read_streams() based on #[stream] fields
- Enables implementation of just
CommandLogic trait for 50% less boilerplate
Testing
EventCore provides comprehensive testing utilities that work seamlessly with derived commands:
use eventcore::testing::*;
use eventcore_macros::Command;
#[derive(Command)]
struct TransferMoney {
#[stream]
from_account: StreamId,
#[stream]
to_account: StreamId,
amount: Money,
}
#[tokio::test]
async fn test_transfer_with_macros() {
let alice = StreamId::try_new("account-alice").unwrap();
let bob = StreamId::try_new("account-bob").unwrap();
let harness = CommandTestHarness::new()
.given_events(vec![
AccountOpened { account: alice.clone(), initial: Money::from_cents(100000) },
AccountOpened { account: bob.clone(), initial: Money::zero() },
])
.when(TransferMoney {
from_account: alice.clone(),
to_account: bob.clone(),
amount: Money::from_cents(10000),
})
.then_expect_events(vec![
Withdrawn { account: alice, amount: Money::from_cents(10000) },
Deposited { account: bob, amount: Money::from_cents(10000) },
]);
harness.run().await.unwrap();
}
Performance
EventCore is designed for high throughput:
- Zero-copy event processing where possible
- Efficient stream merging algorithms
- Connection pooling for database adapters
- Automatic retry with exponential backoff
See Also