uv_fs/
locked_file.rs

1use std::fmt::Display;
2use std::path::{Path, PathBuf};
3use std::sync::LazyLock;
4use std::time::Duration;
5use std::{env, io};
6
7use thiserror::Error;
8use tracing::{debug, error, info, trace, warn};
9
10use uv_static::EnvVars;
11
12use crate::{Simplified, is_known_already_locked_error};
13
14/// Parsed value of `UV_LOCK_TIMEOUT`, with a default of 5 min.
15static LOCK_TIMEOUT: LazyLock<Duration> = LazyLock::new(|| {
16    let default_timeout = Duration::from_secs(300);
17    let Some(lock_timeout) = env::var_os(EnvVars::UV_LOCK_TIMEOUT) else {
18        return default_timeout;
19    };
20
21    if let Some(lock_timeout) = lock_timeout
22        .to_str()
23        .and_then(|lock_timeout| lock_timeout.parse::<u64>().ok())
24    {
25        Duration::from_secs(lock_timeout)
26    } else {
27        warn!(
28            "Could not parse value of {} as integer: {:?}",
29            EnvVars::UV_LOCK_TIMEOUT,
30            lock_timeout
31        );
32        default_timeout
33    }
34});
35
36#[derive(Debug, Error)]
37pub enum LockedFileError {
38    #[error(
39        "Timeout ({}s) when waiting for lock on `{}` at `{}`, is another uv process running? You can set `{}` to increase the timeout.",
40        timeout.as_secs(),
41        resource,
42        path.user_display(),
43        EnvVars::UV_LOCK_TIMEOUT
44    )]
45    Timeout {
46        timeout: Duration,
47        resource: String,
48        path: PathBuf,
49    },
50    #[error(
51        "Could not acquire lock for `{}` at `{}`",
52        resource,
53        path.user_display()
54    )]
55    Lock {
56        resource: String,
57        path: PathBuf,
58        #[source]
59        source: io::Error,
60    },
61    #[error(transparent)]
62    Io(#[from] io::Error),
63    #[error(transparent)]
64    #[cfg(feature = "tokio")]
65    JoinError(#[from] tokio::task::JoinError),
66}
67
68impl LockedFileError {
69    pub fn as_io_error(&self) -> Option<&io::Error> {
70        match self {
71            Self::Timeout { .. } => None,
72            #[cfg(feature = "tokio")]
73            Self::JoinError(_) => None,
74            Self::Lock { source, .. } => Some(source),
75            Self::Io(err) => Some(err),
76        }
77    }
78}
79
80/// Whether to acquire a shared (read) lock or exclusive (write) lock.
81#[derive(Debug, Clone, Copy)]
82pub enum LockedFileMode {
83    Shared,
84    Exclusive,
85}
86
87impl LockedFileMode {
88    /// Try to lock the file and return an error if the lock is already acquired by another process
89    /// and cannot be acquired immediately.
90    fn try_lock(self, file: &fs_err::File) -> Result<(), std::fs::TryLockError> {
91        match self {
92            Self::Exclusive => file.try_lock()?,
93            Self::Shared => file.try_lock_shared()?,
94        }
95        Ok(())
96    }
97
98    /// Lock the file, blocking until the lock becomes available if necessary.
99    fn lock(self, file: &fs_err::File) -> Result<(), io::Error> {
100        match self {
101            Self::Exclusive => file.lock()?,
102            Self::Shared => file.lock_shared()?,
103        }
104        Ok(())
105    }
106}
107
108impl Display for LockedFileMode {
109    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
110        match self {
111            Self::Shared => write!(f, "shared"),
112            Self::Exclusive => write!(f, "exclusive"),
113        }
114    }
115}
116
117/// A file lock that is automatically released when dropped.
118#[cfg(feature = "tokio")]
119#[derive(Debug)]
120#[must_use]
121pub struct LockedFile(fs_err::File);
122
123#[cfg(feature = "tokio")]
124impl LockedFile {
125    /// Inner implementation for [`LockedFile::acquire`].
126    async fn lock_file(
127        file: fs_err::File,
128        mode: LockedFileMode,
129        resource: &str,
130    ) -> Result<Self, LockedFileError> {
131        trace!(
132            "Checking lock for `{resource}` at `{}`",
133            file.path().user_display()
134        );
135        // If there's no contention, return directly.
136        let try_lock_exclusive = tokio::task::spawn_blocking(move || (mode.try_lock(&file), file));
137        let file = match try_lock_exclusive.await? {
138            (Ok(()), file) => {
139                debug!("Acquired {mode} lock for `{resource}`");
140                return Ok(Self(file));
141            }
142            (Err(err), file) => {
143                // Log error code and enum kind to help debugging more exotic failures.
144                if !is_known_already_locked_error(&err) {
145                    debug!("Try lock {mode} error: {err:?}");
146                }
147                file
148            }
149        };
150
151        // If there's lock contention, wait and break deadlocks with a timeout if necessary.
152        info!(
153            "Waiting to acquire {mode} lock for `{resource}` at `{}`",
154            file.path().user_display(),
155        );
156        let path = file.path().to_path_buf();
157        let lock_exclusive = tokio::task::spawn_blocking(move || (mode.lock(&file), file));
158        let (result, file) = tokio::time::timeout(*LOCK_TIMEOUT, lock_exclusive)
159            .await
160            .map_err(|_| LockedFileError::Timeout {
161                timeout: *LOCK_TIMEOUT,
162                resource: resource.to_string(),
163                path: path.clone(),
164            })??;
165        // Not an fs_err method, we need to build our own path context
166        result.map_err(|err| LockedFileError::Lock {
167            resource: resource.to_string(),
168            path,
169            source: err,
170        })?;
171
172        debug!("Acquired {mode} lock for `{resource}`");
173        Ok(Self(file))
174    }
175
176    /// Inner implementation for [`LockedFile::acquire_no_wait`].
177    fn lock_file_no_wait(file: fs_err::File, mode: LockedFileMode, resource: &str) -> Option<Self> {
178        trace!(
179            "Checking lock for `{resource}` at `{}`",
180            file.path().user_display()
181        );
182        match mode.try_lock(&file) {
183            Ok(()) => {
184                debug!("Acquired {mode} lock for `{resource}`");
185                Some(Self(file))
186            }
187            Err(err) => {
188                // Log error code and enum kind to help debugging more exotic failures.
189                if !is_known_already_locked_error(&err) {
190                    debug!("Try lock error: {err:?}");
191                }
192                debug!("Lock is busy for `{resource}`");
193                None
194            }
195        }
196    }
197
198    /// Acquire a cross-process lock for a resource using a file at the provided path.
199    pub async fn acquire(
200        path: impl AsRef<Path>,
201        mode: LockedFileMode,
202        resource: impl Display,
203    ) -> Result<Self, LockedFileError> {
204        let file = Self::create(path)?;
205        let resource = resource.to_string();
206        Self::lock_file(file, mode, &resource).await
207    }
208
209    /// Acquire a cross-process lock for a resource using a file at the provided path
210    ///
211    /// Unlike [`LockedFile::acquire`] this function will not wait for the lock to become available.
212    ///
213    /// If the lock is not immediately available, [`None`] is returned.
214    pub fn acquire_no_wait(
215        path: impl AsRef<Path>,
216        mode: LockedFileMode,
217        resource: impl Display,
218    ) -> Option<Self> {
219        let file = Self::create(path).ok()?;
220        let resource = resource.to_string();
221        Self::lock_file_no_wait(file, mode, &resource)
222    }
223
224    #[cfg(unix)]
225    fn create(path: impl AsRef<Path>) -> Result<fs_err::File, std::io::Error> {
226        use std::os::unix::fs::PermissionsExt;
227        use tempfile::NamedTempFile;
228
229        // If path already exists, return it.
230        if let Ok(file) = fs_err::OpenOptions::new()
231            .read(true)
232            .write(true)
233            .open(path.as_ref())
234        {
235            return Ok(file);
236        }
237
238        // Otherwise, create a temporary file with 666 permissions. We must set
239        // permissions _after_ creating the file, to override the `umask`.
240        let file = if let Some(parent) = path.as_ref().parent() {
241            NamedTempFile::new_in(parent)?
242        } else {
243            NamedTempFile::new()?
244        };
245        if let Err(err) = file
246            .as_file()
247            .set_permissions(std::fs::Permissions::from_mode(0o666))
248        {
249            warn!("Failed to set permissions on temporary file: {err}");
250        }
251
252        // Try to move the file to path, but if path exists now, just open path
253        match file.persist_noclobber(path.as_ref()) {
254            Ok(file) => Ok(fs_err::File::from_parts(file, path.as_ref())),
255            Err(err) => {
256                if err.error.kind() == std::io::ErrorKind::AlreadyExists {
257                    fs_err::OpenOptions::new()
258                        .read(true)
259                        .write(true)
260                        .open(path.as_ref())
261                } else {
262                    Err(err.error)
263                }
264            }
265        }
266    }
267
268    #[cfg(not(unix))]
269    fn create(path: impl AsRef<Path>) -> std::io::Result<fs_err::File> {
270        fs_err::OpenOptions::new()
271            .read(true)
272            .write(true)
273            .create(true)
274            .open(path.as_ref())
275    }
276}
277
278#[cfg(feature = "tokio")]
279impl Drop for LockedFile {
280    /// Unlock the file.
281    fn drop(&mut self) {
282        if let Err(err) = self.0.unlock() {
283            error!(
284                "Failed to unlock resource at `{}`; program may be stuck: {err}",
285                self.0.path().display()
286            );
287        } else {
288            debug!("Released lock at `{}`", self.0.path().display());
289        }
290    }
291}