nodedb_wal/segment/
atomic_io.rs1use std::fs;
22use std::io::Write;
23use std::os::unix::io::AsRawFd;
24use std::path::Path;
25
26use crate::error::{Result, WalError};
27
28pub fn fsync_directory(dir: &Path) -> Result<()> {
36 let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
37 dir_file.sync_all().map_err(WalError::Io)?;
38 Ok(())
39}
40
41pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
53 let parent = dst.parent().ok_or_else(|| {
54 WalError::Io(std::io::Error::new(
55 std::io::ErrorKind::InvalidInput,
56 "atomic_write_fsync: dst has no parent directory",
57 ))
58 })?;
59
60 {
61 let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
62 f.write_all(bytes).map_err(WalError::Io)?;
63 f.sync_data().map_err(WalError::Io)?;
64 }
65
66 fs::rename(tmp, dst).map_err(WalError::Io)?;
67 fsync_directory(parent)?;
68 Ok(())
69}
70
71pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
78 let parent = live.parent().ok_or_else(|| {
79 WalError::Io(std::io::Error::new(
80 std::io::ErrorKind::InvalidInput,
81 "atomic_swap_dirs_fsync: live has no parent directory",
82 ))
83 })?;
84
85 fs::rename(live, backup).map_err(WalError::Io)?;
86 fs::rename(staged, live).map_err(WalError::Io)?;
87 fsync_directory(parent)?;
88 Ok(())
89}
90
91pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
101 let file = fs::File::open(path).map_err(WalError::Io)?;
102 let len = file.metadata().map_err(WalError::Io)?.len();
103 let bytes = fs::read(path).map_err(WalError::Io)?;
104
105 #[cfg(unix)]
106 {
107 let ret = unsafe {
110 libc::posix_fadvise(
111 file.as_raw_fd(),
112 0,
113 len as libc::off_t,
114 libc::POSIX_FADV_DONTNEED,
115 )
116 };
117 if ret != 0 {
118 tracing::debug!(
119 path = %path.display(),
120 ret,
121 "posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
122 );
123 }
124 }
125 #[cfg(not(unix))]
126 {
127 let _ = len;
128 }
129
130 Ok(bytes)
131}
132
133#[cfg(test)]
134mod tests {
135 use super::*;
136
137 #[test]
138 fn atomic_write_fsync_roundtrip() {
139 let dir = tempfile::tempdir().unwrap();
140 let dst = dir.path().join("payload.ckpt");
141 let tmp = dir.path().join("payload.ckpt.tmp");
142
143 atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
144 assert!(!tmp.exists(), "tmp must be renamed away");
145 assert_eq!(fs::read(&dst).unwrap(), b"hello world");
146 }
147
148 #[test]
149 fn atomic_write_fsync_overwrites() {
150 let dir = tempfile::tempdir().unwrap();
151 let dst = dir.path().join("payload.ckpt");
152 let tmp = dir.path().join("payload.ckpt.tmp");
153
154 atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
155 atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
156 assert_eq!(fs::read(&dst).unwrap(), b"v2");
157 }
158
159 #[test]
160 fn atomic_swap_dirs_fsync_swaps() {
161 let dir = tempfile::tempdir().unwrap();
162 let live = dir.path().join("live");
163 let backup = dir.path().join("backup");
164 let staged = dir.path().join("staged");
165
166 fs::create_dir(&live).unwrap();
167 fs::write(live.join("marker"), b"old").unwrap();
168 fs::create_dir(&staged).unwrap();
169 fs::write(staged.join("marker"), b"new").unwrap();
170
171 atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
172
173 assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
174 assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
175 assert!(!staged.exists());
176 }
177
178 #[test]
179 fn read_checkpoint_dontneed_returns_bytes() {
180 let dir = tempfile::tempdir().unwrap();
181 let path = dir.path().join("ckpt");
182 fs::write(&path, b"checkpoint bytes").unwrap();
183
184 let bytes = read_checkpoint_dontneed(&path).unwrap();
185 assert_eq!(bytes, b"checkpoint bytes");
186 }
187}