Skip to main content

mako_engine/
snapshot.rs

1//! Snapshotting support — a performance optimisation for long event streams.
2//!
3//! Snapshots are **never the source of truth**. They are always rebuildable
4//! by replaying the full event stream. The engine must function correctly if
5//! all snapshots are discarded.
6//!
7//! # Policy
8//!
9//! A common policy is to take a snapshot every N events (e.g. N = 100).
10//! Use [`Snapshot::should_take`] to check whether enough events have
11//! accumulated since the last snapshot.
12//!
13//! ```rust
14//! use mako_engine::snapshot::Snapshot;
15//!
16//! // Take a snapshot every 100 events.
17//! // second argument is the sequence_number of the last snapshot (0 = none).
18//! assert!(!Snapshot::should_take(99,  0, 100));
19//! assert!( Snapshot::should_take(100, 0, 100));
20//! // Non-exact counts still trigger once the threshold is exceeded:
21//! assert!( Snapshot::should_take(101, 0, 100));
22//! // After a snapshot at seq 100, next triggers at 200+:
23//! assert!(!Snapshot::should_take(101, 100, 100));
24//! assert!( Snapshot::should_take(200, 100, 100));
25//! // interval = 0 disables snapshotting (never returns true):
26//! assert!(!Snapshot::should_take(1000, 0, 0));
27//! ```
28//!
29//! # Using the store
30//!
31//! ```rust,ignore
32//! use mako_engine::snapshot::{Snapshot, SnapshotStore};
33//!
34//! // After executing a command:
35//! let event_count = process.event_count().await?;
36//! let last_snap   = snap_store.load(process.stream_id()).await?
37//!     .map_or(0, |s| s.sequence_number);
38//! if Snapshot::should_take(event_count, last_snap, 100) {
39//!     let state   = process.state().await?;
40//!     let payload = serde_json::to_value(&state)?;
41//!     let snap    = Snapshot::new(process.stream_id().clone(), event_count, 1, payload);
42//!     snap_store.save(&snap).await?;
43//! }
44//! ```
45
46use std::sync::Arc;
47
48#[cfg(any(test, feature = "testing"))]
49use std::collections::HashMap;
50#[cfg(any(test, feature = "testing"))]
51use tokio::sync::RwLock;
52
53use serde_json::Value;
54
55use crate::{error::EngineError, ids::StreamId};
56
57// ── Snapshot ──────────────────────────────────────────────────────────────────
58
59/// A point-in-time snapshot of an aggregate's state.
60///
61/// A snapshot carries the serialized state at a specific `sequence_number`.
62/// During state reconstruction, the engine loads the snapshot (if any) and
63/// then replays only the events that arrived after it.
64///
65/// ## Schema versioning
66///
67/// `state_schema_version` mirrors `EventEnvelope::schema_version`. Increment
68/// it when the serialized state layout changes incompatibly. The engine must
69/// discard snapshots whose schema version it does not recognise.
70#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
71pub struct Snapshot {
72    /// The stream this snapshot covers.
73    pub stream_id: StreamId,
74
75    /// The sequence number of the last event incorporated into this snapshot.
76    ///
77    /// Events with a higher sequence number must still be replayed on top.
78    pub sequence_number: u64,
79
80    /// Schema version of the serialized `state` payload.
81    pub state_schema_version: u32,
82
83    /// The serialized aggregate state at `sequence_number`.
84    ///
85    /// Stored as [`serde_json::Value`] so the engine layer remains
86    /// domain-agnostic; the domain crate deserializes it into the concrete
87    /// `Workflow::State` type.
88    pub state: Value,
89}
90
91impl Snapshot {
92    /// Construct a new snapshot.
93    #[must_use]
94    pub fn new(
95        stream_id: StreamId,
96        sequence_number: u64,
97        state_schema_version: u32,
98        state: Value,
99    ) -> Self {
100        Self {
101            stream_id,
102            sequence_number,
103            state_schema_version,
104            state,
105        }
106    }
107
108    /// Return `true` when a snapshot should be taken.
109    ///
110    /// Returns `true` when `event_count - last_snapshot_at >= interval`,
111    /// i.e. at least `interval` new events have accumulated since the last
112    /// snapshot. This avoids the exact-multiple trap: if snapshotting is
113    /// skipped at count 100, it still triggers at 101, 102, etc. until a
114    /// snapshot is taken.
115    ///
116    /// Set `last_snapshot_at` to `0` when no snapshot has ever been taken.
117    ///
118    /// Returns `false` when `interval` is `0` (snapshotting disabled).
119    ///
120    /// # Example
121    ///
122    /// ```
123    /// use mako_engine::snapshot::Snapshot;
124    ///
125    /// // First snapshot: no prior snapshot (last_snapshot_at = 0).
126    /// assert!(!Snapshot::should_take(99,  0, 100));
127    /// assert!( Snapshot::should_take(100, 0, 100));
128    /// assert!( Snapshot::should_take(101, 0, 100)); // non-exact still triggers
129    ///
130    /// // After a snapshot at seq 100, next triggers at 200+.
131    /// assert!(!Snapshot::should_take(101, 100, 100));
132    /// assert!( Snapshot::should_take(200, 100, 100));
133    /// assert!( Snapshot::should_take(201, 100, 100));
134    ///
135    /// // interval = 0 disables snapshotting.
136    /// assert!(!Snapshot::should_take(1000, 0, 0));
137    /// ```
138    #[must_use]
139    pub fn should_take(event_count: u64, last_snapshot_at: u64, interval: u64) -> bool {
140        if interval == 0 {
141            return false;
142        }
143        event_count > 0 && event_count.saturating_sub(last_snapshot_at) >= interval
144    }
145}
146
147// ── SnapshotStore ─────────────────────────────────────────────────────────────
148
149/// Storage contract for aggregate snapshots.
150///
151/// Implementations live in separate backend crates
152/// (`mako-event-store-slatedb`, `mako-event-store-redb`, etc.).
153/// [`NoopSnapshotStore`] is provided here for tests and deployments that do not
154/// need snapshotting.
155///
156/// ## Contract
157///
158/// - `save` must persist the snapshot durably before returning `Ok`.
159/// - `load` must return the **most recent** snapshot for `stream_id`, or
160///   `None` if no snapshot exists.
161/// - A snapshot must never be returned for a `stream_id` if its
162///   `state_schema_version` is not recognised by the caller — implementations
163///   are encouraged to store schema version in a queryable column/key so
164///   callers can filter by it.
165///
166/// ## Blanket `Arc` implementation
167///
168/// `Arc<S>` implements `SnapshotStore` whenever `S: SnapshotStore`, so
169/// `Process<W, Arc<MyEventStore>>` can accept `Arc<MySnapshotStore>` without
170/// any extra wrapper.
171#[allow(async_fn_in_trait)]
172pub trait SnapshotStore: Send + Sync {
173    /// Persist `snapshot`, replacing any previous snapshot for the same stream.
174    ///
175    /// # Errors
176    ///
177    /// Returns [`EngineError::Snapshot`] on storage failure.
178    async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError>;
179
180    /// Load the most recent snapshot for `stream_id`.
181    ///
182    /// Returns `None` when no snapshot exists (full replay required).
183    ///
184    /// # Errors
185    ///
186    /// Returns [`EngineError::Snapshot`] on storage failure.
187    async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError>;
188}
189
190// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
191
192impl<S: SnapshotStore> SnapshotStore for Arc<S> {
193    async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
194        self.as_ref().save(snapshot).await
195    }
196
197    async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
198        self.as_ref().load(stream_id).await
199    }
200}
201
202// ── NoopSnapshotStore ─────────────────────────────────────────────────────────
203
204/// A [`SnapshotStore`] that never persists anything.
205///
206/// Every `load` returns `None` (full replay); every `save` succeeds silently.
207///
208/// # ⚠️ Data loss warning
209///
210/// `NoopSnapshotStore` **discards every snapshot silently**. Processes built
211/// with this store perform full event replay on every state read. Do not use
212/// in production — bind a [`SlateDbStore::as_snapshot_store()`] instead.
213///
214/// The [`SnapshotStore`] implementation is only compiled when the `testing`
215/// feature is enabled or inside `#[cfg(test)]`. Production binaries must call
216/// [`EngineBuilder::with_snapshot_store`] with a durable backend.
217///
218/// [`SlateDbStore::as_snapshot_store()`]: crate::store_slatedb::SlateDbStore::as_snapshot_store
219/// [`EngineBuilder::with_snapshot_store`]: crate::builder::EngineBuilder::with_snapshot_store
220#[derive(Debug, Clone, Copy, Default)]
221#[must_use = "NoopSnapshotStore discards all snapshots silently — use a persistent SnapshotStore in production"]
222pub struct NoopSnapshotStore;
223
224#[cfg(any(test, feature = "testing"))]
225impl SnapshotStore for NoopSnapshotStore {
226    async fn save(&self, _snapshot: &Snapshot) -> Result<(), EngineError> {
227        Ok(())
228    }
229
230    async fn load(&self, _stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
231        Ok(None)
232    }
233}
234
235// ── InMemorySnapshotStore ─────────────────────────────────────────────────────
236
237/// An in-memory [`SnapshotStore`] for tests and development.
238///
239/// Stores the **most recent snapshot per stream**. Cloning shares the
240/// underlying data via `Arc` — all clones see the same snapshots.
241///
242/// Use this with [`Process::take_snapshot`] and
243/// [`Process::state_with_snapshot`] to verify snapshot-accelerated replay
244/// without depending on an external storage backend.
245///
246/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
247///
248/// [`Process::take_snapshot`]: crate::process::Process::take_snapshot
249/// [`Process::state_with_snapshot`]: crate::process::Process::state_with_snapshot
250#[cfg(any(test, feature = "testing"))]
251#[derive(Debug, Default, Clone)]
252pub struct InMemorySnapshotStore {
253    inner: Arc<RwLock<HashMap<StreamId, Snapshot>>>,
254}
255
256#[cfg(any(test, feature = "testing"))]
257impl InMemorySnapshotStore {
258    /// Create an empty snapshot store.
259    #[must_use]
260    pub fn new() -> Self {
261        Self::default()
262    }
263
264    /// Return `true` when no snapshots are stored.
265    pub async fn is_empty(&self) -> bool {
266        self.inner.read().await.is_empty()
267    }
268}
269
270#[cfg(any(test, feature = "testing"))]
271impl SnapshotStore for InMemorySnapshotStore {
272    async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
273        self.inner
274            .write()
275            .await
276            .insert(snapshot.stream_id.clone(), snapshot.clone());
277        Ok(())
278    }
279
280    async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
281        Ok(self.inner.read().await.get(stream_id).cloned())
282    }
283}
284
285#[cfg(test)]
286mod tests {
287    use super::*;
288
289    #[test]
290    fn should_take_at_exact_multiples() {
291        // No previous snapshot (last_snapshot_at = 0).
292        assert!(!Snapshot::should_take(0, 0, 100));
293        assert!(!Snapshot::should_take(99, 0, 100));
294        assert!(Snapshot::should_take(100, 0, 100));
295        // Non-exact still triggers ("at-least" semantics):
296        assert!(Snapshot::should_take(101, 0, 100));
297        assert!(Snapshot::should_take(200, 0, 100));
298        // After snapshot at 100, next triggers at 200+:
299        assert!(!Snapshot::should_take(101, 100, 100));
300        assert!(Snapshot::should_take(200, 100, 100));
301        assert!(Snapshot::should_take(250, 100, 100));
302    }
303
304    #[test]
305    fn should_take_interval_one() {
306        // Every event triggers a snapshot (last_snapshot_at = prev count).
307        for i in 1u64..=5 {
308            assert!(Snapshot::should_take(i, i - 1, 1));
309        }
310    }
311
312    #[tokio::test]
313    async fn noop_store_always_returns_none() {
314        let store = NoopSnapshotStore;
315        let stream = StreamId::new("process/test");
316        assert!(store.load(&stream).await.unwrap().is_none());
317    }
318
319    #[tokio::test]
320    async fn noop_store_save_succeeds() {
321        let store = NoopSnapshotStore;
322        let snap = Snapshot::new(
323            StreamId::new("process/test"),
324            10,
325            1,
326            serde_json::json!({"status": "Active"}),
327        );
328        assert!(store.save(&snap).await.is_ok());
329    }
330
331    // ── InMemorySnapshotStore ─────────────────────────────────────────────────
332
333    #[tokio::test]
334    async fn in_memory_store_round_trip() {
335        let store = InMemorySnapshotStore::new();
336        let stream = StreamId::new("process/abc");
337        let snap = Snapshot::new(stream.clone(), 5, 1, serde_json::json!({"x": 1}));
338
339        store.save(&snap).await.unwrap();
340
341        let loaded = store
342            .load(&stream)
343            .await
344            .unwrap()
345            .expect("snapshot must exist");
346        assert_eq!(loaded.sequence_number, 5);
347        assert_eq!(loaded.state_schema_version, 1);
348        assert_eq!(loaded.state, serde_json::json!({"x": 1}));
349    }
350
351    #[tokio::test]
352    async fn in_memory_store_overwrite_keeps_latest() {
353        let store = InMemorySnapshotStore::new();
354        let stream = StreamId::new("process/abc");
355
356        store
357            .save(&Snapshot::new(
358                stream.clone(),
359                5,
360                1,
361                serde_json::json!({"seq": 5}),
362            ))
363            .await
364            .unwrap();
365        store
366            .save(&Snapshot::new(
367                stream.clone(),
368                10,
369                1,
370                serde_json::json!({"seq": 10}),
371            ))
372            .await
373            .unwrap();
374
375        let loaded = store
376            .load(&stream)
377            .await
378            .unwrap()
379            .expect("snapshot must exist");
380        assert_eq!(
381            loaded.sequence_number, 10,
382            "second save must overwrite first"
383        );
384    }
385
386    #[tokio::test]
387    async fn in_memory_store_separate_streams_isolated() {
388        let store = InMemorySnapshotStore::new();
389        let stream1 = StreamId::new("process/aaa");
390        let stream2 = StreamId::new("process/bbb");
391
392        store
393            .save(&Snapshot::new(
394                stream1.clone(),
395                3,
396                1,
397                serde_json::json!(null),
398            ))
399            .await
400            .unwrap();
401
402        assert!(
403            store.load(&stream2).await.unwrap().is_none(),
404            "unrelated stream must not return stream1's snapshot"
405        );
406    }
407
408    #[tokio::test]
409    async fn in_memory_store_is_empty_initially() {
410        let store = InMemorySnapshotStore::new();
411        assert!(store.is_empty().await);
412
413        let stream = StreamId::new("process/test");
414        store
415            .save(&Snapshot::new(stream, 1, 1, serde_json::json!({})))
416            .await
417            .unwrap();
418        assert!(!store.is_empty().await);
419    }
420
421    #[tokio::test]
422    async fn in_memory_store_clone_shares_data() {
423        let store1 = InMemorySnapshotStore::new();
424        let store2 = store1.clone();
425        let stream = StreamId::new("process/shared");
426
427        store1
428            .save(&Snapshot::new(
429                stream.clone(),
430                7,
431                1,
432                serde_json::json!({"y": 2}),
433            ))
434            .await
435            .unwrap();
436
437        let loaded = store2
438            .load(&stream)
439            .await
440            .unwrap()
441            .expect("clone must see the same snapshot");
442        assert_eq!(loaded.sequence_number, 7);
443    }
444}