Skip to main content

Crate eventfold_es

Crate eventfold_es 

Source
Expand description

Embedded event-sourcing framework built on top of eventfold.

eventfold-es provides the building blocks for event-sourced applications: aggregates, projections, process managers, and a typed command bus. All state is persisted to disk via eventfold’s append-only JSONL logs – no external database required.

§Key Types

TypeRole
AggregateDomain model: handles commands, emits events, folds state
AggregateStoreCentral registry: spawns actors, caches handles, runs projections
ProjectionCross-stream read model built from events
ProcessManagerCross-aggregate workflow that reacts to events with commands
CommandBusTyped command router keyed by TypeId
AggregateHandleAsync handle to a running aggregate actor

§Quick Start

use eventfold_es::{
    Aggregate, AggregateStore, CommandContext,
};
use serde::{Deserialize, Serialize};

// 1. Define your aggregate.
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
struct Counter { value: u64 }

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", content = "data")]
enum CounterEvent { Incremented }

#[derive(Debug, thiserror::Error)]
enum CounterError {}

impl Aggregate for Counter {
    const AGGREGATE_TYPE: &'static str = "counter";
    type Command = String;  // simplified for example
    type DomainEvent = CounterEvent;
    type Error = CounterError;

    fn handle(&self, _cmd: String) -> Result<Vec<CounterEvent>, CounterError> {
        Ok(vec![CounterEvent::Incremented])
    }
    fn apply(mut self, _event: &CounterEvent) -> Self {
        self.value += 1;
        self
    }
}

// 2. Open the store and send commands.
let store = AggregateStore::open("/tmp/my-app").await?;
let handle = store.get::<Counter>("counter-1").await?;
handle.execute("go".into(), CommandContext::default()).await?;

let state = handle.state().await?;
assert_eq!(state.value, 1);

See examples/counter.rs for a self-contained runnable example that demonstrates aggregates, projections, and the command bus.

Structs§

AggregateHandle
Async handle to a running aggregate actor.
AggregateStore
Central registry that manages aggregate instance lifecycles.
AggregateStoreBuilder
Builder for configuring an AggregateStore with projections and process managers.
CommandBus
Typed command router that maps concrete command types to aggregate handlers.
CommandContext
Cross-cutting metadata passed alongside a command.
CommandEnvelope
A type-erased command envelope for cross-aggregate dispatch.
InjectOptions
Options controlling the behaviour of AggregateStore::inject_event.
ProcessManagerReport
Summary of a run_process_managers pass.
StreamLayout
Manages the on-disk directory layout for aggregate event streams.

Enums§

DispatchError
Errors that can occur when dispatching a command.
ExecuteError
Error returned when executing a command against an aggregate fails.
StateError
Error returned when reading the current state of an aggregate fails.

Traits§

Aggregate
A domain aggregate whose state is derived from its event history.
ProcessManager
A cross-aggregate workflow coordinator that reacts to events by producing commands.
Projection
A cross-stream read model that consumes events from multiple aggregate streams.

Functions§

reducer
Build an eventfold::ReduceFn<A> that deserializes each eventfold::Event into A::DomainEvent and delegates to A::apply.
spawn_actor
Spawn a new aggregate actor for the stream at stream_dir.
to_eventfold_event
Convert a domain event and command context into an eventfold::Event.