1use crate::error::{PersistenceError, PersistenceResult};
11use std::io::{Read, Write};
12use std::path::PathBuf;
13
14pub fn sync_file<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
22 let Some(p) = dir.file_path(path) else {
23 return Err(PersistenceError::NotSupported(
24 "sync_file requires Directory::file_path()".into(),
25 ));
26 };
27 let f = std::fs::OpenOptions::new().read(true).open(&p)?;
28 f.sync_data()?;
29 Ok(())
30}
31
32pub fn sync_parent_dir<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
43 let Some(p) = dir.file_path(path) else {
44 return Err(PersistenceError::NotSupported(
45 "sync_parent_dir requires Directory::file_path()".into(),
46 ));
47 };
48 let Some(parent) = p.parent() else {
49 return Err(PersistenceError::InvalidConfig(format!(
50 "path has no parent directory: {p:?}"
51 )));
52 };
53 let f = std::fs::File::open(parent)?;
54 f.sync_all()?;
57 Ok(())
58}
59
60#[derive(Debug, Clone, Copy, PartialEq)]
68pub enum FlushPolicy {
69 PerAppend,
71 EveryN(usize),
75 Interval(std::time::Duration),
81 Manual,
83}
84
85pub trait Directory: Send + Sync {
87 fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
89 fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>>;
91 fn exists(&self, path: &str) -> bool;
93 fn delete(&self, path: &str) -> PersistenceResult<()>;
95 fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()>;
97 fn create_dir_all(&self, path: &str) -> PersistenceResult<()>;
99 fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>>;
101 fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
103 fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()>;
105 fn file_path(&self, path: &str) -> Option<PathBuf>;
107
108 fn durable_sync_file(&self, path: &str) -> PersistenceResult<()> {
119 sync_file(self, path)
120 }
121
122 fn durable_sync_parent_dir(&self, path: &str) -> PersistenceResult<()> {
127 sync_parent_dir(self, path)
128 }
129
130 fn atomic_rename_durable(&self, from: &str, to: &str) -> PersistenceResult<()> {
134 let from_path = match self.file_path(from) {
135 Some(p) => p,
136 None => {
137 return Err(PersistenceError::NotSupported(
138 "atomic_rename_durable requires Directory::file_path()".into(),
139 ));
140 }
141 };
142 let to_path = match self.file_path(to) {
143 Some(p) => p,
144 None => {
145 return Err(PersistenceError::NotSupported(
146 "atomic_rename_durable requires Directory::file_path()".into(),
147 ));
148 }
149 };
150
151 self.atomic_rename(from, to)?;
152 let from_parent = from_path.parent();
153 let to_parent = to_path.parent();
154 if from_parent != to_parent {
155 self.durable_sync_parent_dir(from)?;
156 }
157 self.durable_sync_parent_dir(to)?;
158 Ok(())
159 }
160
161 fn atomic_write_durable(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
168 if self.file_path(path).is_none() {
169 return Err(PersistenceError::NotSupported(
170 "atomic_write_durable requires Directory::file_path()".into(),
171 ));
172 }
173
174 let tmp = format!("{path}.tmp");
175 if let Err(e) = (|| -> PersistenceResult<()> {
176 let mut w = self.create_file(&tmp)?;
177 w.write_all(data)?;
178 w.flush()?;
179 Ok(())
180 })() {
181 let _ = self.delete(&tmp);
182 return Err(e);
183 }
184
185 if let Err(e) = self.durable_sync_file(&tmp) {
186 let _ = self.delete(&tmp);
187 return Err(e);
188 }
189
190 if let Err(e) = self.atomic_rename_durable(&tmp, path) {
191 let _ = self.delete(&tmp);
192 return Err(e);
193 }
194
195 Ok(())
196 }
197}
198
199pub struct FsDirectory {
201 root: PathBuf,
202}
203
204impl FsDirectory {
205 pub fn new(root: impl Into<PathBuf>) -> PersistenceResult<Self> {
207 let root = root.into();
208 std::fs::create_dir_all(&root)?;
209 Ok(Self { root })
210 }
211
212 pub fn arc(
214 root: impl Into<std::path::PathBuf>,
215 ) -> PersistenceResult<std::sync::Arc<dyn Directory>> {
216 Ok(std::sync::Arc::new(Self::new(root)?))
217 }
218
219 fn resolve_path(&self, path: &str) -> PersistenceResult<PathBuf> {
220 for component in std::path::Path::new(path).components() {
222 match component {
223 std::path::Component::ParentDir
224 | std::path::Component::RootDir
225 | std::path::Component::Prefix(_) => {
226 return Err(PersistenceError::InvalidConfig(format!(
227 "path must not contain '..', absolute, or prefix components: {path}"
228 )));
229 }
230 _ => {}
231 }
232 }
233 Ok(self.root.join(path))
234 }
235}
236
237impl Directory for FsDirectory {
238 fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
239 let full_path = self.resolve_path(path)?;
240 if let Some(parent) = full_path.parent() {
241 std::fs::create_dir_all(parent)?;
242 }
243 Ok(Box::new(std::fs::File::create(full_path)?))
244 }
245
246 fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
247 let full_path = self.resolve_path(path)?;
248 if !full_path.exists() {
249 return Err(PersistenceError::NotFound(full_path.display().to_string()));
250 }
251 Ok(Box::new(std::fs::File::open(full_path)?))
252 }
253
254 fn exists(&self, path: &str) -> bool {
255 self.resolve_path(path).map(|p| p.exists()).unwrap_or(false)
256 }
257
258 fn delete(&self, path: &str) -> PersistenceResult<()> {
259 let full_path = self.resolve_path(path)?;
260 if full_path.is_dir() {
261 std::fs::remove_dir_all(full_path)?;
262 } else if full_path.exists() {
263 std::fs::remove_file(full_path)?;
264 }
265 Ok(())
266 }
267
268 fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
269 let from_path = self.resolve_path(from)?;
270 let to_path = self.resolve_path(to)?;
271 if let Some(parent) = to_path.parent() {
272 std::fs::create_dir_all(parent)?;
273 }
274 std::fs::rename(from_path, to_path)?;
275 Ok(())
276 }
277
278 fn create_dir_all(&self, path: &str) -> PersistenceResult<()> {
279 std::fs::create_dir_all(self.resolve_path(path)?)?;
280 Ok(())
281 }
282
283 fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
284 let full_path = self.resolve_path(path)?;
285 if !full_path.exists() {
286 return Ok(Vec::new());
287 }
288 let entries = std::fs::read_dir(full_path)?;
289 let mut out = Vec::new();
290 for entry in entries {
291 let entry = entry?;
292 out.push(entry.file_name().to_string_lossy().to_string());
293 }
294 out.sort();
295 Ok(out)
296 }
297
298 fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
299 let full_path = self.resolve_path(path)?;
300 if let Some(parent) = full_path.parent() {
301 std::fs::create_dir_all(parent)?;
302 }
303 let file = std::fs::OpenOptions::new()
304 .create(true)
305 .append(true)
306 .open(full_path)?;
307 Ok(Box::new(file))
308 }
309
310 fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
311 let temp_path = format!("{path}.tmp");
312 let full_temp_path = self.resolve_path(&temp_path)?;
313 if let Some(parent) = full_temp_path.parent() {
314 std::fs::create_dir_all(parent)?;
315 }
316
317 if let Err(e) = (|| -> PersistenceResult<()> {
318 let mut temp_file = std::fs::File::create(&full_temp_path)?;
319 temp_file.write_all(data)?;
320 temp_file.sync_all()?;
321 Ok(())
322 })() {
323 let _ = std::fs::remove_file(&full_temp_path);
324 return Err(e);
325 }
326
327 let full_path = self.resolve_path(path)?;
328 if let Err(e) = std::fs::rename(&full_temp_path, &full_path) {
329 let _ = std::fs::remove_file(&full_temp_path);
330 return Err(e.into());
331 }
332
333 if let Some(parent) = full_path.parent() {
334 let parent_file = std::fs::File::open(parent)?;
335 parent_file.sync_all()?;
336 }
337 Ok(())
338 }
339
340 fn file_path(&self, path: &str) -> Option<PathBuf> {
341 self.resolve_path(path).ok()
342 }
343}
344
345#[derive(Clone, Default)]
347pub struct MemoryDirectory {
348 files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
349}
350
351impl MemoryDirectory {
352 pub fn new() -> Self {
354 Self::default()
355 }
356
357 pub fn arc() -> std::sync::Arc<dyn Directory> {
359 std::sync::Arc::new(Self::new())
360 }
361}
362
363impl Directory for MemoryDirectory {
364 fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
365 self.files
367 .write()
368 .map_err(|_| PersistenceError::LockFailed {
369 resource: "memory directory".to_string(),
370 reason: "lock poisoned".to_string(),
371 })?
372 .insert(path.to_string(), Vec::new());
373
374 Ok(Box::new(MemoryInPlaceWriter {
375 files: self.files.clone(),
376 path: path.to_string(),
377 }))
378 }
379
380 fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
381 let files = self
382 .files
383 .read()
384 .map_err(|_| PersistenceError::LockFailed {
385 resource: "memory directory".to_string(),
386 reason: "lock poisoned".to_string(),
387 })?;
388 let data = files
389 .get(path)
390 .ok_or_else(|| PersistenceError::NotFound(path.to_string()))?
391 .clone();
392 Ok(Box::new(std::io::Cursor::new(data)))
393 }
394
395 fn exists(&self, path: &str) -> bool {
396 self.files
397 .read()
398 .map(|f| f.contains_key(path))
399 .unwrap_or(false)
400 }
401
402 fn delete(&self, path: &str) -> PersistenceResult<()> {
403 let mut files = self
404 .files
405 .write()
406 .map_err(|_| PersistenceError::LockFailed {
407 resource: "memory directory".to_string(),
408 reason: "lock poisoned".to_string(),
409 })?;
410 files.remove(path);
411 let prefix = format!("{path}/");
413 files.retain(|k, _| !k.starts_with(&prefix));
414 Ok(())
415 }
416
417 fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
418 let mut files = self
419 .files
420 .write()
421 .map_err(|_| PersistenceError::LockFailed {
422 resource: "memory directory".to_string(),
423 reason: "lock poisoned".to_string(),
424 })?;
425 let data = files
426 .remove(from)
427 .ok_or_else(|| PersistenceError::NotFound(from.to_string()))?;
428 files.insert(to.to_string(), data);
429 Ok(())
430 }
431
432 fn create_dir_all(&self, _path: &str) -> PersistenceResult<()> {
433 Ok(())
434 }
435
436 fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
437 let files = self
438 .files
439 .read()
440 .map_err(|_| PersistenceError::LockFailed {
441 resource: "memory directory".to_string(),
442 reason: "lock poisoned".to_string(),
443 })?;
444 let prefix = if path.is_empty() {
445 "".to_string()
446 } else {
447 format!("{path}/")
448 };
449 let result: std::collections::BTreeSet<String> = files
450 .keys()
451 .filter(|k| k.starts_with(&prefix))
452 .filter_map(|k| {
453 let rest = k.strip_prefix(&prefix).unwrap_or(k);
454 let first_component = rest.split('/').next().unwrap_or(rest);
455 if first_component.is_empty() {
456 None
457 } else {
458 Some(first_component.to_string())
459 }
460 })
461 .collect();
462 Ok(result.into_iter().collect())
463 }
464
465 fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
466 {
468 let mut files = self
469 .files
470 .write()
471 .map_err(|_| PersistenceError::LockFailed {
472 resource: "memory directory".to_string(),
473 reason: "lock poisoned".to_string(),
474 })?;
475 files.entry(path.to_string()).or_insert_with(Vec::new);
476 }
477 Ok(Box::new(MemoryInPlaceWriter {
478 files: self.files.clone(),
479 path: path.to_string(),
480 }))
481 }
482
483 fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
484 let mut files = self
485 .files
486 .write()
487 .map_err(|_| PersistenceError::LockFailed {
488 resource: "memory directory".to_string(),
489 reason: "lock poisoned".to_string(),
490 })?;
491 files.insert(path.to_string(), data.to_vec());
492 Ok(())
493 }
494
495 fn file_path(&self, _path: &str) -> Option<PathBuf> {
496 None
497 }
498}
499
500struct MemoryInPlaceWriter {
501 files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
502 path: String,
503}
504
505impl Write for MemoryInPlaceWriter {
506 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
507 let mut files = self
508 .files
509 .write()
510 .map_err(|_| std::io::Error::other("lock poisoned"))?;
511 let entry = files.entry(self.path.clone()).or_insert_with(Vec::new);
512 entry.extend_from_slice(buf);
513 Ok(buf.len())
514 }
515
516 fn flush(&mut self) -> std::io::Result<()> {
517 Ok(())
518 }
519}