atomicfile/
lib.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::fs;
9use std::fs::File;
10#[cfg(unix)]
11use std::fs::Permissions;
12use std::io;
13#[cfg(unix)]
14use std::os::unix::fs::PermissionsExt;
15use std::path::Path;
16use std::path::PathBuf;
17use std::time::Duration;
18
19use tempfile::NamedTempFile;
20
21/// Create a temp file and then rename it into the specified path to
22/// achieve atomicity. The temp file is created in the same directory
23/// as path to ensure the rename is not cross filesystem. If fysnc is
24/// true, the file will be fsynced before and after renaming, and the
25/// directory will by fsynced after renaming.
26///
27/// mode_perms is required but does nothing on windows. mode_perms is
28/// not automatically umasked.
29///
30/// The renamed file is returned. Any further data written to the file
31/// will not be atomic since the file is already visibile to readers.
32///
33/// Note that the rename operation will fail on windows if the
34/// destination file exists and is open.
35pub fn atomic_write(
36    path: &Path,
37    #[allow(dead_code)] mode_perms: u32,
38    fsync: bool,
39    op: impl FnOnce(&mut File) -> io::Result<()>,
40) -> io::Result<File> {
41    let mut af = AtomicFile::open(path, mode_perms, fsync)?;
42    op(af.as_file())?;
43    af.save()
44}
45
46/// State to wait for change to a path.
47pub struct Wait<'a> {
48    path: &'a Path,
49    meta: Option<fs::Metadata>,
50}
51
52impl<'a> Wait<'a> {
53    /// Construct from a path for change detection. This reads the
54    /// file stats immediately. If you also need to read the file
55    /// content to double check whether to "wait" or not, call
56    /// this before reading the file content to avoid races.
57    pub fn from_path(path: &'a Path) -> io::Result<Self> {
58        let meta = match path.symlink_metadata() {
59            Ok(m) => Some(m),
60            Err(e) if e.kind() == io::ErrorKind::NotFound => None,
61            Err(e) => return Err(e),
62        };
63        Ok(Self { path, meta })
64    }
65
66    /// Wait for change on the given `path`. The `path` is expected to be
67    /// atomically updated (ex. `inode` should change on Linux).
68    ///
69    /// If `path` does not exist, wait for it to be created.
70    pub fn wait_for_change(&mut self) -> io::Result<()> {
71        #[cfg(unix)]
72        use std::os::unix::fs::MetadataExt;
73        #[cfg(windows)]
74        use std::os::windows::fs::MetadataExt;
75        tracing::debug!("waiting for atomic change: {}", self.path.display());
76        let mut new_wait;
77        'wait_loop: loop {
78            new_wait = Self::from_path(self.path)?;
79            match (&self.meta, new_wait.meta.as_ref()) {
80                (None, None) => {}
81                (Some(_), None) | (None, Some(_)) => {
82                    tracing::trace!(" waited: existence changed");
83                    break 'wait_loop;
84                }
85                (Some(new), Some(old)) => {
86                    #[cfg(unix)]
87                    if new.ino() != old.ino() {
88                        tracing::trace!(" waited: inode changed");
89                        break 'wait_loop;
90                    }
91                    // Consider using `file_index`, similar to `ino` once stabilized:
92                    // https://github.com/rust-lang/rust/issues/63010
93                    #[cfg(windows)]
94                    if new.last_write_time() != old.last_write_time()
95                        || new.creation_time() != old.creation_time()
96                    {
97                        tracing::trace!(" waited: mtime changed");
98                        break 'wait_loop;
99                    }
100                }
101            }
102            std::thread::sleep(Duration::from_millis(100));
103        }
104        self.meta = new_wait.meta;
105        Ok(())
106    }
107}
108
109pub struct AtomicFile {
110    file: NamedTempFile,
111    path: PathBuf,
112    dir: PathBuf,
113    fsync: bool,
114}
115
116impl AtomicFile {
117    pub fn open(
118        path: &Path,
119        #[allow(unused_variables)] mode_perms: u32,
120        fsync: bool,
121    ) -> io::Result<Self> {
122        let dir = match path.parent() {
123            Some(dir) => dir,
124            None => return Err(io::Error::from(io::ErrorKind::InvalidInput)),
125        };
126
127        #[allow(unused_mut)]
128        let mut temp = NamedTempFile::new_in(dir)?;
129
130        #[cfg(unix)]
131        {
132            let f = temp.as_file_mut();
133            f.set_permissions(Permissions::from_mode(mode_perms))?;
134        }
135
136        Ok(Self {
137            file: temp,
138            path: path.to_path_buf(),
139            dir: dir.to_path_buf(),
140            fsync,
141        })
142    }
143
144    pub fn as_file(&mut self) -> &mut File {
145        self.file.as_file_mut()
146    }
147
148    pub fn save(self) -> io::Result<File> {
149        #[allow(unused_variables)]
150        let (mut temp, path, dir, fsync) = (self.file, self.path, self.dir, self.fsync);
151        let f = temp.as_file_mut();
152
153        if fsync {
154            f.sync_data()?;
155        }
156
157        let max_retries = if cfg!(windows) { 5u16 } else { 0 };
158        let mut retry = 0;
159        loop {
160            match temp.persist(&path) {
161                Ok(persisted) => {
162                    if fsync {
163                        persisted.sync_all()?;
164
165                        // Also sync the directory on Unix.
166                        // Windows does not support syncing a directory.
167                        #[cfg(unix)]
168                        {
169                            if let Ok(opened) = fs::OpenOptions::new().read(true).open(dir) {
170                                let _ = opened.sync_all();
171                            }
172                        }
173                    }
174
175                    break Ok(persisted);
176                }
177                Err(e) => {
178                    if retry == max_retries || e.error.kind() != io::ErrorKind::PermissionDenied {
179                        break Err(e.error);
180                    }
181
182                    // Windows fails with "Access Denied" if destination file is open.
183                    // Retry a few times.
184                    tracing::info!(
185                        retry,
186                        ?path,
187                        "atomic_write rename failed with EPERM. Will retry.",
188                    );
189                    std::thread::sleep(std::time::Duration::from_millis(1 << retry));
190                    temp = e.file;
191
192                    retry += 1;
193                }
194            }
195        }
196    }
197}
198
199#[cfg(test)]
200mod tests {
201    use std::io::Write;
202    #[cfg(unix)]
203    use std::os::unix::prelude::MetadataExt;
204    use std::sync::mpsc;
205
206    use tempfile::tempdir;
207
208    use super::*;
209
210    #[test]
211    fn test_atomic_write() -> io::Result<()> {
212        let td = tempdir()?;
213
214        let foo_path = td.path().join("foo");
215        atomic_write(&foo_path, 0o640, false, |f| {
216            f.write_all(b"sushi")?;
217            Ok(())
218        })?;
219
220        // Sanity check that we wrote contents and the temp file is gone.
221        assert_eq!("sushi", std::fs::read_to_string(&foo_path)?);
222        assert_eq!(1, std::fs::read_dir(td.path())?.count());
223
224        // Make sure we can set the mode perms on unix.
225        #[cfg(unix)]
226        assert_eq!(
227            0o640,
228            0o777 & std::fs::File::open(&foo_path)?.metadata()?.mode()
229        );
230
231        Ok(())
232    }
233
234    #[test]
235    fn test_wait_for_change() -> io::Result<()> {
236        let dir = tempdir()?;
237        let path = dir.path().join("f");
238
239        let (tx, rx) = mpsc::channel::<i32>();
240
241        std::thread::spawn({
242            let path = path.clone();
243            move || {
244                let mut wait = Wait::from_path(&path).unwrap();
245
246                wait.wait_for_change().unwrap();
247                tx.send(101).unwrap();
248
249                wait.wait_for_change().unwrap();
250                tx.send(102).unwrap();
251
252                wait.wait_for_change().unwrap();
253                tx.send(103).unwrap();
254            }
255        });
256
257        // Nothing changed yet.
258        std::thread::sleep(Duration::from_millis(110));
259        assert!(rx.try_recv().is_err());
260
261        // Create.
262        atomic_write(&path, 0o640, false, |_| Ok(()))?;
263        assert_eq!(rx.recv().unwrap(), 101);
264
265        // Rewrite.
266        atomic_write(&path, 0o640, false, |_| Ok(()))?;
267        assert_eq!(rx.recv().unwrap(), 102);
268
269        // Delete.
270        std::fs::remove_file(&path)?;
271        assert_eq!(rx.recv().unwrap(), 103);
272
273        Ok(())
274    }
275}