sourcery_core/
repository.rs

1//! Application service orchestration.
2//!
3//! `Repository` coordinates loading aggregates, invoking command handlers, and
4//! appending resulting events to the store.
5//!
6//! Snapshot support is opt-in via `Repository<_, _, Snapshots<_>>`. This keeps
7//! the default repository lightweight: no snapshot load/serialize work and no
8//! serde bounds on aggregate state unless snapshots are enabled.
9
10use std::marker::PhantomData;
11
12use serde::{Serialize, de::DeserializeOwned};
13use thiserror::Error;
14
15use crate::{
16    aggregate::{Aggregate, AggregateBuilder, Handle},
17    codec::{Codec, ProjectionEvent, SerializableEvent},
18    concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
19    projection::{Projection, ProjectionBuilder, ProjectionError},
20    snapshot::{OfferSnapshotError, Snapshot, SnapshotOffer, SnapshotStore},
21    store::{AppendError, EventFilter, EventStore, StoredEvent},
22};
23
24type LoadError<S> =
25    ProjectionError<<S as EventStore>::Error, <<S as EventStore>::Codec as Codec>::Error>;
26
27/// Error type for unchecked command execution (no concurrency variant).
28#[derive(Debug, Error)]
29pub enum CommandError<AggregateError, StoreError, CodecError>
30where
31    StoreError: std::error::Error + 'static,
32    CodecError: std::error::Error + 'static,
33{
34    #[error("aggregate rejected command: {0}")]
35    Aggregate(AggregateError),
36    #[error("failed to rebuild aggregate state: {0}")]
37    Projection(#[source] ProjectionError<StoreError, CodecError>),
38    #[error("failed to encode events: {0}")]
39    Codec(#[source] CodecError),
40    #[error("failed to persist events: {0}")]
41    Store(#[source] StoreError),
42}
43
44/// Error type for snapshot-enabled unchecked command execution.
45#[derive(Debug, Error)]
46pub enum SnapshotCommandError<AggregateError, StoreError, CodecError, SnapshotError>
47where
48    StoreError: std::error::Error + 'static,
49    CodecError: std::error::Error + 'static,
50    SnapshotError: std::error::Error + 'static,
51{
52    #[error("aggregate rejected command: {0}")]
53    Aggregate(AggregateError),
54    #[error("failed to rebuild aggregate state: {0}")]
55    Projection(#[source] ProjectionError<StoreError, CodecError>),
56    #[error("failed to encode events: {0}")]
57    Codec(#[source] CodecError),
58    #[error("failed to persist events: {0}")]
59    Store(#[source] StoreError),
60    #[error("snapshot operation failed: {0}")]
61    Snapshot(#[source] SnapshotError),
62}
63
64/// Error type for optimistic command execution (includes concurrency).
65#[derive(Debug, Error)]
66pub enum OptimisticCommandError<AggregateError, Position, StoreError, CodecError>
67where
68    Position: std::fmt::Debug,
69    StoreError: std::error::Error + 'static,
70    CodecError: std::error::Error + 'static,
71{
72    #[error("aggregate rejected command: {0}")]
73    Aggregate(AggregateError),
74    #[error(transparent)]
75    Concurrency(ConcurrencyConflict<Position>),
76    #[error("failed to rebuild aggregate state: {0}")]
77    Projection(#[source] ProjectionError<StoreError, CodecError>),
78    #[error("failed to encode events: {0}")]
79    Codec(#[source] CodecError),
80    #[error("failed to persist events: {0}")]
81    Store(#[source] StoreError),
82}
83
84/// Error type for snapshot-enabled optimistic command execution (includes
85/// concurrency).
86#[derive(Debug, Error)]
87pub enum OptimisticSnapshotCommandError<
88    AggregateError,
89    Position,
90    StoreError,
91    CodecError,
92    SnapshotError,
93> where
94    Position: std::fmt::Debug,
95    StoreError: std::error::Error + 'static,
96    CodecError: std::error::Error + 'static,
97    SnapshotError: std::error::Error + 'static,
98{
99    #[error("aggregate rejected command: {0}")]
100    Aggregate(AggregateError),
101    #[error(transparent)]
102    Concurrency(ConcurrencyConflict<Position>),
103    #[error("failed to rebuild aggregate state: {0}")]
104    Projection(#[source] ProjectionError<StoreError, CodecError>),
105    #[error("failed to encode events: {0}")]
106    Codec(#[source] CodecError),
107    #[error("failed to persist events: {0}")]
108    Store(#[source] StoreError),
109    #[error("snapshot operation failed: {0}")]
110    Snapshot(#[source] SnapshotError),
111}
112
113/// Result type alias for unchecked command execution.
114pub type UncheckedCommandResult<A, S> = Result<
115    (),
116    CommandError<
117        <A as Aggregate>::Error,
118        <S as EventStore>::Error,
119        <<S as EventStore>::Codec as Codec>::Error,
120    >,
121>;
122
123/// Result type alias for snapshot-enabled unchecked command execution.
124pub type UncheckedSnapshotCommandResult<A, S, SS> = Result<
125    (),
126    SnapshotCommandError<
127        <A as Aggregate>::Error,
128        <S as EventStore>::Error,
129        <<S as EventStore>::Codec as Codec>::Error,
130        <SS as SnapshotStore>::Error,
131    >,
132>;
133
134/// Result type alias for optimistic command execution.
135pub type OptimisticCommandResult<A, S> = Result<
136    (),
137    OptimisticCommandError<
138        <A as Aggregate>::Error,
139        <S as EventStore>::Position,
140        <S as EventStore>::Error,
141        <<S as EventStore>::Codec as Codec>::Error,
142    >,
143>;
144
145/// Result type alias for snapshot-enabled optimistic command execution.
146pub type OptimisticSnapshotCommandResult<A, S, SS> = Result<
147    (),
148    OptimisticSnapshotCommandError<
149        <A as Aggregate>::Error,
150        <S as EventStore>::Position,
151        <S as EventStore>::Error,
152        <<S as EventStore>::Codec as Codec>::Error,
153        <SS as SnapshotStore>::Error,
154    >,
155>;
156
157/// Result type alias for retry operations (optimistic, no snapshots).
158pub type RetryResult<A, S> = Result<
159    usize,
160    OptimisticCommandError<
161        <A as Aggregate>::Error,
162        <S as EventStore>::Position,
163        <S as EventStore>::Error,
164        <<S as EventStore>::Codec as Codec>::Error,
165    >,
166>;
167
168/// Result type alias for retry operations (optimistic, snapshots enabled).
169pub type SnapshotRetryResult<A, S, SS> = Result<
170    usize,
171    OptimisticSnapshotCommandError<
172        <A as Aggregate>::Error,
173        <S as EventStore>::Position,
174        <S as EventStore>::Error,
175        <<S as EventStore>::Codec as Codec>::Error,
176        <SS as SnapshotStore>::Error,
177    >,
178>;
179
180struct LoadedAggregate<A, Pos> {
181    aggregate: A,
182    version: Option<Pos>,
183    events_since_snapshot: u64,
184}
185
186fn aggregate_event_filters<S, E>(
187    aggregate_kind: &str,
188    aggregate_id: &S::Id,
189    after: Option<S::Position>,
190) -> Vec<EventFilter<S::Id, S::Position>>
191where
192    S: EventStore,
193    E: ProjectionEvent,
194{
195    E::EVENT_KINDS
196        .iter()
197        .map(|kind| {
198            let mut filter =
199                EventFilter::for_aggregate(*kind, aggregate_kind, aggregate_id.clone());
200            if let Some(position) = after {
201                filter = filter.after(position);
202            }
203            filter
204        })
205        .collect()
206}
207
208fn apply_stored_events<A, S>(
209    aggregate: &mut A,
210    codec: &S::Codec,
211    events: &[StoredEvent<S::Id, S::Position, S::Metadata>],
212) -> Result<Option<S::Position>, crate::codec::EventDecodeError<<S::Codec as Codec>::Error>>
213where
214    S: EventStore,
215    A: Aggregate<Id = S::Id>,
216    A::Event: ProjectionEvent,
217{
218    let mut last_event_position: Option<S::Position> = None;
219
220    for stored in events {
221        let event = A::Event::from_stored(&stored.kind, &stored.data, codec)?;
222        aggregate.apply(&event);
223        last_event_position = Some(stored.position);
224    }
225
226    Ok(last_event_position)
227}
228
229/// Snapshot-enabled repository mode wrapper.
230pub struct Snapshots<SS>(pub SS);
231
232/// Repository.
233pub struct Repository<
234    S,
235    C = Optimistic,
236    M = crate::snapshot::NoSnapshots<<S as EventStore>::Id, <S as EventStore>::Position>,
237> where
238    S: EventStore,
239    C: ConcurrencyStrategy,
240{
241    pub(crate) store: S,
242    snapshots: M,
243    _concurrency: PhantomData<C>,
244}
245
246impl<S> Repository<S>
247where
248    S: EventStore,
249{
250    #[must_use]
251    pub const fn new(store: S) -> Self {
252        Self {
253            store,
254            snapshots: crate::snapshot::NoSnapshots::new(),
255            _concurrency: PhantomData,
256        }
257    }
258}
259
260impl<S, M> Repository<S, Optimistic, M>
261where
262    S: EventStore,
263{
264    /// Disable optimistic concurrency checking for this repository.
265    #[must_use]
266    pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M> {
267        Repository {
268            store: self.store,
269            snapshots: self.snapshots,
270            _concurrency: PhantomData,
271        }
272    }
273}
274
275impl<S, C, M> Repository<S, C, M>
276where
277    S: EventStore,
278    C: ConcurrencyStrategy,
279{
280    #[must_use]
281    pub const fn event_store(&self) -> &S {
282        &self.store
283    }
284
285    pub fn build_projection<P>(&self) -> ProjectionBuilder<'_, S, P>
286    where
287        P: Projection<Id = S::Id>,
288    {
289        ProjectionBuilder::new(&self.store)
290    }
291
292    #[must_use]
293    pub fn with_snapshots<SS>(self, snapshots: SS) -> Repository<S, C, Snapshots<SS>>
294    where
295        SS: SnapshotStore<Id = S::Id, Position = S::Position>,
296    {
297        Repository {
298            store: self.store,
299            snapshots: Snapshots(snapshots),
300            _concurrency: PhantomData,
301        }
302    }
303}
304
305impl<S, C> Repository<S, C, crate::snapshot::NoSnapshots<S::Id, S::Position>>
306where
307    S: EventStore,
308    C: ConcurrencyStrategy,
309{
310    pub const fn aggregate_builder<A>(&self) -> AggregateBuilder<'_, Self, A>
311    where
312        A: Aggregate<Id = S::Id>,
313    {
314        AggregateBuilder::new(self)
315    }
316
317    /// Load an aggregate by replaying all events (no snapshots).
318    ///
319    /// # Errors
320    ///
321    /// Returns [`ProjectionError`] if the store fails to load events or if an
322    /// event cannot be decoded into the aggregate's event sum type.
323    pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
324    where
325        A: Aggregate<Id = S::Id>,
326        A::Event: ProjectionEvent,
327    {
328        Ok(self.load_aggregate::<A>(id).await?.aggregate)
329    }
330
331    async fn load_aggregate<A>(
332        &self,
333        id: &S::Id,
334    ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
335    where
336        A: Aggregate<Id = S::Id>,
337        A::Event: ProjectionEvent,
338    {
339        let filters = aggregate_event_filters::<S, A::Event>(A::KIND, id, None);
340
341        let events = self
342            .store
343            .load_events(&filters)
344            .await
345            .map_err(ProjectionError::Store)?;
346
347        let codec = self.store.codec();
348        let mut aggregate = A::default();
349        let version = apply_stored_events::<A, S>(&mut aggregate, codec, &events)
350            .map_err(ProjectionError::EventDecode)?;
351
352        Ok(LoadedAggregate {
353            aggregate,
354            version,
355            events_since_snapshot: events.len() as u64,
356        })
357    }
358}
359
360impl<S> Repository<S, Unchecked>
361where
362    S: EventStore,
363{
364    /// Execute a command with last-writer-wins semantics (no concurrency
365    /// checking).
366    ///
367    /// # Errors
368    ///
369    /// Returns [`CommandError`] when the aggregate rejects the command, events
370    /// cannot be encoded, the store fails to persist, or the aggregate
371    /// cannot be rebuilt.
372    pub async fn execute_command<A, Cmd>(
373        &mut self,
374        id: &S::Id,
375        command: &Cmd,
376        metadata: &S::Metadata,
377    ) -> UncheckedCommandResult<A, S>
378    where
379        A: Aggregate<Id = S::Id> + Handle<Cmd>,
380        A::Event: ProjectionEvent + SerializableEvent,
381        Cmd: Sync,
382        S::Metadata: Clone,
383    {
384        let LoadedAggregate { aggregate, .. } = self
385            .load_aggregate::<A>(id)
386            .await
387            .map_err(CommandError::Projection)?;
388
389        let new_events =
390            Handle::<Cmd>::handle(&aggregate, command).map_err(CommandError::Aggregate)?;
391
392        if new_events.is_empty() {
393            return Ok(());
394        }
395
396        drop(aggregate);
397
398        let mut tx = self.store.begin::<Unchecked>(A::KIND, id.clone(), None);
399        for event in new_events {
400            tx.append(event, metadata.clone())
401                .map_err(CommandError::Codec)?;
402        }
403        tx.commit().await.map_err(CommandError::Store)?;
404        Ok(())
405    }
406}
407
408impl<S> Repository<S, Optimistic>
409where
410    S: EventStore,
411{
412    /// Execute a command using optimistic concurrency control.
413    ///
414    /// # Errors
415    ///
416    /// Returns [`OptimisticCommandError::Concurrency`] if the stream version
417    /// changed between loading and committing. Other variants cover
418    /// aggregate validation, encoding, persistence, and projection rebuild
419    /// errors.
420    pub async fn execute_command<A, Cmd>(
421        &mut self,
422        id: &S::Id,
423        command: &Cmd,
424        metadata: &S::Metadata,
425    ) -> OptimisticCommandResult<A, S>
426    where
427        A: Aggregate<Id = S::Id> + Handle<Cmd>,
428        A::Event: ProjectionEvent + SerializableEvent,
429        Cmd: Sync,
430        S::Metadata: Clone,
431    {
432        let LoadedAggregate {
433            aggregate, version, ..
434        } = self
435            .load_aggregate::<A>(id)
436            .await
437            .map_err(OptimisticCommandError::Projection)?;
438
439        let new_events = Handle::<Cmd>::handle(&aggregate, command)
440            .map_err(OptimisticCommandError::Aggregate)?;
441
442        if new_events.is_empty() {
443            return Ok(());
444        }
445
446        drop(aggregate);
447
448        let mut tx = self.store.begin::<Optimistic>(A::KIND, id.clone(), version);
449        for event in new_events {
450            tx.append(event, metadata.clone())
451                .map_err(OptimisticCommandError::Codec)?;
452        }
453
454        if let Err(e) = tx.commit().await {
455            match e {
456                AppendError::Conflict(c) => return Err(OptimisticCommandError::Concurrency(c)),
457                AppendError::Store(s) => return Err(OptimisticCommandError::Store(s)),
458            }
459        }
460
461        Ok(())
462    }
463
464    /// Execute a command with automatic retry on concurrency conflicts.
465    ///
466    /// # Errors
467    ///
468    /// Returns the last error if all retries are exhausted, or a
469    /// non-concurrency error immediately.
470    pub async fn execute_with_retry<A, Cmd>(
471        &mut self,
472        id: &S::Id,
473        command: &Cmd,
474        metadata: &S::Metadata,
475        max_retries: usize,
476    ) -> RetryResult<A, S>
477    where
478        A: Aggregate<Id = S::Id> + Handle<Cmd>,
479        A::Event: ProjectionEvent + SerializableEvent,
480        Cmd: Sync,
481        S::Metadata: Clone,
482    {
483        for attempt in 1..=max_retries {
484            match self.execute_command::<A, Cmd>(id, command, metadata).await {
485                Ok(()) => return Ok(attempt),
486                Err(OptimisticCommandError::Concurrency(_)) => {}
487                Err(e) => return Err(e),
488            }
489        }
490
491        self.execute_command::<A, Cmd>(id, command, metadata)
492            .await
493            .map(|()| max_retries + 1)
494    }
495}
496
497impl<S, SS, C> Repository<S, C, Snapshots<SS>>
498where
499    S: EventStore,
500    SS: SnapshotStore<Id = S::Id, Position = S::Position>,
501    C: ConcurrencyStrategy,
502{
503    #[must_use]
504    pub const fn snapshot_store(&self) -> &SS {
505        &self.snapshots.0
506    }
507
508    pub const fn aggregate_builder<A>(&self) -> AggregateBuilder<'_, Self, A>
509    where
510        A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
511    {
512        AggregateBuilder::new(self)
513    }
514
515    /// Load an aggregate using snapshots when available.
516    ///
517    /// # Errors
518    ///
519    /// Returns [`ProjectionError`] if the store fails to load events, if an
520    /// event cannot be decoded, or if a stored snapshot cannot be
521    /// deserialized (which indicates snapshot corruption).
522    pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
523    where
524        A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
525        A::Event: ProjectionEvent,
526    {
527        Ok(self.load_aggregate::<A>(id).await?.aggregate)
528    }
529
530    async fn load_aggregate<A>(
531        &self,
532        id: &S::Id,
533    ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
534    where
535        A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
536        A::Event: ProjectionEvent,
537    {
538        let codec = self.store.codec();
539
540        let snapshot_result = self
541            .snapshots
542            .0
543            .load(A::KIND, id)
544            .await
545            .inspect_err(|e| {
546                tracing::error!(
547                    error = %e,
548                    "failed to load snapshot, falling back to full replay"
549                );
550            })
551            .ok()
552            .flatten();
553
554        let (mut aggregate, snapshot_position) = if let Some(snapshot) = snapshot_result {
555            let restored: A = codec
556                .deserialize(&snapshot.data)
557                .map_err(ProjectionError::SnapshotDeserialize)?;
558            (restored, Some(snapshot.position))
559        } else {
560            (A::default(), None)
561        };
562
563        let filters = aggregate_event_filters::<S, A::Event>(A::KIND, id, snapshot_position);
564
565        let events = self
566            .store
567            .load_events(&filters)
568            .await
569            .map_err(ProjectionError::Store)?;
570
571        let last_event_position = apply_stored_events::<A, S>(&mut aggregate, codec, &events)
572            .map_err(ProjectionError::EventDecode)?;
573
574        let version = last_event_position.or(snapshot_position);
575
576        Ok(LoadedAggregate {
577            aggregate,
578            version,
579            events_since_snapshot: events.len() as u64,
580        })
581    }
582}
583
584impl<S, SS> Repository<S, Unchecked, Snapshots<SS>>
585where
586    S: EventStore,
587    SS: SnapshotStore<Id = S::Id, Position = S::Position>,
588{
589    /// Execute a command with last-writer-wins semantics and optional
590    /// snapshotting.
591    ///
592    /// # Errors
593    ///
594    /// Returns [`SnapshotCommandError`] when the aggregate rejects the command,
595    /// events cannot be encoded, the store fails to persist, snapshot
596    /// persistence fails, or the aggregate cannot be rebuilt.
597    ///
598    /// # Panics
599    ///
600    /// Panics if the store reports `None` from `stream_version` after a
601    /// successful append. This indicates a bug in the event store
602    /// implementation.
603    pub async fn execute_command<A, Cmd>(
604        &mut self,
605        id: &S::Id,
606        command: &Cmd,
607        metadata: &S::Metadata,
608    ) -> UncheckedSnapshotCommandResult<A, S, SS>
609    where
610        A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
611        A::Event: ProjectionEvent + SerializableEvent,
612        Cmd: Sync,
613        S::Metadata: Clone,
614    {
615        let LoadedAggregate {
616            aggregate,
617            events_since_snapshot,
618            ..
619        } = self
620            .load_aggregate::<A>(id)
621            .await
622            .map_err(SnapshotCommandError::Projection)?;
623
624        let new_events =
625            Handle::<Cmd>::handle(&aggregate, command).map_err(SnapshotCommandError::Aggregate)?;
626
627        if new_events.is_empty() {
628            return Ok(());
629        }
630
631        let total_events_since_snapshot = events_since_snapshot + new_events.len() as u64;
632
633        let mut aggregate = aggregate;
634        for event in &new_events {
635            aggregate.apply(event);
636        }
637
638        let mut tx = self.store.begin::<Unchecked>(A::KIND, id.clone(), None);
639        for event in new_events {
640            tx.append(event, metadata.clone())
641                .map_err(SnapshotCommandError::Codec)?;
642        }
643        tx.commit().await.map_err(SnapshotCommandError::Store)?;
644
645        let new_position = self
646            .store
647            .stream_version(A::KIND, id)
648            .await
649            .map_err(SnapshotCommandError::Store)?
650            .expect("stream should have events after append");
651
652        let codec = self.store.codec().clone();
653        let offer_result =
654            self.snapshots
655                .0
656                .offer_snapshot(A::KIND, id, total_events_since_snapshot, move || {
657                    Ok(Snapshot {
658                        position: new_position,
659                        data: codec.serialize(&aggregate)?,
660                    })
661                });
662
663        match offer_result.await {
664            Ok(SnapshotOffer::Declined | SnapshotOffer::Stored) => {}
665            Err(OfferSnapshotError::Create(e)) => return Err(SnapshotCommandError::Codec(e)),
666            Err(OfferSnapshotError::Snapshot(e)) => return Err(SnapshotCommandError::Snapshot(e)),
667        }
668
669        Ok(())
670    }
671}
672
673impl<S, SS> Repository<S, Optimistic, Snapshots<SS>>
674where
675    S: EventStore,
676    SS: SnapshotStore<Id = S::Id, Position = S::Position>,
677{
678    /// Execute a command using optimistic concurrency control and optional
679    /// snapshotting.
680    ///
681    /// # Errors
682    ///
683    /// Returns [`OptimisticSnapshotCommandError::Concurrency`] if the stream
684    /// version changed between loading and committing. Other variants cover
685    /// aggregate validation, encoding, persistence, snapshot persistence,
686    /// and projection rebuild errors.
687    ///
688    /// # Panics
689    ///
690    /// Panics if the store reports `None` from `stream_version` after a
691    /// successful append. This indicates a bug in the event store
692    /// implementation.
693    pub async fn execute_command<A, Cmd>(
694        &mut self,
695        id: &S::Id,
696        command: &Cmd,
697        metadata: &S::Metadata,
698    ) -> OptimisticSnapshotCommandResult<A, S, SS>
699    where
700        A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
701        A::Event: ProjectionEvent + SerializableEvent,
702        Cmd: Sync,
703        S::Metadata: Clone,
704    {
705        let LoadedAggregate {
706            aggregate,
707            version,
708            events_since_snapshot,
709        } = self
710            .load_aggregate::<A>(id)
711            .await
712            .map_err(OptimisticSnapshotCommandError::Projection)?;
713
714        let new_events = Handle::<Cmd>::handle(&aggregate, command)
715            .map_err(OptimisticSnapshotCommandError::Aggregate)?;
716
717        if new_events.is_empty() {
718            return Ok(());
719        }
720
721        let total_events_since_snapshot = events_since_snapshot + new_events.len() as u64;
722
723        let mut aggregate = aggregate;
724        for event in &new_events {
725            aggregate.apply(event);
726        }
727
728        let mut tx = self.store.begin::<Optimistic>(A::KIND, id.clone(), version);
729        for event in new_events {
730            tx.append(event, metadata.clone())
731                .map_err(OptimisticSnapshotCommandError::Codec)?;
732        }
733
734        if let Err(e) = tx.commit().await {
735            match e {
736                AppendError::Conflict(c) => {
737                    return Err(OptimisticSnapshotCommandError::Concurrency(c));
738                }
739                AppendError::Store(s) => return Err(OptimisticSnapshotCommandError::Store(s)),
740            }
741        }
742
743        let new_position = self
744            .store
745            .stream_version(A::KIND, id)
746            .await
747            .map_err(OptimisticSnapshotCommandError::Store)?
748            .expect("stream should have events after append");
749
750        let codec = self.store.codec().clone();
751        let offer_result =
752            self.snapshots
753                .0
754                .offer_snapshot(A::KIND, id, total_events_since_snapshot, move || {
755                    Ok(Snapshot {
756                        position: new_position,
757                        data: codec.serialize(&aggregate)?,
758                    })
759                });
760
761        match offer_result.await {
762            Ok(SnapshotOffer::Declined | SnapshotOffer::Stored) => {}
763            Err(OfferSnapshotError::Create(e)) => {
764                return Err(OptimisticSnapshotCommandError::Codec(e));
765            }
766            Err(OfferSnapshotError::Snapshot(e)) => {
767                return Err(OptimisticSnapshotCommandError::Snapshot(e));
768            }
769        }
770
771        Ok(())
772    }
773
774    /// Execute a command with automatic retry on concurrency conflicts.
775    ///
776    /// # Errors
777    ///
778    /// Returns the last error if all retries are exhausted, or a
779    /// non-concurrency error immediately.
780    pub async fn execute_with_retry<A, Cmd>(
781        &mut self,
782        id: &S::Id,
783        command: &Cmd,
784        metadata: &S::Metadata,
785        max_retries: usize,
786    ) -> SnapshotRetryResult<A, S, SS>
787    where
788        A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
789        A::Event: ProjectionEvent + SerializableEvent,
790        Cmd: Sync,
791        S::Metadata: Clone,
792    {
793        for attempt in 1..=max_retries {
794            match self.execute_command::<A, Cmd>(id, command, metadata).await {
795                Ok(()) => return Ok(attempt),
796                Err(OptimisticSnapshotCommandError::Concurrency(_)) => {}
797                Err(e) => return Err(e),
798            }
799        }
800
801        self.execute_command::<A, Cmd>(id, command, metadata)
802            .await
803            .map(|()| max_retries + 1)
804    }
805}
806
807#[cfg(test)]
808mod tests {
809    use std::{error::Error, io};
810
811    use super::*;
812
813    #[test]
814    fn command_error_display_mentions_aggregate() {
815        let error: CommandError<String, io::Error, io::Error> =
816            CommandError::Aggregate("invalid state".to_string());
817        let msg = error.to_string();
818        assert!(msg.contains("aggregate rejected command"));
819        assert!(error.source().is_none());
820    }
821
822    #[test]
823    fn command_error_store_has_source() {
824        let error: CommandError<String, io::Error, io::Error> =
825            CommandError::Store(io::Error::other("store error"));
826        assert!(error.source().is_some());
827    }
828
829    #[test]
830    fn optimistic_command_error_concurrency_mentions_conflict() {
831        let conflict = ConcurrencyConflict {
832            expected: Some(1u64),
833            actual: Some(2u64),
834        };
835        let error: OptimisticCommandError<String, u64, io::Error, io::Error> =
836            OptimisticCommandError::Concurrency(conflict);
837        let msg = error.to_string();
838        assert!(msg.contains("concurrency conflict"));
839        assert!(error.source().is_none());
840    }
841}