sourcery_core/aggregate.rs
1//! Command-side domain primitives.
2//!
3//! This module defines the building blocks for aggregates: state reconstruction
4//! (`Apply`), command handling (`Handle`), and loading (`AggregateBuilder`).
5//! The `#[derive(Aggregate)]` macro lives here to keep domain ergonomics in one
6//! spot.
7
8use std::marker::PhantomData;
9
10use serde::{Serialize, de::DeserializeOwned};
11
12use crate::{
13 codec::{Codec, ProjectionEvent},
14 concurrency::ConcurrencyStrategy,
15 projection::ProjectionError,
16 repository::{Repository, Snapshots},
17 snapshot::{NoSnapshots, SnapshotStore},
18 store::EventStore,
19};
20
21/// Command-side entities that produce domain events.
22///
23/// Aggregates rebuild their state from events (`Apply<E>`) and validate
24/// commands via [`Handle<C>`]. The derive macro generates the event enum and
25/// plumbing automatically, while keeping your state struct focused on domain
26/// behaviour.
27///
28/// Aggregates are domain objects and do not require serialization by default.
29///
30/// If you enable snapshots (via `Repository::with_snapshots`), the aggregate
31/// state must be serializable (`Serialize + DeserializeOwned`).
32// ANCHOR: aggregate_trait
33pub trait Aggregate: Default + Sized {
34 /// Aggregate type identifier used by the event store.
35 ///
36 /// This is combined with the aggregate ID to create stream identifiers.
37 /// Use lowercase, kebab-case for consistency: `"product"`,
38 /// `"user-account"`, etc.
39 const KIND: &'static str;
40
41 type Event;
42 type Error;
43 type Id;
44
45 /// Apply an event to update aggregate state.
46 ///
47 /// This is called during event replay to rebuild aggregate state from
48 /// history.
49 ///
50 /// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
51 /// implementations. For hand-written aggregates, implement this
52 /// directly with a match expression.
53 fn apply(&mut self, event: &Self::Event);
54}
55// ANCHOR_END: aggregate_trait
56
57/// Mutate an aggregate with a domain event.
58///
59/// `Apply<E>` is called while the repository rebuilds aggregate state, keeping
60/// the domain logic focused on pure events rather than persistence concerns.
61///
62/// ```ignore
63/// #[derive(Default)]
64/// struct Account {
65/// balance: i64,
66/// }
67///
68/// impl Apply<FundsDeposited> for Account {
69/// fn apply(&mut self, event: &FundsDeposited) {
70/// self.balance += event.amount;
71/// }
72/// }
73/// ```
74// ANCHOR: apply_trait
75pub trait Apply<E> {
76 fn apply(&mut self, event: &E);
77}
78// ANCHOR_END: apply_trait
79
80/// Entry point for command handling.
81///
82/// Each command type gets its own implementation, letting the aggregate express
83/// validation logic in a strongly typed way.
84///
85/// ```ignore
86/// impl Handle<DepositFunds> for Account {
87/// fn handle(&self, command: &DepositFunds) -> Result<Vec<Self::Event>, Self::Error> {
88/// if command.amount <= 0 {
89/// return Err("amount must be positive".into());
90/// }
91/// Ok(vec![FundsDeposited { amount: command.amount }.into()])
92/// }
93/// }
94/// ```
95// ANCHOR: handle_trait
96pub trait Handle<C>: Aggregate {
97 /// Handle a command and produce events.
98 ///
99 /// # Errors
100 ///
101 /// Returns `Self::Error` if the command is invalid for the current
102 /// aggregate state.
103 fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
104}
105// ANCHOR_END: handle_trait
106
107/// Builder for loading aggregates by ID.
108pub struct AggregateBuilder<'a, R, A> {
109 pub(super) repository: &'a R,
110 pub(super) _phantom: PhantomData<fn() -> A>,
111}
112
113impl<'a, R, A> AggregateBuilder<'a, R, A> {
114 pub(super) const fn new(repository: &'a R) -> Self {
115 Self {
116 repository,
117 _phantom: PhantomData,
118 }
119 }
120}
121
122impl<S, C, A> AggregateBuilder<'_, Repository<S, C, NoSnapshots<S::Id, S::Position>>, A>
123where
124 S: EventStore,
125 C: ConcurrencyStrategy,
126 A: Aggregate<Id = S::Id>,
127{
128 /// Load the aggregate instance by replaying events (no snapshots).
129 ///
130 /// The event kinds to load are automatically determined from the
131 /// aggregate's event type via `ProjectionEvent::EVENT_KINDS`.
132 ///
133 /// # Errors
134 ///
135 /// Returns an error if the store fails to load events or if events cannot
136 /// be deserialized.
137 pub async fn load(
138 self,
139 id: &S::Id,
140 ) -> Result<A, ProjectionError<S::Error, <S::Codec as Codec>::Error>>
141 where
142 A::Event: ProjectionEvent,
143 {
144 self.repository.load::<A>(id).await
145 }
146}
147
148impl<S, SS, C, A> AggregateBuilder<'_, Repository<S, C, Snapshots<SS>>, A>
149where
150 S: EventStore,
151 SS: SnapshotStore<Id = S::Id, Position = S::Position>,
152 C: ConcurrencyStrategy,
153 A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
154{
155 /// Load the aggregate instance using snapshots when available.
156 ///
157 /// The event kinds to load are automatically determined from the
158 /// aggregate's event type via `ProjectionEvent::EVENT_KINDS`.
159 ///
160 /// # Errors
161 ///
162 /// Returns an error if events cannot be deserialized or a stored snapshot
163 /// cannot be deserialized (which indicates snapshot data corruption).
164 pub async fn load(
165 self,
166 id: &S::Id,
167 ) -> Result<A, ProjectionError<S::Error, <S::Codec as Codec>::Error>>
168 where
169 A::Event: ProjectionEvent,
170 {
171 self.repository.load::<A>(id).await
172 }
173}