nodedb_wal/segment/
atomic_io.rs1use std::fs;
22use std::io::Write;
23use std::path::Path;
24
25use crate::error::{Result, WalError};
26
27pub fn fsync_directory(dir: &Path) -> Result<()> {
35 let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
36 dir_file.sync_all().map_err(WalError::Io)?;
37 Ok(())
38}
39
40pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
52 let parent = dst.parent().ok_or_else(|| {
53 WalError::Io(std::io::Error::new(
54 std::io::ErrorKind::InvalidInput,
55 "atomic_write_fsync: dst has no parent directory",
56 ))
57 })?;
58
59 {
60 let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
61 f.write_all(bytes).map_err(WalError::Io)?;
62 f.sync_data().map_err(WalError::Io)?;
63 }
64
65 fs::rename(tmp, dst).map_err(WalError::Io)?;
66 fsync_directory(parent)?;
67 Ok(())
68}
69
70pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
77 let parent = live.parent().ok_or_else(|| {
78 WalError::Io(std::io::Error::new(
79 std::io::ErrorKind::InvalidInput,
80 "atomic_swap_dirs_fsync: live has no parent directory",
81 ))
82 })?;
83
84 fs::rename(live, backup).map_err(WalError::Io)?;
85 fs::rename(staged, live).map_err(WalError::Io)?;
86 fsync_directory(parent)?;
87 Ok(())
88}
89
90pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
100 let file = fs::File::open(path).map_err(WalError::Io)?;
101 let len = file.metadata().map_err(WalError::Io)?.len();
102 let bytes = fs::read(path).map_err(WalError::Io)?;
103
104 #[cfg(unix)]
105 {
106 use std::os::unix::io::AsRawFd as _;
107 let ret = unsafe {
110 libc::posix_fadvise(
111 file.as_raw_fd(),
112 0,
113 len as libc::off_t,
114 libc::POSIX_FADV_DONTNEED,
115 )
116 };
117 if ret != 0 {
118 tracing::debug!(
119 path = %path.display(),
120 ret,
121 "posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
122 );
123 }
124 }
125 #[cfg(not(unix))]
126 {
127 let _ = len;
128 }
129
130 Ok(bytes)
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn atomic_write_fsync_roundtrip() {
139 let dir = tempfile::tempdir().unwrap();
140 let dst = dir.path().join("payload.ckpt");
141 let tmp = dir.path().join("payload.ckpt.tmp");
142
143 atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
144 assert!(!tmp.exists(), "tmp must be renamed away");
145 assert_eq!(fs::read(&dst).unwrap(), b"hello world");
146 }
147
148 #[test]
149 fn atomic_write_fsync_overwrites() {
150 let dir = tempfile::tempdir().unwrap();
151 let dst = dir.path().join("payload.ckpt");
152 let tmp = dir.path().join("payload.ckpt.tmp");
153
154 atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
155 atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
156 assert_eq!(fs::read(&dst).unwrap(), b"v2");
157 }
158
159 #[test]
160 fn atomic_swap_dirs_fsync_swaps() {
161 let dir = tempfile::tempdir().unwrap();
162 let live = dir.path().join("live");
163 let backup = dir.path().join("backup");
164 let staged = dir.path().join("staged");
165
166 fs::create_dir(&live).unwrap();
167 fs::write(live.join("marker"), b"old").unwrap();
168 fs::create_dir(&staged).unwrap();
169 fs::write(staged.join("marker"), b"new").unwrap();
170
171 atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
172
173 assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
174 assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
175 assert!(!staged.exists());
176 }
177
178 #[test]
179 fn read_checkpoint_dontneed_returns_bytes() {
180 let dir = tempfile::tempdir().unwrap();
181 let path = dir.path().join("ckpt");
182 fs::write(&path, b"checkpoint bytes").unwrap();
183
184 let bytes = read_checkpoint_dontneed(&path).unwrap();
185 assert_eq!(bytes, b"checkpoint bytes");
186 }
187}