dir_lock/
lib.rs

1//! This is `dir-lock`, a library crate providing the type [`DirLock`], which is a simple file-system-based mutex.
2
3use {
4    std::{
5        io,
6        mem::forget,
7        num::ParseIntError,
8        path::{
9            Path,
10            PathBuf,
11        },
12        sync::Arc,
13        thread,
14        time::Duration,
15    },
16    sysinfo::{
17        Pid,
18        ProcessRefreshKind,
19        ProcessesToUpdate,
20    },
21    thiserror::Error,
22    tokio::{
23        fs,
24        time::sleep,
25    },
26};
27
28/// A simple file-system-based mutex.
29///
30/// When constructing a value of this type, a directory is created at the specified path.
31/// If a directory already exists, the constructor waits until it's removed.
32/// Dropping a `DirLock` removes the corresponding directory.
33/// Since creating a directory if it does not exist is an atomic operation on most operating systems,
34/// this can be used as a quick-and-dirty cross-process mutex.
35///
36/// To guard against processes exiting without properly removing the lock, a file containing the current process ID is created inside the lock.
37/// If no process with that ID exists, another process may claim the lock for itself.
38/// If the file does not exist, the constructor waits until it does (or until the directory is removed).
39///
40/// Of course, this is still not completely fail-proof since the user or other processes could mess with the lock directory.
41///
42/// This type is a RAII lock guard, but unlocking a directory lock uses I/O and can error, so it is recommended to call [`drop_async`](Self::drop_async).
43#[must_use = "should call the drop_async method to unlock"]
44pub struct DirLock(PathBuf);
45
46/// An error that can occur when locking or unlocking a [`DirLock`].
47#[derive(Debug, Error, Clone)]
48#[allow(missing_docs)]
49pub enum Error {
50    #[error("I/O error{}: {}", if let Some(path) = .1 { format!(" at {}", path.display()) } else { String::default() }, .0)] Io(#[source] Arc<io::Error>, Option<PathBuf>),
51    #[error(transparent)] ParseInt(#[from] ParseIntError),
52}
53
54trait IoResultExt {
55    type T;
56
57    fn at(self, path: impl AsRef<Path>) -> Self::T;
58}
59
60impl IoResultExt for io::Error {
61    type T = Error;
62
63    fn at(self, path: impl AsRef<Path>) -> Error {
64        Error::Io(Arc::new(self), Some(path.as_ref().to_owned()))
65    }
66}
67
68impl<T, E: IoResultExt> IoResultExt for Result<T, E> {
69    type T = Result<T, E::T>;
70
71    fn at(self, path: impl AsRef<Path>) -> Result<T, E::T> {
72        self.map_err(|e| e.at(path))
73    }
74}
75
76impl DirLock {
77    /// Acquires a directory lock at the given path, without blocking the thread.
78    ///
79    /// See the type-level docs for details.
80    pub async fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
81        let path = path.as_ref().to_owned();
82        loop {
83            match fs::create_dir(&path).await {
84                Ok(()) => {
85                    let pidfile = path.join("pid");
86                    fs::write(&pidfile, format!("{}\n", std::process::id())).await.at(pidfile)?;
87                    return Ok(Self(path))
88                }
89                Err(e) => match e.kind() {
90                    io::ErrorKind::AlreadyExists => {
91                        let pidfile = path.join("pid");
92                        if match fs::read_to_string(&pidfile).await {
93                            Ok(buf) => {
94                                !buf.is_empty() // assume pidfile is still being written if empty //TODO check timestamp
95                                && !pid_exists(buf.trim().parse()?)
96                            }
97                            Err(e) => if e.kind() == io::ErrorKind::NotFound {
98                                false
99                            } else {
100                                return Err(e.at(path.join("pid")))
101                            },
102                        } {
103                            clean_up_path(&path).await?;
104                        }
105                        sleep(Duration::from_secs(1)).await;
106                        continue
107                    }
108                    _ => return Err(e.at(path)),
109                },
110            }
111        }
112    }
113
114    /// Blocks the current thread until the lock can be established.
115    pub fn new_sync(path: &impl AsRef<Path>) -> Result<Self, Error> {
116        let path = path.as_ref().to_owned();
117        loop {
118            match std::fs::create_dir(&path) {
119                Ok(()) => {
120                    let pidfile = path.join("pid");
121                    std::fs::write(&pidfile, format!("{}\n", std::process::id())).at(pidfile)?;
122                    return Ok(Self(path))
123                }
124                Err(e) => match e.kind() {
125                    io::ErrorKind::AlreadyExists => {
126                        let pidfile = path.join("pid");
127                        if match std::fs::read_to_string(&pidfile) {
128                            Ok(buf) => {
129                                !buf.is_empty() // assume pidfile is still being written if empty //TODO check timestamp
130                                && !pid_exists(buf.trim().parse()?)
131                            }
132                            Err(e) => if e.kind() == io::ErrorKind::NotFound {
133                                false
134                            } else {
135                                return Err(e.at(path.join("pid")))
136                            },
137                        } {
138                            clean_up_path_sync(&path)?;
139                        }
140                        thread::sleep(Duration::from_secs(1));
141                        continue
142                    }
143                    _ => return Err(e.at(path)),
144                },
145            }
146        }
147    }
148
149    /// Return the contained Path.
150    pub fn path(&self) -> &Path {
151        self.0.as_path()
152    }
153
154    /// Unlocks this lock without blocking the thread.
155    pub async fn drop_async(self) -> Result<(), Error> {
156        self.clean_up().await?;
157        forget(self);
158        Ok(())
159    }
160
161    async fn clean_up(&self) -> Result<(), Error> {
162        clean_up_path(&self.0).await
163    }
164
165    fn clean_up_sync(&self) -> Result<(), Error> {
166        clean_up_path_sync(&self.0)
167    }
168}
169
170impl Drop for DirLock {
171    /// Unlocks this lock, blocking the current thread while doing so.
172    ///
173    /// # Panics
174    ///
175    /// Unlocking a directory lock involves I/O. If an error occurs, this method will panic.
176    /// It is recommended to use [`drop_async`](Self::drop_async) instead, which returns the error.
177    fn drop(&mut self) {
178        self.clean_up_sync().expect("failed to clean up dir lock");
179    }
180}
181
182async fn clean_up_path(path: &Path) -> Result<(), Error> {
183    if let Err(e) = fs::remove_file(path.join("pid")).await {
184        if e.kind() != io::ErrorKind::NotFound {
185            return Err(e.at(path.join("pid")));
186        }
187    }
188    if let Err(e) = fs::remove_dir(path).await {
189        if e.kind() != io::ErrorKind::NotFound {
190            return Err(e.at(path))
191        }
192    }
193    Ok(())
194}
195
196fn clean_up_path_sync(path: &Path) -> Result<(), Error> {
197    if let Err(e) = std::fs::remove_file(path.join("pid")) {
198        if e.kind() != io::ErrorKind::NotFound {
199            return Err(e.at(path.join("pid")))
200        }
201    }
202    if let Err(e) = std::fs::remove_dir(path) {
203        if e.kind() != io::ErrorKind::NotFound {
204            return Err(e.at(path))
205        }
206    }
207    Ok(())
208}
209
210fn pid_exists(pid: Pid) -> bool {
211    let mut system = sysinfo::System::default();
212    system.refresh_processes_specifics(ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::default()) > 0
213}