Skip to main content

nodedb_wal/segment/
atomic_io.rs

1//! Durable atomic file / directory operations for checkpoint-class writes.
2//!
3//! The tmp-file + rename pattern is atomic only if both the file data and
4//! the containing directory entry reach stable storage in the correct order.
5//! On ext4 / XFS the rename metadata op can reach disk before the data pages
6//! backing the tmp file — a power loss between the write and the next
7//! checkpoint then leaves a correctly-named file containing zeros.
8//!
9//! [`atomic_write_fsync`] is the single helper all checkpoint-class writers
10//! go through so the ordering (`write → sync_data → rename → fsync_dir`) is
11//! enforced in one place. [`atomic_swap_dirs_fsync`] does the same for
12//! directory-level swaps (rename old-dir → backup, rename new-dir → old-dir).
13//!
14//! [`read_checkpoint_dontneed`] pairs with the write helper on the read side:
15//! checkpoint bytes are consumed once (deserialized into the in-memory index)
16//! and then superseded. Leaving them in the page cache wastes memory needed
17//! by hot workloads.
18
19use std::fs;
20use std::io::Write;
21use std::os::unix::io::AsRawFd;
22use std::path::Path;
23
24use crate::error::{Result, WalError};
25
26/// Fsync a directory to ensure file creation/deletion metadata is durable.
27///
28/// On ext4/XFS, creating or deleting a file writes the file data to disk
29/// but the directory entry may only be in the page cache. A power loss
30/// before the directory entry is persisted causes the file to "disappear"
31/// on reboot. Calling fsync on the directory fd ensures the metadata
32/// (filename, inode pointer) is on stable storage.
33pub fn fsync_directory(dir: &Path) -> Result<()> {
34    let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
35    dir_file.sync_all().map_err(WalError::Io)?;
36    Ok(())
37}
38
39/// Atomically write `bytes` to `dst` via a `tmp` file with full durability.
40///
41/// Order of operations (must not change):
42/// 1. Create / truncate `tmp` and write `bytes`.
43/// 2. `sync_data()` on `tmp` — forces file data pages to stable storage.
44/// 3. `rename(tmp, dst)` — atomic on POSIX filesystems.
45/// 4. `fsync_directory(parent)` — forces the directory entry durable so the
46///    new name survives power loss.
47///
48/// `tmp` and `dst` MUST be in the same directory; otherwise rename is not
49/// atomic and the parent fsync won't cover both entries.
50pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
51    let parent = dst.parent().ok_or_else(|| {
52        WalError::Io(std::io::Error::new(
53            std::io::ErrorKind::InvalidInput,
54            "atomic_write_fsync: dst has no parent directory",
55        ))
56    })?;
57
58    {
59        let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
60        f.write_all(bytes).map_err(WalError::Io)?;
61        f.sync_data().map_err(WalError::Io)?;
62    }
63
64    fs::rename(tmp, dst).map_err(WalError::Io)?;
65    fsync_directory(parent)?;
66    Ok(())
67}
68
69/// Atomically swap a directory: `rename(live, backup); rename(staged, live)`,
70/// fsyncing the parent directory once both renames have completed.
71///
72/// `live`, `backup`, and `staged` MUST share the same parent directory. The
73/// caller is responsible for removing the backup directory once the new
74/// state is proven good — this helper does not delete anything.
75pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
76    let parent = live.parent().ok_or_else(|| {
77        WalError::Io(std::io::Error::new(
78            std::io::ErrorKind::InvalidInput,
79            "atomic_swap_dirs_fsync: live has no parent directory",
80        ))
81    })?;
82
83    fs::rename(live, backup).map_err(WalError::Io)?;
84    fs::rename(staged, live).map_err(WalError::Io)?;
85    fsync_directory(parent)?;
86    Ok(())
87}
88
89/// Read a checkpoint file and advise the kernel to drop its pages from the
90/// page cache.
91///
92/// Checkpoint files are consumed exactly once per process lifetime (loaded
93/// into the in-memory index and then superseded). `posix_fadvise(DONTNEED)`
94/// after read frees the page-cache memory for hot workloads.
95///
96/// On non-Unix targets the advise call is skipped and this degrades to a
97/// plain read.
98pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
99    let file = fs::File::open(path).map_err(WalError::Io)?;
100    let len = file.metadata().map_err(WalError::Io)?.len();
101    let bytes = fs::read(path).map_err(WalError::Io)?;
102
103    #[cfg(unix)]
104    {
105        // Safe: `file` owns the fd for the duration of the call; len fits in
106        // off_t on all supported platforms (checkpoint files are << i64::MAX).
107        let ret = unsafe {
108            libc::posix_fadvise(
109                file.as_raw_fd(),
110                0,
111                len as libc::off_t,
112                libc::POSIX_FADV_DONTNEED,
113            )
114        };
115        if ret != 0 {
116            tracing::debug!(
117                path = %path.display(),
118                ret,
119                "posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
120            );
121        }
122    }
123    #[cfg(not(unix))]
124    {
125        let _ = len;
126    }
127
128    Ok(bytes)
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn atomic_write_fsync_roundtrip() {
137        let dir = tempfile::tempdir().unwrap();
138        let dst = dir.path().join("payload.ckpt");
139        let tmp = dir.path().join("payload.ckpt.tmp");
140
141        atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
142        assert!(!tmp.exists(), "tmp must be renamed away");
143        assert_eq!(fs::read(&dst).unwrap(), b"hello world");
144    }
145
146    #[test]
147    fn atomic_write_fsync_overwrites() {
148        let dir = tempfile::tempdir().unwrap();
149        let dst = dir.path().join("payload.ckpt");
150        let tmp = dir.path().join("payload.ckpt.tmp");
151
152        atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
153        atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
154        assert_eq!(fs::read(&dst).unwrap(), b"v2");
155    }
156
157    #[test]
158    fn atomic_swap_dirs_fsync_swaps() {
159        let dir = tempfile::tempdir().unwrap();
160        let live = dir.path().join("live");
161        let backup = dir.path().join("backup");
162        let staged = dir.path().join("staged");
163
164        fs::create_dir(&live).unwrap();
165        fs::write(live.join("marker"), b"old").unwrap();
166        fs::create_dir(&staged).unwrap();
167        fs::write(staged.join("marker"), b"new").unwrap();
168
169        atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
170
171        assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
172        assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
173        assert!(!staged.exists());
174    }
175
176    #[test]
177    fn read_checkpoint_dontneed_returns_bytes() {
178        let dir = tempfile::tempdir().unwrap();
179        let path = dir.path().join("ckpt");
180        fs::write(&path, b"checkpoint bytes").unwrap();
181
182        let bytes = read_checkpoint_dontneed(&path).unwrap();
183        assert_eq!(bytes, b"checkpoint bytes");
184    }
185}