1use std::{collections::HashMap, convert::Infallible, marker::PhantomData};
29
30use nonempty::NonEmpty;
31use serde::{Serialize, de::DeserializeOwned};
32use thiserror::Error;
33
34use crate::{
35 aggregate::{Aggregate, Handle},
36 concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
37 event::{EventKind, ProjectionEvent},
38 projection::{HandlerError, Projection, ProjectionError, ProjectionFilters},
39 snapshot::{OfferSnapshotError, Snapshot, SnapshotOffer, SnapshotStore},
40 store::{
41 CommitError, EventFilter, EventStore, GloballyOrderedStore, OptimisticCommitError,
42 StoredEvents,
43 },
44 subscription::{SubscribableStore, SubscriptionBuilder},
45};
46
47type LoadError<S> = ProjectionError<<S as EventStore>::Error>;
48
49#[derive(Debug, Error)]
51pub enum CommandError<AggregateError, ConcurrencyError, StoreError, SnapshotError>
52where
53 ConcurrencyError: std::error::Error + 'static,
54 StoreError: std::error::Error + 'static,
55 SnapshotError: std::error::Error + 'static,
56{
57 #[error("aggregate rejected command: {0}")]
58 Aggregate(AggregateError),
59 #[error(transparent)]
60 Concurrency(ConcurrencyError),
61 #[error("failed to rebuild aggregate state: {0}")]
62 Projection(#[source] ProjectionError<StoreError>),
63 #[error("failed to persist events: {0}")]
64 Store(#[source] StoreError),
65 #[error("snapshot operation failed: {0}")]
66 Snapshot(#[source] SnapshotError),
67}
68
69pub type UncheckedCommandResult<A, S> = Result<
71 (),
72 CommandError<<A as Aggregate>::Error, Infallible, <S as EventStore>::Error, Infallible>,
73>;
74
75pub type UncheckedSnapshotCommandResult<A, S, SS> = Result<
77 (),
78 CommandError<
79 <A as Aggregate>::Error,
80 Infallible,
81 <S as EventStore>::Error,
82 <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
83 >,
84>;
85
86pub type OptimisticCommandResult<A, S> = Result<
88 (),
89 CommandError<
90 <A as Aggregate>::Error,
91 ConcurrencyConflict<<S as EventStore>::Position>,
92 <S as EventStore>::Error,
93 Infallible,
94 >,
95>;
96
97pub type OptimisticSnapshotCommandResult<A, S, SS> = Result<
99 (),
100 CommandError<
101 <A as Aggregate>::Error,
102 ConcurrencyConflict<<S as EventStore>::Position>,
103 <S as EventStore>::Error,
104 <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
105 >,
106>;
107
108pub type RetryResult<A, S> = Result<
110 usize,
111 CommandError<
112 <A as Aggregate>::Error,
113 ConcurrencyConflict<<S as EventStore>::Position>,
114 <S as EventStore>::Error,
115 Infallible,
116 >,
117>;
118
119pub type SnapshotRetryResult<A, S, SS> = Result<
121 usize,
122 CommandError<
123 <A as Aggregate>::Error,
124 ConcurrencyConflict<<S as EventStore>::Position>,
125 <S as EventStore>::Error,
126 <SS as SnapshotStore<<S as EventStore>::Id>>::Error,
127 >,
128>;
129
130#[doc(hidden)]
131pub enum CommitPolicyError<C, S> {
132 Concurrency(C),
133 Store(S),
134}
135
136#[doc(hidden)]
137pub trait CommitPolicy<S: EventStore>: private::Sealed {
138 type ConcurrencyError: std::error::Error + 'static;
139
140 fn commit<E>(
141 store: &S,
142 kind: &str,
143 id: &S::Id,
144 expected: Option<S::Position>,
145 events: NonEmpty<E>,
146 metadata: &S::Metadata,
147 ) -> impl std::future::Future<
148 Output = Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>,
149 > + Send
150 where
151 E: EventKind + Serialize + Send + Sync,
152 S::Metadata: Clone;
153}
154
155#[doc(hidden)]
156pub trait SnapshotPolicy<S: EventStore, A: Aggregate<Id = S::Id>>: private::Sealed {
157 type SnapshotError: std::error::Error + 'static;
158 type Prepared;
159
160 fn load_base(
161 &self,
162 kind: &str,
163 id: &S::Id,
164 ) -> impl std::future::Future<Output = (A, Option<S::Position>)> + Send;
165
166 fn prepare_snapshot(&self, aggregate: A, events: &NonEmpty<A::Event>) -> Self::Prepared;
167
168 fn offer_snapshot(
169 &self,
170 kind: &str,
171 id: &S::Id,
172 events_since_snapshot: u64,
173 new_position: S::Position,
174 prepared: Self::Prepared,
175 ) -> impl std::future::Future<Output = Result<(), Self::SnapshotError>> + Send;
176}
177
178struct LoadedAggregate<A, Pos> {
179 aggregate: A,
180 version: Option<Pos>,
181 events_since_snapshot: u64,
182}
183
184fn aggregate_event_filters<S, E>(
185 aggregate_kind: &str,
186 aggregate_id: &S::Id,
187 after: Option<&S::Position>,
188) -> Vec<EventFilter<S::Id, S::Position>>
189where
190 S: EventStore,
191 E: ProjectionEvent,
192{
193 E::EVENT_KINDS
194 .iter()
195 .map(|kind| {
196 let mut filter =
197 EventFilter::for_aggregate(*kind, aggregate_kind, aggregate_id.clone());
198 if let Some(position) = after {
199 filter = filter.after(position.clone());
200 }
201 filter
202 })
203 .collect()
204}
205
206fn apply_stored_events<A, S>(
207 aggregate: &mut A,
208 store: &S,
209 events: &StoredEvents<S::Id, S::Position, S::Data, S::Metadata>,
210) -> Result<Option<S::Position>, crate::event::EventDecodeError<S::Error>>
211where
212 S: EventStore,
213 S::Position: Clone,
214 A: Aggregate<Id = S::Id>,
215 A::Event: ProjectionEvent,
216{
217 let mut last_event_position: Option<S::Position> = None;
218
219 for stored in events {
220 let event = A::Event::from_stored(stored, store)?;
221 aggregate.apply(&event);
222 last_event_position = Some(stored.position());
223 }
224
225 Ok(last_event_position)
226}
227
228fn replay_projection_events<P, S>(
229 projection: &mut P,
230 events: &StoredEvents<S::Id, S::Position, S::Data, S::Metadata>,
231 handlers: &HashMap<&'static str, crate::projection::EventHandler<P, S>>,
232 store: &S,
233) -> Result<Option<S::Position>, ProjectionError<S::Error>>
234where
235 S: EventStore,
236 S::Position: Clone,
237 P: ProjectionFilters<Id = S::Id>,
238{
239 let mut last_position = None;
240
241 for stored in events {
242 let aggregate_id = stored.aggregate_id();
243 let kind = stored.kind();
244 let metadata = stored.metadata();
245
246 if let Some(handler) = handlers.get(kind) {
247 (handler)(projection, aggregate_id, stored, metadata, store).map_err(|error| {
248 match error {
249 HandlerError::Store(error) => {
250 ProjectionError::EventDecode(crate::event::EventDecodeError::Store(error))
251 }
252 HandlerError::EventDecode(error) => ProjectionError::EventDecode(error),
253 }
254 })?;
255 }
256 last_position = Some(stored.position());
257 }
258
259 Ok(last_position)
260}
261
262pub struct Snapshots<SS>(
276 #[doc(hidden)]
281 pub SS,
282);
283
284impl<Id, SS> SnapshotStore<Id> for Snapshots<SS>
285where
286 Id: Send + Sync + 'static,
287 SS: SnapshotStore<Id>,
288{
289 type Error = SS::Error;
290 type Position = SS::Position;
291
292 async fn load<T>(
293 &self,
294 kind: &str,
295 id: &Id,
296 ) -> Result<Option<Snapshot<Self::Position, T>>, Self::Error>
297 where
298 T: DeserializeOwned,
299 {
300 self.0.load(kind, id).await
301 }
302
303 async fn offer_snapshot<CE, T, Create>(
304 &self,
305 kind: &str,
306 id: &Id,
307 events_since_last_snapshot: u64,
308 create_snapshot: Create,
309 ) -> Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>
310 where
311 CE: std::error::Error + Send + Sync + 'static,
312 T: Serialize,
313 Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send,
314 {
315 self.0
316 .offer_snapshot(kind, id, events_since_last_snapshot, create_snapshot)
317 .await
318 }
319}
320
321mod private {
322 pub trait Sealed {}
323
324 impl Sealed for crate::concurrency::Unchecked {}
325 impl Sealed for crate::concurrency::Optimistic {}
326 impl<Pos> Sealed for crate::snapshot::NoSnapshots<Pos> {}
327 impl<SS> Sealed for super::Snapshots<SS> {}
328}
329
330impl<S> CommitPolicy<S> for Unchecked
331where
332 S: EventStore,
333{
334 type ConcurrencyError = Infallible;
335
336 async fn commit<E>(
337 store: &S,
338 kind: &str,
339 id: &S::Id,
340 _expected: Option<S::Position>,
341 events: NonEmpty<E>,
342 metadata: &S::Metadata,
343 ) -> Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>
344 where
345 E: EventKind + Serialize + Send + Sync,
346 S::Metadata: Clone,
347 {
348 let committed = store
349 .commit_events(kind, id, events, metadata)
350 .await
351 .map_err(|e| match e {
352 CommitError::Store(err) | CommitError::Serialization { source: err, .. } => {
353 CommitPolicyError::Store(err)
354 }
355 })?;
356
357 Ok(committed.last_position)
358 }
359}
360
361impl<S> CommitPolicy<S> for Optimistic
362where
363 S: EventStore,
364{
365 type ConcurrencyError = ConcurrencyConflict<S::Position>;
366
367 async fn commit<E>(
368 store: &S,
369 kind: &str,
370 id: &S::Id,
371 expected: Option<S::Position>,
372 events: NonEmpty<E>,
373 metadata: &S::Metadata,
374 ) -> Result<S::Position, CommitPolicyError<Self::ConcurrencyError, S::Error>>
375 where
376 E: EventKind + Serialize + Send + Sync,
377 S::Metadata: Clone,
378 {
379 let committed = store
380 .commit_events_optimistic(kind, id, expected, events, metadata)
381 .await
382 .map_err(|e| match e {
383 OptimisticCommitError::Conflict(conflict) => {
384 CommitPolicyError::Concurrency(conflict)
385 }
386 OptimisticCommitError::Store(err)
387 | OptimisticCommitError::Serialization { source: err, .. } => {
388 CommitPolicyError::Store(err)
389 }
390 })?;
391
392 Ok(committed.last_position)
393 }
394}
395
396impl<S, A> SnapshotPolicy<S, A> for crate::snapshot::NoSnapshots<S::Position>
397where
398 S: EventStore,
399 A: Aggregate<Id = S::Id>,
400{
401 type Prepared = ();
402 type SnapshotError = Infallible;
403
404 async fn load_base(&self, _kind: &str, _id: &S::Id) -> (A, Option<S::Position>) {
405 (A::default(), None)
406 }
407
408 fn prepare_snapshot(&self, _aggregate: A, _events: &NonEmpty<A::Event>) -> Self::Prepared {}
409
410 async fn offer_snapshot(
411 &self,
412 _kind: &str,
413 _id: &S::Id,
414 _events_since_snapshot: u64,
415 _new_position: S::Position,
416 _prepared: Self::Prepared,
417 ) -> Result<(), Self::SnapshotError> {
418 Ok(())
419 }
420}
421
422impl<S, A, SS> SnapshotPolicy<S, A> for Snapshots<SS>
423where
424 S: EventStore,
425 A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned + Send,
426 SS: SnapshotStore<S::Id, Position = S::Position>,
427{
428 type Prepared = A;
429 type SnapshotError = SS::Error;
430
431 async fn load_base(&self, kind: &str, id: &S::Id) -> (A, Option<S::Position>) {
432 let snapshot_result = self
433 .0
434 .load::<A>(kind, id)
435 .await
436 .inspect_err(|e| {
437 tracing::error!(
438 error = %e,
439 "failed to load snapshot, falling back to full replay"
440 );
441 })
442 .ok()
443 .flatten();
444
445 if let Some(snapshot) = snapshot_result {
446 (snapshot.data, Some(snapshot.position))
447 } else {
448 (A::default(), None)
449 }
450 }
451
452 fn prepare_snapshot(&self, mut aggregate: A, events: &NonEmpty<A::Event>) -> Self::Prepared {
453 for event in events {
454 aggregate.apply(event);
455 }
456 aggregate
457 }
458
459 async fn offer_snapshot(
460 &self,
461 kind: &str,
462 id: &S::Id,
463 events_since_snapshot: u64,
464 new_position: S::Position,
465 prepared: Self::Prepared,
466 ) -> Result<(), Self::SnapshotError> {
467 let offer_result = self.0.offer_snapshot(
468 kind,
469 id,
470 events_since_snapshot,
471 move || -> Result<Snapshot<S::Position, A>, Infallible> {
472 Ok(Snapshot {
473 position: new_position,
474 data: prepared,
475 })
476 },
477 );
478
479 match offer_result.await {
480 Ok(SnapshotOffer::Declined | SnapshotOffer::Stored)
481 | Err(OfferSnapshotError::Create(_)) => Ok(()),
482 Err(OfferSnapshotError::Snapshot(e)) => Err(e),
483 }
484 }
485}
486
487pub type OptimisticRepository<S> =
504 Repository<S, Optimistic, crate::snapshot::NoSnapshots<<S as EventStore>::Position>>;
505
506pub type UncheckedRepository<S> =
516 Repository<S, Unchecked, crate::snapshot::NoSnapshots<<S as EventStore>::Position>>;
517
518pub type OptimisticSnapshotRepository<S, SS> = Repository<S, Optimistic, SS>;
540
541pub struct Repository<
591 S,
592 C = Optimistic,
593 M = crate::snapshot::NoSnapshots<<S as EventStore>::Position>,
594> where
595 S: EventStore,
596 C: ConcurrencyStrategy,
597{
598 pub(crate) store: S,
599 snapshots: M,
600 _concurrency: PhantomData<C>,
601}
602
603impl<S> Repository<S>
604where
605 S: EventStore,
606{
607 #[must_use]
608 pub const fn new(store: S) -> Self {
609 Self {
610 store,
611 snapshots: crate::snapshot::NoSnapshots::new(),
612 _concurrency: PhantomData,
613 }
614 }
615}
616
617impl<S, M> Repository<S, Optimistic, M>
618where
619 S: EventStore,
620{
621 #[must_use]
623 pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M> {
624 Repository {
625 store: self.store,
626 snapshots: self.snapshots,
627 _concurrency: PhantomData,
628 }
629 }
630}
631
632impl<S, C, M> Repository<S, C, M>
633where
634 S: EventStore,
635 C: ConcurrencyStrategy,
636{
637 #[must_use]
638 pub const fn event_store(&self) -> &S {
639 &self.store
640 }
641
642 #[must_use]
643 pub fn with_snapshots<SS>(self, snapshots: SS) -> Repository<S, C, Snapshots<SS>>
644 where
645 SS: SnapshotStore<S::Id, Position = S::Position>,
646 {
647 Repository {
648 store: self.store,
649 snapshots: Snapshots(snapshots),
650 _concurrency: PhantomData,
651 }
652 }
653
654 #[tracing::instrument(
665 skip(self, instance_id),
666 fields(
667 projection_type = std::any::type_name::<P>(),
668 )
669 )]
670 pub async fn load_projection<P>(
671 &self,
672 instance_id: &P::InstanceId,
673 ) -> Result<P, ProjectionError<S::Error>>
674 where
675 P: ProjectionFilters<Id = S::Id, Metadata = S::Metadata>,
676 P::InstanceId: Send + Sync,
677 M: Sync,
678 {
679 tracing::debug!("loading projection");
680
681 let filters = P::filters::<S>(instance_id);
682 let (event_filters, handlers) = filters.into_event_filters(None);
683
684 let events = self
685 .store
686 .load_events(&event_filters)
687 .await
688 .map_err(ProjectionError::Store)?;
689
690 let mut projection = P::init(instance_id);
691 let event_count = events.len();
692 tracing::debug!(
693 events_to_replay = event_count,
694 "replaying events into projection"
695 );
696
697 let _ = replay_projection_events(&mut projection, &events, &handlers, &self.store)?;
698
699 tracing::info!(events_applied = event_count, "projection loaded");
700 Ok(projection)
701 }
702
703 pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
710 where
711 A: Aggregate<Id = S::Id>,
712 A::Event: ProjectionEvent,
713 M: SnapshotPolicy<S, A> + Sync,
714 {
715 Ok(self.load_aggregate::<A>(id).await?.aggregate)
716 }
717
718 async fn load_aggregate<A>(
719 &self,
720 id: &S::Id,
721 ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
722 where
723 A: Aggregate<Id = S::Id>,
724 A::Event: ProjectionEvent,
725 M: SnapshotPolicy<S, A> + Sync,
726 {
727 let (mut aggregate, snapshot_position) = self.snapshots.load_base(A::KIND, id).await;
728
729 let filters =
730 aggregate_event_filters::<S, A::Event>(A::KIND, id, snapshot_position.as_ref());
731
732 let events = self
733 .store
734 .load_events(&filters)
735 .await
736 .map_err(ProjectionError::Store)?;
737
738 let last_event_position = apply_stored_events::<A, S>(&mut aggregate, &self.store, &events)
739 .map_err(ProjectionError::EventDecode)?;
740
741 let version = last_event_position.or(snapshot_position);
742
743 Ok(LoadedAggregate {
744 aggregate,
745 version,
746 events_since_snapshot: events.len() as u64,
747 })
748 }
749
750 pub async fn execute_command<A, Cmd>(
760 &self,
761 id: &S::Id,
762 command: &Cmd,
763 metadata: &S::Metadata,
764 ) -> Result<(), CommandError<A::Error, C::ConcurrencyError, S::Error, M::SnapshotError>>
765 where
766 A: Aggregate<Id = S::Id> + Handle<Cmd>,
767 A::Event: ProjectionEvent + EventKind + Serialize + Send + Sync,
768 Cmd: Sync,
769 S::Metadata: Clone,
770 C: CommitPolicy<S>,
771 M: SnapshotPolicy<S, A> + Sync,
772 {
773 let LoadedAggregate {
774 aggregate,
775 version,
776 events_since_snapshot,
777 } = self
778 .load_aggregate::<A>(id)
779 .await
780 .map_err(CommandError::Projection)?;
781
782 let new_events =
783 Handle::<Cmd>::handle(&aggregate, command).map_err(CommandError::Aggregate)?;
784
785 let Some(events) = NonEmpty::from_vec(new_events) else {
786 return Ok(());
787 };
788
789 let total_events_since_snapshot = events_since_snapshot + events.len() as u64;
790
791 let prepared = self.snapshots.prepare_snapshot(aggregate, &events);
792
793 let new_position =
794 C::commit::<A::Event>(&self.store, A::KIND, id, version, events, metadata)
795 .await
796 .map_err(|e| match e {
797 CommitPolicyError::Concurrency(conflict) => CommandError::Concurrency(conflict),
798 CommitPolicyError::Store(err) => CommandError::Store(err),
799 })?;
800
801 self.snapshots
802 .offer_snapshot(
803 A::KIND,
804 id,
805 total_events_since_snapshot,
806 new_position,
807 prepared,
808 )
809 .await
810 .map_err(CommandError::Snapshot)?;
811
812 Ok(())
813 }
814}
815
816impl<S, C, SS> Repository<S, C, Snapshots<SS>>
817where
818 S: EventStore + GloballyOrderedStore,
819 S::Position: Ord,
820 C: ConcurrencyStrategy,
821{
822 #[tracing::instrument(
832 skip(self, instance_id),
833 fields(
834 projection_type = std::any::type_name::<P>(),
835 )
836 )]
837 pub async fn load_projection_with_snapshot<P>(
838 &self,
839 instance_id: &P::InstanceId,
840 ) -> Result<P, ProjectionError<S::Error>>
841 where
842 P: Projection
843 + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
844 + Serialize
845 + DeserializeOwned
846 + Sync,
847 P::InstanceId: Send + Sync,
848 SS: SnapshotStore<P::InstanceId, Position = S::Position>,
849 {
850 tracing::debug!("loading projection with snapshot");
851
852 let snapshot_result = self
853 .snapshots
854 .0
855 .load::<P>(P::KIND, instance_id)
856 .await
857 .inspect_err(|e| {
858 tracing::error!(error = %e, "failed to load projection snapshot");
859 })
860 .ok()
861 .flatten();
862
863 let (mut projection, snapshot_position) = if let Some(snapshot) = snapshot_result {
864 (snapshot.data, Some(snapshot.position))
865 } else {
866 (P::init(instance_id), None)
867 };
868
869 let filters = P::filters::<S>(instance_id);
870 let (event_filters, handlers) = filters.into_event_filters(snapshot_position.as_ref());
871
872 let events = self
873 .store
874 .load_events(&event_filters)
875 .await
876 .map_err(ProjectionError::Store)?;
877
878 let event_count = events.len();
879 let last_position =
880 replay_projection_events(&mut projection, &events, &handlers, &self.store)?;
881
882 if event_count > 0
883 && let Some(position) = last_position
884 {
885 let projection_ref = &projection;
886 let offer = self.snapshots.0.offer_snapshot(
887 P::KIND,
888 instance_id,
889 event_count as u64,
890 move || -> Result<Snapshot<S::Position, &P>, std::convert::Infallible> {
891 Ok(Snapshot {
892 position,
893 data: projection_ref,
894 })
895 },
896 );
897
898 if let Err(e) = offer.await {
899 tracing::error!(error = %e, "failed to store projection snapshot");
900 }
901 }
902
903 tracing::info!(events_applied = event_count, "projection loaded");
904 Ok(projection)
905 }
906}
907
908impl<S, SS, C> Repository<S, C, Snapshots<SS>>
909where
910 S: EventStore,
911 SS: SnapshotStore<S::Id, Position = S::Position>,
912 C: ConcurrencyStrategy,
913{
914 #[must_use]
915 pub const fn snapshot_store(&self) -> &SS {
916 &self.snapshots.0
917 }
918}
919
920impl<S, C, M> Repository<S, C, M>
921where
922 S: EventStore,
923 C: ConcurrencyStrategy,
924{
925 pub fn subscribe<P>(
946 &self,
947 instance_id: P::InstanceId,
948 ) -> SubscriptionBuilder<S, P, crate::snapshot::NoSnapshots<S::Position>>
949 where
950 S: SubscribableStore + Clone + 'static,
951 S::Position: Ord,
952 P: Projection
953 + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
954 + Serialize
955 + DeserializeOwned
956 + Send
957 + Sync
958 + 'static,
959 P::InstanceId: Clone + Send + Sync + 'static,
960 P::Metadata: Send,
961 {
962 SubscriptionBuilder::new(
963 self.store.clone(),
964 crate::snapshot::NoSnapshots::new(),
965 instance_id,
966 )
967 }
968
969 pub fn subscribe_with_snapshots<P, SS>(
974 &self,
975 instance_id: P::InstanceId,
976 snapshots: SS,
977 ) -> SubscriptionBuilder<S, P, SS>
978 where
979 S: SubscribableStore + Clone + 'static,
980 S::Position: Ord,
981 P: Projection
982 + ProjectionFilters<Id = S::Id, Metadata = S::Metadata>
983 + Serialize
984 + DeserializeOwned
985 + Send
986 + Sync
987 + 'static,
988 P::InstanceId: Clone + Send + Sync + 'static,
989 P::Metadata: Send,
990 SS: SnapshotStore<P::InstanceId, Position = S::Position> + Send + Sync + 'static,
991 {
992 SubscriptionBuilder::new(self.store.clone(), snapshots, instance_id)
993 }
994}
995
996impl<S, M> Repository<S, Optimistic, M>
997where
998 S: EventStore,
999{
1000 pub async fn execute_with_retry<A, Cmd>(
1007 &self,
1008 id: &S::Id,
1009 command: &Cmd,
1010 metadata: &S::Metadata,
1011 max_retries: usize,
1012 ) -> Result<
1013 usize,
1014 CommandError<A::Error, ConcurrencyConflict<S::Position>, S::Error, M::SnapshotError>,
1015 >
1016 where
1017 A: Aggregate<Id = S::Id> + Handle<Cmd>,
1018 A::Event: ProjectionEvent + EventKind + serde::Serialize + Send + Sync,
1019 Cmd: Sync,
1020 S::Metadata: Clone,
1021 M: SnapshotPolicy<S, A> + Sync,
1022 {
1023 for attempt in 1..=max_retries {
1024 match self.execute_command::<A, Cmd>(id, command, metadata).await {
1025 Ok(()) => return Ok(attempt),
1026 Err(CommandError::Concurrency(_)) => {}
1027 Err(e) => return Err(e),
1028 }
1029 }
1030
1031 self.execute_command::<A, Cmd>(id, command, metadata)
1032 .await
1033 .map(|()| max_retries + 1)
1034 }
1035}
1036
1037#[cfg(test)]
1038mod tests {
1039 use std::{error::Error, io};
1040
1041 use super::*;
1042
1043 #[test]
1044 fn command_error_display_mentions_aggregate() {
1045 let error: CommandError<String, Infallible, io::Error, Infallible> =
1046 CommandError::Aggregate("invalid state".to_string());
1047 let msg = error.to_string();
1048 assert!(msg.contains("aggregate rejected command"));
1049 assert!(error.source().is_none());
1050 }
1051
1052 #[test]
1053 fn command_error_store_has_source() {
1054 let error: CommandError<String, Infallible, io::Error, Infallible> =
1055 CommandError::Store(io::Error::other("store error"));
1056 assert!(error.source().is_some());
1057 }
1058
1059 #[test]
1060 fn optimistic_command_error_concurrency_mentions_conflict() {
1061 let conflict = ConcurrencyConflict {
1062 expected: Some(1u64),
1063 actual: Some(2u64),
1064 };
1065 let error: CommandError<String, ConcurrencyConflict<u64>, io::Error, Infallible> =
1066 CommandError::Concurrency(conflict);
1067 let msg = error.to_string();
1068 assert!(msg.contains("concurrency conflict"));
1069 assert!(error.source().is_none());
1070 }
1071}