1use std::marker::PhantomData;
11
12use serde::{Serialize, de::DeserializeOwned};
13use thiserror::Error;
14
15use crate::{
16 aggregate::{Aggregate, AggregateBuilder, Handle},
17 codec::{Codec, ProjectionEvent, SerializableEvent},
18 concurrency::{ConcurrencyConflict, ConcurrencyStrategy, Optimistic, Unchecked},
19 projection::{Projection, ProjectionBuilder, ProjectionError},
20 snapshot::{OfferSnapshotError, Snapshot, SnapshotOffer, SnapshotStore},
21 store::{AppendError, EventFilter, EventStore, StoredEvent},
22};
23
24type LoadError<S> =
25 ProjectionError<<S as EventStore>::Error, <<S as EventStore>::Codec as Codec>::Error>;
26
27#[derive(Debug, Error)]
29pub enum CommandError<AggregateError, StoreError, CodecError>
30where
31 StoreError: std::error::Error + 'static,
32 CodecError: std::error::Error + 'static,
33{
34 #[error("aggregate rejected command: {0}")]
35 Aggregate(AggregateError),
36 #[error("failed to rebuild aggregate state: {0}")]
37 Projection(#[source] ProjectionError<StoreError, CodecError>),
38 #[error("failed to encode events: {0}")]
39 Codec(#[source] CodecError),
40 #[error("failed to persist events: {0}")]
41 Store(#[source] StoreError),
42}
43
44#[derive(Debug, Error)]
46pub enum SnapshotCommandError<AggregateError, StoreError, CodecError, SnapshotError>
47where
48 StoreError: std::error::Error + 'static,
49 CodecError: std::error::Error + 'static,
50 SnapshotError: std::error::Error + 'static,
51{
52 #[error("aggregate rejected command: {0}")]
53 Aggregate(AggregateError),
54 #[error("failed to rebuild aggregate state: {0}")]
55 Projection(#[source] ProjectionError<StoreError, CodecError>),
56 #[error("failed to encode events: {0}")]
57 Codec(#[source] CodecError),
58 #[error("failed to persist events: {0}")]
59 Store(#[source] StoreError),
60 #[error("snapshot operation failed: {0}")]
61 Snapshot(#[source] SnapshotError),
62}
63
64#[derive(Debug, Error)]
66pub enum OptimisticCommandError<AggregateError, Position, StoreError, CodecError>
67where
68 Position: std::fmt::Debug,
69 StoreError: std::error::Error + 'static,
70 CodecError: std::error::Error + 'static,
71{
72 #[error("aggregate rejected command: {0}")]
73 Aggregate(AggregateError),
74 #[error(transparent)]
75 Concurrency(ConcurrencyConflict<Position>),
76 #[error("failed to rebuild aggregate state: {0}")]
77 Projection(#[source] ProjectionError<StoreError, CodecError>),
78 #[error("failed to encode events: {0}")]
79 Codec(#[source] CodecError),
80 #[error("failed to persist events: {0}")]
81 Store(#[source] StoreError),
82}
83
84#[derive(Debug, Error)]
87pub enum OptimisticSnapshotCommandError<
88 AggregateError,
89 Position,
90 StoreError,
91 CodecError,
92 SnapshotError,
93> where
94 Position: std::fmt::Debug,
95 StoreError: std::error::Error + 'static,
96 CodecError: std::error::Error + 'static,
97 SnapshotError: std::error::Error + 'static,
98{
99 #[error("aggregate rejected command: {0}")]
100 Aggregate(AggregateError),
101 #[error(transparent)]
102 Concurrency(ConcurrencyConflict<Position>),
103 #[error("failed to rebuild aggregate state: {0}")]
104 Projection(#[source] ProjectionError<StoreError, CodecError>),
105 #[error("failed to encode events: {0}")]
106 Codec(#[source] CodecError),
107 #[error("failed to persist events: {0}")]
108 Store(#[source] StoreError),
109 #[error("snapshot operation failed: {0}")]
110 Snapshot(#[source] SnapshotError),
111}
112
113pub type UncheckedCommandResult<A, S> = Result<
115 (),
116 CommandError<
117 <A as Aggregate>::Error,
118 <S as EventStore>::Error,
119 <<S as EventStore>::Codec as Codec>::Error,
120 >,
121>;
122
123pub type UncheckedSnapshotCommandResult<A, S, SS> = Result<
125 (),
126 SnapshotCommandError<
127 <A as Aggregate>::Error,
128 <S as EventStore>::Error,
129 <<S as EventStore>::Codec as Codec>::Error,
130 <SS as SnapshotStore>::Error,
131 >,
132>;
133
134pub type OptimisticCommandResult<A, S> = Result<
136 (),
137 OptimisticCommandError<
138 <A as Aggregate>::Error,
139 <S as EventStore>::Position,
140 <S as EventStore>::Error,
141 <<S as EventStore>::Codec as Codec>::Error,
142 >,
143>;
144
145pub type OptimisticSnapshotCommandResult<A, S, SS> = Result<
147 (),
148 OptimisticSnapshotCommandError<
149 <A as Aggregate>::Error,
150 <S as EventStore>::Position,
151 <S as EventStore>::Error,
152 <<S as EventStore>::Codec as Codec>::Error,
153 <SS as SnapshotStore>::Error,
154 >,
155>;
156
157pub type RetryResult<A, S> = Result<
159 usize,
160 OptimisticCommandError<
161 <A as Aggregate>::Error,
162 <S as EventStore>::Position,
163 <S as EventStore>::Error,
164 <<S as EventStore>::Codec as Codec>::Error,
165 >,
166>;
167
168pub type SnapshotRetryResult<A, S, SS> = Result<
170 usize,
171 OptimisticSnapshotCommandError<
172 <A as Aggregate>::Error,
173 <S as EventStore>::Position,
174 <S as EventStore>::Error,
175 <<S as EventStore>::Codec as Codec>::Error,
176 <SS as SnapshotStore>::Error,
177 >,
178>;
179
180struct LoadedAggregate<A, Pos> {
181 aggregate: A,
182 version: Option<Pos>,
183 events_since_snapshot: u64,
184}
185
186fn aggregate_event_filters<S, E>(
187 aggregate_kind: &str,
188 aggregate_id: &S::Id,
189 after: Option<S::Position>,
190) -> Vec<EventFilter<S::Id, S::Position>>
191where
192 S: EventStore,
193 E: ProjectionEvent,
194{
195 E::EVENT_KINDS
196 .iter()
197 .map(|kind| {
198 let mut filter =
199 EventFilter::for_aggregate(*kind, aggregate_kind, aggregate_id.clone());
200 if let Some(position) = after {
201 filter = filter.after(position);
202 }
203 filter
204 })
205 .collect()
206}
207
208fn apply_stored_events<A, S>(
209 aggregate: &mut A,
210 codec: &S::Codec,
211 events: &[StoredEvent<S::Id, S::Position, S::Metadata>],
212) -> Result<Option<S::Position>, crate::codec::EventDecodeError<<S::Codec as Codec>::Error>>
213where
214 S: EventStore,
215 A: Aggregate<Id = S::Id>,
216 A::Event: ProjectionEvent,
217{
218 let mut last_event_position: Option<S::Position> = None;
219
220 for stored in events {
221 let event = A::Event::from_stored(&stored.kind, &stored.data, codec)?;
222 aggregate.apply(&event);
223 last_event_position = Some(stored.position);
224 }
225
226 Ok(last_event_position)
227}
228
229pub struct Snapshots<SS>(pub SS);
231
232pub struct Repository<
234 S,
235 C = Optimistic,
236 M = crate::snapshot::NoSnapshots<<S as EventStore>::Id, <S as EventStore>::Position>,
237> where
238 S: EventStore,
239 C: ConcurrencyStrategy,
240{
241 pub(crate) store: S,
242 snapshots: M,
243 _concurrency: PhantomData<C>,
244}
245
246impl<S> Repository<S>
247where
248 S: EventStore,
249{
250 #[must_use]
251 pub const fn new(store: S) -> Self {
252 Self {
253 store,
254 snapshots: crate::snapshot::NoSnapshots::new(),
255 _concurrency: PhantomData,
256 }
257 }
258}
259
260impl<S, M> Repository<S, Optimistic, M>
261where
262 S: EventStore,
263{
264 #[must_use]
266 pub fn without_concurrency_checking(self) -> Repository<S, Unchecked, M> {
267 Repository {
268 store: self.store,
269 snapshots: self.snapshots,
270 _concurrency: PhantomData,
271 }
272 }
273}
274
275impl<S, C, M> Repository<S, C, M>
276where
277 S: EventStore,
278 C: ConcurrencyStrategy,
279{
280 #[must_use]
281 pub const fn event_store(&self) -> &S {
282 &self.store
283 }
284
285 pub fn build_projection<P>(&self) -> ProjectionBuilder<'_, S, P>
286 where
287 P: Projection<Id = S::Id>,
288 {
289 ProjectionBuilder::new(&self.store)
290 }
291
292 #[must_use]
293 pub fn with_snapshots<SS>(self, snapshots: SS) -> Repository<S, C, Snapshots<SS>>
294 where
295 SS: SnapshotStore<Id = S::Id, Position = S::Position>,
296 {
297 Repository {
298 store: self.store,
299 snapshots: Snapshots(snapshots),
300 _concurrency: PhantomData,
301 }
302 }
303}
304
305impl<S, C> Repository<S, C, crate::snapshot::NoSnapshots<S::Id, S::Position>>
306where
307 S: EventStore,
308 C: ConcurrencyStrategy,
309{
310 pub const fn aggregate_builder<A>(&self) -> AggregateBuilder<'_, Self, A>
311 where
312 A: Aggregate<Id = S::Id>,
313 {
314 AggregateBuilder::new(self)
315 }
316
317 pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
324 where
325 A: Aggregate<Id = S::Id>,
326 A::Event: ProjectionEvent,
327 {
328 Ok(self.load_aggregate::<A>(id).await?.aggregate)
329 }
330
331 async fn load_aggregate<A>(
332 &self,
333 id: &S::Id,
334 ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
335 where
336 A: Aggregate<Id = S::Id>,
337 A::Event: ProjectionEvent,
338 {
339 let filters = aggregate_event_filters::<S, A::Event>(A::KIND, id, None);
340
341 let events = self
342 .store
343 .load_events(&filters)
344 .await
345 .map_err(ProjectionError::Store)?;
346
347 let codec = self.store.codec();
348 let mut aggregate = A::default();
349 let version = apply_stored_events::<A, S>(&mut aggregate, codec, &events)
350 .map_err(ProjectionError::EventDecode)?;
351
352 Ok(LoadedAggregate {
353 aggregate,
354 version,
355 events_since_snapshot: events.len() as u64,
356 })
357 }
358}
359
360impl<S> Repository<S, Unchecked>
361where
362 S: EventStore,
363{
364 pub async fn execute_command<A, Cmd>(
373 &mut self,
374 id: &S::Id,
375 command: &Cmd,
376 metadata: &S::Metadata,
377 ) -> UncheckedCommandResult<A, S>
378 where
379 A: Aggregate<Id = S::Id> + Handle<Cmd>,
380 A::Event: ProjectionEvent + SerializableEvent,
381 Cmd: Sync,
382 S::Metadata: Clone,
383 {
384 let LoadedAggregate { aggregate, .. } = self
385 .load_aggregate::<A>(id)
386 .await
387 .map_err(CommandError::Projection)?;
388
389 let new_events =
390 Handle::<Cmd>::handle(&aggregate, command).map_err(CommandError::Aggregate)?;
391
392 if new_events.is_empty() {
393 return Ok(());
394 }
395
396 drop(aggregate);
397
398 let mut tx = self.store.begin::<Unchecked>(A::KIND, id.clone(), None);
399 for event in new_events {
400 tx.append(event, metadata.clone())
401 .map_err(CommandError::Codec)?;
402 }
403 tx.commit().await.map_err(CommandError::Store)?;
404 Ok(())
405 }
406}
407
408impl<S> Repository<S, Optimistic>
409where
410 S: EventStore,
411{
412 pub async fn execute_command<A, Cmd>(
421 &mut self,
422 id: &S::Id,
423 command: &Cmd,
424 metadata: &S::Metadata,
425 ) -> OptimisticCommandResult<A, S>
426 where
427 A: Aggregate<Id = S::Id> + Handle<Cmd>,
428 A::Event: ProjectionEvent + SerializableEvent,
429 Cmd: Sync,
430 S::Metadata: Clone,
431 {
432 let LoadedAggregate {
433 aggregate, version, ..
434 } = self
435 .load_aggregate::<A>(id)
436 .await
437 .map_err(OptimisticCommandError::Projection)?;
438
439 let new_events = Handle::<Cmd>::handle(&aggregate, command)
440 .map_err(OptimisticCommandError::Aggregate)?;
441
442 if new_events.is_empty() {
443 return Ok(());
444 }
445
446 drop(aggregate);
447
448 let mut tx = self.store.begin::<Optimistic>(A::KIND, id.clone(), version);
449 for event in new_events {
450 tx.append(event, metadata.clone())
451 .map_err(OptimisticCommandError::Codec)?;
452 }
453
454 if let Err(e) = tx.commit().await {
455 match e {
456 AppendError::Conflict(c) => return Err(OptimisticCommandError::Concurrency(c)),
457 AppendError::Store(s) => return Err(OptimisticCommandError::Store(s)),
458 }
459 }
460
461 Ok(())
462 }
463
464 pub async fn execute_with_retry<A, Cmd>(
471 &mut self,
472 id: &S::Id,
473 command: &Cmd,
474 metadata: &S::Metadata,
475 max_retries: usize,
476 ) -> RetryResult<A, S>
477 where
478 A: Aggregate<Id = S::Id> + Handle<Cmd>,
479 A::Event: ProjectionEvent + SerializableEvent,
480 Cmd: Sync,
481 S::Metadata: Clone,
482 {
483 for attempt in 1..=max_retries {
484 match self.execute_command::<A, Cmd>(id, command, metadata).await {
485 Ok(()) => return Ok(attempt),
486 Err(OptimisticCommandError::Concurrency(_)) => {}
487 Err(e) => return Err(e),
488 }
489 }
490
491 self.execute_command::<A, Cmd>(id, command, metadata)
492 .await
493 .map(|()| max_retries + 1)
494 }
495}
496
497impl<S, SS, C> Repository<S, C, Snapshots<SS>>
498where
499 S: EventStore,
500 SS: SnapshotStore<Id = S::Id, Position = S::Position>,
501 C: ConcurrencyStrategy,
502{
503 #[must_use]
504 pub const fn snapshot_store(&self) -> &SS {
505 &self.snapshots.0
506 }
507
508 pub const fn aggregate_builder<A>(&self) -> AggregateBuilder<'_, Self, A>
509 where
510 A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
511 {
512 AggregateBuilder::new(self)
513 }
514
515 pub async fn load<A>(&self, id: &S::Id) -> Result<A, LoadError<S>>
523 where
524 A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
525 A::Event: ProjectionEvent,
526 {
527 Ok(self.load_aggregate::<A>(id).await?.aggregate)
528 }
529
530 async fn load_aggregate<A>(
531 &self,
532 id: &S::Id,
533 ) -> Result<LoadedAggregate<A, S::Position>, LoadError<S>>
534 where
535 A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
536 A::Event: ProjectionEvent,
537 {
538 let codec = self.store.codec();
539
540 let snapshot_result = self
541 .snapshots
542 .0
543 .load(A::KIND, id)
544 .await
545 .inspect_err(|e| {
546 tracing::error!(
547 error = %e,
548 "failed to load snapshot, falling back to full replay"
549 );
550 })
551 .ok()
552 .flatten();
553
554 let (mut aggregate, snapshot_position) = if let Some(snapshot) = snapshot_result {
555 let restored: A = codec
556 .deserialize(&snapshot.data)
557 .map_err(ProjectionError::SnapshotDeserialize)?;
558 (restored, Some(snapshot.position))
559 } else {
560 (A::default(), None)
561 };
562
563 let filters = aggregate_event_filters::<S, A::Event>(A::KIND, id, snapshot_position);
564
565 let events = self
566 .store
567 .load_events(&filters)
568 .await
569 .map_err(ProjectionError::Store)?;
570
571 let last_event_position = apply_stored_events::<A, S>(&mut aggregate, codec, &events)
572 .map_err(ProjectionError::EventDecode)?;
573
574 let version = last_event_position.or(snapshot_position);
575
576 Ok(LoadedAggregate {
577 aggregate,
578 version,
579 events_since_snapshot: events.len() as u64,
580 })
581 }
582}
583
584impl<S, SS> Repository<S, Unchecked, Snapshots<SS>>
585where
586 S: EventStore,
587 SS: SnapshotStore<Id = S::Id, Position = S::Position>,
588{
589 pub async fn execute_command<A, Cmd>(
604 &mut self,
605 id: &S::Id,
606 command: &Cmd,
607 metadata: &S::Metadata,
608 ) -> UncheckedSnapshotCommandResult<A, S, SS>
609 where
610 A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
611 A::Event: ProjectionEvent + SerializableEvent,
612 Cmd: Sync,
613 S::Metadata: Clone,
614 {
615 let LoadedAggregate {
616 aggregate,
617 events_since_snapshot,
618 ..
619 } = self
620 .load_aggregate::<A>(id)
621 .await
622 .map_err(SnapshotCommandError::Projection)?;
623
624 let new_events =
625 Handle::<Cmd>::handle(&aggregate, command).map_err(SnapshotCommandError::Aggregate)?;
626
627 if new_events.is_empty() {
628 return Ok(());
629 }
630
631 let total_events_since_snapshot = events_since_snapshot + new_events.len() as u64;
632
633 let mut aggregate = aggregate;
634 for event in &new_events {
635 aggregate.apply(event);
636 }
637
638 let mut tx = self.store.begin::<Unchecked>(A::KIND, id.clone(), None);
639 for event in new_events {
640 tx.append(event, metadata.clone())
641 .map_err(SnapshotCommandError::Codec)?;
642 }
643 tx.commit().await.map_err(SnapshotCommandError::Store)?;
644
645 let new_position = self
646 .store
647 .stream_version(A::KIND, id)
648 .await
649 .map_err(SnapshotCommandError::Store)?
650 .expect("stream should have events after append");
651
652 let codec = self.store.codec().clone();
653 let offer_result =
654 self.snapshots
655 .0
656 .offer_snapshot(A::KIND, id, total_events_since_snapshot, move || {
657 Ok(Snapshot {
658 position: new_position,
659 data: codec.serialize(&aggregate)?,
660 })
661 });
662
663 match offer_result.await {
664 Ok(SnapshotOffer::Declined | SnapshotOffer::Stored) => {}
665 Err(OfferSnapshotError::Create(e)) => return Err(SnapshotCommandError::Codec(e)),
666 Err(OfferSnapshotError::Snapshot(e)) => return Err(SnapshotCommandError::Snapshot(e)),
667 }
668
669 Ok(())
670 }
671}
672
673impl<S, SS> Repository<S, Optimistic, Snapshots<SS>>
674where
675 S: EventStore,
676 SS: SnapshotStore<Id = S::Id, Position = S::Position>,
677{
678 pub async fn execute_command<A, Cmd>(
694 &mut self,
695 id: &S::Id,
696 command: &Cmd,
697 metadata: &S::Metadata,
698 ) -> OptimisticSnapshotCommandResult<A, S, SS>
699 where
700 A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
701 A::Event: ProjectionEvent + SerializableEvent,
702 Cmd: Sync,
703 S::Metadata: Clone,
704 {
705 let LoadedAggregate {
706 aggregate,
707 version,
708 events_since_snapshot,
709 } = self
710 .load_aggregate::<A>(id)
711 .await
712 .map_err(OptimisticSnapshotCommandError::Projection)?;
713
714 let new_events = Handle::<Cmd>::handle(&aggregate, command)
715 .map_err(OptimisticSnapshotCommandError::Aggregate)?;
716
717 if new_events.is_empty() {
718 return Ok(());
719 }
720
721 let total_events_since_snapshot = events_since_snapshot + new_events.len() as u64;
722
723 let mut aggregate = aggregate;
724 for event in &new_events {
725 aggregate.apply(event);
726 }
727
728 let mut tx = self.store.begin::<Optimistic>(A::KIND, id.clone(), version);
729 for event in new_events {
730 tx.append(event, metadata.clone())
731 .map_err(OptimisticSnapshotCommandError::Codec)?;
732 }
733
734 if let Err(e) = tx.commit().await {
735 match e {
736 AppendError::Conflict(c) => {
737 return Err(OptimisticSnapshotCommandError::Concurrency(c));
738 }
739 AppendError::Store(s) => return Err(OptimisticSnapshotCommandError::Store(s)),
740 }
741 }
742
743 let new_position = self
744 .store
745 .stream_version(A::KIND, id)
746 .await
747 .map_err(OptimisticSnapshotCommandError::Store)?
748 .expect("stream should have events after append");
749
750 let codec = self.store.codec().clone();
751 let offer_result =
752 self.snapshots
753 .0
754 .offer_snapshot(A::KIND, id, total_events_since_snapshot, move || {
755 Ok(Snapshot {
756 position: new_position,
757 data: codec.serialize(&aggregate)?,
758 })
759 });
760
761 match offer_result.await {
762 Ok(SnapshotOffer::Declined | SnapshotOffer::Stored) => {}
763 Err(OfferSnapshotError::Create(e)) => {
764 return Err(OptimisticSnapshotCommandError::Codec(e));
765 }
766 Err(OfferSnapshotError::Snapshot(e)) => {
767 return Err(OptimisticSnapshotCommandError::Snapshot(e));
768 }
769 }
770
771 Ok(())
772 }
773
774 pub async fn execute_with_retry<A, Cmd>(
781 &mut self,
782 id: &S::Id,
783 command: &Cmd,
784 metadata: &S::Metadata,
785 max_retries: usize,
786 ) -> SnapshotRetryResult<A, S, SS>
787 where
788 A: Aggregate<Id = S::Id> + Handle<Cmd> + Serialize + DeserializeOwned,
789 A::Event: ProjectionEvent + SerializableEvent,
790 Cmd: Sync,
791 S::Metadata: Clone,
792 {
793 for attempt in 1..=max_retries {
794 match self.execute_command::<A, Cmd>(id, command, metadata).await {
795 Ok(()) => return Ok(attempt),
796 Err(OptimisticSnapshotCommandError::Concurrency(_)) => {}
797 Err(e) => return Err(e),
798 }
799 }
800
801 self.execute_command::<A, Cmd>(id, command, metadata)
802 .await
803 .map(|()| max_retries + 1)
804 }
805}
806
807#[cfg(test)]
808mod tests {
809 use std::{error::Error, io};
810
811 use super::*;
812
813 #[test]
814 fn command_error_display_mentions_aggregate() {
815 let error: CommandError<String, io::Error, io::Error> =
816 CommandError::Aggregate("invalid state".to_string());
817 let msg = error.to_string();
818 assert!(msg.contains("aggregate rejected command"));
819 assert!(error.source().is_none());
820 }
821
822 #[test]
823 fn command_error_store_has_source() {
824 let error: CommandError<String, io::Error, io::Error> =
825 CommandError::Store(io::Error::other("store error"));
826 assert!(error.source().is_some());
827 }
828
829 #[test]
830 fn optimistic_command_error_concurrency_mentions_conflict() {
831 let conflict = ConcurrencyConflict {
832 expected: Some(1u64),
833 actual: Some(2u64),
834 };
835 let error: OptimisticCommandError<String, u64, io::Error, io::Error> =
836 OptimisticCommandError::Concurrency(conflict);
837 let msg = error.to_string();
838 assert!(msg.contains("concurrency conflict"));
839 assert!(error.source().is_none());
840 }
841}