pub struct Repository<S, C = Optimistic, M = NoSnapshots<<S as EventStore>::Position>>where
S: EventStore,
C: ConcurrencyStrategy,{ /* private fields */ }Expand description
Command execution and aggregate lifecycle orchestrator.
Repository manages the complete event sourcing workflow: loading aggregates by replaying events, executing commands through handlers, and persisting resulting events transactionally.
§Usage
// Create repository
let repo = Repository::new(store);
// Execute commands
repo.execute_command::<Account, Deposit>(&id, &cmd, &metadata).await?;
// Load aggregate state
let account: Account = repo.load(&id).await?;
// Load projections
let report = repo.load_projection::<InventoryReport>(&()).await?;
// Enable snapshots for faster loading
let repo_with_snaps = repo.with_snapshots(snapshot_store);§Type Aliases
Use these type aliases for common configurations:
OptimisticRepository<S>- Version-checked writes, no snapshotsUncheckedRepository<S>- Last-writer-wins, no snapshotsOptimisticSnapshotRepository<S, SS>- Version-checked writes with snapshots
§Concurrency Strategies
- Optimistic (default): Detects conflicts via version checking. Use
execute_with_retry()to automatically retry on conflicts. - Unchecked: Last-writer-wins semantics. Use only when concurrent writes are impossible or acceptable.
§See Also
- quickstart example
- Complete workflow
execute_command()- Command executionload()- Aggregate loadingload_projection()- Projection loading
Implementations§
Source§impl<S> Repository<S>where
S: EventStore,
impl<S> Repository<S>where
S: EventStore,
Source§impl<S, M> Repository<S, Optimistic, M>where
S: EventStore,
impl<S, M> Repository<S, Optimistic, M>where
S: EventStore,
Sourcepub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M>
pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M>
Disable optimistic concurrency checking for this repository.
Source§impl<S, C, M> Repository<S, C, M>where
S: EventStore,
C: ConcurrencyStrategy,
impl<S, C, M> Repository<S, C, M>where
S: EventStore,
C: ConcurrencyStrategy,
pub const fn event_store(&self) -> &S
pub fn with_snapshots<SS>( self, snapshots: SS, ) -> Repository<S, C, Snapshots<SS>>
Sourcepub async fn load_projection<P>(
&self,
instance_id: &P::InstanceId,
) -> Result<P, ProjectionError<S::Error>>where
P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
P::InstanceId: Send + Sync,
M: Sync,
pub async fn load_projection<P>(
&self,
instance_id: &P::InstanceId,
) -> Result<P, ProjectionError<S::Error>>where
P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
P::InstanceId: Send + Sync,
M: Sync,
Load a projection by replaying events (one-shot query, no snapshots).
Filter configuration is defined centrally in the projection’s
ProjectionFilters implementation. The instance_id parameterises
which events to load.
§Errors
Returns ProjectionError when the store fails to load events or when
an event cannot be deserialised.
Sourcepub async fn load<A>(
&self,
id: &S::Id,
) -> Result<A, ProjectionError<<S as EventStore>::Error>>
pub async fn load<A>( &self, id: &S::Id, ) -> Result<A, ProjectionError<<S as EventStore>::Error>>
Load an aggregate, using snapshots when configured.
§Errors
Returns ProjectionError if the store fails to load events or if an
event cannot be decoded into the aggregate’s event sum type.
Sourcepub async fn execute_command<A, Cmd>(
&self,
id: &S::Id,
command: &Cmd,
metadata: &S::Metadata,
) -> Result<(), CommandError<A::Error, C::ConcurrencyError, S::Error, M::SnapshotError>>
pub async fn execute_command<A, Cmd>( &self, id: &S::Id, command: &Cmd, metadata: &S::Metadata, ) -> Result<(), CommandError<A::Error, C::ConcurrencyError, S::Error, M::SnapshotError>>
Execute a command using the repository’s configured concurrency and snapshot strategy.
§Errors
Returns CommandError when the aggregate rejects the command, events
cannot be encoded, the store fails to persist, snapshot persistence
fails, or the aggregate cannot be rebuilt. Optimistic repositories
return CommandError::Concurrency on conflicts.
Source§impl<S, C, SS> Repository<S, C, Snapshots<SS>>
impl<S, C, SS> Repository<S, C, Snapshots<SS>>
Sourcepub async fn load_projection_with_snapshot<P>(
&self,
instance_id: &P::InstanceId,
) -> Result<P, ProjectionError<S::Error>>where
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Sync,
P::InstanceId: Send + Sync,
SS: SnapshotStore<P::InstanceId, Position = S::Position>,
pub async fn load_projection_with_snapshot<P>(
&self,
instance_id: &P::InstanceId,
) -> Result<P, ProjectionError<S::Error>>where
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Sync,
P::InstanceId: Send + Sync,
SS: SnapshotStore<P::InstanceId, Position = S::Position>,
Load a projection with snapshot support.
Loads the most recent snapshot (if available), replays events from that position, and offers a new snapshot after loading.
§Errors
Returns ProjectionError when the store fails to load events or when
an event cannot be deserialised.
Source§impl<S, SS, C> Repository<S, C, Snapshots<SS>>
impl<S, SS, C> Repository<S, C, Snapshots<SS>>
pub const fn snapshot_store(&self) -> &SS
Source§impl<S, C, M> Repository<S, C, M>where
S: EventStore,
C: ConcurrencyStrategy,
impl<S, C, M> Repository<S, C, M>where
S: EventStore,
C: ConcurrencyStrategy,
Sourcepub fn subscribe<P>(
&self,
instance_id: P::InstanceId,
) -> SubscriptionBuilder<S, P, NoSnapshots<S::Position>>where
S: SubscribableStore + Clone + 'static,
S::Position: Ord,
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static,
P::InstanceId: Clone + Send + Sync + 'static,
P::Metadata: Send,
pub fn subscribe<P>(
&self,
instance_id: P::InstanceId,
) -> SubscriptionBuilder<S, P, NoSnapshots<S::Position>>where
S: SubscribableStore + Clone + 'static,
S::Position: Ord,
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static,
P::InstanceId: Clone + Send + Sync + 'static,
P::Metadata: Send,
Start a continuous subscription for a projection.
Returns a SubscriptionBuilder that can be configured with callbacks
before starting. The subscription replays historical events first
(catch-up phase), then processes live events as they are committed.
Subscription snapshots are disabled. Use subscribe_with_snapshots()
to provide a snapshot store for the subscription.
§Example
let subscription = repo
.subscribe::<Dashboard>(())
.on_update(|d| println!("{d:?}"))
.start()
.await?;Sourcepub fn subscribe_with_snapshots<P, SS>(
&self,
instance_id: P::InstanceId,
snapshots: SS,
) -> SubscriptionBuilder<S, P, SS>where
S: SubscribableStore + Clone + 'static,
S::Position: Ord,
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static,
P::InstanceId: Clone + Send + Sync + 'static,
P::Metadata: Send,
SS: SnapshotStore<P::InstanceId, Position = S::Position> + Send + Sync + 'static,
pub fn subscribe_with_snapshots<P, SS>(
&self,
instance_id: P::InstanceId,
snapshots: SS,
) -> SubscriptionBuilder<S, P, SS>where
S: SubscribableStore + Clone + 'static,
S::Position: Ord,
P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static,
P::InstanceId: Clone + Send + Sync + 'static,
P::Metadata: Send,
SS: SnapshotStore<P::InstanceId, Position = S::Position> + Send + Sync + 'static,
Start a continuous subscription with an explicit snapshot store.
The snapshot store is keyed by P::InstanceId and tracks the
subscription’s position for faster restart.
Source§impl<S, M> Repository<S, Optimistic, M>where
S: EventStore,
impl<S, M> Repository<S, Optimistic, M>where
S: EventStore,
Sourcepub async fn execute_with_retry<A, Cmd>(
&self,
id: &S::Id,
command: &Cmd,
metadata: &S::Metadata,
max_retries: usize,
) -> Result<usize, CommandError<A::Error, ConcurrencyConflict<S::Position>, S::Error, M::SnapshotError>>
pub async fn execute_with_retry<A, Cmd>( &self, id: &S::Id, command: &Cmd, metadata: &S::Metadata, max_retries: usize, ) -> Result<usize, CommandError<A::Error, ConcurrencyConflict<S::Position>, S::Error, M::SnapshotError>>
Execute a command with automatic retry on concurrency conflicts.
§Errors
Returns the last error if all retries are exhausted, or a non-concurrency error immediately.