Expand description
Embedded event-sourcing framework built on top of eventfold.
eventfold-es provides the building blocks for event-sourced applications:
aggregates, projections, process managers, and a typed command bus. All
state is persisted to disk via eventfold’s append-only JSONL logs –
no external database required.
§Key Types
| Type | Role |
|---|---|
Aggregate | Domain model: handles commands, emits events, folds state |
AggregateStore | Central registry: spawns actors, caches handles, runs projections |
Projection | Cross-stream read model built from events |
ProcessManager | Cross-aggregate workflow that reacts to events with commands |
CommandBus | Typed command router keyed by TypeId |
AggregateHandle | Async handle to a running aggregate actor |
§Quick Start
use eventfold_es::{
Aggregate, AggregateStore, CommandContext,
};
use serde::{Deserialize, Serialize};
// 1. Define your aggregate.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct Counter { value: u64 }
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
enum CounterEvent { Incremented }
#[derive(Debug, thiserror::Error)]
enum CounterError {}
impl Aggregate for Counter {
const AGGREGATE_TYPE: &'static str = "counter";
type Command = String; // simplified for example
type DomainEvent = CounterEvent;
type Error = CounterError;
fn handle(&self, _cmd: String) -> Result<Vec<CounterEvent>, CounterError> {
Ok(vec![CounterEvent::Incremented])
}
fn apply(mut self, _event: &CounterEvent) -> Self {
self.value += 1;
self
}
}
// 2. Open the store and send commands.
let store = AggregateStore::open("/tmp/my-app").await?;
let handle = store.get::<Counter>("counter-1").await?;
handle.execute("go".into(), CommandContext::default()).await?;
let state = handle.state().await?;
assert_eq!(state.value, 1);See examples/counter.rs for a self-contained runnable example that
demonstrates aggregates, projections, and the command bus.
Structs§
- Aggregate
Handle - Async handle to a running aggregate actor.
- Aggregate
Store - Central registry that manages aggregate instance lifecycles.
- Aggregate
Store Builder - Builder for configuring an
AggregateStorewith projections and process managers. - Command
Bus - Typed command router that maps concrete command types to aggregate handlers.
- Command
Context - Cross-cutting metadata passed alongside a command.
- Command
Envelope - A type-erased command envelope for cross-aggregate dispatch.
- Inject
Options - Options controlling the behaviour of
AggregateStore::inject_event. - Process
Manager Report - Summary of a
run_process_managerspass. - Stream
Layout - Manages the on-disk directory layout for aggregate event streams.
Enums§
- Dispatch
Error - Errors that can occur when dispatching a command.
- Execute
Error - Error returned when executing a command against an aggregate fails.
- State
Error - Error returned when reading the current state of an aggregate fails.
Traits§
- Aggregate
- A domain aggregate whose state is derived from its event history.
- Process
Manager - A cross-aggregate workflow coordinator that reacts to events by producing commands.
- Projection
- A cross-stream read model that consumes events from multiple aggregate streams.
Functions§
- reducer
- Build an
eventfold::ReduceFn<A>that deserializes eacheventfold::EventintoA::DomainEventand delegates toA::apply. - spawn_
actor - Spawn a new aggregate actor for the stream at
stream_dir. - to_
eventfold_ event - Convert a domain event and command context into an
eventfold::Event.