pub struct AggregateStore { /* private fields */ }Expand description
Central registry that manages aggregate instance lifecycles.
The store handles directory creation, actor spawning, and handle caching.
It is Clone + Send + Sync – cloning shares the underlying cache.
§Examples
use eventfold_es::{AggregateStore, CommandContext};
let store = AggregateStore::open("/tmp/my-app").await?;
// Use store.get::<MyAggregate>("instance-id") to get handlesImplementations§
Source§impl AggregateStore
impl AggregateStore
Sourcepub async fn get<A: Aggregate>(&self, id: &str) -> Result<AggregateHandle<A>>
pub async fn get<A: Aggregate>(&self, id: &str) -> Result<AggregateHandle<A>>
Get a handle to an aggregate instance, spawning its actor if needed.
If the actor is already running (cached), returns a clone of the existing handle. Otherwise, creates the stream directory on disk and spawns a new actor thread.
§Arguments
id- Unique instance identifier within the aggregate type.
§Returns
An AggregateHandle for sending commands and reading state.
§Errors
Returns std::io::Error if directory creation or event log opening fails.
Sourcepub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder
pub fn builder(base_dir: impl AsRef<Path>) -> AggregateStoreBuilder
Create a builder for configuring projections and other options.
§Arguments
base_dir- Root directory for all event store data.
§Returns
An AggregateStoreBuilder that can register projections before opening.
§Examples
use eventfold_es::AggregateStore;
let store = AggregateStore::builder("/tmp/my-app")
// .projection::<MyProjection>()
.open()
.await?;Sourcepub fn projection<P: Projection>(&self) -> Result<P>
pub fn projection<P: Projection>(&self) -> Result<P>
Catch up and return the current state of a registered projection.
Triggers a lazy catch-up before returning: reads any new events from subscribed streams. Returns a clone of the projection state.
This method is synchronous (not async). It uses std::sync locks
and blocking I/O internally. For embedded use, catch_up is fast
for incremental updates. Callers that need an async boundary can
wrap this in tokio::task::spawn_blocking.
§Returns
A clone of the projection’s current state after catching up.
§Errors
Returns io::ErrorKind::NotFound if the projection is not registered.
Returns io::Error if catching up on events fails.
Sourcepub fn rebuild_projection<P: Projection>(&self) -> Result<()>
pub fn rebuild_projection<P: Projection>(&self) -> Result<()>
Delete the checkpoint for projection P and replay all events from scratch.
This acquires the projections read-lock, downcasts to the concrete
ProjectionRunner<P>, and calls rebuild() which:
- Deletes the checkpoint file
- Resets internal state to
ProjectionCheckpoint::default() - Calls
catch_up()to replay all events from offset 0 - Saves the new checkpoint
Blocking I/O – if called from an async context,
wrap this in tokio::task::spawn_blocking.
§Errors
Returns io::ErrorKind::NotFound if the projection is not registered.
Returns io::Error if deleting the checkpoint or catching up fails.
Sourcepub async fn run_process_managers(&self) -> Result<ProcessManagerReport>
pub async fn run_process_managers(&self) -> Result<ProcessManagerReport>
Run all registered process managers through a catch-up pass.
For each process manager:
- Catch up on subscribed streams, collecting command envelopes.
- Dispatch each envelope to the target aggregate via the type registry.
- Write failed dispatches to the per-PM dead-letter log.
- Save the process manager checkpoint after all envelopes are handled.
§Returns
A ProcessManagerReport summarizing how many envelopes were
dispatched and how many were dead-lettered.
§Errors
Returns io::Error if catching up or saving checkpoints fails.
Sourcepub fn layout(&self) -> &StreamLayout
pub fn layout(&self) -> &StreamLayout
Returns a reference to the underlying storage layout.
Sourcepub async fn list_streams(
&self,
aggregate_type: Option<&str>,
) -> Result<Vec<(String, String)>>
pub async fn list_streams( &self, aggregate_type: Option<&str>, ) -> Result<Vec<(String, String)>>
List all known (aggregate_type, instance_id) pairs.
When aggregate_type is Some, returns only streams for that type.
When None, returns streams across all aggregate types. Results are
sorted by aggregate type then instance ID.
§Arguments
aggregate_type- Optional filter. WhenSome, only streams for that aggregate type are returned. WhenNone, all streams are returned.
§Returns
A sorted Vec<(String, String)> of (aggregate_type, instance_id)
pairs. Returns an empty vector if no matching streams exist.
§Errors
Returns std::io::Error if reading the directory fails.
Sourcepub async fn read_events(
&self,
aggregate_type: &str,
instance_id: &str,
) -> Result<Vec<Event>>
pub async fn read_events( &self, aggregate_type: &str, instance_id: &str, ) -> Result<Vec<Event>>
Read all raw events from a stream identified by aggregate type and instance ID.
Returns the events in the order they were appended. Does not spawn an actor or acquire a write lock on the stream.
§Arguments
aggregate_type- The aggregate type name (e.g."counter").instance_id- The unique instance identifier within that type.
§Returns
A Vec<eventfold::Event> containing all events in the stream.
Returns Ok(vec![]) if the stream directory exists but no events
have been written yet.
§Errors
Returns std::io::Error with ErrorKind::NotFound if the stream
directory does not exist (i.e. the stream was never created).
Returns std::io::Error for other I/O failures during reading.
Sourcepub async fn inject_event<A: Aggregate>(
&self,
instance_id: &str,
event: Event,
opts: InjectOptions,
) -> Result<()>
pub async fn inject_event<A: Aggregate>( &self, instance_id: &str, event: Event, opts: InjectOptions, ) -> Result<()>
Append a pre-validated event directly to a stream, bypassing command validation.
This is the primary entry point for relay-sync scenarios where events have already been validated on the originating client. The event is written as-is to the stream’s JSONL log, projections are caught up, and process managers are optionally triggered.
§Deduplication
If event.id is Some(id) and that ID has already been seen by this
store instance, the call returns Ok(()) immediately without writing.
Events with event.id = None are never deduplicated.
§Actor interaction
If a live actor exists for the target stream, the event is injected
through the actor’s channel (preserving the actor’s exclusive writer
ownership). Otherwise, a temporary EventWriter is opened directly.
§Arguments
instance_id- Unique instance identifier within the aggregate type.event- A pre-validatedeventfold::Eventto append as-is.opts- Controls whether process managers run after injection.
§Returns
Ok(()) on success (including dedup no-ops).
§Errors
Returns std::io::Error if directory creation, event writing, or
projection catch-up fails.
Trait Implementations§
Source§impl Clone for AggregateStore
impl Clone for AggregateStore
Source§fn clone(&self) -> AggregateStore
fn clone(&self) -> AggregateStore
1.0.0 · Source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read more