durability 0.6.4

Crash-consistent persistence primitives: directory abstraction, generic WAL, checkpoints, and recovery.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
//! Storage abstraction for durability.
//!
//! Vocabulary note:
//! - Some durability primitives require **atomicity** (e.g. atomic rename/write) and
//!   **integrity** (checksums, framing).
//! - Stable-storage **durability** (survives power loss after reporting success)
//!   additionally requires explicit `fsync`/`sync_all` barriers and sometimes
//!   parent-directory sync after renames.

use crate::error::{PersistenceError, PersistenceResult};
use std::io::{Read, Write};
use std::path::PathBuf;

/// Make the **data** of `path` durable (`fdatasync`).
///
/// Uses `sync_data()` (fdatasync) rather than `sync_all()` (fsync). For
/// append-only logs, this is correct and faster: fdatasync skips unnecessary
/// metadata updates (mtime, atime) that don't affect crash recovery.
///
/// Requires `Directory::file_path()`. Returns `NotSupported` if unavailable.
pub fn sync_file<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
    let Some(p) = dir.file_path(path) else {
        return Err(PersistenceError::NotSupported(
            "sync_file requires Directory::file_path()".into(),
        ));
    };
    let f = std::fs::OpenOptions::new().read(true).open(&p)?;
    f.sync_data()?;
    Ok(())
}

/// Attempt to `fsync`/`sync_all` the parent directory of `path`.
///
/// This is the commonly-missed step needed to make *names* durable:
/// - durable file creation
/// - durable atomic rename
///
/// Notes:
/// - On some platforms/filesystems, syncing the directory is required for the rename/create to
///   survive power loss even after syncing the file itself.
/// - This requires `Directory::file_path()`. If unavailable, returns `NotSupported`.
pub fn sync_parent_dir<D: Directory + ?Sized>(dir: &D, path: &str) -> PersistenceResult<()> {
    let Some(p) = dir.file_path(path) else {
        return Err(PersistenceError::NotSupported(
            "sync_parent_dir requires Directory::file_path()".into(),
        ));
    };
    sync_parent_of_path(&p)
}

/// Attempt to `fsync`/`sync_all` the parent directory of a raw filesystem path.
///
/// Same semantics as [`sync_parent_dir`], but does not require a [`Directory`]
/// instance. Useful for callers that operate on raw `&Path` values and don't
/// otherwise need the `Directory` abstraction (e.g. a JSON or postcard
/// serializer that takes a path argument and inlines its own temp+rename).
///
/// Notes:
/// - On some platforms/filesystems, syncing the directory is required for the
///   rename/create to survive power loss even after syncing the file itself
///   (XFS, some ext4 configurations, overlayfs).
/// - Some platforms (notably Windows) don't allow opening a directory as a
///   `File` and will return an `io::Error`; the caller decides whether to
///   surface that or treat it as best-effort. ext4 with `auto_da_alloc` syncs
///   implicitly.
pub fn sync_parent_of_path(path: &std::path::Path) -> PersistenceResult<()> {
    let Some(parent) = path.parent() else {
        return Err(PersistenceError::InvalidConfig(format!(
            "path has no parent directory: {path:?}"
        )));
    };
    let f = std::fs::File::open(parent)?;
    f.sync_all()?;
    Ok(())
}

/// Policy for when writers call `Write::flush()`.
///
/// Vocabulary note:
/// - `flush()` is not a stable-storage durability guarantee on most filesystems; it is best
///   treated as an IO boundary (push to OS / underlying writer).
/// - Stable-storage durability requires explicit `sync_all`/`fsync` barriers, which are not
///   expressible via the `Directory` trait today.
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum FlushPolicy {
    /// Call `flush()` after each logical append operation.
    PerAppend,
    /// Call `flush()` every N logical append operations.
    ///
    /// `n=1` is equivalent to `PerAppend`. `n=0` is treated as `PerAppend`.
    EveryN(usize),
    /// Call `flush()` when the specified duration has elapsed since the last flush.
    ///
    /// Checked lazily on each `append()` call. If no appends arrive, no flush occurs.
    /// For background flushing independent of write activity, use an external timer
    /// calling [`WalWriter::flush`](crate::walog::WalWriter::flush) directly.
    Interval(std::time::Duration),
    /// Do not call `flush()` implicitly; callers may flush explicitly (if supported by the backend).
    Manual,
}

/// Trait for directory-like storage backends.
pub trait Directory: Send + Sync {
    /// Create a new file for writing (overwriting if it exists).
    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
    /// Open an existing file for reading.
    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>>;
    /// Return whether a path exists.
    fn exists(&self, path: &str) -> bool;
    /// Delete a file or directory (directories recursively).
    fn delete(&self, path: &str) -> PersistenceResult<()>;
    /// Atomically rename/move a file.
    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()>;
    /// Create a directory (and parents if needed).
    fn create_dir_all(&self, path: &str) -> PersistenceResult<()>;
    /// List entries in a directory.
    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>>;
    /// Open a file for appending (creating it if missing).
    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>>;
    /// Atomically write bytes to a path.
    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()>;
    /// Optional filesystem path for backends that support it.
    fn file_path(&self, path: &str) -> Option<PathBuf>;

    // -- Stable-storage durability helpers (default: delegate to free functions) --
    //
    // These require `file_path()` to return `Some`. Non-filesystem backends
    // get `NotSupported` from the defaults, which is correct -- stable-storage
    // durability is meaningless without a real filesystem.

    /// Attempt to make the file at `path` durable on stable storage.
    ///
    /// Default: delegates to [`sync_file`].
    /// Returns `NotSupported` if `file_path()` returns `None`.
    fn durable_sync_file(&self, path: &str) -> PersistenceResult<()> {
        sync_file(self, path)
    }

    /// Attempt to make the *name* of `path` durable (sync the parent directory).
    ///
    /// Default: delegates to [`sync_parent_dir`].
    /// Returns `NotSupported` if `file_path()` returns `None`.
    fn durable_sync_parent_dir(&self, path: &str) -> PersistenceResult<()> {
        sync_parent_dir(self, path)
    }

    /// Atomically rename and then sync the destination parent directory.
    ///
    /// Returns `NotSupported` if `file_path()` returns `None`.
    fn atomic_rename_durable(&self, from: &str, to: &str) -> PersistenceResult<()> {
        let from_path = match self.file_path(from) {
            Some(p) => p,
            None => {
                return Err(PersistenceError::NotSupported(
                    "atomic_rename_durable requires Directory::file_path()".into(),
                ));
            }
        };
        let to_path = match self.file_path(to) {
            Some(p) => p,
            None => {
                return Err(PersistenceError::NotSupported(
                    "atomic_rename_durable requires Directory::file_path()".into(),
                ));
            }
        };

        self.atomic_rename(from, to)?;
        let from_parent = from_path.parent();
        let to_parent = to_path.parent();
        if from_parent != to_parent {
            self.durable_sync_parent_dir(from)?;
        }
        self.durable_sync_parent_dir(to)?;
        Ok(())
    }

    /// Atomically write bytes to `path` with explicit durability barriers.
    ///
    /// Stronger than [`Directory::atomic_write`]: writes temp, syncs temp,
    /// renames, syncs parent directory.
    ///
    /// Returns `NotSupported` if `file_path()` returns `None`.
    fn atomic_write_durable(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
        if self.file_path(path).is_none() {
            return Err(PersistenceError::NotSupported(
                "atomic_write_durable requires Directory::file_path()".into(),
            ));
        }

        let tmp = format!("{path}.tmp");
        if let Err(e) = (|| -> PersistenceResult<()> {
            let mut w = self.create_file(&tmp)?;
            w.write_all(data)?;
            w.flush()?;
            Ok(())
        })() {
            let _ = self.delete(&tmp);
            return Err(e);
        }

        if let Err(e) = self.durable_sync_file(&tmp) {
            let _ = self.delete(&tmp);
            return Err(e);
        }

        if let Err(e) = self.atomic_rename_durable(&tmp, path) {
            let _ = self.delete(&tmp);
            return Err(e);
        }

        Ok(())
    }
}

/// Filesystem-backed `Directory` rooted at a local path.
pub struct FsDirectory {
    root: PathBuf,
}

impl FsDirectory {
    /// Create (or open) a filesystem directory backend rooted at `root`.
    pub fn new(root: impl Into<PathBuf>) -> PersistenceResult<Self> {
        let root = root.into();
        std::fs::create_dir_all(&root)?;
        Ok(Self { root })
    }

    /// Create a filesystem directory backend wrapped in `Arc<dyn Directory>`.
    pub fn arc(
        root: impl Into<std::path::PathBuf>,
    ) -> PersistenceResult<std::sync::Arc<dyn Directory>> {
        Ok(std::sync::Arc::new(Self::new(root)?))
    }

    fn resolve_path(&self, path: &str) -> PersistenceResult<PathBuf> {
        // Reject path traversal: `..`, absolute paths, and prefix components.
        for component in std::path::Path::new(path).components() {
            match component {
                std::path::Component::ParentDir
                | std::path::Component::RootDir
                | std::path::Component::Prefix(_) => {
                    return Err(PersistenceError::InvalidConfig(format!(
                        "path must not contain '..', absolute, or prefix components: {path}"
                    )));
                }
                _ => {}
            }
        }
        Ok(self.root.join(path))
    }
}

impl Directory for FsDirectory {
    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
        let full_path = self.resolve_path(path)?;
        if let Some(parent) = full_path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        Ok(Box::new(std::fs::File::create(full_path)?))
    }

    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
        let full_path = self.resolve_path(path)?;
        if !full_path.exists() {
            return Err(PersistenceError::NotFound(full_path.display().to_string()));
        }
        Ok(Box::new(std::fs::File::open(full_path)?))
    }

    fn exists(&self, path: &str) -> bool {
        self.resolve_path(path).map(|p| p.exists()).unwrap_or(false)
    }

    fn delete(&self, path: &str) -> PersistenceResult<()> {
        let full_path = self.resolve_path(path)?;
        if full_path.is_dir() {
            std::fs::remove_dir_all(full_path)?;
        } else if full_path.exists() {
            std::fs::remove_file(full_path)?;
        }
        Ok(())
    }

    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
        let from_path = self.resolve_path(from)?;
        let to_path = self.resolve_path(to)?;
        if let Some(parent) = to_path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        std::fs::rename(from_path, to_path)?;
        Ok(())
    }

    fn create_dir_all(&self, path: &str) -> PersistenceResult<()> {
        std::fs::create_dir_all(self.resolve_path(path)?)?;
        Ok(())
    }

    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
        let full_path = self.resolve_path(path)?;
        if !full_path.exists() {
            return Ok(Vec::new());
        }
        let entries = std::fs::read_dir(full_path)?;
        let mut out = Vec::new();
        for entry in entries {
            let entry = entry?;
            out.push(entry.file_name().to_string_lossy().to_string());
        }
        out.sort();
        Ok(out)
    }

    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
        let full_path = self.resolve_path(path)?;
        if let Some(parent) = full_path.parent() {
            std::fs::create_dir_all(parent)?;
        }
        let file = std::fs::OpenOptions::new()
            .create(true)
            .append(true)
            .open(full_path)?;
        Ok(Box::new(file))
    }

    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
        let temp_path = format!("{path}.tmp");
        let full_temp_path = self.resolve_path(&temp_path)?;
        if let Some(parent) = full_temp_path.parent() {
            std::fs::create_dir_all(parent)?;
        }

        if let Err(e) = (|| -> PersistenceResult<()> {
            let mut temp_file = std::fs::File::create(&full_temp_path)?;
            temp_file.write_all(data)?;
            temp_file.sync_all()?;
            Ok(())
        })() {
            let _ = std::fs::remove_file(&full_temp_path);
            return Err(e);
        }

        let full_path = self.resolve_path(path)?;
        if let Err(e) = std::fs::rename(&full_temp_path, &full_path) {
            let _ = std::fs::remove_file(&full_temp_path);
            return Err(e.into());
        }

        if let Some(parent) = full_path.parent() {
            let parent_file = std::fs::File::open(parent)?;
            parent_file.sync_all()?;
        }
        Ok(())
    }

    fn file_path(&self, path: &str) -> Option<PathBuf> {
        self.resolve_path(path).ok()
    }
}

/// In-memory `Directory` used for tests.
#[derive(Clone, Default)]
pub struct MemoryDirectory {
    files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
}

impl MemoryDirectory {
    /// Create an empty in-memory directory.
    pub fn new() -> Self {
        Self::default()
    }

    /// Create an empty in-memory directory wrapped in `Arc<dyn Directory>`.
    pub fn arc() -> std::sync::Arc<dyn Directory> {
        std::sync::Arc::new(Self::new())
    }
}

impl Directory for MemoryDirectory {
    fn create_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
        // Overwrite semantics: clear the file eagerly, then append in-place.
        self.files
            .write()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?
            .insert(path.to_string(), Vec::new());

        Ok(Box::new(MemoryInPlaceWriter {
            files: self.files.clone(),
            path: path.to_string(),
        }))
    }

    fn open_file(&self, path: &str) -> PersistenceResult<Box<dyn Read + Send>> {
        let files = self
            .files
            .read()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?;
        let data = files
            .get(path)
            .ok_or_else(|| PersistenceError::NotFound(path.to_string()))?
            .clone();
        Ok(Box::new(std::io::Cursor::new(data)))
    }

    fn exists(&self, path: &str) -> bool {
        self.files
            .read()
            .map(|f| f.contains_key(path))
            .unwrap_or(false)
    }

    fn delete(&self, path: &str) -> PersistenceResult<()> {
        let mut files = self
            .files
            .write()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?;
        files.remove(path);
        // Also remove children (simulate remove_dir_all).
        let prefix = format!("{path}/");
        files.retain(|k, _| !k.starts_with(&prefix));
        Ok(())
    }

    fn atomic_rename(&self, from: &str, to: &str) -> PersistenceResult<()> {
        let mut files = self
            .files
            .write()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?;
        let data = files
            .remove(from)
            .ok_or_else(|| PersistenceError::NotFound(from.to_string()))?;
        files.insert(to.to_string(), data);
        Ok(())
    }

    fn create_dir_all(&self, _path: &str) -> PersistenceResult<()> {
        Ok(())
    }

    fn list_dir(&self, path: &str) -> PersistenceResult<Vec<String>> {
        let files = self
            .files
            .read()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?;
        let prefix = if path.is_empty() {
            "".to_string()
        } else {
            format!("{path}/")
        };
        let result: std::collections::BTreeSet<String> = files
            .keys()
            .filter(|k| k.starts_with(&prefix))
            .filter_map(|k| {
                let rest = k.strip_prefix(&prefix).unwrap_or(k);
                let first_component = rest.split('/').next().unwrap_or(rest);
                if first_component.is_empty() {
                    None
                } else {
                    Some(first_component.to_string())
                }
            })
            .collect();
        Ok(result.into_iter().collect())
    }

    fn append_file(&self, path: &str) -> PersistenceResult<Box<dyn Write + Send>> {
        // Ensure the file exists, then append in-place.
        {
            let mut files = self
                .files
                .write()
                .map_err(|_| PersistenceError::LockFailed {
                    resource: "memory directory".to_string(),
                    reason: "lock poisoned".to_string(),
                })?;
            files.entry(path.to_string()).or_insert_with(Vec::new);
        }
        Ok(Box::new(MemoryInPlaceWriter {
            files: self.files.clone(),
            path: path.to_string(),
        }))
    }

    fn atomic_write(&self, path: &str, data: &[u8]) -> PersistenceResult<()> {
        let mut files = self
            .files
            .write()
            .map_err(|_| PersistenceError::LockFailed {
                resource: "memory directory".to_string(),
                reason: "lock poisoned".to_string(),
            })?;
        files.insert(path.to_string(), data.to_vec());
        Ok(())
    }

    fn file_path(&self, _path: &str) -> Option<PathBuf> {
        None
    }
}

struct MemoryInPlaceWriter {
    files: std::sync::Arc<std::sync::RwLock<std::collections::HashMap<String, Vec<u8>>>>,
    path: String,
}

impl Write for MemoryInPlaceWriter {
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
        let mut files = self
            .files
            .write()
            .map_err(|_| std::io::Error::other("lock poisoned"))?;
        let entry = files.entry(self.path.clone()).or_insert_with(Vec::new);
        entry.extend_from_slice(buf);
        Ok(buf.len())
    }

    fn flush(&mut self) -> std::io::Result<()> {
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    /// `sync_parent_of_path` succeeds for an existing parent dir and surfaces
    /// the io::Error when the parent does not exist. The successful sync is
    /// the load-bearing case for atomic-rename callers; the failure case
    /// pins the error-surfacing contract so a caller-side `?` does the
    /// right thing.
    #[cfg(unix)]
    #[test]
    fn sync_parent_of_path_existing_parent_succeeds() {
        let dir = tempfile::tempdir().expect("tempdir");
        let path = dir.path().join("some_file.txt");
        std::fs::write(&path, b"hi").expect("write");

        // Parent (the tempdir) exists; sync should succeed.
        sync_parent_of_path(&path).expect("sync_parent_of_path on existing parent");
    }

    #[cfg(unix)]
    #[test]
    fn sync_parent_of_path_missing_parent_errors() {
        let dir = tempfile::tempdir().expect("tempdir");
        let nonexistent = dir.path().join("definitely-not-a-real-subdir/file.txt");
        // Parent does not exist; File::open(parent) fails -> error returned.
        assert!(sync_parent_of_path(&nonexistent).is_err());
    }

    #[test]
    fn sync_parent_of_path_root_errors_with_invalid_config() {
        // A bare path with no parent (e.g. just "/") returns InvalidConfig,
        // not a wrapped io::Error. Pins the parent.is_none() branch.
        let root = std::path::Path::new("/");
        match sync_parent_of_path(root) {
            Err(PersistenceError::InvalidConfig(_)) => {}
            other => panic!("expected InvalidConfig for root path, got {other:?}"),
        }
    }
}