Skip to main content

atomr_distributed_data/
durable.rs

1//! Durable storage backend for the Replicator. Phase 8.F.
2//!
3//! akka.net pairs the Replicator with a `DurableStore` that flushes
4//! CRDT state to disk so a node can rejoin the cluster without losing
5//! local writes. Backends in upstream are LMDB / H2 / SQLite.
6//!
7//! For atomr we ship two reference impls:
8//!
9//! * [`NoopDurableStore`] — the default; everything stays in memory.
10//! * [`FileDurableStore`] — a small append-only log of `(key, blob)`
11//!   markers under a directory, suitable for tests and small workloads.
12//!   The on-disk format is `<key>=<base64-of-blob>\n`; we use the
13//!   built-in `base64` formatting via hex to keep external deps to zero.
14//!
15//! Heavier-weight backends (`redb`, `lmdb`) plug in by implementing
16//! [`DurableStore`] in a separate crate; the trait surface intentionally
17//! mirrors `Replicator::update` / `delete` so the actor (Phase 8.E) can
18//! call into it without knowing the on-disk layout.
19
20use std::collections::HashSet;
21use std::fs::{self, File, OpenOptions};
22use std::io::{self, Read, Write};
23use std::path::PathBuf;
24use std::sync::Mutex;
25
26/// Abstraction over a durable backing store. Methods are sync so they
27/// can be called from anywhere (including the replicator actor task);
28/// implementations should keep work small or punt to a worker thread.
29pub trait DurableStore: Send + Sync + 'static {
30    /// Note that `key` was written. `blob` is the serialized CRDT
31    /// snapshot — opaque to the store. Implementations may de-duplicate.
32    fn persist(&self, key: &str, blob: &[u8]) -> io::Result<()>;
33
34    /// Convenience: persist a key without a blob (replicator-actor
35    /// uses this when the value is type-erased; the user typically
36    /// provides a full `persist` via a typed adapter).
37    fn persist_marker(&self, key: &str) -> io::Result<()> {
38        self.persist(key, &[])
39    }
40
41    /// Forget `key`.
42    fn delete_marker(&self, key: &str) -> io::Result<()>;
43
44    /// Read the full snapshot for `key`. `None` if absent.
45    fn load(&self, key: &str) -> io::Result<Option<Vec<u8>>>;
46
47    /// All keys currently held.
48    fn keys(&self) -> io::Result<Vec<String>>;
49}
50
51/// In-memory no-op implementation. Used when durability is disabled.
52pub struct NoopDurableStore;
53
54impl DurableStore for NoopDurableStore {
55    fn persist(&self, _key: &str, _blob: &[u8]) -> io::Result<()> {
56        Ok(())
57    }
58    fn delete_marker(&self, _key: &str) -> io::Result<()> {
59        Ok(())
60    }
61    fn load(&self, _key: &str) -> io::Result<Option<Vec<u8>>> {
62        Ok(None)
63    }
64    fn keys(&self) -> io::Result<Vec<String>> {
65        Ok(Vec::new())
66    }
67}
68
69/// Append-only file-backed store. Keys live as `<dir>/<sanitized>.bin`.
70pub struct FileDurableStore {
71    dir: PathBuf,
72    keys: Mutex<HashSet<String>>,
73}
74
75impl FileDurableStore {
76    pub fn open(dir: impl Into<PathBuf>) -> io::Result<Self> {
77        let dir = dir.into();
78        fs::create_dir_all(&dir)?;
79        let mut keys = HashSet::new();
80        for entry in fs::read_dir(&dir)? {
81            let entry = entry?;
82            if let Some(name) = entry.file_name().to_str() {
83                if let Some(stripped) = name.strip_suffix(".bin") {
84                    keys.insert(unsanitize(stripped));
85                }
86            }
87        }
88        Ok(Self { dir, keys: Mutex::new(keys) })
89    }
90
91    /// Convenience for tests: a fresh per-test temporary directory.
92    pub fn tmp() -> io::Result<Self> {
93        let mut dir = std::env::temp_dir();
94        dir.push(format!("atomr-ddata-{}", std::process::id()));
95        dir.push(uuid_like());
96        Self::open(dir)
97    }
98
99    pub fn contains(&self, key: &str) -> bool {
100        self.keys.lock().unwrap().contains(key)
101    }
102
103    fn path_for(&self, key: &str) -> PathBuf {
104        self.dir.join(format!("{}.bin", sanitize(key)))
105    }
106}
107
108impl DurableStore for FileDurableStore {
109    fn persist(&self, key: &str, blob: &[u8]) -> io::Result<()> {
110        let path = self.path_for(key);
111        let mut f = OpenOptions::new().create(true).truncate(true).write(true).open(&path)?;
112        f.write_all(blob)?;
113        f.sync_data()?;
114        self.keys.lock().unwrap().insert(key.to_string());
115        Ok(())
116    }
117    fn delete_marker(&self, key: &str) -> io::Result<()> {
118        let path = self.path_for(key);
119        if path.exists() {
120            fs::remove_file(path)?;
121        }
122        self.keys.lock().unwrap().remove(key);
123        Ok(())
124    }
125    fn load(&self, key: &str) -> io::Result<Option<Vec<u8>>> {
126        let path = self.path_for(key);
127        if !path.exists() {
128            return Ok(None);
129        }
130        let mut buf = Vec::new();
131        File::open(path)?.read_to_end(&mut buf)?;
132        Ok(Some(buf))
133    }
134    fn keys(&self) -> io::Result<Vec<String>> {
135        let mut v: Vec<String> = self.keys.lock().unwrap().iter().cloned().collect();
136        v.sort();
137        Ok(v)
138    }
139}
140
141fn sanitize(key: &str) -> String {
142    key.chars().map(|c| if c.is_ascii_alphanumeric() || c == '-' || c == '_' { c } else { '_' }).collect()
143}
144
145fn unsanitize(name: &str) -> String {
146    name.to_string()
147}
148
149fn uuid_like() -> String {
150    use std::time::{SystemTime, UNIX_EPOCH};
151    let n = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
152    format!("{n:032x}")
153}
154
155#[cfg(test)]
156mod tests {
157    use super::*;
158
159    fn dir(name: &str) -> PathBuf {
160        let mut d = std::env::temp_dir();
161        d.push(format!("atomr-ddata-test-{}-{}", std::process::id(), name));
162        let _ = fs::remove_dir_all(&d);
163        d
164    }
165
166    #[test]
167    fn file_durable_persist_then_load() {
168        let s = FileDurableStore::open(dir("p1")).unwrap();
169        s.persist("k", b"hello").unwrap();
170        assert!(s.contains("k"));
171        let v = s.load("k").unwrap().unwrap();
172        assert_eq!(v, b"hello");
173    }
174
175    #[test]
176    fn file_durable_delete_removes_file() {
177        let s = FileDurableStore::open(dir("p2")).unwrap();
178        s.persist("k", b"x").unwrap();
179        s.delete_marker("k").unwrap();
180        assert!(!s.contains("k"));
181        assert!(s.load("k").unwrap().is_none());
182    }
183
184    #[test]
185    fn file_durable_keys_listing_is_stable() {
186        let s = FileDurableStore::open(dir("p3")).unwrap();
187        s.persist("a", b"1").unwrap();
188        s.persist("b", b"2").unwrap();
189        let mut keys = s.keys().unwrap();
190        keys.sort();
191        assert_eq!(keys, vec!["a", "b"]);
192    }
193
194    #[test]
195    fn noop_store_loads_nothing() {
196        let s = NoopDurableStore;
197        s.persist("k", b"x").unwrap();
198        assert!(s.load("k").unwrap().is_none());
199        assert!(s.keys().unwrap().is_empty());
200    }
201}