sourcery_core/
snapshot.rs

1//! Snapshot support for optimized aggregate loading.
2//!
3//! Snapshots persist aggregate state at a point in time, reducing the number of
4//! events that need to be replayed when loading an aggregate. This module
5//! provides:
6//!
7//! - [`Snapshot`] - Point-in-time aggregate state
8//! - [`SnapshotStore`] - Trait for snapshot persistence with policy
9//! - [`NoSnapshots`] - No-op implementation (use `Repository` instead if you
10//!   want no snapshots)
11//! - [`InMemorySnapshotStore`] - Reference implementation with configurable
12//!   policy
13
14use std::{collections::HashMap, convert::Infallible, future::Future};
15
16use crate::store::StreamKey;
17
18/// Point-in-time snapshot of aggregate state.
19///
20/// The `position` field indicates the event stream position when this snapshot
21/// was taken. When loading an aggregate, only events after this position need
22/// to be replayed.
23///
24/// Schema evolution is handled at the serialization layer (e.g., via
25/// `serde_evolve`), so no version field is needed here.
26///
27/// # Type Parameters
28///
29/// - `Pos`: The position type used by the event store (e.g., `u64`, `i64`,
30///   etc.)
31#[derive(Clone, Debug)]
32pub struct Snapshot<Pos> {
33    /// Event position when this snapshot was taken.
34    pub position: Pos,
35    /// Serialized aggregate state.
36    pub data: Vec<u8>,
37}
38
39/// Trait for snapshot persistence with built-in policy.
40///
41/// Implementations decide both *how* to store snapshots and *when* to store
42/// them. The repository calls [`offer_snapshot`](SnapshotStore::offer_snapshot)
43/// after each successful command execution to decide whether to create and
44/// persist a new snapshot.
45///
46/// # Example Implementations
47///
48/// - Always save: useful for aggregates with expensive replay
49/// - Every N events: balance between storage and replay cost
50/// - Never save: read-only replicas that only load snapshots created elsewhere
51// ANCHOR: snapshot_store_trait
52pub trait SnapshotStore: Send + Sync {
53    /// Aggregate identifier type.
54    ///
55    /// Must match the `EventStore::Id` type used in the same repository.
56    type Id: Send + Sync + 'static;
57
58    /// Position type for tracking snapshot positions.
59    ///
60    /// Must match the `EventStore::Position` type used in the same repository.
61    type Position: Send + Sync + 'static;
62
63    /// Error type for snapshot operations.
64    type Error: std::error::Error + Send + Sync + 'static;
65
66    /// Load the most recent snapshot for an aggregate.
67    ///
68    /// Returns `Ok(None)` if no snapshot exists.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if the underlying storage fails.
73    fn load<'a>(
74        &'a self,
75        aggregate_kind: &'a str,
76        aggregate_id: &'a Self::Id,
77    ) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;
78
79    /// Whether to store a snapshot, with lazy snapshot creation.
80    ///
81    /// The repository calls this after successfully appending new events,
82    /// passing `events_since_last_snapshot` and a `create_snapshot`
83    /// callback. Implementations may decline without invoking
84    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
85    /// (serialization, extra I/O, etc.).
86    ///
87    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
88    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
89    /// snapshot was stored.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
94    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
95    fn offer_snapshot<'a, CE, Create>(
96        &'a mut self,
97        aggregate_kind: &'a str,
98        aggregate_id: &'a Self::Id,
99        events_since_last_snapshot: u64,
100        create_snapshot: Create,
101    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
102    where
103        CE: std::error::Error + Send + Sync + 'static,
104        Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
105}
106// ANCHOR_END: snapshot_store_trait
107
108/// Result of offering a snapshot to a store.
109#[derive(Clone, Copy, Debug, Eq, PartialEq)]
110pub enum SnapshotOffer {
111    /// The snapshot store declined to store a snapshot.
112    Declined,
113    /// The snapshot store stored the snapshot.
114    Stored,
115}
116
117/// Error returned by [`SnapshotStore::offer_snapshot`].
118#[derive(Debug, thiserror::Error)]
119pub enum OfferSnapshotError<SnapshotError, CreateError>
120where
121    SnapshotError: std::error::Error + 'static,
122    CreateError: std::error::Error + 'static,
123{
124    /// Snapshot creation failed (e.g., serialization, extra I/O, etc.).
125    #[error("failed to create snapshot: {0}")]
126    Create(#[source] CreateError),
127    /// Snapshot persistence failed.
128    #[error("snapshot operation failed: {0}")]
129    Snapshot(#[source] SnapshotError),
130}
131
132/// No-op snapshot store for backwards compatibility.
133///
134/// This implementation:
135/// - Always returns `None` from `load()`
136/// - Silently discards all offered snapshots
137///
138/// Use this as the default when snapshots are not needed.
139///
140/// Generic over `Id` and `Pos` to match the `EventStore` types.
141#[derive(Clone, Debug, Default)]
142pub struct NoSnapshots<Id, Pos>(std::marker::PhantomData<(Id, Pos)>);
143
144impl<Id, Pos> NoSnapshots<Id, Pos> {
145    /// Create a new no-op snapshot store.
146    #[must_use]
147    pub const fn new() -> Self {
148        Self(std::marker::PhantomData)
149    }
150}
151
152impl<Id, Pos> SnapshotStore for NoSnapshots<Id, Pos>
153where
154    Id: Send + Sync + 'static,
155    Pos: Send + Sync + 'static,
156{
157    type Error = Infallible;
158    type Id = Id;
159    type Position = Pos;
160
161    fn load<'a>(
162        &'a self,
163        _aggregate_kind: &'a str,
164        _aggregate_id: &'a Self::Id,
165    ) -> impl Future<Output = Result<Option<Snapshot<Pos>>, Self::Error>> + Send + 'a {
166        std::future::ready(Ok(None))
167    }
168
169    fn offer_snapshot<'a, CE, Create>(
170        &'a mut self,
171        _aggregate_kind: &'a str,
172        _aggregate_id: &'a Self::Id,
173        _events_since_last_snapshot: u64,
174        _create_snapshot: Create,
175    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
176    where
177        CE: std::error::Error + Send + Sync + 'static,
178        Create: FnOnce() -> Result<Snapshot<Pos>, CE> + 'a,
179    {
180        std::future::ready(Ok(SnapshotOffer::Declined))
181    }
182}
183
184/// Snapshot creation policy.
185///
186/// # Choosing a Policy
187///
188/// The right policy depends on your aggregate's characteristics:
189///
190/// | Policy | Best For | Trade-off |
191/// |--------|----------|-----------|
192/// | `Always` | Expensive replay, low write volume | Storage cost per command |
193/// | `EveryNEvents(n)` | Most use cases | Balanced storage vs replay |
194/// | `Never` | Read replicas, external snapshot management | Full replay every load |
195///
196/// ## `Always`
197///
198/// Creates a snapshot after every command. Best for aggregates where:
199/// - Event replay is computationally expensive
200/// - Aggregates have many events (100+)
201/// - Read latency is more important than write overhead
202/// - Write volume is relatively low
203///
204/// ## `EveryNEvents(n)`
205///
206/// Creates a snapshot every N events. Recommended for most use cases.
207/// - Start with `n = 50-100` and tune based on profiling
208/// - Balances storage cost against replay time
209/// - Works well for aggregates with moderate event counts
210///
211/// ## `Never`
212///
213/// Never creates snapshots. Use when:
214/// - Running a read replica that consumes snapshots created elsewhere
215/// - Aggregates are short-lived (few events per instance)
216/// - Managing snapshots through an external process
217/// - Testing without snapshot overhead
218#[derive(Clone, Debug)]
219enum SnapshotPolicy {
220    /// Create a snapshot after every command.
221    Always,
222    /// Create a snapshot every N events.
223    EveryNEvents(u64),
224    /// Never create snapshots (load-only mode).
225    Never,
226}
227
228impl SnapshotPolicy {
229    const fn should_snapshot(&self, events_since: u64) -> bool {
230        match self {
231            Self::Always => true,
232            Self::EveryNEvents(threshold) => events_since >= *threshold,
233            Self::Never => false,
234        }
235    }
236}
237
238/// In-memory snapshot store with configurable policy.
239///
240/// This is a reference implementation suitable for testing and development.
241/// Production systems should implement [`SnapshotStore`] with durable storage.
242///
243/// Generic over `Id` and `Pos` to match the `EventStore` types.
244///
245/// # Example
246///
247/// ```ignore
248/// use sourcery::{Repository, InMemoryEventStore, InMemorySnapshotStore, JsonCodec};
249///
250/// let repo = Repository::new(InMemoryEventStore::new(JsonCodec))
251///     .with_snapshots(InMemorySnapshotStore::every(100));
252/// ```
253#[derive(Clone, Debug)]
254pub struct InMemorySnapshotStore<Id, Pos> {
255    snapshots: HashMap<StreamKey<Id>, Snapshot<Pos>>,
256    policy: SnapshotPolicy,
257}
258
259impl<Id, Pos> InMemorySnapshotStore<Id, Pos> {
260    /// Create a snapshot store that saves after every command.
261    ///
262    /// Best for aggregates with expensive replay or many events.
263    /// See the policy guidelines above for choosing an appropriate cadence.
264    #[must_use]
265    pub fn always() -> Self {
266        Self {
267            snapshots: HashMap::new(),
268            policy: SnapshotPolicy::Always,
269        }
270    }
271
272    /// Create a snapshot store that saves every N events.
273    ///
274    /// Recommended for most use cases. Start with `n = 50-100` and tune
275    /// based on your aggregate's replay cost.
276    /// See the policy guidelines above for choosing a policy.
277    #[must_use]
278    pub fn every(n: u64) -> Self {
279        Self {
280            snapshots: HashMap::new(),
281            policy: SnapshotPolicy::EveryNEvents(n),
282        }
283    }
284
285    /// Create a snapshot store that never saves (load-only).
286    ///
287    /// Use for read replicas, short-lived aggregates, or when managing
288    /// snapshots externally. See the policy guidelines above for when this
289    /// fits.
290    #[must_use]
291    pub fn never() -> Self {
292        Self {
293            snapshots: HashMap::new(),
294            policy: SnapshotPolicy::Never,
295        }
296    }
297}
298
299impl<Id, Pos> Default for InMemorySnapshotStore<Id, Pos> {
300    fn default() -> Self {
301        Self::always()
302    }
303}
304
305impl<Id, Pos> SnapshotStore for InMemorySnapshotStore<Id, Pos>
306where
307    Id: Clone + Eq + std::hash::Hash + Send + Sync + 'static,
308    Pos: Clone + Send + Sync + 'static,
309{
310    type Error = Infallible;
311    type Id = Id;
312    type Position = Pos;
313
314    #[tracing::instrument(skip(self, aggregate_id))]
315    fn load<'a>(
316        &'a self,
317        aggregate_kind: &'a str,
318        aggregate_id: &'a Self::Id,
319    ) -> impl Future<Output = Result<Option<Snapshot<Pos>>, Self::Error>> + Send + 'a {
320        let key = StreamKey::new(aggregate_kind, aggregate_id.clone());
321        let snapshot = self.snapshots.get(&key).cloned();
322        tracing::trace!(found = snapshot.is_some(), "snapshot lookup");
323        std::future::ready(Ok(snapshot))
324    }
325
326    #[tracing::instrument(skip(self, aggregate_id, create_snapshot))]
327    fn offer_snapshot<'a, CE, Create>(
328        &'a mut self,
329        aggregate_kind: &'a str,
330        aggregate_id: &'a Self::Id,
331        events_since_last_snapshot: u64,
332        create_snapshot: Create,
333    ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
334    where
335        CE: std::error::Error + Send + Sync + 'static,
336        Create: FnOnce() -> Result<Snapshot<Pos>, CE> + 'a,
337    {
338        if !self.policy.should_snapshot(events_since_last_snapshot) {
339            return std::future::ready(Ok(SnapshotOffer::Declined));
340        }
341
342        let snapshot = match create_snapshot() {
343            Ok(snapshot) => snapshot,
344            Err(e) => return std::future::ready(Err(OfferSnapshotError::Create(e))),
345        };
346
347        let key = StreamKey::new(aggregate_kind, aggregate_id.clone());
348        self.snapshots.insert(key, snapshot);
349        tracing::debug!(events_since_last_snapshot, "snapshot saved");
350        std::future::ready(Ok(SnapshotOffer::Stored))
351    }
352}