1use std::fs;
9use std::fs::File;
10#[cfg(unix)]
11use std::fs::Permissions;
12use std::io;
13#[cfg(unix)]
14use std::os::unix::fs::PermissionsExt;
15use std::path::Path;
16use std::path::PathBuf;
17use std::time::Duration;
18
19use tempfile::NamedTempFile;
20
21pub fn atomic_write(
36 path: &Path,
37 #[allow(dead_code)] mode_perms: u32,
38 fsync: bool,
39 op: impl FnOnce(&mut File) -> io::Result<()>,
40) -> io::Result<File> {
41 let mut af = AtomicFile::open(path, mode_perms, fsync)?;
42 op(af.as_file())?;
43 af.save()
44}
45
46pub struct Wait<'a> {
48 path: &'a Path,
49 meta: Option<fs::Metadata>,
50}
51
52impl<'a> Wait<'a> {
53 pub fn from_path(path: &'a Path) -> io::Result<Self> {
58 let meta = match path.symlink_metadata() {
59 Ok(m) => Some(m),
60 Err(e) if e.kind() == io::ErrorKind::NotFound => None,
61 Err(e) => return Err(e),
62 };
63 Ok(Self { path, meta })
64 }
65
66 pub fn wait_for_change(&mut self) -> io::Result<()> {
71 #[cfg(unix)]
72 use std::os::unix::fs::MetadataExt;
73 #[cfg(windows)]
74 use std::os::windows::fs::MetadataExt;
75 tracing::debug!("waiting for atomic change: {}", self.path.display());
76 let mut new_wait;
77 'wait_loop: loop {
78 new_wait = Self::from_path(self.path)?;
79 match (&self.meta, new_wait.meta.as_ref()) {
80 (None, None) => {}
81 (Some(_), None) | (None, Some(_)) => {
82 tracing::trace!(" waited: existence changed");
83 break 'wait_loop;
84 }
85 (Some(new), Some(old)) => {
86 #[cfg(unix)]
87 if new.ino() != old.ino() {
88 tracing::trace!(" waited: inode changed");
89 break 'wait_loop;
90 }
91 #[cfg(windows)]
94 if new.last_write_time() != old.last_write_time()
95 || new.creation_time() != old.creation_time()
96 {
97 tracing::trace!(" waited: mtime changed");
98 break 'wait_loop;
99 }
100 }
101 }
102 std::thread::sleep(Duration::from_millis(100));
103 }
104 self.meta = new_wait.meta;
105 Ok(())
106 }
107}
108
109pub struct AtomicFile {
110 file: NamedTempFile,
111 path: PathBuf,
112 dir: PathBuf,
113 fsync: bool,
114}
115
116impl AtomicFile {
117 pub fn open(
118 path: &Path,
119 #[allow(unused_variables)] mode_perms: u32,
120 fsync: bool,
121 ) -> io::Result<Self> {
122 let dir = match path.parent() {
123 Some(dir) => dir,
124 None => return Err(io::Error::from(io::ErrorKind::InvalidInput)),
125 };
126
127 #[allow(unused_mut)]
128 let mut temp = NamedTempFile::new_in(dir)?;
129
130 #[cfg(unix)]
131 {
132 let f = temp.as_file_mut();
133 f.set_permissions(Permissions::from_mode(mode_perms))?;
134 }
135
136 Ok(Self {
137 file: temp,
138 path: path.to_path_buf(),
139 dir: dir.to_path_buf(),
140 fsync,
141 })
142 }
143
144 pub fn as_file(&mut self) -> &mut File {
145 self.file.as_file_mut()
146 }
147
148 pub fn save(self) -> io::Result<File> {
149 #[allow(unused_variables)]
150 let (mut temp, path, dir, fsync) = (self.file, self.path, self.dir, self.fsync);
151 let f = temp.as_file_mut();
152
153 if fsync {
154 f.sync_data()?;
155 }
156
157 let max_retries = if cfg!(windows) { 5u16 } else { 0 };
158 let mut retry = 0;
159 loop {
160 match temp.persist(&path) {
161 Ok(persisted) => {
162 if fsync {
163 persisted.sync_all()?;
164
165 #[cfg(unix)]
168 {
169 if let Ok(opened) = fs::OpenOptions::new().read(true).open(dir) {
170 let _ = opened.sync_all();
171 }
172 }
173 }
174
175 break Ok(persisted);
176 }
177 Err(e) => {
178 if retry == max_retries || e.error.kind() != io::ErrorKind::PermissionDenied {
179 break Err(e.error);
180 }
181
182 tracing::info!(
185 retry,
186 ?path,
187 "atomic_write rename failed with EPERM. Will retry.",
188 );
189 std::thread::sleep(std::time::Duration::from_millis(1 << retry));
190 temp = e.file;
191
192 retry += 1;
193 }
194 }
195 }
196 }
197}
198
199#[cfg(test)]
200mod tests {
201 use std::io::Write;
202 #[cfg(unix)]
203 use std::os::unix::prelude::MetadataExt;
204 use std::sync::mpsc;
205
206 use tempfile::tempdir;
207
208 use super::*;
209
210 #[test]
211 fn test_atomic_write() -> io::Result<()> {
212 let td = tempdir()?;
213
214 let foo_path = td.path().join("foo");
215 atomic_write(&foo_path, 0o640, false, |f| {
216 f.write_all(b"sushi")?;
217 Ok(())
218 })?;
219
220 assert_eq!("sushi", std::fs::read_to_string(&foo_path)?);
222 assert_eq!(1, std::fs::read_dir(td.path())?.count());
223
224 #[cfg(unix)]
226 assert_eq!(
227 0o640,
228 0o777 & std::fs::File::open(&foo_path)?.metadata()?.mode()
229 );
230
231 Ok(())
232 }
233
234 #[test]
235 fn test_wait_for_change() -> io::Result<()> {
236 let dir = tempdir()?;
237 let path = dir.path().join("f");
238
239 let (tx, rx) = mpsc::channel::<i32>();
240
241 std::thread::spawn({
242 let path = path.clone();
243 move || {
244 let mut wait = Wait::from_path(&path).unwrap();
245
246 wait.wait_for_change().unwrap();
247 tx.send(101).unwrap();
248
249 wait.wait_for_change().unwrap();
250 tx.send(102).unwrap();
251
252 wait.wait_for_change().unwrap();
253 tx.send(103).unwrap();
254 }
255 });
256
257 std::thread::sleep(Duration::from_millis(110));
259 assert!(rx.try_recv().is_err());
260
261 atomic_write(&path, 0o640, false, |_| Ok(()))?;
263 assert_eq!(rx.recv().unwrap(), 101);
264
265 atomic_write(&path, 0o640, false, |_| Ok(()))?;
267 assert_eq!(rx.recv().unwrap(), 102);
268
269 std::fs::remove_file(&path)?;
271 assert_eq!(rx.recv().unwrap(), 103);
272
273 Ok(())
274 }
275}