sourcery_core/snapshot.rs
1//! Snapshot support for optimized aggregate loading.
2//!
3//! Snapshots persist aggregate state at a point in time, reducing the number of
4//! events that need to be replayed when loading an aggregate. This module
5//! provides:
6//!
7//! - [`Snapshot`] - Point-in-time aggregate state
8//! - [`SnapshotStore`] - Trait for snapshot persistence with policy
9//! - [`NoSnapshots`] - No-op implementation (use `Repository` instead if you
10//! want no snapshots)
11//! - [`InMemorySnapshotStore`] - Reference implementation with configurable
12//! policy
13
14use std::{collections::HashMap, convert::Infallible, future::Future};
15
16use crate::store::StreamKey;
17
18/// Point-in-time snapshot of aggregate state.
19///
20/// The `position` field indicates the event stream position when this snapshot
21/// was taken. When loading an aggregate, only events after this position need
22/// to be replayed.
23///
24/// Schema evolution is handled at the serialization layer (e.g., via
25/// `serde_evolve`), so no version field is needed here.
26///
27/// # Type Parameters
28///
29/// - `Pos`: The position type used by the event store (e.g., `u64`, `i64`,
30/// etc.)
31#[derive(Clone, Debug)]
32pub struct Snapshot<Pos> {
33 /// Event position when this snapshot was taken.
34 pub position: Pos,
35 /// Serialized aggregate state.
36 pub data: Vec<u8>,
37}
38
39/// Trait for snapshot persistence with built-in policy.
40///
41/// Implementations decide both *how* to store snapshots and *when* to store
42/// them. The repository calls [`offer_snapshot`](SnapshotStore::offer_snapshot)
43/// after each successful command execution to decide whether to create and
44/// persist a new snapshot.
45///
46/// # Example Implementations
47///
48/// - Always save: useful for aggregates with expensive replay
49/// - Every N events: balance between storage and replay cost
50/// - Never save: read-only replicas that only load snapshots created elsewhere
51// ANCHOR: snapshot_store_trait
52pub trait SnapshotStore: Send + Sync {
53 /// Aggregate identifier type.
54 ///
55 /// Must match the `EventStore::Id` type used in the same repository.
56 type Id: Send + Sync + 'static;
57
58 /// Position type for tracking snapshot positions.
59 ///
60 /// Must match the `EventStore::Position` type used in the same repository.
61 type Position: Send + Sync + 'static;
62
63 /// Error type for snapshot operations.
64 type Error: std::error::Error + Send + Sync + 'static;
65
66 /// Load the most recent snapshot for an aggregate.
67 ///
68 /// Returns `Ok(None)` if no snapshot exists.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if the underlying storage fails.
73 fn load<'a>(
74 &'a self,
75 aggregate_kind: &'a str,
76 aggregate_id: &'a Self::Id,
77 ) -> impl Future<Output = Result<Option<Snapshot<Self::Position>>, Self::Error>> + Send + 'a;
78
79 /// Whether to store a snapshot, with lazy snapshot creation.
80 ///
81 /// The repository calls this after successfully appending new events,
82 /// passing `events_since_last_snapshot` and a `create_snapshot`
83 /// callback. Implementations may decline without invoking
84 /// `create_snapshot`, avoiding unnecessary snapshot creation cost
85 /// (serialization, extra I/O, etc.).
86 ///
87 /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
88 /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
89 /// snapshot was stored.
90 ///
91 /// # Errors
92 ///
93 /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
94 /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
95 fn offer_snapshot<'a, CE, Create>(
96 &'a mut self,
97 aggregate_kind: &'a str,
98 aggregate_id: &'a Self::Id,
99 events_since_last_snapshot: u64,
100 create_snapshot: Create,
101 ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
102 where
103 CE: std::error::Error + Send + Sync + 'static,
104 Create: FnOnce() -> Result<Snapshot<Self::Position>, CE> + 'a;
105}
106// ANCHOR_END: snapshot_store_trait
107
108/// Result of offering a snapshot to a store.
109#[derive(Clone, Copy, Debug, Eq, PartialEq)]
110pub enum SnapshotOffer {
111 /// The snapshot store declined to store a snapshot.
112 Declined,
113 /// The snapshot store stored the snapshot.
114 Stored,
115}
116
117/// Error returned by [`SnapshotStore::offer_snapshot`].
118#[derive(Debug, thiserror::Error)]
119pub enum OfferSnapshotError<SnapshotError, CreateError>
120where
121 SnapshotError: std::error::Error + 'static,
122 CreateError: std::error::Error + 'static,
123{
124 /// Snapshot creation failed (e.g., serialization, extra I/O, etc.).
125 #[error("failed to create snapshot: {0}")]
126 Create(#[source] CreateError),
127 /// Snapshot persistence failed.
128 #[error("snapshot operation failed: {0}")]
129 Snapshot(#[source] SnapshotError),
130}
131
132/// No-op snapshot store for backwards compatibility.
133///
134/// This implementation:
135/// - Always returns `None` from `load()`
136/// - Silently discards all offered snapshots
137///
138/// Use this as the default when snapshots are not needed.
139///
140/// Generic over `Id` and `Pos` to match the `EventStore` types.
141#[derive(Clone, Debug, Default)]
142pub struct NoSnapshots<Id, Pos>(std::marker::PhantomData<(Id, Pos)>);
143
144impl<Id, Pos> NoSnapshots<Id, Pos> {
145 /// Create a new no-op snapshot store.
146 #[must_use]
147 pub const fn new() -> Self {
148 Self(std::marker::PhantomData)
149 }
150}
151
152impl<Id, Pos> SnapshotStore for NoSnapshots<Id, Pos>
153where
154 Id: Send + Sync + 'static,
155 Pos: Send + Sync + 'static,
156{
157 type Error = Infallible;
158 type Id = Id;
159 type Position = Pos;
160
161 fn load<'a>(
162 &'a self,
163 _aggregate_kind: &'a str,
164 _aggregate_id: &'a Self::Id,
165 ) -> impl Future<Output = Result<Option<Snapshot<Pos>>, Self::Error>> + Send + 'a {
166 std::future::ready(Ok(None))
167 }
168
169 fn offer_snapshot<'a, CE, Create>(
170 &'a mut self,
171 _aggregate_kind: &'a str,
172 _aggregate_id: &'a Self::Id,
173 _events_since_last_snapshot: u64,
174 _create_snapshot: Create,
175 ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
176 where
177 CE: std::error::Error + Send + Sync + 'static,
178 Create: FnOnce() -> Result<Snapshot<Pos>, CE> + 'a,
179 {
180 std::future::ready(Ok(SnapshotOffer::Declined))
181 }
182}
183
184/// Snapshot creation policy.
185///
186/// # Choosing a Policy
187///
188/// The right policy depends on your aggregate's characteristics:
189///
190/// | Policy | Best For | Trade-off |
191/// |--------|----------|-----------|
192/// | `Always` | Expensive replay, low write volume | Storage cost per command |
193/// | `EveryNEvents(n)` | Most use cases | Balanced storage vs replay |
194/// | `Never` | Read replicas, external snapshot management | Full replay every load |
195///
196/// ## `Always`
197///
198/// Creates a snapshot after every command. Best for aggregates where:
199/// - Event replay is computationally expensive
200/// - Aggregates have many events (100+)
201/// - Read latency is more important than write overhead
202/// - Write volume is relatively low
203///
204/// ## `EveryNEvents(n)`
205///
206/// Creates a snapshot every N events. Recommended for most use cases.
207/// - Start with `n = 50-100` and tune based on profiling
208/// - Balances storage cost against replay time
209/// - Works well for aggregates with moderate event counts
210///
211/// ## `Never`
212///
213/// Never creates snapshots. Use when:
214/// - Running a read replica that consumes snapshots created elsewhere
215/// - Aggregates are short-lived (few events per instance)
216/// - Managing snapshots through an external process
217/// - Testing without snapshot overhead
218#[derive(Clone, Debug)]
219enum SnapshotPolicy {
220 /// Create a snapshot after every command.
221 Always,
222 /// Create a snapshot every N events.
223 EveryNEvents(u64),
224 /// Never create snapshots (load-only mode).
225 Never,
226}
227
228impl SnapshotPolicy {
229 const fn should_snapshot(&self, events_since: u64) -> bool {
230 match self {
231 Self::Always => true,
232 Self::EveryNEvents(threshold) => events_since >= *threshold,
233 Self::Never => false,
234 }
235 }
236}
237
238/// In-memory snapshot store with configurable policy.
239///
240/// This is a reference implementation suitable for testing and development.
241/// Production systems should implement [`SnapshotStore`] with durable storage.
242///
243/// Generic over `Id` and `Pos` to match the `EventStore` types.
244///
245/// # Example
246///
247/// ```ignore
248/// use sourcery::{Repository, InMemoryEventStore, InMemorySnapshotStore, JsonCodec};
249///
250/// let repo = Repository::new(InMemoryEventStore::new(JsonCodec))
251/// .with_snapshots(InMemorySnapshotStore::every(100));
252/// ```
253#[derive(Clone, Debug)]
254pub struct InMemorySnapshotStore<Id, Pos> {
255 snapshots: HashMap<StreamKey<Id>, Snapshot<Pos>>,
256 policy: SnapshotPolicy,
257}
258
259impl<Id, Pos> InMemorySnapshotStore<Id, Pos> {
260 /// Create a snapshot store that saves after every command.
261 ///
262 /// Best for aggregates with expensive replay or many events.
263 /// See the policy guidelines above for choosing an appropriate cadence.
264 #[must_use]
265 pub fn always() -> Self {
266 Self {
267 snapshots: HashMap::new(),
268 policy: SnapshotPolicy::Always,
269 }
270 }
271
272 /// Create a snapshot store that saves every N events.
273 ///
274 /// Recommended for most use cases. Start with `n = 50-100` and tune
275 /// based on your aggregate's replay cost.
276 /// See the policy guidelines above for choosing a policy.
277 #[must_use]
278 pub fn every(n: u64) -> Self {
279 Self {
280 snapshots: HashMap::new(),
281 policy: SnapshotPolicy::EveryNEvents(n),
282 }
283 }
284
285 /// Create a snapshot store that never saves (load-only).
286 ///
287 /// Use for read replicas, short-lived aggregates, or when managing
288 /// snapshots externally. See the policy guidelines above for when this
289 /// fits.
290 #[must_use]
291 pub fn never() -> Self {
292 Self {
293 snapshots: HashMap::new(),
294 policy: SnapshotPolicy::Never,
295 }
296 }
297}
298
299impl<Id, Pos> Default for InMemorySnapshotStore<Id, Pos> {
300 fn default() -> Self {
301 Self::always()
302 }
303}
304
305impl<Id, Pos> SnapshotStore for InMemorySnapshotStore<Id, Pos>
306where
307 Id: Clone + Eq + std::hash::Hash + Send + Sync + 'static,
308 Pos: Clone + Send + Sync + 'static,
309{
310 type Error = Infallible;
311 type Id = Id;
312 type Position = Pos;
313
314 #[tracing::instrument(skip(self, aggregate_id))]
315 fn load<'a>(
316 &'a self,
317 aggregate_kind: &'a str,
318 aggregate_id: &'a Self::Id,
319 ) -> impl Future<Output = Result<Option<Snapshot<Pos>>, Self::Error>> + Send + 'a {
320 let key = StreamKey::new(aggregate_kind, aggregate_id.clone());
321 let snapshot = self.snapshots.get(&key).cloned();
322 tracing::trace!(found = snapshot.is_some(), "snapshot lookup");
323 std::future::ready(Ok(snapshot))
324 }
325
326 #[tracing::instrument(skip(self, aggregate_id, create_snapshot))]
327 fn offer_snapshot<'a, CE, Create>(
328 &'a mut self,
329 aggregate_kind: &'a str,
330 aggregate_id: &'a Self::Id,
331 events_since_last_snapshot: u64,
332 create_snapshot: Create,
333 ) -> impl Future<Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>> + Send + 'a
334 where
335 CE: std::error::Error + Send + Sync + 'static,
336 Create: FnOnce() -> Result<Snapshot<Pos>, CE> + 'a,
337 {
338 if !self.policy.should_snapshot(events_since_last_snapshot) {
339 return std::future::ready(Ok(SnapshotOffer::Declined));
340 }
341
342 let snapshot = match create_snapshot() {
343 Ok(snapshot) => snapshot,
344 Err(e) => return std::future::ready(Err(OfferSnapshotError::Create(e))),
345 };
346
347 let key = StreamKey::new(aggregate_kind, aggregate_id.clone());
348 self.snapshots.insert(key, snapshot);
349 tracing::debug!(events_since_last_snapshot, "snapshot saved");
350 std::future::ready(Ok(SnapshotOffer::Stored))
351 }
352}