atomr_distributed_data/
durable.rs1use std::collections::HashSet;
21use std::fs::{self, File, OpenOptions};
22use std::io::{self, Read, Write};
23use std::path::PathBuf;
24use std::sync::Mutex;
25
26pub trait DurableStore: Send + Sync + 'static {
30 fn persist(&self, key: &str, blob: &[u8]) -> io::Result<()>;
33
34 fn persist_marker(&self, key: &str) -> io::Result<()> {
38 self.persist(key, &[])
39 }
40
41 fn delete_marker(&self, key: &str) -> io::Result<()>;
43
44 fn load(&self, key: &str) -> io::Result<Option<Vec<u8>>>;
46
47 fn keys(&self) -> io::Result<Vec<String>>;
49}
50
51pub struct NoopDurableStore;
53
54impl DurableStore for NoopDurableStore {
55 fn persist(&self, _key: &str, _blob: &[u8]) -> io::Result<()> {
56 Ok(())
57 }
58 fn delete_marker(&self, _key: &str) -> io::Result<()> {
59 Ok(())
60 }
61 fn load(&self, _key: &str) -> io::Result<Option<Vec<u8>>> {
62 Ok(None)
63 }
64 fn keys(&self) -> io::Result<Vec<String>> {
65 Ok(Vec::new())
66 }
67}
68
69pub struct FileDurableStore {
71 dir: PathBuf,
72 keys: Mutex<HashSet<String>>,
73}
74
75impl FileDurableStore {
76 pub fn open(dir: impl Into<PathBuf>) -> io::Result<Self> {
77 let dir = dir.into();
78 fs::create_dir_all(&dir)?;
79 let mut keys = HashSet::new();
80 for entry in fs::read_dir(&dir)? {
81 let entry = entry?;
82 if let Some(name) = entry.file_name().to_str() {
83 if let Some(stripped) = name.strip_suffix(".bin") {
84 keys.insert(unsanitize(stripped));
85 }
86 }
87 }
88 Ok(Self { dir, keys: Mutex::new(keys) })
89 }
90
91 pub fn tmp() -> io::Result<Self> {
93 let mut dir = std::env::temp_dir();
94 dir.push(format!("atomr-ddata-{}", std::process::id()));
95 dir.push(uuid_like());
96 Self::open(dir)
97 }
98
99 pub fn contains(&self, key: &str) -> bool {
100 self.keys.lock().unwrap().contains(key)
101 }
102
103 fn path_for(&self, key: &str) -> PathBuf {
104 self.dir.join(format!("{}.bin", sanitize(key)))
105 }
106}
107
108impl DurableStore for FileDurableStore {
109 fn persist(&self, key: &str, blob: &[u8]) -> io::Result<()> {
110 let path = self.path_for(key);
111 let mut f = OpenOptions::new().create(true).truncate(true).write(true).open(&path)?;
112 f.write_all(blob)?;
113 f.sync_data()?;
114 self.keys.lock().unwrap().insert(key.to_string());
115 Ok(())
116 }
117 fn delete_marker(&self, key: &str) -> io::Result<()> {
118 let path = self.path_for(key);
119 if path.exists() {
120 fs::remove_file(path)?;
121 }
122 self.keys.lock().unwrap().remove(key);
123 Ok(())
124 }
125 fn load(&self, key: &str) -> io::Result<Option<Vec<u8>>> {
126 let path = self.path_for(key);
127 if !path.exists() {
128 return Ok(None);
129 }
130 let mut buf = Vec::new();
131 File::open(path)?.read_to_end(&mut buf)?;
132 Ok(Some(buf))
133 }
134 fn keys(&self) -> io::Result<Vec<String>> {
135 let mut v: Vec<String> = self.keys.lock().unwrap().iter().cloned().collect();
136 v.sort();
137 Ok(v)
138 }
139}
140
141fn sanitize(key: &str) -> String {
142 key.chars().map(|c| if c.is_ascii_alphanumeric() || c == '-' || c == '_' { c } else { '_' }).collect()
143}
144
145fn unsanitize(name: &str) -> String {
146 name.to_string()
147}
148
149fn uuid_like() -> String {
150 use std::time::{SystemTime, UNIX_EPOCH};
151 let n = SystemTime::now().duration_since(UNIX_EPOCH).unwrap().as_nanos();
152 format!("{n:032x}")
153}
154
155#[cfg(test)]
156mod tests {
157 use super::*;
158
159 fn dir(name: &str) -> PathBuf {
160 let mut d = std::env::temp_dir();
161 d.push(format!("atomr-ddata-test-{}-{}", std::process::id(), name));
162 let _ = fs::remove_dir_all(&d);
163 d
164 }
165
166 #[test]
167 fn file_durable_persist_then_load() {
168 let s = FileDurableStore::open(dir("p1")).unwrap();
169 s.persist("k", b"hello").unwrap();
170 assert!(s.contains("k"));
171 let v = s.load("k").unwrap().unwrap();
172 assert_eq!(v, b"hello");
173 }
174
175 #[test]
176 fn file_durable_delete_removes_file() {
177 let s = FileDurableStore::open(dir("p2")).unwrap();
178 s.persist("k", b"x").unwrap();
179 s.delete_marker("k").unwrap();
180 assert!(!s.contains("k"));
181 assert!(s.load("k").unwrap().is_none());
182 }
183
184 #[test]
185 fn file_durable_keys_listing_is_stable() {
186 let s = FileDurableStore::open(dir("p3")).unwrap();
187 s.persist("a", b"1").unwrap();
188 s.persist("b", b"2").unwrap();
189 let mut keys = s.keys().unwrap();
190 keys.sort();
191 assert_eq!(keys, vec!["a", "b"]);
192 }
193
194 #[test]
195 fn noop_store_loads_nothing() {
196 let s = NoopDurableStore;
197 s.persist("k", b"x").unwrap();
198 assert!(s.load("k").unwrap().is_none());
199 assert!(s.keys().unwrap().is_empty());
200 }
201}