Skip to main content

Repository

Struct Repository 

Source
pub struct Repository<S, C = Optimistic, M = NoSnapshots<<S as EventStore>::Position>>{ /* private fields */ }
Expand description

Command execution and aggregate lifecycle orchestrator.

Repository manages the complete event sourcing workflow: loading aggregates by replaying events, executing commands through handlers, and persisting resulting events transactionally.

§Usage

// Create repository
let repo = Repository::new(store);

// Execute commands
repo.execute_command::<Account, Deposit>(&id, &cmd, &metadata).await?;

// Load aggregate state
let account: Account = repo.load(&id).await?;

// Load projections
let report = repo.load_projection::<InventoryReport>(&()).await?;

// Enable snapshots for faster loading
let repo_with_snaps = repo.with_snapshots(snapshot_store);

§Type Aliases

Use these type aliases for common configurations:

§Concurrency Strategies

  • Optimistic (default): Detects conflicts via version checking. Use execute_with_retry() to automatically retry on conflicts.
  • Unchecked: Last-writer-wins semantics. Use only when concurrent writes are impossible or acceptable.

§See Also

Implementations§

Source§

impl<S> Repository<S>
where S: EventStore,

Source

pub const fn new(store: S) -> Self

Source§

impl<S, M> Repository<S, Optimistic, M>
where S: EventStore,

Source

pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M>

Disable optimistic concurrency checking for this repository.

Source§

impl<S, C, M> Repository<S, C, M>

Source

pub const fn event_store(&self) -> &S

Source

pub fn with_snapshots<SS>( self, snapshots: SS, ) -> Repository<S, C, Snapshots<SS>>
where SS: SnapshotStore<S::Id, Position = S::Position>,

Source

pub async fn load_projection<P>( &self, instance_id: &P::InstanceId, ) -> Result<P, ProjectionError<S::Error>>
where P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>, P::InstanceId: Send + Sync, M: Sync,

Load a projection by replaying events (one-shot query, no snapshots).

Filter configuration is defined centrally in the projection’s ProjectionFilters implementation. The instance_id parameterises which events to load.

§Errors

Returns ProjectionError when the store fails to load events or when an event cannot be deserialised.

Source

pub async fn load<A>( &self, id: &S::Id, ) -> Result<A, ProjectionError<<S as EventStore>::Error>>
where A: Aggregate<Id = S::Id>, A::Event: ProjectionEvent, M: SnapshotPolicy<S, A> + Sync,

Load an aggregate, using snapshots when configured.

§Errors

Returns ProjectionError if the store fails to load events or if an event cannot be decoded into the aggregate’s event sum type.

Source

pub async fn execute_command<A, Cmd>( &self, id: &S::Id, command: &Cmd, metadata: &S::Metadata, ) -> Result<(), CommandError<A::Error, C::ConcurrencyError, S::Error, M::SnapshotError>>
where A: Aggregate<Id = S::Id> + Handle<Cmd>, A::Event: ProjectionEvent + EventKind + Serialize + Send + Sync, Cmd: Sync, S::Metadata: Clone, C: CommitPolicy<S>, M: SnapshotPolicy<S, A> + Sync,

Execute a command using the repository’s configured concurrency and snapshot strategy.

§Errors

Returns CommandError when the aggregate rejects the command, events cannot be encoded, the store fails to persist, snapshot persistence fails, or the aggregate cannot be rebuilt. Optimistic repositories return CommandError::Concurrency on conflicts.

Source§

impl<S, C, SS> Repository<S, C, Snapshots<SS>>

Source

pub async fn load_projection_with_snapshot<P>( &self, instance_id: &P::InstanceId, ) -> Result<P, ProjectionError<S::Error>>
where P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Sync, P::InstanceId: Send + Sync, SS: SnapshotStore<P::InstanceId, Position = S::Position>,

Load a projection with snapshot support.

Loads the most recent snapshot (if available), replays events from that position, and offers a new snapshot after loading.

§Errors

Returns ProjectionError when the store fails to load events or when an event cannot be deserialised.

Source§

impl<S, SS, C> Repository<S, C, Snapshots<SS>>
where S: EventStore, SS: SnapshotStore<S::Id, Position = S::Position>, C: ConcurrencyStrategy,

Source

pub const fn snapshot_store(&self) -> &SS

Source§

impl<S, C, M> Repository<S, C, M>

Source

pub fn subscribe<P>( &self, instance_id: P::InstanceId, ) -> SubscriptionBuilder<S, P, NoSnapshots<S::Position>>
where S: SubscribableStore + Clone + 'static, S::Position: Ord, P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static, P::InstanceId: Clone + Send + Sync + 'static, P::Metadata: Send,

Start a continuous subscription for a projection.

Returns a SubscriptionBuilder that can be configured with callbacks before starting. The subscription replays historical events first (catch-up phase), then processes live events as they are committed.

Subscription snapshots are disabled. Use subscribe_with_snapshots() to provide a snapshot store for the subscription.

§Example
let subscription = repo
    .subscribe::<Dashboard>(())
    .on_update(|d| println!("{d:?}"))
    .start()
    .await?;
Source

pub fn subscribe_with_snapshots<P, SS>( &self, instance_id: P::InstanceId, snapshots: SS, ) -> SubscriptionBuilder<S, P, SS>
where S: SubscribableStore + Clone + 'static, S::Position: Ord, P: Projection + ProjectionFilters<Id = S::Id, Metadata = S::Metadata> + Serialize + DeserializeOwned + Send + Sync + 'static, P::InstanceId: Clone + Send + Sync + 'static, P::Metadata: Send, SS: SnapshotStore<P::InstanceId, Position = S::Position> + Send + Sync + 'static,

Start a continuous subscription with an explicit snapshot store.

The snapshot store is keyed by P::InstanceId and tracks the subscription’s position for faster restart.

Source§

impl<S, M> Repository<S, Optimistic, M>
where S: EventStore,

Source

pub async fn execute_with_retry<A, Cmd>( &self, id: &S::Id, command: &Cmd, metadata: &S::Metadata, max_retries: usize, ) -> Result<usize, CommandError<A::Error, ConcurrencyConflict<S::Position>, S::Error, M::SnapshotError>>
where A: Aggregate<Id = S::Id> + Handle<Cmd>, A::Event: ProjectionEvent + EventKind + Serialize + Send + Sync, Cmd: Sync, S::Metadata: Clone, M: SnapshotPolicy<S, A> + Sync,

Execute a command with automatic retry on concurrency conflicts.

§Errors

Returns the last error if all retries are exhausted, or a non-concurrency error immediately.

Auto Trait Implementations§

§

impl<S, C, M> Freeze for Repository<S, C, M>
where S: Freeze, M: Freeze,

§

impl<S, C, M> RefUnwindSafe for Repository<S, C, M>

§

impl<S, C, M> Send for Repository<S, C, M>
where M: Send,

§

impl<S, C, M> Sync for Repository<S, C, M>
where M: Sync,

§

impl<S, C, M> Unpin for Repository<S, C, M>
where S: Unpin, M: Unpin, C: Unpin,

§

impl<S, C, M> UnwindSafe for Repository<S, C, M>
where S: UnwindSafe, M: UnwindSafe, C: UnwindSafe,

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more