Skip to main content

durability/
storage.rs

1//! Storage abstraction for durability.
2//!
3//! Vocabulary note:
4//! - Some durability primitives require **atomicity** (e.g. atomic rename/write) and
5//!   **integrity** (checksums, framing).
6//! - Stable-storage **durability** (survives power loss after reporting success)
7//!   additionally requires explicit `fsync`/`sync_all` barriers and sometimes
8//!   parent-directory sync after renames.
9
10use crate::error::{PersistenceError, PersistenceResult};
11use std::io::{Read, Write};
12use std::path::PathBuf;
13
14/// Make the **data** of `path` durable (`fdatasync`).
15///
16/// Uses `sync_data()` (fdatasync) rather than `sync_all()` (fsync). For
17/// append-only logs, this is correct and faster: fdatasync skips unnecessary
18/// metadata updates (mtime, atime) that don't affect crash recovery.
19///
20/// Requires `Directory::file_path()`. Returns `NotSupported` if unavailable.
21pub fn sync_file<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
22    let Some(p) = dir.file_path(path) else {
23        return Err(PersistenceError::NotSupported(
24            "sync_file requires Directory::file_path()".into(),
25        ));
26    };
27    let f = std::fs::OpenOptions::new().read(true).open(&p)?;
28    f.sync_data()?;
29    Ok(())
30}
31
32/// Attempt to `fsync`/`sync_all` the parent directory of `path`.
33///
34/// This is the commonly-missed step needed to make *names* durable:
35/// - durable file creation
36/// - durable atomic rename
37///
38/// Notes:
39/// - On some platforms/filesystems, syncing the directory is required for the rename/create to
40///   survive power loss even after syncing the file itself.
41/// - This requires `Directory::file_path()`. If unavailable, returns `NotSupported`.
42pub fn sync_parent_dir<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
43    let Some(p) = dir.file_path(path) else {
44        return Err(PersistenceError::NotSupported(
45            "sync_parent_dir requires Directory::file_path()".into(),
46        ));
47    };
48    let Some(parent) = p.parent() else {
49        return Err(PersistenceError::InvalidConfig(format!(
50            "path has no parent directory: {p:?}"
51        )));
52    };
53    let f = std::fs::File::open(parent)?;
54    // Best-effort: if this fails due to platform-specific directory open semantics,
55    // surface the error to the caller; stable storage requires an explicit decision.
56    f.sync_all()?;
57    Ok(())
58}
59
60/// Policy for when writers call `Write::flush()`.
61///
62/// Vocabulary note:
63/// - `flush()` is not a stable-storage durability guarantee on most filesystems; it is best
64///   treated as an IO boundary (push to OS / underlying writer).
65/// - Stable-storage durability requires explicit `sync_all`/`fsync` barriers, which are not
66///   expressible via the `Directory` trait today.
67#[derive(Debug, Clone, Copy, PartialEq)]
68pub enum FlushPolicy {
69    /// Call `flush()` after each logical append operation.
70    PerAppend,
71    /// Call `flush()` every N logical append operations.
72    ///
73    /// `n=1` is equivalent to `PerAppend`. `n=0` is treated as `PerAppend`.
74    EveryN(usize),
75    /// Call `flush()` when the specified duration has elapsed since the last flush.
76    ///
77    /// Checked lazily on each `append()` call. If no appends arrive, no flush occurs.
78    /// For background flushing independent of write activity, use an external timer
79    /// calling [`WalWriter::flush`](crate::walog::WalWriter::flush) directly.
80    Interval(std::time::Duration),
81    /// Do not call `flush()` implicitly; callers may flush explicitly (if supported by the backend).
82    Manual,
83}
84
85/// Trait for directory-like storage backends.
86pub trait Directory: Send + Sync {
87    /// Create a new file for writing (overwriting if it exists).
88    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
89    /// Open an existing file for reading.
90    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>>;
91    /// Return whether a path exists.
92    fn exists(&self, path: &str) -> bool;
93    /// Delete a file or directory (directories recursively).
94    fn delete(&self, path: &str) -> PersistenceResult<()>;
95    /// Atomically rename/move a file.
96    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()>;
97    /// Create a directory (and parents if needed).
98    fn create_dir_all(&self, path: &str) -> PersistenceResult<()>;
99    /// List entries in a directory.
100    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>>;
101    /// Open a file for appending (creating it if missing).
102    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
103    /// Atomically write bytes to a path.
104    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()>;
105    /// Optional filesystem path for backends that support it.
106    fn file_path(&self, path: &str) -> Option<PathBuf>;
107
108    // -- Stable-storage durability helpers (default: delegate to free functions) --
109    //
110    // These require `file_path()` to return `Some`. Non-filesystem backends
111    // get `NotSupported` from the defaults, which is correct -- stable-storage
112    // durability is meaningless without a real filesystem.
113
114    /// Attempt to make the file at `path` durable on stable storage.
115    ///
116    /// Default: delegates to [`sync_file`].
117    /// Returns `NotSupported` if `file_path()` returns `None`.
118    fn durable_sync_file(&self, path: &str) -> PersistenceResult<()> {
119        sync_file(self, path)
120    }
121
122    /// Attempt to make the *name* of `path` durable (sync the parent directory).
123    ///
124    /// Default: delegates to [`sync_parent_dir`].
125    /// Returns `NotSupported` if `file_path()` returns `None`.
126    fn durable_sync_parent_dir(&self, path: &str) -> PersistenceResult<()> {
127        sync_parent_dir(self, path)
128    }
129
130    /// Atomically rename and then sync the destination parent directory.
131    ///
132    /// Returns `NotSupported` if `file_path()` returns `None`.
133    fn atomic_rename_durable(&self, from: &str, to: &str) -> PersistenceResult<()> {
134        let from_path = match self.file_path(from) {
135            Some(p) => p,
136            None => {
137                return Err(PersistenceError::NotSupported(
138                    "atomic_rename_durable requires Directory::file_path()".into(),
139                ));
140            }
141        };
142        let to_path = match self.file_path(to) {
143            Some(p) => p,
144            None => {
145                return Err(PersistenceError::NotSupported(
146                    "atomic_rename_durable requires Directory::file_path()".into(),
147                ));
148            }
149        };
150
151        self.atomic_rename(from, to)?;
152        let from_parent = from_path.parent();
153        let to_parent = to_path.parent();
154        if from_parent != to_parent {
155            self.durable_sync_parent_dir(from)?;
156        }
157        self.durable_sync_parent_dir(to)?;
158        Ok(())
159    }
160
161    /// Atomically write bytes to `path` with explicit durability barriers.
162    ///
163    /// Stronger than [`Directory::atomic_write`]: writes temp, syncs temp,
164    /// renames, syncs parent directory.
165    ///
166    /// Returns `NotSupported` if `file_path()` returns `None`.
167    fn atomic_write_durable(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
168        if self.file_path(path).is_none() {
169            return Err(PersistenceError::NotSupported(
170                "atomic_write_durable requires Directory::file_path()".into(),
171            ));
172        }
173
174        let tmp = format!("{path}.tmp");
175        if let Err(e) = (|| -> PersistenceResult<()> {
176            let mut w = self.create_file(&tmp)?;
177            w.write_all(data)?;
178            w.flush()?;
179            Ok(())
180        })() {
181            let _ = self.delete(&tmp);
182            return Err(e);
183        }
184
185        if let Err(e) = self.durable_sync_file(&tmp) {
186            let _ = self.delete(&tmp);
187            return Err(e);
188        }
189
190        if let Err(e) = self.atomic_rename_durable(&tmp, path) {
191            let _ = self.delete(&tmp);
192            return Err(e);
193        }
194
195        Ok(())
196    }
197}
198
199/// Filesystem-backed `Directory` rooted at a local path.
200pub struct FsDirectory {
201    root: PathBuf,
202}
203
204impl FsDirectory {
205    /// Create (or open) a filesystem directory backend rooted at `root`.
206    pub fn new(root: impl Into<PathBuf>) -> PersistenceResult<Self> {
207        let root = root.into();
208        std::fs::create_dir_all(&root)?;
209        Ok(Self { root })
210    }
211
212    /// Create a filesystem directory backend wrapped in `Arc<dyn Directory>`.
213    pub fn arc(
214        root: impl Into<std::path::PathBuf>,
215    ) -> PersistenceResult<std::sync::Arc<dyn Directory>> {
216        Ok(std::sync::Arc::new(Self::new(root)?))
217    }
218
219    fn resolve_path(&self, path: &str) -> PersistenceResult<PathBuf> {
220        // Reject path traversal: `..`, absolute paths, and prefix components.
221        for component in std::path::Path::new(path).components() {
222            match component {
223                std::path::Component::ParentDir
224                | std::path::Component::RootDir
225                | std::path::Component::Prefix(_) => {
226                    return Err(PersistenceError::InvalidConfig(format!(
227                        "path must not contain '..', absolute, or prefix components: {path}"
228                    )));
229                }
230                _ => {}
231            }
232        }
233        Ok(self.root.join(path))
234    }
235}
236
237impl Directory for FsDirectory {
238    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
239        let full_path = self.resolve_path(path)?;
240        if let Some(parent) = full_path.parent() {
241            std::fs::create_dir_all(parent)?;
242        }
243        Ok(Box::new(std::fs::File::create(full_path)?))
244    }
245
246    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
247        let full_path = self.resolve_path(path)?;
248        if !full_path.exists() {
249            return Err(PersistenceError::NotFound(full_path.display().to_string()));
250        }
251        Ok(Box::new(std::fs::File::open(full_path)?))
252    }
253
254    fn exists(&self, path: &str) -> bool {
255        self.resolve_path(path).map(|p| p.exists()).unwrap_or(false)
256    }
257
258    fn delete(&self, path: &str) -> PersistenceResult<()> {
259        let full_path = self.resolve_path(path)?;
260        if full_path.is_dir() {
261            std::fs::remove_dir_all(full_path)?;
262        } else if full_path.exists() {
263            std::fs::remove_file(full_path)?;
264        }
265        Ok(())
266    }
267
268    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
269        let from_path = self.resolve_path(from)?;
270        let to_path = self.resolve_path(to)?;
271        if let Some(parent) = to_path.parent() {
272            std::fs::create_dir_all(parent)?;
273        }
274        std::fs::rename(from_path, to_path)?;
275        Ok(())
276    }
277
278    fn create_dir_all(&self, path: &str) -> PersistenceResult<()> {
279        std::fs::create_dir_all(self.resolve_path(path)?)?;
280        Ok(())
281    }
282
283    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
284        let full_path = self.resolve_path(path)?;
285        if !full_path.exists() {
286            return Ok(Vec::new());
287        }
288        let entries = std::fs::read_dir(full_path)?;
289        let mut out = Vec::new();
290        for entry in entries {
291            let entry = entry?;
292            out.push(entry.file_name().to_string_lossy().to_string());
293        }
294        out.sort();
295        Ok(out)
296    }
297
298    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
299        let full_path = self.resolve_path(path)?;
300        if let Some(parent) = full_path.parent() {
301            std::fs::create_dir_all(parent)?;
302        }
303        let file = std::fs::OpenOptions::new()
304            .create(true)
305            .append(true)
306            .open(full_path)?;
307        Ok(Box::new(file))
308    }
309
310    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
311        let temp_path = format!("{path}.tmp");
312        let full_temp_path = self.resolve_path(&temp_path)?;
313        if let Some(parent) = full_temp_path.parent() {
314            std::fs::create_dir_all(parent)?;
315        }
316
317        if let Err(e) = (|| -> PersistenceResult<()> {
318            let mut temp_file = std::fs::File::create(&full_temp_path)?;
319            temp_file.write_all(data)?;
320            temp_file.sync_all()?;
321            Ok(())
322        })() {
323            let _ = std::fs::remove_file(&full_temp_path);
324            return Err(e);
325        }
326
327        let full_path = self.resolve_path(path)?;
328        if let Err(e) = std::fs::rename(&full_temp_path, &full_path) {
329            let _ = std::fs::remove_file(&full_temp_path);
330            return Err(e.into());
331        }
332
333        if let Some(parent) = full_path.parent() {
334            let parent_file = std::fs::File::open(parent)?;
335            parent_file.sync_all()?;
336        }
337        Ok(())
338    }
339
340    fn file_path(&self, path: &str) -> Option<PathBuf> {
341        self.resolve_path(path).ok()
342    }
343}
344
345/// In-memory `Directory` used for tests.
346#[derive(Clone, Default)]
347pub struct MemoryDirectory {
348    files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
349}
350
351impl MemoryDirectory {
352    /// Create an empty in-memory directory.
353    pub fn new() -> Self {
354        Self::default()
355    }
356
357    /// Create an empty in-memory directory wrapped in `Arc<dyn Directory>`.
358    pub fn arc() -> std::sync::Arc<dyn Directory> {
359        std::sync::Arc::new(Self::new())
360    }
361}
362
363impl Directory for MemoryDirectory {
364    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
365        // Overwrite semantics: clear the file eagerly, then append in-place.
366        self.files
367            .write()
368            .map_err(|_| PersistenceError::LockFailed {
369                resource: "memory directory".to_string(),
370                reason: "lock poisoned".to_string(),
371            })?
372            .insert(path.to_string(), Vec::new());
373
374        Ok(Box::new(MemoryInPlaceWriter {
375            files: self.files.clone(),
376            path: path.to_string(),
377        }))
378    }
379
380    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
381        let files = self
382            .files
383            .read()
384            .map_err(|_| PersistenceError::LockFailed {
385                resource: "memory directory".to_string(),
386                reason: "lock poisoned".to_string(),
387            })?;
388        let data = files
389            .get(path)
390            .ok_or_else(|| PersistenceError::NotFound(path.to_string()))?
391            .clone();
392        Ok(Box::new(std::io::Cursor::new(data)))
393    }
394
395    fn exists(&self, path: &str) -> bool {
396        self.files
397            .read()
398            .map(|f| f.contains_key(path))
399            .unwrap_or(false)
400    }
401
402    fn delete(&self, path: &str) -> PersistenceResult<()> {
403        let mut files = self
404            .files
405            .write()
406            .map_err(|_| PersistenceError::LockFailed {
407                resource: "memory directory".to_string(),
408                reason: "lock poisoned".to_string(),
409            })?;
410        files.remove(path);
411        // Also remove children (simulate remove_dir_all).
412        let prefix = format!("{path}/");
413        files.retain(|k, _| !k.starts_with(&prefix));
414        Ok(())
415    }
416
417    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
418        let mut files = self
419            .files
420            .write()
421            .map_err(|_| PersistenceError::LockFailed {
422                resource: "memory directory".to_string(),
423                reason: "lock poisoned".to_string(),
424            })?;
425        let data = files
426            .remove(from)
427            .ok_or_else(|| PersistenceError::NotFound(from.to_string()))?;
428        files.insert(to.to_string(), data);
429        Ok(())
430    }
431
432    fn create_dir_all(&self, _path: &str) -> PersistenceResult<()> {
433        Ok(())
434    }
435
436    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
437        let files = self
438            .files
439            .read()
440            .map_err(|_| PersistenceError::LockFailed {
441                resource: "memory directory".to_string(),
442                reason: "lock poisoned".to_string(),
443            })?;
444        let prefix = if path.is_empty() {
445            "".to_string()
446        } else {
447            format!("{path}/")
448        };
449        let result: std::collections::BTreeSet<String> = files
450            .keys()
451            .filter(|k| k.starts_with(&prefix))
452            .filter_map(|k| {
453                let rest = k.strip_prefix(&prefix).unwrap_or(k);
454                let first_component = rest.split('/').next().unwrap_or(rest);
455                if first_component.is_empty() {
456                    None
457                } else {
458                    Some(first_component.to_string())
459                }
460            })
461            .collect();
462        Ok(result.into_iter().collect())
463    }
464
465    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
466        // Ensure the file exists, then append in-place.
467        {
468            let mut files = self
469                .files
470                .write()
471                .map_err(|_| PersistenceError::LockFailed {
472                    resource: "memory directory".to_string(),
473                    reason: "lock poisoned".to_string(),
474                })?;
475            files.entry(path.to_string()).or_insert_with(Vec::new);
476        }
477        Ok(Box::new(MemoryInPlaceWriter {
478            files: self.files.clone(),
479            path: path.to_string(),
480        }))
481    }
482
483    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
484        let mut files = self
485            .files
486            .write()
487            .map_err(|_| PersistenceError::LockFailed {
488                resource: "memory directory".to_string(),
489                reason: "lock poisoned".to_string(),
490            })?;
491        files.insert(path.to_string(), data.to_vec());
492        Ok(())
493    }
494
495    fn file_path(&self, _path: &str) -> Option<PathBuf> {
496        None
497    }
498}
499
500struct MemoryInPlaceWriter {
501    files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
502    path: String,
503}
504
505impl Write for MemoryInPlaceWriter {
506    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
507        let mut files = self
508            .files
509            .write()
510            .map_err(|_| std::io::Error::other("lock poisoned"))?;
511        let entry = files.entry(self.path.clone()).or_insert_with(Vec::new);
512        entry.extend_from_slice(buf);
513        Ok(buf.len())
514    }
515
516    fn flush(&mut self) -> std::io::Result<()> {
517        Ok(())
518    }
519}