use std::fs;
use std::io::Write;
use std::os::unix::io::AsRawFd;
use std::path::Path;
use crate::error::{Result, WalError};
pub fn fsync_directory(dir: &Path) -> Result<()> {
let dir_file = fs::File::open(dir).map_err(WalError::Io)?;
dir_file.sync_all().map_err(WalError::Io)?;
Ok(())
}
pub fn atomic_write_fsync(tmp: &Path, dst: &Path, bytes: &[u8]) -> Result<()> {
let parent = dst.parent().ok_or_else(|| {
WalError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"atomic_write_fsync: dst has no parent directory",
))
})?;
{
let mut f = fs::File::create(tmp).map_err(WalError::Io)?;
f.write_all(bytes).map_err(WalError::Io)?;
f.sync_data().map_err(WalError::Io)?;
}
fs::rename(tmp, dst).map_err(WalError::Io)?;
fsync_directory(parent)?;
Ok(())
}
pub fn atomic_swap_dirs_fsync(live: &Path, backup: &Path, staged: &Path) -> Result<()> {
let parent = live.parent().ok_or_else(|| {
WalError::Io(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"atomic_swap_dirs_fsync: live has no parent directory",
))
})?;
fs::rename(live, backup).map_err(WalError::Io)?;
fs::rename(staged, live).map_err(WalError::Io)?;
fsync_directory(parent)?;
Ok(())
}
pub fn read_checkpoint_dontneed(path: &Path) -> Result<Vec<u8>> {
let file = fs::File::open(path).map_err(WalError::Io)?;
let len = file.metadata().map_err(WalError::Io)?.len();
let bytes = fs::read(path).map_err(WalError::Io)?;
#[cfg(unix)]
{
let ret = unsafe {
libc::posix_fadvise(
file.as_raw_fd(),
0,
len as libc::off_t,
libc::POSIX_FADV_DONTNEED,
)
};
if ret != 0 {
tracing::debug!(
path = %path.display(),
ret,
"posix_fadvise(DONTNEED) returned nonzero — checkpoint bytes may stay in page cache"
);
}
}
#[cfg(not(unix))]
{
let _ = len;
}
Ok(bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn atomic_write_fsync_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let dst = dir.path().join("payload.ckpt");
let tmp = dir.path().join("payload.ckpt.tmp");
atomic_write_fsync(&tmp, &dst, b"hello world").unwrap();
assert!(!tmp.exists(), "tmp must be renamed away");
assert_eq!(fs::read(&dst).unwrap(), b"hello world");
}
#[test]
fn atomic_write_fsync_overwrites() {
let dir = tempfile::tempdir().unwrap();
let dst = dir.path().join("payload.ckpt");
let tmp = dir.path().join("payload.ckpt.tmp");
atomic_write_fsync(&tmp, &dst, b"v1").unwrap();
atomic_write_fsync(&tmp, &dst, b"v2").unwrap();
assert_eq!(fs::read(&dst).unwrap(), b"v2");
}
#[test]
fn atomic_swap_dirs_fsync_swaps() {
let dir = tempfile::tempdir().unwrap();
let live = dir.path().join("live");
let backup = dir.path().join("backup");
let staged = dir.path().join("staged");
fs::create_dir(&live).unwrap();
fs::write(live.join("marker"), b"old").unwrap();
fs::create_dir(&staged).unwrap();
fs::write(staged.join("marker"), b"new").unwrap();
atomic_swap_dirs_fsync(&live, &backup, &staged).unwrap();
assert_eq!(fs::read(live.join("marker")).unwrap(), b"new");
assert_eq!(fs::read(backup.join("marker")).unwrap(), b"old");
assert!(!staged.exists());
}
#[test]
fn read_checkpoint_dontneed_returns_bytes() {
let dir = tempfile::tempdir().unwrap();
let path = dir.path().join("ckpt");
fs::write(&path, b"checkpoint bytes").unwrap();
let bytes = read_checkpoint_dontneed(&path).unwrap();
assert_eq!(bytes, b"checkpoint bytes");
}
}