use std::sync::Arc;
#[cfg(any(test, feature = "testing"))]
use std::collections::HashMap;
#[cfg(any(test, feature = "testing"))]
use tokio::sync::RwLock;
use serde_json::Value;
use crate::{error::EngineError, ids::StreamId};
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
pub struct Snapshot {
pub stream_id: StreamId,
pub sequence_number: u64,
pub state_schema_version: u32,
pub state: Value,
}
impl Snapshot {
#[must_use]
pub fn new(
stream_id: StreamId,
sequence_number: u64,
state_schema_version: u32,
state: Value,
) -> Self {
Self {
stream_id,
sequence_number,
state_schema_version,
state,
}
}
#[must_use]
pub fn should_take(event_count: u64, last_snapshot_at: u64, interval: u64) -> bool {
if interval == 0 {
return false;
}
event_count > 0 && event_count.saturating_sub(last_snapshot_at) >= interval
}
}
#[allow(async_fn_in_trait)]
pub trait SnapshotStore: Send + Sync {
async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError>;
async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError>;
}
impl<S: SnapshotStore> SnapshotStore for Arc<S> {
async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
self.as_ref().save(snapshot).await
}
async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
self.as_ref().load(stream_id).await
}
}
#[derive(Debug, Clone, Copy, Default)]
#[must_use = "NoopSnapshotStore discards all snapshots silently — use a persistent SnapshotStore in production"]
pub struct NoopSnapshotStore;
#[cfg(any(test, feature = "testing"))]
impl SnapshotStore for NoopSnapshotStore {
async fn save(&self, _snapshot: &Snapshot) -> Result<(), EngineError> {
Ok(())
}
async fn load(&self, _stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
Ok(None)
}
}
#[cfg(any(test, feature = "testing"))]
#[derive(Debug, Default, Clone)]
pub struct InMemorySnapshotStore {
inner: Arc<RwLock<HashMap<StreamId, Snapshot>>>,
}
#[cfg(any(test, feature = "testing"))]
impl InMemorySnapshotStore {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn is_empty(&self) -> bool {
self.inner.read().await.is_empty()
}
}
#[cfg(any(test, feature = "testing"))]
impl SnapshotStore for InMemorySnapshotStore {
async fn save(&self, snapshot: &Snapshot) -> Result<(), EngineError> {
self.inner
.write()
.await
.insert(snapshot.stream_id.clone(), snapshot.clone());
Ok(())
}
async fn load(&self, stream_id: &StreamId) -> Result<Option<Snapshot>, EngineError> {
Ok(self.inner.read().await.get(stream_id).cloned())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_take_at_exact_multiples() {
assert!(!Snapshot::should_take(0, 0, 100));
assert!(!Snapshot::should_take(99, 0, 100));
assert!(Snapshot::should_take(100, 0, 100));
assert!(Snapshot::should_take(101, 0, 100));
assert!(Snapshot::should_take(200, 0, 100));
assert!(!Snapshot::should_take(101, 100, 100));
assert!(Snapshot::should_take(200, 100, 100));
assert!(Snapshot::should_take(250, 100, 100));
}
#[test]
fn should_take_interval_one() {
for i in 1u64..=5 {
assert!(Snapshot::should_take(i, i - 1, 1));
}
}
#[tokio::test]
async fn noop_store_always_returns_none() {
let store = NoopSnapshotStore;
let stream = StreamId::new("process/test");
assert!(store.load(&stream).await.unwrap().is_none());
}
#[tokio::test]
async fn noop_store_save_succeeds() {
let store = NoopSnapshotStore;
let snap = Snapshot::new(
StreamId::new("process/test"),
10,
1,
serde_json::json!({"status": "Active"}),
);
assert!(store.save(&snap).await.is_ok());
}
#[tokio::test]
async fn in_memory_store_round_trip() {
let store = InMemorySnapshotStore::new();
let stream = StreamId::new("process/abc");
let snap = Snapshot::new(stream.clone(), 5, 1, serde_json::json!({"x": 1}));
store.save(&snap).await.unwrap();
let loaded = store
.load(&stream)
.await
.unwrap()
.expect("snapshot must exist");
assert_eq!(loaded.sequence_number, 5);
assert_eq!(loaded.state_schema_version, 1);
assert_eq!(loaded.state, serde_json::json!({"x": 1}));
}
#[tokio::test]
async fn in_memory_store_overwrite_keeps_latest() {
let store = InMemorySnapshotStore::new();
let stream = StreamId::new("process/abc");
store
.save(&Snapshot::new(
stream.clone(),
5,
1,
serde_json::json!({"seq": 5}),
))
.await
.unwrap();
store
.save(&Snapshot::new(
stream.clone(),
10,
1,
serde_json::json!({"seq": 10}),
))
.await
.unwrap();
let loaded = store
.load(&stream)
.await
.unwrap()
.expect("snapshot must exist");
assert_eq!(
loaded.sequence_number, 10,
"second save must overwrite first"
);
}
#[tokio::test]
async fn in_memory_store_separate_streams_isolated() {
let store = InMemorySnapshotStore::new();
let stream1 = StreamId::new("process/aaa");
let stream2 = StreamId::new("process/bbb");
store
.save(&Snapshot::new(
stream1.clone(),
3,
1,
serde_json::json!(null),
))
.await
.unwrap();
assert!(
store.load(&stream2).await.unwrap().is_none(),
"unrelated stream must not return stream1's snapshot"
);
}
#[tokio::test]
async fn in_memory_store_is_empty_initially() {
let store = InMemorySnapshotStore::new();
assert!(store.is_empty().await);
let stream = StreamId::new("process/test");
store
.save(&Snapshot::new(stream, 1, 1, serde_json::json!({})))
.await
.unwrap();
assert!(!store.is_empty().await);
}
#[tokio::test]
async fn in_memory_store_clone_shares_data() {
let store1 = InMemorySnapshotStore::new();
let store2 = store1.clone();
let stream = StreamId::new("process/shared");
store1
.save(&Snapshot::new(
stream.clone(),
7,
1,
serde_json::json!({"y": 2}),
))
.await
.unwrap();
let loaded = store2
.load(&stream)
.await
.unwrap()
.expect("clone must see the same snapshot");
assert_eq!(loaded.sequence_number, 7);
}
}