Skip to main content

AggregateStore

Struct AggregateStore 

Source
pub struct AggregateStore { /* private fields */ }
Expand description

Central registry that manages aggregate instance lifecycles.

The store handles directory creation, actor spawning, and handle caching. It is Clone + Send + Sync – cloning shares the underlying cache.

§Examples

use eventfold_es::{AggregateStore, CommandContext};

let store = AggregateStore::open("/tmp/my-app").await?;
// Use store.get::<MyAggregate>("instance-id") to get handles

Implementations§

Source§

impl AggregateStore

Source

pub async fn open(base_dir: impl AsRef<Path>) -> Result<Self>

Open or create a store rooted at base_dir.

Creates the metadata directory if it does not exist.

§Arguments
  • base_dir - Root directory for all event store data.
§Returns

A new AggregateStore ready to spawn aggregate actors.

§Errors

Returns std::io::Error if the metadata directory cannot be created.

Source

pub async fn get<A: Aggregate>(&self, id: &str) -> Result<AggregateHandle<A>>

Get a handle to an aggregate instance, spawning its actor if needed.

If the actor is already running (cached), returns a clone of the existing handle. Otherwise, creates the stream directory on disk and spawns a new actor thread.

§Arguments
  • id - Unique instance identifier within the aggregate type.
§Returns

An AggregateHandle for sending commands and reading state.

§Errors

Returns std::io::Error if directory creation or event log opening fails.

Source

pub async fn list<A: Aggregate>(&self) -> Result<Vec<String>>

List all instance IDs for a given aggregate type.

§Returns

A sorted Vec<String> of instance IDs. Returns an empty vector if no instances of the given aggregate type exist.

§Errors

Returns std::io::Error if reading the directory fails.

Source

pub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder

Create a builder for configuring projections and other options.

§Arguments
  • base_dir - Root directory for all event store data.
§Returns

An AggregateStoreBuilder that can register projections before opening.

§Examples
use eventfold_es::AggregateStore;

let store = AggregateStore::builder("/tmp/my-app")
    // .projection::<MyProjection>()
    .open()
    .await?;
Source

pub fn projection<P: Projection>(&self) -> Result<P>

Catch up and return the current state of a registered projection.

Triggers a lazy catch-up before returning: reads any new events from subscribed streams. Returns a clone of the projection state.

This method is synchronous (not async). It uses std::sync locks and blocking I/O internally. For embedded use, catch_up is fast for incremental updates. Callers that need an async boundary can wrap this in tokio::task::spawn_blocking.

§Returns

A clone of the projection’s current state after catching up.

§Errors

Returns io::ErrorKind::NotFound if the projection is not registered. Returns io::Error if catching up on events fails.

Source

pub fn rebuild_projection<P: Projection>(&self) -> Result<()>

Delete the checkpoint for projection P and replay all events from scratch.

This acquires the projections read-lock, downcasts to the concrete ProjectionRunner<P>, and calls rebuild() which:

  1. Deletes the checkpoint file
  2. Resets internal state to ProjectionCheckpoint::default()
  3. Calls catch_up() to replay all events from offset 0
  4. Saves the new checkpoint

Blocking I/O – if called from an async context, wrap this in tokio::task::spawn_blocking.

§Errors

Returns io::ErrorKind::NotFound if the projection is not registered. Returns io::Error if deleting the checkpoint or catching up fails.

Source

pub async fn run_process_managers(&self) -> Result<ProcessManagerReport>

Run all registered process managers through a catch-up pass.

For each process manager:

  1. Catch up on subscribed streams, collecting command envelopes.
  2. Dispatch each envelope to the target aggregate via the type registry.
  3. Write failed dispatches to the per-PM dead-letter log.
  4. Save the process manager checkpoint after all envelopes are handled.
§Returns

A ProcessManagerReport summarizing how many envelopes were dispatched and how many were dead-lettered.

§Errors

Returns io::Error if catching up or saving checkpoints fails.

Source

pub fn layout(&self) -> &StreamLayout

Returns a reference to the underlying storage layout.

Source

pub async fn list_streams( &self, aggregate_type: Option<&str>, ) -> Result<Vec<(String, String)>>

List all known (aggregate_type, instance_id) pairs.

When aggregate_type is Some, returns only streams for that type. When None, returns streams across all aggregate types. Results are sorted by aggregate type then instance ID.

§Arguments
  • aggregate_type - Optional filter. When Some, only streams for that aggregate type are returned. When None, all streams are returned.
§Returns

A sorted Vec<(String, String)> of (aggregate_type, instance_id) pairs. Returns an empty vector if no matching streams exist.

§Errors

Returns std::io::Error if reading the directory fails.

Source

pub async fn read_events( &self, aggregate_type: &str, instance_id: &str, ) -> Result<Vec<Event>>

Read all raw events from a stream identified by aggregate type and instance ID.

Returns the events in the order they were appended. Does not spawn an actor or acquire a write lock on the stream.

§Arguments
  • aggregate_type - The aggregate type name (e.g. "counter").
  • instance_id - The unique instance identifier within that type.
§Returns

A Vec<eventfold::Event> containing all events in the stream. Returns Ok(vec![]) if the stream directory exists but no events have been written yet.

§Errors

Returns std::io::Error with ErrorKind::NotFound if the stream directory does not exist (i.e. the stream was never created). Returns std::io::Error for other I/O failures during reading.

Source

pub async fn inject_event<A: Aggregate>( &self, instance_id: &str, event: Event, opts: InjectOptions, ) -> Result<()>

Append a pre-validated event directly to a stream, bypassing command validation.

This is the primary entry point for relay-sync scenarios where events have already been validated on the originating client. The event is written as-is to the stream’s JSONL log, projections are caught up, and process managers are optionally triggered.

§Deduplication

If event.id is Some(id) and that ID has already been seen by this store instance, the call returns Ok(()) immediately without writing. Events with event.id = None are never deduplicated.

§Actor interaction

If a live actor exists for the target stream, the event is injected through the actor’s channel (preserving the actor’s exclusive writer ownership). Otherwise, a temporary EventWriter is opened directly.

§Arguments
  • instance_id - Unique instance identifier within the aggregate type.
  • event - A pre-validated eventfold::Event to append as-is.
  • opts - Controls whether process managers run after injection.
§Returns

Ok(()) on success (including dedup no-ops).

§Errors

Returns std::io::Error if directory creation, event writing, or projection catch-up fails.

Trait Implementations§

Source§

impl Clone for AggregateStore

Source§

fn clone(&self) -> AggregateStore

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl Debug for AggregateStore

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more