sourcery_core/snapshot.rs
1//! Snapshot support for optimised aggregate loading.
2//!
3//! Snapshots persist aggregate state at a point in time, reducing the number of
4//! events that need to be replayed when loading an aggregate. This module
5//! provides:
6//!
7//! - [`Snapshot`] - Point-in-time aggregate state
8//! - [`SnapshotStore`] - Trait for snapshot persistence with policy
9//! - [`NoSnapshots`] - No-op implementation (use `Repository` instead if you
10//! want no snapshots)
11//! - [`inmemory`] - In-memory reference implementation with configurable policy
12
13use std::convert::Infallible;
14
15use serde::{Serialize, de::DeserializeOwned};
16
17pub mod inmemory;
18/// Point-in-time snapshot of aggregate state.
19///
20/// The `position` field indicates the event stream position when this snapshot
21/// was taken. When loading an aggregate, only events after this position need
22/// to be replayed.
23///
24/// Schema evolution is handled at the serialisation layer (e.g., via
25/// `serde_evolve`), so no version field is needed here.
26///
27/// # Type Parameters
28///
29/// - `Pos`: The position type used by the event store (e.g., `u64`, `i64`,
30/// etc.)
31/// - `Data`: The snapshot payload type.
32#[derive(Clone, Debug)]
33pub struct Snapshot<Pos, Data> {
34 /// Event position when this snapshot was taken.
35 pub position: Pos,
36 /// Snapshot payload.
37 pub data: Data,
38}
39
40/// Trait for snapshot persistence with built-in policy.
41///
42/// Implementations decide both *how* to store snapshots and *when* to store
43/// them. The repository calls [`offer_snapshot`](SnapshotStore::offer_snapshot)
44/// after each successful command execution to decide whether to create and
45/// persist a new snapshot.
46///
47/// # Example Implementations
48///
49/// - Always save: useful for aggregates with expensive replay
50/// - Every N events: balance between storage and replay cost
51/// - Never save: read-only replicas that only load snapshots created elsewhere
52// ANCHOR: snapshot_store_trait
53pub trait SnapshotStore<Id: Sync>: Send + Sync {
54 /// Position type for tracking snapshot positions.
55 ///
56 /// Must match the `EventStore::Position` type used in the same repository.
57 type Position: Send + Sync;
58
59 /// Error type for snapshot operations.
60 type Error: std::error::Error + Send + Sync + 'static;
61
62 /// Load the most recent snapshot for an aggregate.
63 ///
64 /// Returns `Ok(None)` if no snapshot exists.
65 ///
66 /// # Errors
67 ///
68 /// Returns an error if the underlying storage fails.
69 fn load<T>(
70 &self,
71 kind: &str,
72 id: &Id,
73 ) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
74 where
75 T: DeserializeOwned;
76
77 /// Whether to store a snapshot, with lazy snapshot creation.
78 ///
79 /// The repository calls this after successfully appending new events,
80 /// passing `events_since_last_snapshot` and a `create_snapshot`
81 /// callback. Implementations may decline without invoking
82 /// `create_snapshot`, avoiding unnecessary snapshot creation cost
83 /// (serialisation, extra I/O, etc.).
84 ///
85 /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
86 /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
87 /// snapshot was stored.
88 ///
89 /// # Errors
90 ///
91 /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
92 /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
93 fn offer_snapshot<CE, T, Create>(
94 &self,
95 kind: &str,
96 id: &Id,
97 events_since_last_snapshot: u64,
98 create_snapshot: Create,
99 ) -> impl std::future::Future<
100 Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
101 > + Send
102 where
103 CE: std::error::Error + Send + Sync + 'static,
104 T: Serialize,
105 Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
106}
107// ANCHOR_END: snapshot_store_trait
108
109/// Result of offering a snapshot to a store.
110#[derive(Clone, Copy, Debug, Eq, PartialEq)]
111pub enum SnapshotOffer {
112 /// The snapshot store declined to store a snapshot.
113 Declined,
114 /// The snapshot store stored the snapshot.
115 Stored,
116}
117
118/// Error returned by [`SnapshotStore::offer_snapshot`].
119#[derive(Debug, thiserror::Error)]
120pub enum OfferSnapshotError<SnapshotError, CreateError>
121where
122 SnapshotError: std::error::Error + 'static,
123 CreateError: std::error::Error + 'static,
124{
125 /// Snapshot creation failed (e.g., serialisation, extra I/O, etc.).
126 #[error("failed to create snapshot: {0}")]
127 Create(#[source] CreateError),
128 /// Snapshot persistence failed.
129 #[error("snapshot operation failed: {0}")]
130 Snapshot(#[source] SnapshotError),
131}
132
133/// No-op snapshot store for backwards compatibility.
134///
135/// This implementation:
136/// - Always returns `None` from `load()`
137/// - Silently discards all offered snapshots
138///
139/// Use this as the default when snapshots are not needed.
140///
141/// Generic over `Pos` to match the `EventStore` position type.
142#[derive(Clone, Debug, Default)]
143pub struct NoSnapshots<Pos>(std::marker::PhantomData<Pos>);
144
145impl<Pos> NoSnapshots<Pos> {
146 /// Create a new no-op snapshot store.
147 #[must_use]
148 pub const fn new() -> Self {
149 Self(std::marker::PhantomData)
150 }
151}
152
153impl<Id, Pos> SnapshotStore<Id> for NoSnapshots<Pos>
154where
155 Id: Send + Sync,
156 Pos: Send + Sync,
157{
158 type Error = Infallible;
159 type Position = Pos;
160
161 async fn load<T>(&self, _kind: &str, _id: &Id) -> Result<Option<Snapshot<Pos, T>>, Self::Error>
162 where
163 T: DeserializeOwned,
164 {
165 Ok(None)
166 }
167
168 async fn offer_snapshot<CE, T, Create>(
169 &self,
170 _kind: &str,
171 _id: &Id,
172 _events_since_last_snapshot: u64,
173 _create_snapshot: Create,
174 ) -> Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>
175 where
176 CE: std::error::Error + Send + Sync + 'static,
177 T: Serialize,
178 Create: FnOnce() -> Result<Snapshot<Pos, T>, CE>,
179 {
180 Ok(SnapshotOffer::Declined)
181 }
182}
183
184#[cfg(test)]
185mod tests {
186 use std::{error::Error, io};
187
188 use super::*;
189
190 #[tokio::test]
191 async fn no_snapshots_load_returns_none() {
192 let store = NoSnapshots::<u64>::new();
193 let result: Option<Snapshot<u64, String>> = store.load("test", &"id").await.unwrap();
194 assert!(result.is_none());
195 }
196
197 #[tokio::test]
198 async fn no_snapshots_offer_declines() {
199 let store = NoSnapshots::<u64>::new();
200 let result = store
201 .offer_snapshot::<io::Error, _, _>("test", &"id", 100, || {
202 Ok(Snapshot {
203 position: 1,
204 data: "data",
205 })
206 })
207 .await
208 .unwrap();
209
210 assert_eq!(result, SnapshotOffer::Declined);
211 }
212
213 #[test]
214 fn offer_error_create_displays_source() {
215 let err: OfferSnapshotError<io::Error, io::Error> =
216 OfferSnapshotError::Create(io::Error::other("create failed"));
217 let msg = err.to_string();
218 assert!(msg.contains("failed to create snapshot"));
219 assert!(err.source().is_some());
220 }
221
222 #[test]
223 fn offer_error_snapshot_displays_source() {
224 let err: OfferSnapshotError<io::Error, io::Error> =
225 OfferSnapshotError::Snapshot(io::Error::other("snapshot failed"));
226 let msg = err.to_string();
227 assert!(msg.contains("snapshot operation failed"));
228 assert!(err.source().is_some());
229 }
230}