Skip to main content

pg_embedded_setup_unpriv/cache/
lock.rs

1//! Cross-process file locking for cache coordination.
2//!
3//! Provides exclusive and shared locks to coordinate binary downloads across
4//! parallel test runners. On Unix systems, uses `flock(2)` for advisory locking.
5//! On non-Unix platforms, locking is a no-op.
6
7use camino::Utf8Path;
8use std::fs::{File, OpenOptions};
9use std::io;
10
11#[cfg(unix)]
12use std::os::unix::io::AsRawFd;
13
14/// Subdirectory within the cache for lock files.
15const LOCKS_SUBDIR: &str = ".locks";
16
17/// Guard that holds a file lock until dropped.
18///
19/// The lock is automatically released when the guard goes out of scope.
20#[derive(Debug)]
21pub struct CacheLock {
22    _file: File,
23}
24
25impl CacheLock {
26    /// Acquires an exclusive lock for a specific version.
27    ///
28    /// Use exclusive locks when downloading or populating the cache to prevent
29    /// concurrent writes.
30    ///
31    /// # Errors
32    ///
33    /// Returns an error if the lock file cannot be created or the lock cannot
34    /// be acquired.
35    ///
36    /// # Examples
37    ///
38    /// ```no_run
39    /// use camino::Utf8Path;
40    /// use pg_embedded_setup_unpriv::cache::CacheLock;
41    ///
42    /// let cache_dir = Utf8Path::new("/tmp/pg-cache");
43    /// let _lock = CacheLock::acquire_exclusive(cache_dir, "17.4.0")?;
44    /// // Exclusive access to version 17.4.0 cache entry
45    /// # Ok::<(), std::io::Error>(())
46    /// ```
47    pub fn acquire_exclusive(cache_dir: &Utf8Path, version: &str) -> io::Result<Self> {
48        Self::acquire(cache_dir, version, LockType::Exclusive)
49    }
50
51    /// Acquires a shared lock for a specific version.
52    ///
53    /// Use shared locks when reading from the cache to allow concurrent reads
54    /// whilst blocking writes.
55    ///
56    /// # Errors
57    ///
58    /// Returns an error if the lock file cannot be created or the lock cannot
59    /// be acquired.
60    ///
61    /// # Examples
62    ///
63    /// ```no_run
64    /// use camino::Utf8Path;
65    /// use pg_embedded_setup_unpriv::cache::CacheLock;
66    ///
67    /// let cache_dir = Utf8Path::new("/tmp/pg-cache");
68    /// let _lock = CacheLock::acquire_shared(cache_dir, "17.4.0")?;
69    /// // Shared access to version 17.4.0 cache entry
70    /// # Ok::<(), std::io::Error>(())
71    /// ```
72    pub fn acquire_shared(cache_dir: &Utf8Path, version: &str) -> io::Result<Self> {
73        Self::acquire(cache_dir, version, LockType::Shared)
74    }
75
76    /// Acquires a lock with the specified type.
77    #[cfg(unix)]
78    fn acquire(cache_dir: &Utf8Path, version: &str, lock_type: LockType) -> io::Result<Self> {
79        validate_version(version)?;
80        let locks_dir = cache_dir.join(LOCKS_SUBDIR);
81        std::fs::create_dir_all(&locks_dir)?;
82
83        let lock_path = locks_dir.join(format!("{version}.lock"));
84        let file = OpenOptions::new()
85            .read(true)
86            .write(true)
87            .create(true)
88            .truncate(false)
89            .open(&lock_path)?;
90
91        let flock_arg = match lock_type {
92            LockType::Exclusive => libc::LOCK_EX,
93            LockType::Shared => libc::LOCK_SH,
94        };
95
96        // SAFETY: The file descriptor obtained from `file.as_raw_fd()` is valid
97        // because `file` was opened via `OpenOptions::open` and remains owned by
98        // this scope until after the `flock` call completes. No other code moves
99        // or closes the descriptor while this block runs.
100        //
101        // Retry loop handles EINTR, which can occur when the process receives a
102        // signal while blocked on flock.
103        loop {
104            let result = unsafe { libc::flock(file.as_raw_fd(), flock_arg) };
105            if result == 0 {
106                break;
107            }
108            let err = io::Error::last_os_error();
109            if err.kind() != io::ErrorKind::Interrupted {
110                return Err(err);
111            }
112            // EINTR: signal interrupted syscall, retry.
113        }
114
115        Ok(Self { _file: file })
116    }
117
118    /// No-op lock acquisition on non-Unix platforms.
119    #[cfg(not(unix))]
120    fn acquire(_cache_dir: &Utf8Path, version: &str, _lock_type: LockType) -> io::Result<Self> {
121        validate_version(version)?;
122        // Cross-process locking not supported; return a dummy lock.
123        // Concurrent tests may race on non-Unix platforms.
124        // Create a temporary file without external dependencies.
125        let temp_path = std::env::temp_dir().join(format!("pg-cache-lock-{}.tmp", version));
126        let file = OpenOptions::new()
127            .read(true)
128            .write(true)
129            .create(true)
130            .truncate(true)
131            .open(&temp_path)?;
132        // Attempt cleanup; ignore errors as temp files are ephemeral.
133        drop(std::fs::remove_file(&temp_path));
134        Ok(Self { _file: file })
135    }
136}
137
138/// Type of lock to acquire.
139#[derive(Debug, Clone, Copy)]
140enum LockType {
141    /// Exclusive lock for writes.
142    Exclusive,
143    /// Shared lock for reads.
144    Shared,
145}
146
147/// Validates that a version string is a single path component.
148///
149/// Rejects versions containing path separators or parent directory references
150/// that could escape the cache directory.
151fn validate_version(version: &str) -> io::Result<()> {
152    use std::path::Component;
153
154    let mut components = std::path::Path::new(version).components();
155    match (components.next(), components.next()) {
156        (Some(Component::Normal(_)), None) => Ok(()),
157        _ => Err(io::Error::new(
158            io::ErrorKind::InvalidInput,
159            "version must be a single path component",
160        )),
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use super::*;
167    use rstest::{fixture, rstest};
168    use tempfile::TempDir;
169
170    /// Fixture providing a temporary cache directory as a UTF-8 path.
171    #[fixture]
172    fn cache_fixture() -> (TempDir, camino::Utf8PathBuf) {
173        let temp = tempfile::tempdir().expect("tempdir");
174        let cache_dir =
175            camino::Utf8PathBuf::from_path_buf(temp.path().to_path_buf()).expect("utf8 path");
176        (temp, cache_dir)
177    }
178
179    #[rstest]
180    #[case::exclusive("17.4.0", true)]
181    #[case::shared("16.3.0", false)]
182    fn acquire_lock_creates_lock_file(
183        cache_fixture: (TempDir, camino::Utf8PathBuf),
184        #[case] version: &str,
185        #[case] exclusive: bool,
186    ) {
187        let (temp, cache_dir) = cache_fixture;
188        let _lock = if exclusive {
189            CacheLock::acquire_exclusive(&cache_dir, version).expect("acquire lock")
190        } else {
191            CacheLock::acquire_shared(&cache_dir, version).expect("acquire lock")
192        };
193
194        let lock_path = temp
195            .path()
196            .join(LOCKS_SUBDIR)
197            .join(format!("{version}.lock"));
198        assert!(lock_path.exists(), "lock file should be created");
199    }
200
201    #[rstest]
202    fn multiple_shared_locks_can_coexist(cache_fixture: (TempDir, camino::Utf8PathBuf)) {
203        let (_temp, cache_dir) = cache_fixture;
204
205        let lock1 = CacheLock::acquire_shared(&cache_dir, "17.4.0").expect("acquire lock 1");
206        let lock2 = CacheLock::acquire_shared(&cache_dir, "17.4.0").expect("acquire lock 2");
207
208        // Both locks should be held successfully
209        drop(lock1);
210        drop(lock2);
211    }
212
213    #[rstest]
214    fn different_versions_have_separate_locks(cache_fixture: (TempDir, camino::Utf8PathBuf)) {
215        let (_temp, cache_dir) = cache_fixture;
216
217        let lock1 = CacheLock::acquire_exclusive(&cache_dir, "17.4.0").expect("acquire lock 1");
218        let lock2 = CacheLock::acquire_exclusive(&cache_dir, "16.3.0").expect("acquire lock 2");
219
220        // Different versions should not block each other
221        drop(lock1);
222        drop(lock2);
223    }
224
225    #[rstest]
226    #[case::parent_dir_exclusive("..")]
227    #[case::parent_dir_shared("..")]
228    #[case::path_separator_exclusive("foo/bar")]
229    #[case::path_separator_shared("foo/bar")]
230    #[case::parent_in_path_exclusive("../17.4.0")]
231    #[case::absolute_path_exclusive("/etc/passwd")]
232    fn acquire_rejects_invalid_version_strings(
233        cache_fixture: (TempDir, camino::Utf8PathBuf),
234        #[case] invalid_version: &str,
235    ) {
236        let (_temp, cache_dir) = cache_fixture;
237
238        let exclusive_err = CacheLock::acquire_exclusive(&cache_dir, invalid_version)
239            .expect_err("acquire_exclusive should reject invalid version");
240        assert_eq!(
241            exclusive_err.kind(),
242            io::ErrorKind::InvalidInput,
243            "error kind should be InvalidInput for: {invalid_version}"
244        );
245
246        let shared_err = CacheLock::acquire_shared(&cache_dir, invalid_version)
247            .expect_err("acquire_shared should reject invalid version");
248        assert_eq!(
249            shared_err.kind(),
250            io::ErrorKind::InvalidInput,
251            "error kind should be InvalidInput for: {invalid_version}"
252        );
253    }
254}