Skip to main content

EventStore

Trait EventStore 

Source
pub trait EventStore {
    // Required methods
    fn read_stream<E>(
        &self,
        stream_id: StreamId,
    ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send
       where E: Event;
    fn append_events(
        &self,
        writes: StreamWrites,
    ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send;
}
Expand description

Trait defining the contract for event store implementations.

Event stores provide two core operations:

  1. Read events from streams for state reconstruction
  2. Atomically append events to streams with version checking

The EventStore trait hides implementation details of how backends achieve atomicity (PostgreSQL uses ACID transactions, in-memory uses locks, etc.). Library consumers interact with simple read/append operations.

Implementations include:

  • eventcore-postgres: Production PostgreSQL backend with ACID guarantees
  • eventcore-memory: In-memory backend for testing

§Immutability Guarantee

Event stores are append-only. Once an event is written, it must never be modified or deleted. This is a fundamental invariant of event sourcing: events represent facts that have already occurred in the business domain.

Implementations MUST ensure this immutability through whatever mechanisms their storage backend provides:

  • SQL databases: Use triggers/rules to reject UPDATE/DELETE operations
  • Purpose-built event stores (e.g., Kurrent/EventStoreDB): Rely on native append-only semantics
  • In-memory stores: May omit enforcement (test-only, ephemeral)

The eventcore-postgres backend enforces immutability via database triggers that raise errors on any attempt to UPDATE or DELETE event records.

Required Methods§

Source

fn read_stream<E>( &self, stream_id: StreamId, ) -> impl Future<Output = Result<EventStreamReader<E>, EventStoreError>> + Send
where E: Event,

Read all events from a stream.

Loads the complete event history from a stream for state reconstruction. Events are returned in stream version order (oldest to newest).

The generic type parameter T is the consumer’s event payload type. Callers must specify what event type they expect from the stream.

§Parameters
  • stream_id - Identifier of the stream to read
§Returns
  • Ok(EventStreamReader<T>) - Handle for reading events from the stream
  • Err(EventStoreError) - If stream cannot be read
Source

fn append_events( &self, writes: StreamWrites, ) -> impl Future<Output = Result<EventStreamSlice, EventStoreError>> + Send

Atomically append events to multiple streams with optimistic concurrency control.

This method provides the core write operation for event sourcing. It atomically appends events to one or more streams while enforcing version constraints to prevent concurrent modification conflicts.

§Atomicity Guarantee

All events in the write batch are committed atomically - either all events are persisted or none are. If any stream’s version check fails, the entire operation is rolled back and no events are written.

§Optimistic Concurrency Control

Each stream write includes an expected version. The store verifies that each stream’s current version matches the expected version before writing. If any version mismatch is detected, the operation fails with EventStoreError::VersionConflict.

This prevents lost updates when multiple commands attempt to modify the same stream(s) concurrently. The caller should retry the entire command execution (reload state, re-validate, re-generate events) when conflicts occur.

§Parameters
  • writes - Collection of events to append, organized by stream with expected versions
§Returns
  • Ok(EventStreamSlice) - Events successfully appended to all streams
  • Err(EventStoreError::VersionConflict) - One or more streams had version mismatches
§Examples
let writes = StreamWrites::new()
    .register_stream(stream_id.clone(), StreamVersion::new(0))
    .and_then(|writes| writes.append(event1))
    .and_then(|writes| writes.append(event2))
    .expect("builder pattern should succeed");

match store.append_events(writes).await {
    Ok(_) => println!("Events persisted"),
    Err(EventStoreError::VersionConflict) => println!("Concurrent modification detected"),
}

Dyn Compatibility§

This trait is not dyn compatible.

In older versions of Rust, dyn compatibility was called "object safety", so this trait is not object safe.

Implementations on Foreign Types§

Source§

impl<T> EventStore for &T
where T: EventStore + Sync,

Blanket implementation allowing EventStore trait to work with references.

This enables passing both owned and borrowed event stores to execute():

  • execute(store, command) - owned value
  • execute(&store, command) - borrowed reference

This is idiomatic Rust: traits that only need &self methods should work with references to avoid forcing consumers to clone or move stores.

Implementors§