Skip to main content

harn_vm/
atomic_io.rs

1//! Atomic file write helpers.
2//!
3//! All persistent on-disk state in Harn (workflow mailboxes, run records,
4//! event logs, lockfiles, package manifests, ...) should use these helpers
5//! rather than `std::fs::write` so that concurrent readers and abrupt
6//! process termination cannot observe a half-written file.
7//!
8//! The pattern is:
9//!
10//! 1. Create the parent directory if needed.
11//! 2. Write to a sibling `.<name>.<uuid>.tmp` file.
12//! 3. `fsync` the temp file.
13//! 4. `rename` the temp file over the destination (atomic on POSIX, atomic
14//!    overwrite on Windows since Rust 1.5+).
15//! 5. Best-effort `fsync` the parent directory so the rename survives a
16//!    power loss on filesystems that decouple the dirent from the inode.
17//!
18//! On any failure between (2) and (4), the temp file is removed so that
19//! repeated retries don't leak `.tmp` siblings.
20
21use std::fs::{File, OpenOptions};
22use std::io::{self, BufWriter, Write};
23use std::path::{Path, PathBuf};
24
25/// Atomically write `bytes` to `path`.
26pub fn atomic_write(path: &Path, bytes: &[u8]) -> io::Result<()> {
27    atomic_write_with(path, |writer| writer.write_all(bytes))
28}
29
30/// Atomically write the destination at `path` by streaming through a
31/// `BufWriter`. The closure runs against a buffered writer over a sibling
32/// temp file. On success, the buffer is flushed, the file is `fsync`'d, and
33/// the temp file is renamed over `path`.
34///
35/// Use this for line-by-line or chunked writes (e.g. JSONL compaction).
36/// For a one-shot byte write, prefer [`atomic_write`].
37pub fn atomic_write_with<F>(path: &Path, write_fn: F) -> io::Result<()>
38where
39    F: FnOnce(&mut BufWriter<File>) -> io::Result<()>,
40{
41    let tmp = TempFile::create(path)?;
42    let result = write_and_finalize(&tmp, write_fn);
43    if let Err(err) = result {
44        let _ = std::fs::remove_file(&tmp.path);
45        return Err(err);
46    }
47    if let Err(err) = std::fs::rename(&tmp.path, path) {
48        let _ = std::fs::remove_file(&tmp.path);
49        return Err(err);
50    }
51    sync_parent_dir(path);
52    Ok(())
53}
54
55fn write_and_finalize<F>(tmp: &TempFile, write_fn: F) -> io::Result<()>
56where
57    F: FnOnce(&mut BufWriter<File>) -> io::Result<()>,
58{
59    let file = tmp.file.try_clone()?;
60    let mut buf = BufWriter::new(file);
61    write_fn(&mut buf)?;
62    buf.flush()?;
63    let inner = buf.into_inner().map_err(|err| err.into_error())?;
64    inner.sync_all()?;
65    Ok(())
66}
67
68fn sync_parent_dir(path: &Path) {
69    if let Some(parent) = path.parent() {
70        if parent.as_os_str().is_empty() {
71            return;
72        }
73        if let Ok(dir) = OpenOptions::new().read(true).open(parent) {
74            let _ = dir.sync_all();
75        }
76    }
77}
78
79/// Owns the temp file path + handle so callers can rely on RAII for
80/// cleanup if they bail out mid-write.
81struct TempFile {
82    path: PathBuf,
83    file: File,
84}
85
86impl TempFile {
87    fn create(target: &Path) -> io::Result<Self> {
88        let parent = target.parent().ok_or_else(|| {
89            io::Error::new(
90                io::ErrorKind::InvalidInput,
91                format!(
92                    "atomic_io: destination '{}' has no parent directory",
93                    target.display()
94                ),
95            )
96        })?;
97        if !parent.as_os_str().is_empty() {
98            std::fs::create_dir_all(parent)?;
99        }
100        let file_name = target
101            .file_name()
102            .and_then(|value| value.to_str())
103            .unwrap_or("file");
104        let tmp_path = if parent.as_os_str().is_empty() {
105            PathBuf::from(format!(".{file_name}.{}.tmp", uuid::Uuid::now_v7()))
106        } else {
107            parent.join(format!(".{file_name}.{}.tmp", uuid::Uuid::now_v7()))
108        };
109        let file = OpenOptions::new()
110            .create_new(true)
111            .write(true)
112            .open(&tmp_path)?;
113        Ok(Self {
114            path: tmp_path,
115            file,
116        })
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn writes_bytes_atomically() {
126        let dir = tempfile::tempdir().unwrap();
127        let path = dir.path().join("state.json");
128        atomic_write(&path, b"hello").unwrap();
129        assert_eq!(std::fs::read(&path).unwrap(), b"hello");
130    }
131
132    #[test]
133    fn overwrites_existing_file() {
134        let dir = tempfile::tempdir().unwrap();
135        let path = dir.path().join("state.json");
136        std::fs::write(&path, b"old").unwrap();
137        atomic_write(&path, b"new").unwrap();
138        assert_eq!(std::fs::read(&path).unwrap(), b"new");
139    }
140
141    #[test]
142    fn creates_missing_parent_dirs() {
143        let dir = tempfile::tempdir().unwrap();
144        let path = dir.path().join("a/b/c/state.json");
145        atomic_write(&path, b"deep").unwrap();
146        assert_eq!(std::fs::read(&path).unwrap(), b"deep");
147    }
148
149    #[test]
150    fn streaming_writer_finalizes_atomically() {
151        let dir = tempfile::tempdir().unwrap();
152        let path = dir.path().join("log.jsonl");
153        atomic_write_with(&path, |writer| {
154            writeln!(writer, "first")?;
155            writeln!(writer, "second")?;
156            Ok(())
157        })
158        .unwrap();
159        let read = std::fs::read_to_string(&path).unwrap();
160        assert_eq!(read, "first\nsecond\n");
161    }
162
163    #[test]
164    fn streaming_writer_cleans_up_on_error() {
165        let dir = tempfile::tempdir().unwrap();
166        let path = dir.path().join("state.json");
167        let err = atomic_write_with(&path, |_| Err(io::Error::other("nope"))).unwrap_err();
168        assert_eq!(err.to_string(), "nope");
169        assert!(!path.exists(), "destination should not exist after failure");
170        // No leftover .tmp siblings.
171        let leftover: Vec<_> = std::fs::read_dir(dir.path())
172            .unwrap()
173            .filter_map(Result::ok)
174            .filter(|entry| entry.file_name().to_string_lossy().ends_with(".tmp"))
175            .collect();
176        assert!(
177            leftover.is_empty(),
178            "tmp file should be cleaned up on error"
179        );
180    }
181
182    #[test]
183    fn concurrent_writers_do_not_collide() {
184        let dir = tempfile::tempdir().unwrap();
185        let path = std::sync::Arc::new(dir.path().join("state.json"));
186        let mut handles = Vec::new();
187        for i in 0..16 {
188            let path = std::sync::Arc::clone(&path);
189            handles.push(std::thread::spawn(move || {
190                let payload = format!("writer-{i}");
191                atomic_write(&path, payload.as_bytes()).unwrap();
192            }));
193        }
194        for handle in handles {
195            handle.join().unwrap();
196        }
197        // The final contents must match exactly one of the writers — never a
198        // truncated or interleaved value.
199        let final_contents = std::fs::read_to_string(&*path).unwrap();
200        assert!(
201            final_contents.starts_with("writer-") && final_contents.len() <= "writer-15".len(),
202            "unexpected final contents: {final_contents:?}"
203        );
204    }
205}