Skip to main content

sourcery_core/
snapshot.rs

1//! Snapshot support for optimised aggregate loading.
2//!
3//! Snapshots persist aggregate state at a point in time, reducing the number of
4//! events that need to be replayed when loading an aggregate. This module
5//! provides:
6//!
7//! - [`Snapshot`] - Point-in-time aggregate state
8//! - [`SnapshotStore`] - Trait for snapshot persistence with policy
9//! - [`NoSnapshots`] - No-op implementation (use `Repository` instead if you
10//!   want no snapshots)
11//! - [`inmemory`] - In-memory reference implementation with configurable policy
12
13use std::convert::Infallible;
14
15use serde::{Serialize, de::DeserializeOwned};
16
17pub mod inmemory;
18/// Point-in-time snapshot of aggregate state.
19///
20/// The `position` field indicates the event stream position when this snapshot
21/// was taken. When loading an aggregate, only events after this position need
22/// to be replayed.
23///
24/// Schema evolution is handled at the serialisation layer (e.g., via
25/// `serde_evolve`), so no version field is needed here.
26///
27/// # Type Parameters
28///
29/// - `Pos`: The position type used by the event store (e.g., `u64`, `i64`,
30///   etc.)
31/// - `Data`: The snapshot payload type.
32#[derive(Clone, Debug)]
33pub struct Snapshot<Pos, Data> {
34    /// Event position when this snapshot was taken.
35    pub position: Pos,
36    /// Snapshot payload.
37    pub data: Data,
38}
39
40/// Trait for snapshot persistence with built-in policy.
41///
42/// Implementations decide both *how* to store snapshots and *when* to store
43/// them. The repository calls [`offer_snapshot`](SnapshotStore::offer_snapshot)
44/// after each successful command execution to decide whether to create and
45/// persist a new snapshot.
46///
47/// # Example Implementations
48///
49/// - Always save: useful for aggregates with expensive replay
50/// - Every N events: balance between storage and replay cost
51/// - Never save: read-only replicas that only load snapshots created elsewhere
52// ANCHOR: snapshot_store_trait
53pub trait SnapshotStore<Id: Sync>: Send + Sync {
54    /// Position type for tracking snapshot positions.
55    ///
56    /// Must match the `EventStore::Position` type used in the same repository.
57    type Position: Send + Sync;
58
59    /// Error type for snapshot operations.
60    type Error: std::error::Error + Send + Sync + 'static;
61
62    /// Load the most recent snapshot for an aggregate.
63    ///
64    /// Returns `Ok(None)` if no snapshot exists.
65    ///
66    /// # Errors
67    ///
68    /// Returns an error if the underlying storage fails.
69    fn load<T>(
70        &self,
71        kind: &str,
72        id: &Id,
73    ) -> impl std::future::Future<Output = Result<Option<Snapshot<Self::Position, T>>, Self::Error>> + Send
74    where
75        T: DeserializeOwned;
76
77    /// Whether to store a snapshot, with lazy snapshot creation.
78    ///
79    /// The repository calls this after successfully appending new events,
80    /// passing `events_since_last_snapshot` and a `create_snapshot`
81    /// callback. Implementations may decline without invoking
82    /// `create_snapshot`, avoiding unnecessary snapshot creation cost
83    /// (serialisation, extra I/O, etc.).
84    ///
85    /// Returning [`SnapshotOffer::Stored`] indicates that the snapshot was
86    /// persisted. Returning [`SnapshotOffer::Declined`] indicates that no
87    /// snapshot was stored.
88    ///
89    /// # Errors
90    ///
91    /// Returns [`OfferSnapshotError::Create`] if `create_snapshot` fails.
92    /// Returns [`OfferSnapshotError::Snapshot`] if persistence fails.
93    fn offer_snapshot<CE, T, Create>(
94        &self,
95        kind: &str,
96        id: &Id,
97        events_since_last_snapshot: u64,
98        create_snapshot: Create,
99    ) -> impl std::future::Future<
100        Output = Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>,
101    > + Send
102    where
103        CE: std::error::Error + Send + Sync + 'static,
104        T: Serialize,
105        Create: FnOnce() -> Result<Snapshot<Self::Position, T>, CE> + Send;
106}
107// ANCHOR_END: snapshot_store_trait
108
109/// Result of offering a snapshot to a store.
110#[derive(Clone, Copy, Debug, Eq, PartialEq)]
111pub enum SnapshotOffer {
112    /// The snapshot store declined to store a snapshot.
113    Declined,
114    /// The snapshot store stored the snapshot.
115    Stored,
116}
117
118/// Error returned by [`SnapshotStore::offer_snapshot`].
119#[derive(Debug, thiserror::Error)]
120pub enum OfferSnapshotError<SnapshotError, CreateError>
121where
122    SnapshotError: std::error::Error + 'static,
123    CreateError: std::error::Error + 'static,
124{
125    /// Snapshot creation failed (e.g., serialisation, extra I/O, etc.).
126    #[error("failed to create snapshot: {0}")]
127    Create(#[source] CreateError),
128    /// Snapshot persistence failed.
129    #[error("snapshot operation failed: {0}")]
130    Snapshot(#[source] SnapshotError),
131}
132
133/// No-op snapshot store for backwards compatibility.
134///
135/// This implementation:
136/// - Always returns `None` from `load()`
137/// - Silently discards all offered snapshots
138///
139/// Use this as the default when snapshots are not needed.
140///
141/// Generic over `Pos` to match the `EventStore` position type.
142#[derive(Clone, Debug, Default)]
143pub struct NoSnapshots<Pos>(std::marker::PhantomData<Pos>);
144
145impl<Pos> NoSnapshots<Pos> {
146    /// Create a new no-op snapshot store.
147    #[must_use]
148    pub const fn new() -> Self {
149        Self(std::marker::PhantomData)
150    }
151}
152
153impl<Id, Pos> SnapshotStore<Id> for NoSnapshots<Pos>
154where
155    Id: Send + Sync,
156    Pos: Send + Sync,
157{
158    type Error = Infallible;
159    type Position = Pos;
160
161    async fn load<T>(&self, _kind: &str, _id: &Id) -> Result<Option<Snapshot<Pos, T>>, Self::Error>
162    where
163        T: DeserializeOwned,
164    {
165        Ok(None)
166    }
167
168    async fn offer_snapshot<CE, T, Create>(
169        &self,
170        _kind: &str,
171        _id: &Id,
172        _events_since_last_snapshot: u64,
173        _create_snapshot: Create,
174    ) -> Result<SnapshotOffer, OfferSnapshotError<Self::Error, CE>>
175    where
176        CE: std::error::Error + Send + Sync + 'static,
177        T: Serialize,
178        Create: FnOnce() -> Result<Snapshot<Pos, T>, CE>,
179    {
180        Ok(SnapshotOffer::Declined)
181    }
182}
183
184#[cfg(test)]
185mod tests {
186    use std::{error::Error, io};
187
188    use super::*;
189
190    #[tokio::test]
191    async fn no_snapshots_load_returns_none() {
192        let store = NoSnapshots::<u64>::new();
193        let result: Option<Snapshot<u64, String>> = store.load("test", &"id").await.unwrap();
194        assert!(result.is_none());
195    }
196
197    #[tokio::test]
198    async fn no_snapshots_offer_declines() {
199        let store = NoSnapshots::<u64>::new();
200        let result = store
201            .offer_snapshot::<io::Error, _, _>("test", &"id", 100, || {
202                Ok(Snapshot {
203                    position: 1,
204                    data: "data",
205                })
206            })
207            .await
208            .unwrap();
209
210        assert_eq!(result, SnapshotOffer::Declined);
211    }
212
213    #[test]
214    fn offer_error_create_displays_source() {
215        let err: OfferSnapshotError<io::Error, io::Error> =
216            OfferSnapshotError::Create(io::Error::other("create failed"));
217        let msg = err.to_string();
218        assert!(msg.contains("failed to create snapshot"));
219        assert!(err.source().is_some());
220    }
221
222    #[test]
223    fn offer_error_snapshot_displays_source() {
224        let err: OfferSnapshotError<io::Error, io::Error> =
225            OfferSnapshotError::Snapshot(io::Error::other("snapshot failed"));
226        let msg = err.to_string();
227        assert!(msg.contains("snapshot operation failed"));
228        assert!(err.source().is_some());
229    }
230}