mako_engine/snapshot.rs
1//! Snapshotting support — a performance optimisation for long event streams.
2//!
3//! Snapshots are **never the source of truth**. They are always rebuildable
4//! by replaying the full event stream. The engine must function correctly if
5//! all snapshots are discarded.
6//!
7//! # Policy
8//!
9//! A common policy is to take a snapshot every N events (e.g. N = 100).
10//! Use [`Snapshot::should_take`] to check whether enough events have
11//! accumulated since the last snapshot.
12//!
13//! ```rust
14//! use mako_engine::snapshot::Snapshot;
15//!
16//! // Take a snapshot every 100 events.
17//! // second argument is the sequence_number of the last snapshot (0 = none).
18//! assert!(!Snapshot::should_take(99, 0, 100));
19//! assert!( Snapshot::should_take(100, 0, 100));
20//! // Non-exact counts still trigger once the threshold is exceeded:
21//! assert!( Snapshot::should_take(101, 0, 100));
22//! // After a snapshot at seq 100, next triggers at 200+:
23//! assert!(!Snapshot::should_take(101, 100, 100));
24//! assert!( Snapshot::should_take(200, 100, 100));
25//! // interval = 0 disables snapshotting (never returns true):
26//! assert!(!Snapshot::should_take(1000, 0, 0));
27//! ```
28//!
29//! # Using the store
30//!
31//! ```rust,ignore
32//! use mako_engine::snapshot::{Snapshot, SnapshotStore};
33//!
34//! // After executing a command:
35//! let event_count = process.event_count().await?;
36//! let last_snap = snap_store.load(process.stream_id()).await?
37//! .map_or(0, |s| s.sequence_number);
38//! if Snapshot::should_take(event_count, last_snap, 100) {
39//! let state = process.state().await?;
40//! let payload = serde_json::to_value(&state)?;
41//! let snap = Snapshot::new(process.stream_id().clone(), event_count, 1, payload);
42//! snap_store.save(&snap).await?;
43//! }
44//! ```
45
46use std::sync::Arc;
47
48#[cfg(any(test, feature = "testing"))]
49use std::collections::HashMap;
50#[cfg(any(test, feature = "testing"))]
51use tokio::sync::RwLock;
52
53use serde_json::Value;
54
55use crate::{error::EngineError, ids::StreamId};
56
57// ── Snapshot ──────────────────────────────────────────────────────────────────
58
59/// A point-in-time snapshot of an aggregate's state.
60///
61/// A snapshot carries the serialized state at a specific `sequence_number`.
62/// During state reconstruction, the engine loads the snapshot (if any) and
63/// then replays only the events that arrived after it.
64///
65/// ## Schema versioning
66///
67/// `state_schema_version` mirrors `EventEnvelope::schema_version`. Increment
68/// it when the serialized state layout changes incompatibly. The engine must
69/// discard snapshots whose schema version it does not recognise.
70#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
71pub struct Snapshot {
72 /// The stream this snapshot covers.
73 pub stream_id: StreamId,
74
75 /// The sequence number of the last event incorporated into this snapshot.
76 ///
77 /// Events with a higher sequence number must still be replayed on top.
78 pub sequence_number: u64,
79
80 /// Schema version of the serialized `state` payload.
81 pub state_schema_version: u32,
82
83 /// The serialized aggregate state at `sequence_number`.
84 ///
85 /// Stored as [`serde_json::Value`] so the engine layer remains
86 /// domain-agnostic; the domain crate deserializes it into the concrete
87 /// `Workflow::State` type.
88 pub state: Value,
89}
90
91impl Snapshot {
92 /// Construct a new snapshot.
93 #[must_use]
94 pub fn new(
95 stream_id: StreamId,
96 sequence_number: u64,
97 state_schema_version: u32,
98 state: Value,
99 ) -> Self {
100 Self {
101 stream_id,
102 sequence_number,
103 state_schema_version,
104 state,
105 }
106 }
107
108 /// Return `true` when a snapshot should be taken.
109 ///
110 /// Returns `true` when `event_count - last_snapshot_at >= interval`,
111 /// i.e. at least `interval` new events have accumulated since the last
112 /// snapshot. This avoids the exact-multiple trap: if snapshotting is
113 /// skipped at count 100, it still triggers at 101, 102, etc. until a
114 /// snapshot is taken.
115 ///
116 /// Set `last_snapshot_at` to `0` when no snapshot has ever been taken.
117 ///
118 /// Returns `false` when `interval` is `0` (snapshotting disabled).
119 ///
120 /// # Example
121 ///
122 /// ```
123 /// use mako_engine::snapshot::Snapshot;
124 ///
125 /// // First snapshot: no prior snapshot (last_snapshot_at = 0).
126 /// assert!(!Snapshot::should_take(99, 0, 100));
127 /// assert!( Snapshot::should_take(100, 0, 100));
128 /// assert!( Snapshot::should_take(101, 0, 100)); // non-exact still triggers
129 ///
130 /// // After a snapshot at seq 100, next triggers at 200+.
131 /// assert!(!Snapshot::should_take(101, 100, 100));
132 /// assert!( Snapshot::should_take(200, 100, 100));
133 /// assert!( Snapshot::should_take(201, 100, 100));
134 ///
135 /// // interval = 0 disables snapshotting.
136 /// assert!(!Snapshot::should_take(1000, 0, 0));
137 /// ```
138 #[must_use]
139 pub fn should_take(event_count: u64, last_snapshot_at: u64, interval: u64) -> bool {
140 if interval == 0 {
141 return false;
142 }
143 event_count > 0 && event_count.saturating_sub(last_snapshot_at) >= interval
144 }
145}
146
147// ── SnapshotStore ─────────────────────────────────────────────────────────────
148
149/// Storage contract for aggregate snapshots.
150///
151/// Implementations live in separate backend crates
152/// (`mako-event-store-slatedb`, `mako-event-store-redb`, etc.).
153/// [`NoopSnapshotStore`] is provided here for tests and deployments that do not
154/// need snapshotting.
155///
156/// ## Contract
157///
158/// - `save` must persist the snapshot durably before returning `Ok`.
159/// - `load` must return the **most recent** snapshot for `stream_id`, or
160/// `None` if no snapshot exists.
161/// - A snapshot must never be returned for a `stream_id` if its
162/// `state_schema_version` is not recognised by the caller — implementations
163/// are encouraged to store schema version in a queryable column/key so
164/// callers can filter by it.
165///
166/// ## Blanket `Arc` implementation
167///
168/// `Arc<S>` implements `SnapshotStore` whenever `S: SnapshotStore`, so
169/// `Process<W, Arc<MyEventStore>>` can accept `Arc<MySnapshotStore>` without
170/// any extra wrapper.
171#[allow(async_fn_in_trait)]
172pub trait SnapshotStore: Send + Sync {
173 /// Persist `snapshot`, replacing any previous snapshot for the same stream.
174 ///
175 /// # Errors
176 ///
177 /// Returns [`EngineError::Snapshot`] on storage failure.
178 async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError>;
179
180 /// Load the most recent snapshot for `stream_id`.
181 ///
182 /// Returns `None` when no snapshot exists (full replay required).
183 ///
184 /// # Errors
185 ///
186 /// Returns [`EngineError::Snapshot`] on storage failure.
187 async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError>;
188}
189
190// ── Arc<S> blanket impl ───────────────────────────────────────────────────────
191
192impl<S: SnapshotStore> SnapshotStore for Arc<S> {
193 async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
194 self.as_ref().save(snapshot).await
195 }
196
197 async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
198 self.as_ref().load(stream_id).await
199 }
200}
201
202// ── NoopSnapshotStore ─────────────────────────────────────────────────────────
203
204/// A [`SnapshotStore`] that never persists anything.
205///
206/// Every `load` returns `None` (full replay); every `save` succeeds silently.
207///
208/// # ⚠️ Data loss warning
209///
210/// `NoopSnapshotStore` **discards every snapshot silently**. Processes built
211/// with this store perform full event replay on every state read. Do not use
212/// in production — bind a [`SlateDbStore::as_snapshot_store()`] instead.
213///
214/// The [`SnapshotStore`] implementation is only compiled when the `testing`
215/// feature is enabled or inside `#[cfg(test)]`. Production binaries must call
216/// [`EngineBuilder::with_snapshot_store`] with a durable backend.
217///
218/// [`SlateDbStore::as_snapshot_store()`]: crate::store_slatedb::SlateDbStore::as_snapshot_store
219/// [`EngineBuilder::with_snapshot_store`]: crate::builder::EngineBuilder::with_snapshot_store
220#[derive(Debug, Clone, Copy, Default)]
221#[must_use = "NoopSnapshotStore discards all snapshots silently — use a persistent SnapshotStore in production"]
222pub struct NoopSnapshotStore;
223
224#[cfg(any(test, feature = "testing"))]
225impl SnapshotStore for NoopSnapshotStore {
226 async fn save(&self, _snapshot: &Snapshot) -> Result<(), EngineError> {
227 Ok(())
228 }
229
230 async fn load(&self, _stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
231 Ok(None)
232 }
233}
234
235// ── InMemorySnapshotStore ─────────────────────────────────────────────────────
236
237/// An in-memory [`SnapshotStore`] for tests and development.
238///
239/// Stores the **most recent snapshot per stream**. Cloning shares the
240/// underlying data via `Arc` — all clones see the same snapshots.
241///
242/// Use this with [`Process::take_snapshot`] and
243/// [`Process::state_with_snapshot`] to verify snapshot-accelerated replay
244/// without depending on an external storage backend.
245///
246/// Only available in `#[cfg(test)]` or with the `testing` feature enabled.
247///
248/// [`Process::take_snapshot`]: crate::process::Process::take_snapshot
249/// [`Process::state_with_snapshot`]: crate::process::Process::state_with_snapshot
250#[cfg(any(test, feature = "testing"))]
251#[derive(Debug, Default, Clone)]
252pub struct InMemorySnapshotStore {
253 inner: Arc<RwLock<HashMap<StreamId, Snapshot>>>,
254}
255
256#[cfg(any(test, feature = "testing"))]
257impl InMemorySnapshotStore {
258 /// Create an empty snapshot store.
259 #[must_use]
260 pub fn new() -> Self {
261 Self::default()
262 }
263
264 /// Return `true` when no snapshots are stored.
265 pub async fn is_empty(&self) -> bool {
266 self.inner.read().await.is_empty()
267 }
268}
269
270#[cfg(any(test, feature = "testing"))]
271impl SnapshotStore for InMemorySnapshotStore {
272 async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
273 self.inner
274 .write()
275 .await
276 .insert(snapshot.stream_id.clone(), snapshot.clone());
277 Ok(())
278 }
279
280 async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
281 Ok(self.inner.read().await.get(stream_id).cloned())
282 }
283}
284
285#[cfg(test)]
286mod tests {
287 use super::*;
288
289 #[test]
290 fn should_take_at_exact_multiples() {
291 // No previous snapshot (last_snapshot_at = 0).
292 assert!(!Snapshot::should_take(0, 0, 100));
293 assert!(!Snapshot::should_take(99, 0, 100));
294 assert!(Snapshot::should_take(100, 0, 100));
295 // Non-exact still triggers ("at-least" semantics):
296 assert!(Snapshot::should_take(101, 0, 100));
297 assert!(Snapshot::should_take(200, 0, 100));
298 // After snapshot at 100, next triggers at 200+:
299 assert!(!Snapshot::should_take(101, 100, 100));
300 assert!(Snapshot::should_take(200, 100, 100));
301 assert!(Snapshot::should_take(250, 100, 100));
302 }
303
304 #[test]
305 fn should_take_interval_one() {
306 // Every event triggers a snapshot (last_snapshot_at = prev count).
307 for i in 1u64..=5 {
308 assert!(Snapshot::should_take(i, i - 1, 1));
309 }
310 }
311
312 #[tokio::test]
313 async fn noop_store_always_returns_none() {
314 let store = NoopSnapshotStore;
315 let stream = StreamId::new("process/test");
316 assert!(store.load(&stream).await.unwrap().is_none());
317 }
318
319 #[tokio::test]
320 async fn noop_store_save_succeeds() {
321 let store = NoopSnapshotStore;
322 let snap = Snapshot::new(
323 StreamId::new("process/test"),
324 10,
325 1,
326 serde_json::json!({"status": "Active"}),
327 );
328 assert!(store.save(&snap).await.is_ok());
329 }
330
331 // ── InMemorySnapshotStore ─────────────────────────────────────────────────
332
333 #[tokio::test]
334 async fn in_memory_store_round_trip() {
335 let store = InMemorySnapshotStore::new();
336 let stream = StreamId::new("process/abc");
337 let snap = Snapshot::new(stream.clone(), 5, 1, serde_json::json!({"x": 1}));
338
339 store.save(&snap).await.unwrap();
340
341 let loaded = store
342 .load(&stream)
343 .await
344 .unwrap()
345 .expect("snapshot must exist");
346 assert_eq!(loaded.sequence_number, 5);
347 assert_eq!(loaded.state_schema_version, 1);
348 assert_eq!(loaded.state, serde_json::json!({"x": 1}));
349 }
350
351 #[tokio::test]
352 async fn in_memory_store_overwrite_keeps_latest() {
353 let store = InMemorySnapshotStore::new();
354 let stream = StreamId::new("process/abc");
355
356 store
357 .save(&Snapshot::new(
358 stream.clone(),
359 5,
360 1,
361 serde_json::json!({"seq": 5}),
362 ))
363 .await
364 .unwrap();
365 store
366 .save(&Snapshot::new(
367 stream.clone(),
368 10,
369 1,
370 serde_json::json!({"seq": 10}),
371 ))
372 .await
373 .unwrap();
374
375 let loaded = store
376 .load(&stream)
377 .await
378 .unwrap()
379 .expect("snapshot must exist");
380 assert_eq!(
381 loaded.sequence_number, 10,
382 "second save must overwrite first"
383 );
384 }
385
386 #[tokio::test]
387 async fn in_memory_store_separate_streams_isolated() {
388 let store = InMemorySnapshotStore::new();
389 let stream1 = StreamId::new("process/aaa");
390 let stream2 = StreamId::new("process/bbb");
391
392 store
393 .save(&Snapshot::new(
394 stream1.clone(),
395 3,
396 1,
397 serde_json::json!(null),
398 ))
399 .await
400 .unwrap();
401
402 assert!(
403 store.load(&stream2).await.unwrap().is_none(),
404 "unrelated stream must not return stream1's snapshot"
405 );
406 }
407
408 #[tokio::test]
409 async fn in_memory_store_is_empty_initially() {
410 let store = InMemorySnapshotStore::new();
411 assert!(store.is_empty().await);
412
413 let stream = StreamId::new("process/test");
414 store
415 .save(&Snapshot::new(stream, 1, 1, serde_json::json!({})))
416 .await
417 .unwrap();
418 assert!(!store.is_empty().await);
419 }
420
421 #[tokio::test]
422 async fn in_memory_store_clone_shares_data() {
423 let store1 = InMemorySnapshotStore::new();
424 let store2 = store1.clone();
425 let stream = StreamId::new("process/shared");
426
427 store1
428 .save(&Snapshot::new(
429 stream.clone(),
430 7,
431 1,
432 serde_json::json!({"y": 2}),
433 ))
434 .await
435 .unwrap();
436
437 let loaded = store2
438 .load(&stream)
439 .await
440 .unwrap()
441 .expect("clone must see the same snapshot");
442 assert_eq!(loaded.sequence_number, 7);
443 }
444}