Skip to main content

nodedb_wal/segment/
atomic_io.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! Durable atomic file / directory operations for checkpoint-class writes.
4//!
5//! The tmp-file + rename pattern is atomic only if both the file data and
6//! the containing directory entry reach stable storage in the correct order.
7//! On ext4 / XFS the rename metadata op can reach disk before the data pages
8//! backing the tmp file — a power loss between the write and the next
9//! checkpoint then leaves a correctly-named file containing zeros.
10//!
11//! [`atomic_write_fsync`] is the single helper all checkpoint-class writers
12//! go through so the ordering (`write → sync_data → rename → fsync_dir`) is
13//! enforced in one place. [`atomic_swap_dirs_fsync`] does the same for
14//! directory-level swaps (rename old-dir → backup, rename new-dir → old-dir).
15//!
16//! [`read_checkpoint_dontneed`] pairs with the write helper on the read side:
17//! checkpoint bytes are consumed once (deserialized into the in-memory index)
18//! and then superseded. Leaving them in the page cache wastes memory needed
19//! by hot workloads.
20
21use std::fs;
22use std::io::Write;
23use std::os::unix::io::AsRawFd;
24use std::path::Path;
25
26use crate::error::{Result, WalError};
27
28/// Fsync a directory to ensure file creation/deletion metadata is durable.
29///
30/// On ext4/XFS, creating or deleting a file writes the file data to disk
31/// but the directory entry may only be in the page cache. A power loss
32/// before the directory entry is persisted causes the file to "disappear"
33/// on reboot. Calling fsync on the directory fd ensures the metadata
34/// (filename, inode pointer) is on stable storage.
35pub fn fsync_directory(dir: &Path) -> Result<()> {
36    let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
37    dir_file.sync_all().map_err(WalError::Io)?;
38    Ok(())
39}
40
41/// Atomically write `bytes` to `dst` via a `tmp` file with full durability.
42///
43/// Order of operations (must not change):
44/// 1. Create / truncate `tmp` and write `bytes`.
45/// 2. `sync_data()` on `tmp` — forces file data pages to stable storage.
46/// 3. `rename(tmp, dst)` — atomic on POSIX filesystems.
47/// 4. `fsync_directory(parent)` — forces the directory entry durable so the
48///    new name survives power loss.
49///
50/// `tmp` and `dst` MUST be in the same directory; otherwise rename is not
51/// atomic and the parent fsync won't cover both entries.
52pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
53    let parent = dst.parent().ok_or_else(|| {
54        WalError::Io(std::io::Error::new(
55            std::io::ErrorKind::InvalidInput,
56            "atomic_write_fsync: dst has no parent directory",
57        ))
58    })?;
59
60    {
61        let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
62        f.write_all(bytes).map_err(WalError::Io)?;
63        f.sync_data().map_err(WalError::Io)?;
64    }
65
66    fs::rename(tmp, dst).map_err(WalError::Io)?;
67    fsync_directory(parent)?;
68    Ok(())
69}
70
71/// Atomically swap a directory: `rename(live, backup); rename(staged, live)`,
72/// fsyncing the parent directory once both renames have completed.
73///
74/// `live`, `backup`, and `staged` MUST share the same parent directory. The
75/// caller is responsible for removing the backup directory once the new
76/// state is proven good — this helper does not delete anything.
77pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
78    let parent = live.parent().ok_or_else(|| {
79        WalError::Io(std::io::Error::new(
80            std::io::ErrorKind::InvalidInput,
81            "atomic_swap_dirs_fsync: live has no parent directory",
82        ))
83    })?;
84
85    fs::rename(live, backup).map_err(WalError::Io)?;
86    fs::rename(staged, live).map_err(WalError::Io)?;
87    fsync_directory(parent)?;
88    Ok(())
89}
90
91/// Read a checkpoint file and advise the kernel to drop its pages from the
92/// page cache.
93///
94/// Checkpoint files are consumed exactly once per process lifetime (loaded
95/// into the in-memory index and then superseded). `posix_fadvise(DONTNEED)`
96/// after read frees the page-cache memory for hot workloads.
97///
98/// On non-Unix targets the advise call is skipped and this degrades to a
99/// plain read.
100pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
101    let file = fs::File::open(path).map_err(WalError::Io)?;
102    let len = file.metadata().map_err(WalError::Io)?.len();
103    let bytes = fs::read(path).map_err(WalError::Io)?;
104
105    #[cfg(unix)]
106    {
107        // Safe: `file` owns the fd for the duration of the call; len fits in
108        // off_t on all supported platforms (checkpoint files are << i64::MAX).
109        let ret = unsafe {
110            libc::posix_fadvise(
111                file.as_raw_fd(),
112                0,
113                len as libc::off_t,
114                libc::POSIX_FADV_DONTNEED,
115            )
116        };
117        if ret != 0 {
118            tracing::debug!(
119                path = %path.display(),
120                ret,
121                "posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
122            );
123        }
124    }
125    #[cfg(not(unix))]
126    {
127        let _ = len;
128    }
129
130    Ok(bytes)
131}
132
133#[cfg(test)]
134mod tests {
135    use super::*;
136
137    #[test]
138    fn atomic_write_fsync_roundtrip() {
139        let dir = tempfile::tempdir().unwrap();
140        let dst = dir.path().join("payload.ckpt");
141        let tmp = dir.path().join("payload.ckpt.tmp");
142
143        atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
144        assert!(!tmp.exists(), "tmp must be renamed away");
145        assert_eq!(fs::read(&dst).unwrap(), b"hello world");
146    }
147
148    #[test]
149    fn atomic_write_fsync_overwrites() {
150        let dir = tempfile::tempdir().unwrap();
151        let dst = dir.path().join("payload.ckpt");
152        let tmp = dir.path().join("payload.ckpt.tmp");
153
154        atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
155        atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
156        assert_eq!(fs::read(&dst).unwrap(), b"v2");
157    }
158
159    #[test]
160    fn atomic_swap_dirs_fsync_swaps() {
161        let dir = tempfile::tempdir().unwrap();
162        let live = dir.path().join("live");
163        let backup = dir.path().join("backup");
164        let staged = dir.path().join("staged");
165
166        fs::create_dir(&live).unwrap();
167        fs::write(live.join("marker"), b"old").unwrap();
168        fs::create_dir(&staged).unwrap();
169        fs::write(staged.join("marker"), b"new").unwrap();
170
171        atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
172
173        assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
174        assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
175        assert!(!staged.exists());
176    }
177
178    #[test]
179    fn read_checkpoint_dontneed_returns_bytes() {
180        let dir = tempfile::tempdir().unwrap();
181        let path = dir.path().join("ckpt");
182        fs::write(&path, b"checkpoint bytes").unwrap();
183
184        let bytes = read_checkpoint_dontneed(&path).unwrap();
185        assert_eq!(bytes, b"checkpoint bytes");
186    }
187}