dir_lock/
lib.rs

1//! This is `dir-lock`, a library crate providing the type [`DirLock`], which is a simple file-system-based mutex.
2//!
3//! # Features
4//!
5//! The following feature is enabled by default:
6//!
7//! * `tokio`: Uses the [`tokio`](https://docs.rs/tokio) runtime for async operations.
8//!
9//! The following features can be enabled via Cargo:
10//!
11//! * `async-std`: Uses the [`async-std`](https://docs.rs/async-std) runtime for async operations. The `tokio` feature should be disabled when using this feature.
12
13use {
14    std::{
15        io,
16        mem::forget,
17        num::ParseIntError,
18        path::{
19            Path,
20            PathBuf,
21        },
22        sync::Arc,
23        thread,
24        time::Duration,
25    },
26    sysinfo::{
27        Pid,
28        ProcessRefreshKind,
29        ProcessesToUpdate,
30    },
31    thiserror::Error,
32};
33#[cfg(feature = "async-std")] use async_std::{
34    fs,
35    task::sleep,
36};
37#[cfg(feature = "tokio")] use tokio::{
38    fs,
39    time::sleep,
40};
41
42/// A simple file-system-based mutex.
43///
44/// When constructing a value of this type, a directory is created at the specified path.
45/// If a directory already exists, the constructor waits until it's removed.
46/// Dropping a `DirLock` removes the corresponding directory.
47/// Since creating a directory if it does not exist is an atomic operation on most operating systems,
48/// this can be used as a quick-and-dirty cross-process mutex.
49///
50/// To guard against processes exiting without properly removing the lock, a file containing the current process ID is created inside the lock.
51/// If no process with that ID exists, another process may claim the lock for itself.
52/// If the file does not exist, the constructor waits until it does (or until the directory is removed).
53///
54/// Of course, this is still not completely fail-proof since the user or other processes could mess with the lock directory.
55///
56/// This type is a RAII lock guard, but unlocking a directory lock uses I/O and can error, so it is recommended to call [`drop_async`](Self::drop_async).
57#[must_use = "should call the drop_async method to unlock"]
58pub struct DirLock(PathBuf);
59
60/// An error that can occur when locking or unlocking a [`DirLock`].
61#[derive(Debug, Error, Clone)]
62#[allow(missing_docs)]
63pub enum Error {
64    #[error("I/O error{}: {}", if let Some(path) = .1 { format!(" at {}", path.display()) } else { String::default() }, .0)] Io(#[source] Arc<io::Error>, Option<PathBuf>),
65    #[error(transparent)] ParseInt(#[from] ParseIntError),
66}
67
68trait IoResultExt {
69    type T;
70
71    fn at(self, path: impl AsRef<Path>) -> Self::T;
72}
73
74impl IoResultExt for io::Error {
75    type T = Error;
76
77    fn at(self, path: impl AsRef<Path>) -> Error {
78        Error::Io(Arc::new(self), Some(path.as_ref().to_owned()))
79    }
80}
81
82impl<T, E: IoResultExt> IoResultExt for Result<T, E> {
83    type T = Result<T, E::T>;
84
85    fn at(self, path: impl AsRef<Path>) -> Result<T, E::T> {
86        self.map_err(|e| e.at(path))
87    }
88}
89
90impl DirLock {
91    /// Acquires a directory lock at the given path, without blocking the thread.
92    ///
93    /// See the type-level docs for details.
94    pub async fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
95        let path = path.as_ref().to_owned();
96        loop {
97            match fs::create_dir(&path).await {
98                Ok(()) => {
99                    let pidfile = path.join("pid");
100                    fs::write(&pidfile, format!("{}\n", std::process::id())).await.at(pidfile)?;
101                    return Ok(Self(path))
102                }
103                Err(e) => match e.kind() {
104                    io::ErrorKind::AlreadyExists => {
105                        let pidfile = path.join("pid");
106                        if match fs::read_to_string(&pidfile).await {
107                            Ok(buf) => {
108                                !buf.is_empty() // assume pidfile is still being written if empty //TODO check timestamp
109                                && !pid_exists(buf.trim().parse()?)
110                            }
111                            Err(e) => if e.kind() == io::ErrorKind::NotFound {
112                                false
113                            } else {
114                                return Err(e.at(path.join("pid")))
115                            },
116                        } {
117                            clean_up_path(&path).await?;
118                        }
119                        sleep(Duration::from_secs(1)).await;
120                        continue
121                    }
122                    _ => return Err(e.at(path)),
123                },
124            }
125        }
126    }
127
128    /// Blocks the current thread until the lock can be established.
129    pub fn new_sync(path: &impl AsRef<Path>) -> Result<Self, Error> {
130        let path = path.as_ref().to_owned();
131        loop {
132            match std::fs::create_dir(&path) {
133                Ok(()) => {
134                    let pidfile = path.join("pid");
135                    std::fs::write(&pidfile, format!("{}\n", std::process::id())).at(pidfile)?;
136                    return Ok(Self(path))
137                }
138                Err(e) => match e.kind() {
139                    io::ErrorKind::AlreadyExists => {
140                        let pidfile = path.join("pid");
141                        if match std::fs::read_to_string(&pidfile) {
142                            Ok(buf) => {
143                                !buf.is_empty() // assume pidfile is still being written if empty //TODO check timestamp
144                                && !pid_exists(buf.trim().parse()?)
145                            }
146                            Err(e) => if e.kind() == io::ErrorKind::NotFound {
147                                false
148                            } else {
149                                return Err(e.at(path.join("pid")))
150                            },
151                        } {
152                            clean_up_path_sync(&path)?;
153                        }
154                        thread::sleep(Duration::from_secs(1));
155                        continue
156                    }
157                    _ => return Err(e.at(path)),
158                },
159            }
160        }
161    }
162
163    /// Unlocks this lock without blocking the thread.
164    pub async fn drop_async(self) -> Result<(), Error> {
165        self.clean_up().await?;
166        forget(self);
167        Ok(())
168    }
169
170    async fn clean_up(&self) -> Result<(), Error> {
171        clean_up_path(&self.0).await
172    }
173
174    fn clean_up_sync(&self) -> Result<(), Error> {
175        clean_up_path_sync(&self.0)
176    }
177}
178
179impl Drop for DirLock {
180    /// Unlocks this lock, blocking the current thread while doing so.
181    ///
182    /// # Panics
183    ///
184    /// Unlocking a directory lock involves I/O. If an error occurs, this method will panic.
185    /// It is recommended to use [`drop_async`](Self::drop_async) instead, which returns the error.
186    fn drop(&mut self) {
187        self.clean_up_sync().expect("failed to clean up dir lock");
188    }
189}
190
191async fn clean_up_path(path: &Path) -> Result<(), Error> {
192    if let Err(e) = fs::remove_file(path.join("pid")).await {
193        if e.kind() != io::ErrorKind::NotFound {
194            return Err(e.at(path.join("pid")));
195        }
196    }
197    if let Err(e) = fs::remove_dir(path).await {
198        if e.kind() != io::ErrorKind::NotFound {
199            return Err(e.at(path))
200        }
201    }
202    Ok(())
203}
204
205fn clean_up_path_sync(path: &Path) -> Result<(), Error> {
206    if let Err(e) = std::fs::remove_file(path.join("pid")) {
207        if e.kind() != io::ErrorKind::NotFound {
208            return Err(e.at(path.join("pid")))
209        }
210    }
211    if let Err(e) = std::fs::remove_dir(path) {
212        if e.kind() != io::ErrorKind::NotFound {
213            return Err(e.at(path))
214        }
215    }
216    Ok(())
217}
218
219fn pid_exists(pid: Pid) -> bool {
220    let mut system = sysinfo::System::default();
221    system.refresh_processes_specifics(ProcessesToUpdate::Some(&[pid]), true, ProcessRefreshKind::default()) > 0
222}