sourcery_core/
aggregate.rs

1//! Command-side domain primitives.
2//!
3//! This module defines the building blocks for aggregates: state reconstruction
4//! (`Apply`), command handling (`Handle`), and loading (`AggregateBuilder`).
5//! The `#[derive(Aggregate)]` macro lives here to keep domain ergonomics in one
6//! spot.
7
8use std::marker::PhantomData;
9
10use serde::{Serialize, de::DeserializeOwned};
11
12use crate::{
13    codec::{Codec, ProjectionEvent},
14    concurrency::ConcurrencyStrategy,
15    projection::ProjectionError,
16    repository::{Repository, Snapshots},
17    snapshot::{NoSnapshots, SnapshotStore},
18    store::EventStore,
19};
20
21/// Command-side entities that produce domain events.
22///
23/// Aggregates rebuild their state from events (`Apply<E>`) and validate
24/// commands via [`Handle<C>`]. The derive macro generates the event enum and
25/// plumbing automatically, while keeping your state struct focused on domain
26/// behaviour.
27///
28/// Aggregates are domain objects and do not require serialization by default.
29///
30/// If you enable snapshots (via `Repository::with_snapshots`), the aggregate
31/// state must be serializable (`Serialize + DeserializeOwned`).
32// ANCHOR: aggregate_trait
33pub trait Aggregate: Default + Sized {
34    /// Aggregate type identifier used by the event store.
35    ///
36    /// This is combined with the aggregate ID to create stream identifiers.
37    /// Use lowercase, kebab-case for consistency: `"product"`,
38    /// `"user-account"`, etc.
39    const KIND: &'static str;
40
41    type Event;
42    type Error;
43    type Id;
44
45    /// Apply an event to update aggregate state.
46    ///
47    /// This is called during event replay to rebuild aggregate state from
48    /// history.
49    ///
50    /// When using `#[derive(Aggregate)]`, this dispatches to your `Apply<E>`
51    /// implementations. For hand-written aggregates, implement this
52    /// directly with a match expression.
53    fn apply(&mut self, event: &Self::Event);
54}
55// ANCHOR_END: aggregate_trait
56
57/// Mutate an aggregate with a domain event.
58///
59/// `Apply<E>` is called while the repository rebuilds aggregate state, keeping
60/// the domain logic focused on pure events rather than persistence concerns.
61///
62/// ```ignore
63/// #[derive(Default)]
64/// struct Account {
65///     balance: i64,
66/// }
67///
68/// impl Apply<FundsDeposited> for Account {
69///     fn apply(&mut self, event: &FundsDeposited) {
70///         self.balance += event.amount;
71///     }
72/// }
73/// ```
74// ANCHOR: apply_trait
75pub trait Apply<E> {
76    fn apply(&mut self, event: &E);
77}
78// ANCHOR_END: apply_trait
79
80/// Entry point for command handling.
81///
82/// Each command type gets its own implementation, letting the aggregate express
83/// validation logic in a strongly typed way.
84///
85/// ```ignore
86/// impl Handle<DepositFunds> for Account {
87///     fn handle(&self, command: &DepositFunds) -> Result<Vec<Self::Event>, Self::Error> {
88///         if command.amount <= 0 {
89///             return Err("amount must be positive".into());
90///         }
91///         Ok(vec![FundsDeposited { amount: command.amount }.into()])
92///     }
93/// }
94/// ```
95// ANCHOR: handle_trait
96pub trait Handle<C>: Aggregate {
97    /// Handle a command and produce events.
98    ///
99    /// # Errors
100    ///
101    /// Returns `Self::Error` if the command is invalid for the current
102    /// aggregate state.
103    fn handle(&self, command: &C) -> Result<Vec<Self::Event>, Self::Error>;
104}
105// ANCHOR_END: handle_trait
106
107/// Builder for loading aggregates by ID.
108pub struct AggregateBuilder<'a, R, A> {
109    pub(super) repository: &'a R,
110    pub(super) _phantom: PhantomData<fn() -> A>,
111}
112
113impl<'a, R, A> AggregateBuilder<'a, R, A> {
114    pub(super) const fn new(repository: &'a R) -> Self {
115        Self {
116            repository,
117            _phantom: PhantomData,
118        }
119    }
120}
121
122impl<S, C, A> AggregateBuilder<'_, Repository<S, C, NoSnapshots<S::Id, S::Position>>, A>
123where
124    S: EventStore,
125    C: ConcurrencyStrategy,
126    A: Aggregate<Id = S::Id>,
127{
128    /// Load the aggregate instance by replaying events (no snapshots).
129    ///
130    /// The event kinds to load are automatically determined from the
131    /// aggregate's event type via `ProjectionEvent::EVENT_KINDS`.
132    ///
133    /// # Errors
134    ///
135    /// Returns an error if the store fails to load events or if events cannot
136    /// be deserialized.
137    pub async fn load(
138        self,
139        id: &S::Id,
140    ) -> Result<A, ProjectionError<S::Error, <S::Codec as Codec>::Error>>
141    where
142        A::Event: ProjectionEvent,
143    {
144        self.repository.load::<A>(id).await
145    }
146}
147
148impl<S, SS, C, A> AggregateBuilder<'_, Repository<S, C, Snapshots<SS>>, A>
149where
150    S: EventStore,
151    SS: SnapshotStore<Id = S::Id, Position = S::Position>,
152    C: ConcurrencyStrategy,
153    A: Aggregate<Id = S::Id> + Serialize + DeserializeOwned,
154{
155    /// Load the aggregate instance using snapshots when available.
156    ///
157    /// The event kinds to load are automatically determined from the
158    /// aggregate's event type via `ProjectionEvent::EVENT_KINDS`.
159    ///
160    /// # Errors
161    ///
162    /// Returns an error if events cannot be deserialized or a stored snapshot
163    /// cannot be deserialized (which indicates snapshot data corruption).
164    pub async fn load(
165        self,
166        id: &S::Id,
167    ) -> Result<A, ProjectionError<S::Error, <S::Codec as Codec>::Error>>
168    where
169        A::Event: ProjectionEvent,
170    {
171        self.repository.load::<A>(id).await
172    }
173}