nodedb_wal/segment/
atomic_io.rs1use std::fs;
20use std::io::Write;
21use std::os::unix::io::AsRawFd;
22use std::path::Path;
23
24use crate::error::{Result, WalError};
25
26pub fn fsync_directory(dir: &Path) -> Result<()> {
34 let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
35 dir_file.sync_all().map_err(WalError::Io)?;
36 Ok(())
37}
38
39pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
51 let parent = dst.parent().ok_or_else(|| {
52 WalError::Io(std::io::Error::new(
53 std::io::ErrorKind::InvalidInput,
54 "atomic_write_fsync: dst has no parent directory",
55 ))
56 })?;
57
58 {
59 let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
60 f.write_all(bytes).map_err(WalError::Io)?;
61 f.sync_data().map_err(WalError::Io)?;
62 }
63
64 fs::rename(tmp, dst).map_err(WalError::Io)?;
65 fsync_directory(parent)?;
66 Ok(())
67}
68
69pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
76 let parent = live.parent().ok_or_else(|| {
77 WalError::Io(std::io::Error::new(
78 std::io::ErrorKind::InvalidInput,
79 "atomic_swap_dirs_fsync: live has no parent directory",
80 ))
81 })?;
82
83 fs::rename(live, backup).map_err(WalError::Io)?;
84 fs::rename(staged, live).map_err(WalError::Io)?;
85 fsync_directory(parent)?;
86 Ok(())
87}
88
89pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
99 let file = fs::File::open(path).map_err(WalError::Io)?;
100 let len = file.metadata().map_err(WalError::Io)?.len();
101 let bytes = fs::read(path).map_err(WalError::Io)?;
102
103 #[cfg(unix)]
104 {
105 let ret = unsafe {
108 libc::posix_fadvise(
109 file.as_raw_fd(),
110 0,
111 len as libc::off_t,
112 libc::POSIX_FADV_DONTNEED,
113 )
114 };
115 if ret != 0 {
116 tracing::debug!(
117 path = %path.display(),
118 ret,
119 "posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
120 );
121 }
122 }
123 #[cfg(not(unix))]
124 {
125 let _ = len;
126 }
127
128 Ok(bytes)
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn atomic_write_fsync_roundtrip() {
137 let dir = tempfile::tempdir().unwrap();
138 let dst = dir.path().join("payload.ckpt");
139 let tmp = dir.path().join("payload.ckpt.tmp");
140
141 atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
142 assert!(!tmp.exists(), "tmp must be renamed away");
143 assert_eq!(fs::read(&dst).unwrap(), b"hello world");
144 }
145
146 #[test]
147 fn atomic_write_fsync_overwrites() {
148 let dir = tempfile::tempdir().unwrap();
149 let dst = dir.path().join("payload.ckpt");
150 let tmp = dir.path().join("payload.ckpt.tmp");
151
152 atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
153 atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
154 assert_eq!(fs::read(&dst).unwrap(), b"v2");
155 }
156
157 #[test]
158 fn atomic_swap_dirs_fsync_swaps() {
159 let dir = tempfile::tempdir().unwrap();
160 let live = dir.path().join("live");
161 let backup = dir.path().join("backup");
162 let staged = dir.path().join("staged");
163
164 fs::create_dir(&live).unwrap();
165 fs::write(live.join("marker"), b"old").unwrap();
166 fs::create_dir(&staged).unwrap();
167 fs::write(staged.join("marker"), b"new").unwrap();
168
169 atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
170
171 assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
172 assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
173 assert!(!staged.exists());
174 }
175
176 #[test]
177 fn read_checkpoint_dontneed_returns_bytes() {
178 let dir = tempfile::tempdir().unwrap();
179 let path = dir.path().join("ckpt");
180 fs::write(&path, b"checkpoint bytes").unwrap();
181
182 let bytes = read_checkpoint_dontneed(&path).unwrap();
183 assert_eq!(bytes, b"checkpoint bytes");
184 }
185}