1use std::fs::{File, OpenOptions};
22use std::io::{self, BufWriter, Write};
23use std::path::{Path, PathBuf};
24
25pub fn atomic_write(path: &Path, bytes: &[u8]) -> io::Result<()> {
27 atomic_write_with(path, |writer| writer.write_all(bytes))
28}
29
30pub fn atomic_write_with<F>(path: &Path, write_fn: F) -> io::Result<()>
38where
39 F: FnOnce(&mut BufWriter<File>) -> io::Result<()>,
40{
41 let tmp = TempFile::create(path)?;
42 let result = write_and_finalize(&tmp, write_fn);
43 if let Err(err) = result {
44 let _ = std::fs::remove_file(&tmp.path);
45 return Err(err);
46 }
47 if let Err(err) = std::fs::rename(&tmp.path, path) {
48 let _ = std::fs::remove_file(&tmp.path);
49 return Err(err);
50 }
51 sync_parent_dir(path);
52 Ok(())
53}
54
55fn write_and_finalize<F>(tmp: &TempFile, write_fn: F) -> io::Result<()>
56where
57 F: FnOnce(&mut BufWriter<File>) -> io::Result<()>,
58{
59 let file = tmp.file.try_clone()?;
60 let mut buf = BufWriter::new(file);
61 write_fn(&mut buf)?;
62 buf.flush()?;
63 let inner = buf.into_inner().map_err(|err| err.into_error())?;
64 inner.sync_all()?;
65 Ok(())
66}
67
68fn sync_parent_dir(path: &Path) {
69 if let Some(parent) = path.parent() {
70 if parent.as_os_str().is_empty() {
71 return;
72 }
73 if let Ok(dir) = OpenOptions::new().read(true).open(parent) {
74 let _ = dir.sync_all();
75 }
76 }
77}
78
79struct TempFile {
82 path: PathBuf,
83 file: File,
84}
85
86impl TempFile {
87 fn create(target: &Path) -> io::Result<Self> {
88 let parent = target.parent().ok_or_else(|| {
89 io::Error::new(
90 io::ErrorKind::InvalidInput,
91 format!(
92 "atomic_io: destination '{}' has no parent directory",
93 target.display()
94 ),
95 )
96 })?;
97 if !parent.as_os_str().is_empty() {
98 std::fs::create_dir_all(parent)?;
99 }
100 let file_name = target
101 .file_name()
102 .and_then(|value| value.to_str())
103 .unwrap_or("file");
104 let tmp_path = if parent.as_os_str().is_empty() {
105 PathBuf::from(format!(".{file_name}.{}.tmp", uuid::Uuid::now_v7()))
106 } else {
107 parent.join(format!(".{file_name}.{}.tmp", uuid::Uuid::now_v7()))
108 };
109 let file = OpenOptions::new()
110 .create_new(true)
111 .write(true)
112 .open(&tmp_path)?;
113 Ok(Self {
114 path: tmp_path,
115 file,
116 })
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn writes_bytes_atomically() {
126 let dir = tempfile::tempdir().unwrap();
127 let path = dir.path().join("state.json");
128 atomic_write(&path, b"hello").unwrap();
129 assert_eq!(std::fs::read(&path).unwrap(), b"hello");
130 }
131
132 #[test]
133 fn overwrites_existing_file() {
134 let dir = tempfile::tempdir().unwrap();
135 let path = dir.path().join("state.json");
136 std::fs::write(&path, b"old").unwrap();
137 atomic_write(&path, b"new").unwrap();
138 assert_eq!(std::fs::read(&path).unwrap(), b"new");
139 }
140
141 #[test]
142 fn creates_missing_parent_dirs() {
143 let dir = tempfile::tempdir().unwrap();
144 let path = dir.path().join("a/b/c/state.json");
145 atomic_write(&path, b"deep").unwrap();
146 assert_eq!(std::fs::read(&path).unwrap(), b"deep");
147 }
148
149 #[test]
150 fn streaming_writer_finalizes_atomically() {
151 let dir = tempfile::tempdir().unwrap();
152 let path = dir.path().join("log.jsonl");
153 atomic_write_with(&path, |writer| {
154 writeln!(writer, "first")?;
155 writeln!(writer, "second")?;
156 Ok(())
157 })
158 .unwrap();
159 let read = std::fs::read_to_string(&path).unwrap();
160 assert_eq!(read, "first\nsecond\n");
161 }
162
163 #[test]
164 fn streaming_writer_cleans_up_on_error() {
165 let dir = tempfile::tempdir().unwrap();
166 let path = dir.path().join("state.json");
167 let err = atomic_write_with(&path, |_| Err(io::Error::other("nope"))).unwrap_err();
168 assert_eq!(err.to_string(), "nope");
169 assert!(!path.exists(), "destination should not exist after failure");
170 let leftover: Vec<_> = std::fs::read_dir(dir.path())
172 .unwrap()
173 .filter_map(Result::ok)
174 .filter(|entry| entry.file_name().to_string_lossy().ends_with(".tmp"))
175 .collect();
176 assert!(
177 leftover.is_empty(),
178 "tmp file should be cleaned up on error"
179 );
180 }
181
182 #[test]
183 fn concurrent_writers_do_not_collide() {
184 let dir = tempfile::tempdir().unwrap();
185 let path = std::sync::Arc::new(dir.path().join("state.json"));
186 let mut handles = Vec::new();
187 for i in 0..16 {
188 let path = std::sync::Arc::clone(&path);
189 handles.push(std::thread::spawn(move || {
190 let payload = format!("writer-{i}");
191 atomic_write(&path, payload.as_bytes()).unwrap();
192 }));
193 }
194 for handle in handles {
195 handle.join().unwrap();
196 }
197 let final_contents = std::fs::read_to_string(&*path).unwrap();
200 assert!(
201 final_contents.starts_with("writer-") && final_contents.len() <= "writer-15".len(),
202 "unexpected final contents: {final_contents:?}"
203 );
204 }
205}