Skip to main content

sourcery_core/
repository.rs

1//! Command execution and aggregate lifecycle management.
2//!
3//! The [`Repository`] orchestrates the core event sourcing workflow:
4//!
5//! 1. Load aggregate state by replaying events
6//! 2. Execute commands via [`Handle<C>`](crate::aggregate::Handle)
7//! 3. Persist resulting events transactionally
8//! 4. Load projections from event streams
9//!
10//! # Quick Example
11//!
12//! ```ignore
13//! let repo = Repository::new(store);
14//!
15//! // Execute a command
16//! repo.execute_command::<Account, Deposit>(&id, &cmd, &metadata).await?;
17//!
18//! // Load aggregate state
19//! let account: Account = repo.load(&id).await?;
20//!
21//! // Load a projection
22//! let report = repo.load_projection::<Report>(&()).await?;
23//! ```
24//!
25//! See the [quickstart example](https://github.com/danieleades/sourcery/blob/main/examples/quickstart.rs)
26//! for a complete working example.
27
28use std::{collections::HashMap, convert::Infallible, marker::PhantomData};
29
30use nonempty::NonEmpty;
31use serde::{Serialize, de::DeserializeOwned};
32use thiserror::Error;
33
34use crate::{
35    aggregate::{Aggregate, Handle},
36    concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
37    event::{EventKind, ProjectionEvent},
38    projection::{HandlerError, Projection, ProjectionError, ProjectionFilters},
39    snapshot::{OfferSnapshotError, Snapshot, SnapshotOffer, SnapshotStore},
40    store::{
41        CommitError, EventFilter, EventStore, GloballyOrderedStore, OptimisticCommitError,
42        StoredEvents,
43    },
44    subscription::{SubscribableStore, SubscriptionBuilder},
45};
46
47type LoadError<S> = ProjectionError<<S as EventStore>::Error>;
48
49/// Error type for command execution across all repository modes.
50#[derive(Debug, Error)]
51pub enum CommandError<AggregateError, ConcurrencyError, StoreError, SnapshotError>
52where
53    ConcurrencyError: std::error::Error + 'static,
54    StoreError: std::error::Error + 'static,
55    SnapshotError: std::error::Error + 'static,
56{
57    #[error("aggregate rejected command: {0}")]
58    Aggregate(AggregateError),
59    #[error(transparent)]
60    Concurrency(ConcurrencyError),
61    #[error("failed to rebuild aggregate state: {0}")]
62    Projection(#[source] ProjectionError<StoreError>),
63    #[error("failed to persist events: {0}")]
64    Store(#[source] StoreError),
65    #[error("snapshot operation failed: {0}")]
66    Snapshot(#[source] SnapshotError),
67}
68
69/// Result type alias for unchecked command execution.
70pub type UncheckedCommandResult<A, S> = Result<
71    (),
72    CommandError<<A as Aggregate>::Error, Infallible, <S as EventStore>::Error, Infallible>,
73>;
74
75/// Result type alias for snapshot-enabled unchecked command execution.
76pub type UncheckedSnapshotCommandResult<A, S, SS> = Result<
77    (),
78    CommandError<
79        <A as Aggregate>::Error,
80        Infallible,
81        <S as EventStore>::Error,
82        <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
83    >,
84>;
85
86/// Result type alias for optimistic command execution.
87pub type OptimisticCommandResult<A, S> = Result<
88    (),
89    CommandError<
90        <A as Aggregate>::Error,
91        ConcurrencyConflict<<S as EventStore>::Position>,
92        <S as EventStore>::Error,
93        Infallible,
94    >,
95>;
96
97/// Result type alias for snapshot-enabled optimistic command execution.
98pub type OptimisticSnapshotCommandResult<A, S, SS> = Result<
99    (),
100    CommandError<
101        <A as Aggregate>::Error,
102        ConcurrencyConflict<<S as EventStore>::Position>,
103        <S as EventStore>::Error,
104        <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
105    >,
106>;
107
108/// Result type alias for retry operations (optimistic, no snapshots).
109pub type RetryResult<A, S> = Result<
110    usize,
111    CommandError<
112        <A as Aggregate>::Error,
113        ConcurrencyConflict<<S as EventStore>::Position>,
114        <S as EventStore>::Error,
115        Infallible,
116    >,
117>;
118
119/// Result type alias for retry operations (optimistic, snapshots enabled).
120pub type SnapshotRetryResult<A, S, SS> = Result<
121    usize,
122    CommandError<
123        <A as Aggregate>::Error,
124        ConcurrencyConflict<<S as EventStore>::Position>,
125        <S as EventStore>::Error,
126        <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
127    >,
128>;
129
130#[doc(hidden)]
131pub enum CommitPolicyError<C, S> {
132    Concurrency(C),
133    Store(S),
134}
135
136#[doc(hidden)]
137pub trait CommitPolicy<S: EventStore>: private::Sealed {
138    type ConcurrencyError: std::error::Error + 'static;
139
140    fn commit<E>(
141        store: &S,
142        kind: &str,
143        id: &S::Id,
144        expected: Option<S::Position>,
145        events: NonEmpty<E>,
146        metadata: &S::Metadata,
147    ) -> impl std::future::Future<
148        Output = Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>,
149    > + Send
150    where
151        E: EventKind + Serialize + Send + Sync,
152        S::Metadata: Clone;
153}
154
155#[doc(hidden)]
156pub trait SnapshotPolicy<S: EventStore, A: Aggregate<Id = S::Id>>: private::Sealed {
157    type SnapshotError: std::error::Error + 'static;
158    type Prepared;
159
160    fn load_base(
161        &self,
162        kind: &str,
163        id: &S::Id,
164    ) -> impl std::future::Future<Output = (A, Option<S::Position>)> + Send;
165
166    fn prepare_snapshot(&self, aggregate: A, events: &NonEmpty<A::Event>) -> Self::Prepared;
167
168    fn offer_snapshot(
169        &self,
170        kind: &str,
171        id: &S::Id,
172        events_since_snapshot: u64,
173        new_position: S::Position,
174        prepared: Self::Prepared,
175    ) -> impl std::future::Future<Output = Result<(), Self::SnapshotError>> + Send;
176}
177
178struct LoadedAggregate<A, Pos> {
179    aggregate: A,
180    version: Option<Pos>,
181    events_since_snapshot: u64,
182}
183
184fn aggregate_event_filters<S, E>(
185    aggregate_kind: &str,
186    aggregate_id: &S::Id,
187    after: Option<&S::Position>,
188) -> Vec<EventFilter<S::Id, S::Position>>
189where
190    S: EventStore,
191    E: ProjectionEvent,
192{
193    E::EVENT_KINDS
194        .iter()
195        .map(|kind| {
196            let mut filter =
197                EventFilter::for_aggregate(*kind, aggregate_kind, aggregate_id.clone());
198            if let Some(position) = after {
199                filter = filter.after(position.clone());
200            }
201            filter
202        })
203        .collect()
204}
205
206fn apply_stored_events<A, S>(
207    aggregate: &mut A,
208    store: &S,
209    events: &StoredEvents<S::Id, S::Position, S::Data, S::Metadata>,
210) -> Result<Option<S::Position>, crate::event::EventDecodeError<S::Error>>
211where
212    S: EventStore,
213    S::Position: Clone,
214    A: Aggregate<Id = S::Id>,
215    A::Event: ProjectionEvent,
216{
217    let mut last_event_position: Option<S::Position> = None;
218
219    for stored in events {
220        let event = A::Event::from_stored(stored, store)?;
221        aggregate.apply(&event);
222        last_event_position = Some(stored.position());
223    }
224
225    Ok(last_event_position)
226}
227
228fn replay_projection_events<P, S>(
229    projection: &mut P,
230    events: &StoredEvents<S::Id, S::Position, S::Data, S::Metadata>,
231    handlers: &HashMap<&'static str, crate::projection::EventHandler<P, S>>,
232    store: &S,
233) -> Result<Option<S::Position>, ProjectionError<S::Error>>
234where
235    S: EventStore,
236    S::Position: Clone,
237    P: ProjectionFilters<Id = S::Id>,
238{
239    let mut last_position = None;
240
241    for stored in events {
242        let aggregate_id = stored.aggregate_id();
243        let kind = stored.kind();
244        let metadata = stored.metadata();
245
246        if let Some(handler) = handlers.get(kind) {
247            (handler)(projection, aggregate_id, stored, metadata, store).map_err(|error| {
248                match error {
249                    HandlerError::Store(error) => {
250                        ProjectionError::EventDecode(crate::event::EventDecodeError::Store(error))
251                    }
252                    HandlerError::EventDecode(error) => ProjectionError::EventDecode(error),
253                }
254            })?;
255        }
256        last_position = Some(stored.position());
257    }
258
259    Ok(last_position)
260}
261
262/// Snapshot-enabled repository mode wrapper.
263///
264/// This is an implementation detail of [`Repository`]'s type-state pattern for
265/// snapshot support. You should never construct this type directly.
266///
267/// # Usage
268///
269/// Enable snapshots via [`Repository::with_snapshots()`]:
270///
271/// ```ignore
272/// let repo = Repository::new(store)
273///     .with_snapshots(snapshot_store);
274/// ```
275pub struct Snapshots<SS>(
276    /// The underlying snapshot store implementation.
277    ///
278    /// This field is public for trait implementation purposes only.
279    /// Do not access it directly.
280    #[doc(hidden)]
281    pub SS,
282);
283
284impl<Id, SS> SnapshotStore<Id> for Snapshots<SS>
285where
286    Id: Send + Sync + 'static,
287    SS: SnapshotStore<Id>,
288{
289    type Error = SS::Error;
290    type Position = SS::Position;
291
292    async fn load<T>(
293        &self,
294        kind: &str,
295        id: &Id,
296    ) -> Result<Option<Snapshot<Self::Position, T>>, Self::Error>
297    where
298        T: DeserializeOwned,
299    {
300        self.0.load(kind, id).await
301    }
302
303    async fn offer_snapshot<CE, T, Create>(
304        &self,
305        kind: &str,
306        id: &Id,
307        events_since_last_snapshot: u64,
308        create_snapshot: Create,
309    ) -> Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>
310    where
311        CE: std::error::Error + Send + Sync + 'static,
312        T: Serialize,
313        Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send,
314    {
315        self.0
316            .offer_snapshot(kind, id, events_since_last_snapshot, create_snapshot)
317            .await
318    }
319}
320
321mod private {
322    pub trait Sealed {}
323
324    impl Sealed for crate::concurrency::Unchecked {}
325    impl Sealed for crate::concurrency::Optimistic {}
326    impl<Pos> Sealed for crate::snapshot::NoSnapshots<Pos> {}
327    impl<SS> Sealed for super::Snapshots<SS> {}
328}
329
330impl<S> CommitPolicy<S> for Unchecked
331where
332    S: EventStore,
333{
334    type ConcurrencyError = Infallible;
335
336    async fn commit<E>(
337        store: &S,
338        kind: &str,
339        id: &S::Id,
340        _expected: Option<S::Position>,
341        events: NonEmpty<E>,
342        metadata: &S::Metadata,
343    ) -> Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>
344    where
345        E: EventKind + Serialize + Send + Sync,
346        S::Metadata: Clone,
347    {
348        let committed = store
349            .commit_events(kind, id, events, metadata)
350            .await
351            .map_err(|e| match e {
352                CommitError::Store(err) | CommitError::Serialization { source: err, .. } => {
353                    CommitPolicyError::Store(err)
354                }
355            })?;
356
357        Ok(committed.last_position)
358    }
359}
360
361impl<S> CommitPolicy<S> for Optimistic
362where
363    S: EventStore,
364{
365    type ConcurrencyError = ConcurrencyConflict<S::Position>;
366
367    async fn commit<E>(
368        store: &S,
369        kind: &str,
370        id: &S::Id,
371        expected: Option<S::Position>,
372        events: NonEmpty<E>,
373        metadata: &S::Metadata,
374    ) -> Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>
375    where
376        E: EventKind + Serialize + Send + Sync,
377        S::Metadata: Clone,
378    {
379        let committed = store
380            .commit_events_optimistic(kind, id, expected, events, metadata)
381            .await
382            .map_err(|e| match e {
383                OptimisticCommitError::Conflict(conflict) => {
384                    CommitPolicyError::Concurrency(conflict)
385                }
386                OptimisticCommitError::Store(err)
387                | OptimisticCommitError::Serialization { source: err, .. } => {
388                    CommitPolicyError::Store(err)
389                }
390            })?;
391
392        Ok(committed.last_position)
393    }
394}
395
396impl<S, A> SnapshotPolicy<S, A> for crate::snapshot::NoSnapshots<S::Position>
397where
398    S: EventStore,
399    A: Aggregate<Id = S::Id>,
400{
401    type Prepared = ();
402    type SnapshotError = Infallible;
403
404    async fn load_base(&self, _kind: &str, _id: &S::Id) -> (A, Option<S::Position>) {
405        (A::default(), None)
406    }
407
408    fn prepare_snapshot(&self, _aggregate: A, _events: &NonEmpty<A::Event>) -> Self::Prepared {}
409
410    async fn offer_snapshot(
411        &self,
412        _kind: &str,
413        _id: &S::Id,
414        _events_since_snapshot: u64,
415        _new_position: S::Position,
416        _prepared: Self::Prepared,
417    ) -> Result<(), Self::SnapshotError> {
418        Ok(())
419    }
420}
421
422impl<S, A, SS> SnapshotPolicy<S, A> for Snapshots<SS>
423where
424    S: EventStore,
425    A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned + Send,
426    SS: SnapshotStore<S::Id, Position = S::Position>,
427{
428    type Prepared = A;
429    type SnapshotError = SS::Error;
430
431    async fn load_base(&self, kind: &str, id: &S::Id) -> (A, Option<S::Position>) {
432        let snapshot_result = self
433            .0
434            .load::<A>(kind, id)
435            .await
436            .inspect_err(|e| {
437                tracing::error!(
438                    error = %e,
439                    "failed to load snapshot, falling back to full replay"
440                );
441            })
442            .ok()
443            .flatten();
444
445        if let Some(snapshot) = snapshot_result {
446            (snapshot.data, Some(snapshot.position))
447        } else {
448            (A::default(), None)
449        }
450    }
451
452    fn prepare_snapshot(&self, mut aggregate: A, events: &NonEmpty<A::Event>) -> Self::Prepared {
453        for event in events {
454            aggregate.apply(event);
455        }
456        aggregate
457    }
458
459    async fn offer_snapshot(
460        &self,
461        kind: &str,
462        id: &S::Id,
463        events_since_snapshot: u64,
464        new_position: S::Position,
465        prepared: Self::Prepared,
466    ) -> Result<(), Self::SnapshotError> {
467        let offer_result = self.0.offer_snapshot(
468            kind,
469            id,
470            events_since_snapshot,
471            move || -> Result<Snapshot<S::Position, A>, Infallible> {
472                Ok(Snapshot {
473                    position: new_position,
474                    data: prepared,
475                })
476            },
477        );
478
479        match offer_result.await {
480            Ok(SnapshotOffer::Declined | SnapshotOffer::Stored)
481            | Err(OfferSnapshotError::Create(_)) => Ok(()),
482            Err(OfferSnapshotError::Snapshot(e)) => Err(e),
483        }
484    }
485}
486
487/// Repository type alias with optimistic concurrency and no snapshots.
488///
489/// This configuration provides version-checked writes without snapshot support.
490/// It is equivalent to:
491/// ```ignore
492/// Repository<S, Optimistic, NoSnapshots<<S as EventStore>::Position>>
493/// ```
494///
495/// # Example
496///
497/// ```ignore
498/// use sourcery::{Repository, store::inmemory};
499///
500/// let store = inmemory::Store::new();
501/// let repo: OptimisticRepository<_> = Repository::new(store);
502/// ```
503pub type OptimisticRepository<S> =
504    Repository<S, Optimistic, crate::snapshot::NoSnapshots<<S as EventStore>::Position>>;
505
506/// Repository type alias with unchecked concurrency and no snapshots.
507///
508/// This configuration skips version checking, allowing last-writer-wins
509/// semantics. Use when concurrent writes are impossible or acceptable.
510///
511/// Equivalent to:
512/// ```ignore
513/// Repository<S, Unchecked, NoSnapshots<<S as EventStore>::Position>>
514/// ```
515pub type UncheckedRepository<S> =
516    Repository<S, Unchecked, crate::snapshot::NoSnapshots<<S as EventStore>::Position>>;
517
518/// Repository type alias with optimistic concurrency and snapshot support.
519///
520/// This configuration enables snapshot support for faster aggregate loading
521/// with version-checked writes. Requires aggregate state to implement
522/// `Serialize + DeserializeOwned`.
523///
524/// Equivalent to:
525/// ```ignore
526/// Repository<S, Optimistic, SS>
527/// ```
528///
529/// # Example
530///
531/// ```ignore
532/// use sourcery::{Repository, store::inmemory, snapshot::inmemory};
533///
534/// let store = inmemory::Store::new();
535/// let snapshot_store = inmemory::Store::every(100);
536/// let repo: OptimisticSnapshotRepository<_, _> = Repository::new(store)
537///     .with_snapshots(snapshot_store);
538/// ```
539pub type OptimisticSnapshotRepository<S, SS> = Repository<S, Optimistic, SS>;
540
541/// Command execution and aggregate lifecycle orchestrator.
542///
543/// Repository manages the complete event sourcing workflow: loading aggregates
544/// by replaying events, executing commands through handlers, and persisting
545/// resulting events transactionally.
546///
547/// # Usage
548///
549/// ```ignore
550/// // Create repository
551/// let repo = Repository::new(store);
552///
553/// // Execute commands
554/// repo.execute_command::<Account, Deposit>(&id, &cmd, &metadata).await?;
555///
556/// // Load aggregate state
557/// let account: Account = repo.load(&id).await?;
558///
559/// // Load projections
560/// let report = repo.load_projection::<InventoryReport>(&()).await?;
561///
562/// // Enable snapshots for faster loading
563/// let repo_with_snaps = repo.with_snapshots(snapshot_store);
564/// ```
565///
566/// # Type Aliases
567///
568/// Use these type aliases for common configurations:
569///
570/// - [`OptimisticRepository<S>`] - Version-checked writes, no snapshots
571/// - [`UncheckedRepository<S>`] - Last-writer-wins, no snapshots
572/// - [`OptimisticSnapshotRepository<S, SS>`] - Version-checked writes with
573///   snapshots
574///
575/// # Concurrency Strategies
576///
577/// - **Optimistic** (default): Detects conflicts via version checking. Use
578///   [`execute_with_retry()`](Self::execute_with_retry) to automatically retry
579///   on conflicts.
580/// - **Unchecked**: Last-writer-wins semantics. Use only when concurrent writes
581///   are impossible or acceptable.
582///
583/// # See Also
584///
585/// - [quickstart example](https://github.com/danieleades/sourcery/blob/main/examples/quickstart.rs)
586///   - Complete workflow
587/// - [`execute_command()`](Self::execute_command) - Command execution
588/// - [`load()`](Self::load) - Aggregate loading
589/// - [`load_projection()`](Self::load_projection) - Projection loading
590pub struct Repository<
591    S,
592    C = Optimistic,
593    M = crate::snapshot::NoSnapshots<<S as EventStore>::Position>,
594> where
595    S: EventStore,
596    C: ConcurrencyStrategy,
597{
598    pub(crate) store: S,
599    snapshots: M,
600    _concurrency: PhantomData<C>,
601}
602
603impl<S> Repository<S>
604where
605    S: EventStore,
606{
607    #[must_use]
608    pub const fn new(store: S) -> Self {
609        Self {
610            store,
611            snapshots: crate::snapshot::NoSnapshots::new(),
612            _concurrency: PhantomData,
613        }
614    }
615}
616
617impl<S, M> Repository<S, Optimistic, M>
618where
619    S: EventStore,
620{
621    /// Disable optimistic concurrency checking for this repository.
622    #[must_use]
623    pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M> {
624        Repository {
625            store: self.store,
626            snapshots: self.snapshots,
627            _concurrency: PhantomData,
628        }
629    }
630}
631
632impl<S, C, M> Repository<S, C, M>
633where
634    S: EventStore,
635    C: ConcurrencyStrategy,
636{
637    #[must_use]
638    pub const fn event_store(&self) -> &S {
639        &self.store
640    }
641
642    #[must_use]
643    pub fn with_snapshots<SS>(self, snapshots: SS) -> Repository<S, C, Snapshots<SS>>
644    where
645        SS: SnapshotStore<S::Id, Position = S::Position>,
646    {
647        Repository {
648            store: self.store,
649            snapshots: Snapshots(snapshots),
650            _concurrency: PhantomData,
651        }
652    }
653
654    /// Load a projection by replaying events (one-shot query, no snapshots).
655    ///
656    /// Filter configuration is defined centrally in the projection's
657    /// [`ProjectionFilters`] implementation. The `instance_id` parameterises
658    /// which events to load.
659    ///
660    /// # Errors
661    ///
662    /// Returns [`ProjectionError`] when the store fails to load events or when
663    /// an event cannot be deserialised.
664    #[tracing::instrument(
665        skip(self, instance_id),
666        fields(
667            projection_type = std::any::type_name::<P>(),
668        )
669    )]
670    pub async fn load_projection<P>(
671        &self,
672        instance_id: &P::InstanceId,
673    ) -> Result<P, ProjectionError<S::Error>>
674    where
675        P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
676        P::InstanceId: Send + Sync,
677        M: Sync,
678    {
679        tracing::debug!("loading projection");
680
681        let filters = P::filters::<S>(instance_id);
682        let (event_filters, handlers) = filters.into_event_filters(None);
683
684        let events = self
685            .store
686            .load_events(&event_filters)
687            .await
688            .map_err(ProjectionError::Store)?;
689
690        let mut projection = P::init(instance_id);
691        let event_count = events.len();
692        tracing::debug!(
693            events_to_replay = event_count,
694            "replaying events into projection"
695        );
696
697        let _ = replay_projection_events(&mut projection, &events, &handlers, &self.store)?;
698
699        tracing::info!(events_applied = event_count, "projection loaded");
700        Ok(projection)
701    }
702
703    /// Load an aggregate, using snapshots when configured.
704    ///
705    /// # Errors
706    ///
707    /// Returns [`ProjectionError`] if the store fails to load events or if an
708    /// event cannot be decoded into the aggregate's event sum type.
709    pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
710    where
711        A: Aggregate<Id = S::Id>,
712        A::Event: ProjectionEvent,
713        M: SnapshotPolicy<S, A> + Sync,
714    {
715        Ok(self.load_aggregate::<A>(id).await?.aggregate)
716    }
717
718    async fn load_aggregate<A>(
719        &self,
720        id: &S::Id,
721    ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
722    where
723        A: Aggregate<Id = S::Id>,
724        A::Event: ProjectionEvent,
725        M: SnapshotPolicy<S, A> + Sync,
726    {
727        let (mut aggregate, snapshot_position) = self.snapshots.load_base(A::KIND, id).await;
728
729        let filters =
730            aggregate_event_filters::<S, A::Event>(A::KIND, id, snapshot_position.as_ref());
731
732        let events = self
733            .store
734            .load_events(&filters)
735            .await
736            .map_err(ProjectionError::Store)?;
737
738        let last_event_position = apply_stored_events::<A, S>(&mut aggregate, &self.store, &events)
739            .map_err(ProjectionError::EventDecode)?;
740
741        let version = last_event_position.or(snapshot_position);
742
743        Ok(LoadedAggregate {
744            aggregate,
745            version,
746            events_since_snapshot: events.len() as u64,
747        })
748    }
749
750    /// Execute a command using the repository's configured concurrency and
751    /// snapshot strategy.
752    ///
753    /// # Errors
754    ///
755    /// Returns [`CommandError`] when the aggregate rejects the command, events
756    /// cannot be encoded, the store fails to persist, snapshot persistence
757    /// fails, or the aggregate cannot be rebuilt. Optimistic repositories
758    /// return [`CommandError::Concurrency`] on conflicts.
759    pub async fn execute_command<A, Cmd>(
760        &self,
761        id: &S::Id,
762        command: &Cmd,
763        metadata: &S::Metadata,
764    ) -> Result<(), CommandError<A::Error, C::ConcurrencyError, S::Error, M::SnapshotError>>
765    where
766        A: Aggregate<Id = S::Id> + Handle<Cmd>,
767        A::Event: ProjectionEvent + EventKind + Serialize + Send + Sync,
768        Cmd: Sync,
769        S::Metadata: Clone,
770        C: CommitPolicy<S>,
771        M: SnapshotPolicy<S, A> + Sync,
772    {
773        let LoadedAggregate {
774            aggregate,
775            version,
776            events_since_snapshot,
777        } = self
778            .load_aggregate::<A>(id)
779            .await
780            .map_err(CommandError::Projection)?;
781
782        let new_events =
783            Handle::<Cmd>::handle(&aggregate, command).map_err(CommandError::Aggregate)?;
784
785        let Some(events) = NonEmpty::from_vec(new_events) else {
786            return Ok(());
787        };
788
789        let total_events_since_snapshot = events_since_snapshot + events.len() as u64;
790
791        let prepared = self.snapshots.prepare_snapshot(aggregate, &events);
792
793        let new_position =
794            C::commit::<A::Event>(&self.store, A::KIND, id, version, events, metadata)
795                .await
796                .map_err(|e| match e {
797                    CommitPolicyError::Concurrency(conflict) => CommandError::Concurrency(conflict),
798                    CommitPolicyError::Store(err) => CommandError::Store(err),
799                })?;
800
801        self.snapshots
802            .offer_snapshot(
803                A::KIND,
804                id,
805                total_events_since_snapshot,
806                new_position,
807                prepared,
808            )
809            .await
810            .map_err(CommandError::Snapshot)?;
811
812        Ok(())
813    }
814}
815
816impl<S, C, SS> Repository<S, C, Snapshots<SS>>
817where
818    S: EventStore + GloballyOrderedStore,
819    S::Position: Ord,
820    C: ConcurrencyStrategy,
821{
822    /// Load a projection with snapshot support.
823    ///
824    /// Loads the most recent snapshot (if available), replays events from that
825    /// position, and offers a new snapshot after loading.
826    ///
827    /// # Errors
828    ///
829    /// Returns [`ProjectionError`] when the store fails to load events or when
830    /// an event cannot be deserialised.
831    #[tracing::instrument(
832        skip(self, instance_id),
833        fields(
834            projection_type = std::any::type_name::<P>(),
835        )
836    )]
837    pub async fn load_projection_with_snapshot<P>(
838        &self,
839        instance_id: &P::InstanceId,
840    ) -> Result<P, ProjectionError<S::Error>>
841    where
842        P: Projection
843            + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
844            + Serialize
845            + DeserializeOwned
846            + Sync,
847        P::InstanceId: Send + Sync,
848        SS: SnapshotStore<P::InstanceId, Position = S::Position>,
849    {
850        tracing::debug!("loading projection with snapshot");
851
852        let snapshot_result = self
853            .snapshots
854            .0
855            .load::<P>(P::KIND, instance_id)
856            .await
857            .inspect_err(|e| {
858                tracing::error!(error = %e, "failed to load projection snapshot");
859            })
860            .ok()
861            .flatten();
862
863        let (mut projection, snapshot_position) = if let Some(snapshot) = snapshot_result {
864            (snapshot.data, Some(snapshot.position))
865        } else {
866            (P::init(instance_id), None)
867        };
868
869        let filters = P::filters::<S>(instance_id);
870        let (event_filters, handlers) = filters.into_event_filters(snapshot_position.as_ref());
871
872        let events = self
873            .store
874            .load_events(&event_filters)
875            .await
876            .map_err(ProjectionError::Store)?;
877
878        let event_count = events.len();
879        let last_position =
880            replay_projection_events(&mut projection, &events, &handlers, &self.store)?;
881
882        if event_count > 0
883            && let Some(position) = last_position
884        {
885            let projection_ref = &projection;
886            let offer = self.snapshots.0.offer_snapshot(
887                P::KIND,
888                instance_id,
889                event_count as u64,
890                move || -> Result<Snapshot<S::Position, &P>, std::convert::Infallible> {
891                    Ok(Snapshot {
892                        position,
893                        data: projection_ref,
894                    })
895                },
896            );
897
898            if let Err(e) = offer.await {
899                tracing::error!(error = %e, "failed to store projection snapshot");
900            }
901        }
902
903        tracing::info!(events_applied = event_count, "projection loaded");
904        Ok(projection)
905    }
906}
907
908impl<S, SS, C> Repository<S, C, Snapshots<SS>>
909where
910    S: EventStore,
911    SS: SnapshotStore<S::Id, Position = S::Position>,
912    C: ConcurrencyStrategy,
913{
914    #[must_use]
915    pub const fn snapshot_store(&self) -> &SS {
916        &self.snapshots.0
917    }
918}
919
920impl<S, C, M> Repository<S, C, M>
921where
922    S: EventStore,
923    C: ConcurrencyStrategy,
924{
925    /// Start a continuous subscription for a projection.
926    ///
927    /// Returns a [`SubscriptionBuilder`] that can be configured with callbacks
928    /// before starting. The subscription replays historical events first
929    /// (catch-up phase), then processes live events as they are committed.
930    ///
931    /// Subscription snapshots are disabled. Use [`subscribe_with_snapshots()`]
932    /// to provide a snapshot store for the subscription.
933    ///
934    /// [`subscribe_with_snapshots()`]: Self::subscribe_with_snapshots
935    ///
936    /// # Example
937    ///
938    /// ```ignore
939    /// let subscription = repo
940    ///     .subscribe::<Dashboard>(())
941    ///     .on_update(|d| println!("{d:?}"))
942    ///     .start()
943    ///     .await?;
944    /// ```
945    pub fn subscribe<P>(
946        &self,
947        instance_id: P::InstanceId,
948    ) -> SubscriptionBuilder<S, P, crate::snapshot::NoSnapshots<S::Position>>
949    where
950        S: SubscribableStore + Clone + 'static,
951        S::Position: Ord,
952        P: Projection
953            + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
954            + Serialize
955            + DeserializeOwned
956            + Send
957            + Sync
958            + 'static,
959        P::InstanceId: Clone + Send + Sync + 'static,
960        P::Metadata: Send,
961    {
962        SubscriptionBuilder::new(
963            self.store.clone(),
964            crate::snapshot::NoSnapshots::new(),
965            instance_id,
966        )
967    }
968
969    /// Start a continuous subscription with an explicit snapshot store.
970    ///
971    /// The snapshot store is keyed by `P::InstanceId` and tracks the
972    /// subscription's position for faster restart.
973    pub fn subscribe_with_snapshots<P, SS>(
974        &self,
975        instance_id: P::InstanceId,
976        snapshots: SS,
977    ) -> SubscriptionBuilder<S, P, SS>
978    where
979        S: SubscribableStore + Clone + 'static,
980        S::Position: Ord,
981        P: Projection
982            + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
983            + Serialize
984            + DeserializeOwned
985            + Send
986            + Sync
987            + 'static,
988        P::InstanceId: Clone + Send + Sync + 'static,
989        P::Metadata: Send,
990        SS: SnapshotStore<P::InstanceId, Position = S::Position> + Send + Sync + 'static,
991    {
992        SubscriptionBuilder::new(self.store.clone(), snapshots, instance_id)
993    }
994}
995
996impl<S, M> Repository<S, Optimistic, M>
997where
998    S: EventStore,
999{
1000    /// Execute a command with automatic retry on concurrency conflicts.
1001    ///
1002    /// # Errors
1003    ///
1004    /// Returns the last error if all retries are exhausted, or a
1005    /// non-concurrency error immediately.
1006    pub async fn execute_with_retry<A, Cmd>(
1007        &self,
1008        id: &S::Id,
1009        command: &Cmd,
1010        metadata: &S::Metadata,
1011        max_retries: usize,
1012    ) -> Result<
1013        usize,
1014        CommandError<A::Error, ConcurrencyConflict<S::Position>, S::Error, M::SnapshotError>,
1015    >
1016    where
1017        A: Aggregate<Id = S::Id> + Handle<Cmd>,
1018        A::Event: ProjectionEvent + EventKind + serde::Serialize + Send + Sync,
1019        Cmd: Sync,
1020        S::Metadata: Clone,
1021        M: SnapshotPolicy<S, A> + Sync,
1022    {
1023        for attempt in 1..=max_retries {
1024            match self.execute_command::<A, Cmd>(id, command, metadata).await {
1025                Ok(()) => return Ok(attempt),
1026                Err(CommandError::Concurrency(_)) => {}
1027                Err(e) => return Err(e),
1028            }
1029        }
1030
1031        self.execute_command::<A, Cmd>(id, command, metadata)
1032            .await
1033            .map(|()| max_retries + 1)
1034    }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039    use std::{error::Error, io};
1040
1041    use super::*;
1042
1043    #[test]
1044    fn command_error_display_mentions_aggregate() {
1045        let error: CommandError<String, Infallible, io::Error, Infallible> =
1046            CommandError::Aggregate("invalid state".to_string());
1047        let msg = error.to_string();
1048        assert!(msg.contains("aggregate rejected command"));
1049        assert!(error.source().is_none());
1050    }
1051
1052    #[test]
1053    fn command_error_store_has_source() {
1054        let error: CommandError<String, Infallible, io::Error, Infallible> =
1055            CommandError::Store(io::Error::other("store error"));
1056        assert!(error.source().is_some());
1057    }
1058
1059    #[test]
1060    fn optimistic_command_error_concurrency_mentions_conflict() {
1061        let conflict = ConcurrencyConflict {
1062            expected: Some(1u64),
1063            actual: Some(2u64),
1064        };
1065        let error: CommandError<String, ConcurrencyConflict<u64>, io::Error, Infallible> =
1066            CommandError::Concurrency(conflict);
1067        let msg = error.to_string();
1068        assert!(msg.contains("concurrency conflict"));
1069        assert!(error.source().is_none());
1070    }
1071}