Skip to main content

atomr_persistence/
snapshot.rs

1//! Snapshot store plugin. akka.net: `SnapshotStore`, `MemorySnapshotStore`.
2
3use std::collections::HashMap;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use parking_lot::RwLock;
8
9#[derive(Debug, Clone)]
10pub struct SnapshotMetadata {
11    pub persistence_id: String,
12    pub sequence_nr: u64,
13    pub timestamp: u64,
14}
15
16#[async_trait]
17pub trait SnapshotStore: Send + Sync + 'static {
18    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>);
19    async fn load(&self, persistence_id: &str) -> Option<(SnapshotMetadata, Vec<u8>)>;
20    async fn delete(&self, persistence_id: &str, to_sequence_nr: u64);
21}
22
23type SnapshotEntries = HashMap<String, Vec<(SnapshotMetadata, Vec<u8>)>>;
24
25#[derive(Default)]
26pub struct InMemorySnapshotStore {
27    snapshots: RwLock<SnapshotEntries>,
28}
29
30impl InMemorySnapshotStore {
31    pub fn new() -> Arc<Self> {
32        Arc::new(Self::default())
33    }
34}
35
36#[async_trait]
37impl SnapshotStore for InMemorySnapshotStore {
38    async fn save(&self, meta: SnapshotMetadata, payload: Vec<u8>) {
39        self.snapshots.write().entry(meta.persistence_id.clone()).or_default().push((meta, payload));
40    }
41
42    async fn load(&self, pid: &str) -> Option<(SnapshotMetadata, Vec<u8>)> {
43        self.snapshots.read().get(pid).and_then(|v| v.last()).cloned()
44    }
45
46    async fn delete(&self, pid: &str, to_sequence_nr: u64) {
47        if let Some(v) = self.snapshots.write().get_mut(pid) {
48            v.retain(|(m, _)| m.sequence_nr > to_sequence_nr);
49        }
50    }
51}