Skip to main content

objects/
lock.rs

1// SPDX-License-Identifier: Apache-2.0
2//! Repository locking for concurrent access.
3//!
4//! [`RepoLock`] guarantees three invariants:
5//! - **Cross-process** exclusion via `flock(2)` on a lock file.
6//! - **Cross-thread, same-process** exclusion: two threads never both hold the
7//!   write lock.
8//! - **Same-thread reentrancy**: the owning thread may re-acquire the write lock
9//!   any number of times without blocking.
10//!
11//! The reentrancy invariant matters because `flock(2)` locks attach to the open
12//! file description, not the process: a single thread that opens the lock file
13//! twice and calls `flock` on the second fd blocks forever on its own first
14//! lock. The canonical write lock is taken at the top of an import and then
15//! re-taken by downstream writers on the same thread, so a non-reentrant
16//! primitive self-deadlocks. We therefore hold the `flock` once on the outermost
17//! acquisition and gate intra-process access through a per-lock-path registry.
18
19use std::{
20    collections::HashMap,
21    fs::File,
22    io,
23    path::{Path, PathBuf},
24    sync::{Arc, Condvar, Mutex, MutexGuard, OnceLock},
25    thread::{self, ThreadId},
26};
27
28use fs2::FileExt;
29use thiserror::Error;
30
31#[derive(Debug, Error)]
32pub enum LockError {
33    #[error("failed to acquire lock: {0}")]
34    Acquire(#[source] io::Error),
35    #[error("lock file not accessible: {0}")]
36    Io(#[source] io::Error),
37}
38
39pub type Result<T> = std::result::Result<T, LockError>;
40
41/// Intra-process state for a single lock path. `flock` holds the OS-level lock
42/// file while owned; dropping it releases the cross-process lock.
43struct GateState {
44    owner: Option<ThreadId>,
45    depth: usize,
46    flock: Option<File>,
47}
48
49struct Entry {
50    gate: Mutex<GateState>,
51    cv: Condvar,
52}
53
54impl Entry {
55    fn new() -> Self {
56        Self {
57            gate: Mutex::new(GateState {
58                owner: None,
59                depth: 0,
60                flock: None,
61            }),
62            cv: Condvar::new(),
63        }
64    }
65}
66
67/// Process-global registry of per-lock-path gates, keyed by the canonical lock
68/// path. Entries are created on first use and never removed (one small entry per
69/// distinct lock path per process lifetime).
70static REGISTRY: OnceLock<Mutex<HashMap<PathBuf, Arc<Entry>>>> = OnceLock::new();
71
72fn registry() -> &'static Mutex<HashMap<PathBuf, Arc<Entry>>> {
73    REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
74}
75
76fn entry_for(key: PathBuf) -> Arc<Entry> {
77    let mut map = registry().lock().unwrap_or_else(|e| e.into_inner());
78    Arc::clone(map.entry(key).or_insert_with(|| Arc::new(Entry::new())))
79}
80
81fn lock_gate(entry: &Entry) -> MutexGuard<'_, GateState> {
82    entry.gate.lock().unwrap_or_else(|e| e.into_inner())
83}
84
85pub struct ReadLockGuard {
86    // `None` when this is a no-op guard: the current thread already holds the
87    // write lock, whose exclusive flock subsumes a shared read.
88    _file: Option<File>,
89}
90
91impl Drop for ReadLockGuard {
92    fn drop(&mut self) {
93        if let Some(file) = &self._file {
94            let _ = file.unlock();
95        }
96    }
97}
98
99pub struct WriteLockGuard {
100    entry: Arc<Entry>,
101}
102
103impl Drop for WriteLockGuard {
104    fn drop(&mut self) {
105        let mut state = lock_gate(&self.entry);
106        if state.depth > 0 {
107            state.depth -= 1;
108        }
109        if state.depth == 0 {
110            state.owner = None;
111            // Dropping the File releases the cross-process flock.
112            state.flock = None;
113            self.entry.cv.notify_one();
114        }
115    }
116}
117
118pub struct RepoLock {
119    lock_path: PathBuf,
120}
121
122impl RepoLock {
123    pub fn new(repo_root: &Path) -> Self {
124        let lock_path = repo_root.join(".heddle/locks/repo.lock");
125        Self { lock_path }
126    }
127
128    pub fn at(lock_path: PathBuf) -> Self {
129        Self { lock_path }
130    }
131
132    pub fn read(&self) -> Result<ReadLockGuard> {
133        self.ensure_lock_dir()?;
134        let entry = entry_for(self.registry_key());
135
136        // If the current thread already holds the write lock, the exclusive
137        // flock covers this read; hand back a no-op guard so a same-thread
138        // read-under-write cannot deadlock against our own flock.
139        {
140            let state = lock_gate(&entry);
141            if state.owner == Some(thread::current().id()) {
142                return Ok(ReadLockGuard { _file: None });
143            }
144        }
145
146        let file = self.open_lock_file()?;
147        file.lock_shared().map_err(LockError::Acquire)?;
148        Ok(ReadLockGuard { _file: Some(file) })
149    }
150
151    pub fn write(&self) -> Result<WriteLockGuard> {
152        self.ensure_lock_dir()?;
153        let entry = entry_for(self.registry_key());
154        let tid = thread::current().id();
155        let mut state = lock_gate(&entry);
156        loop {
157            match state.owner {
158                Some(owner) if owner == tid => {
159                    state.depth += 1;
160                    return Ok(WriteLockGuard {
161                        entry: Arc::clone(&entry),
162                    });
163                }
164                None => {
165                    // Acquire the cross-process flock once for the outermost
166                    // holder. Holding the gate across this blocking call is
167                    // intentional: other local threads must block here until we
168                    // either win the flock or fail.
169                    let file = self.open_lock_file()?;
170                    file.lock_exclusive().map_err(LockError::Acquire)?;
171                    state.owner = Some(tid);
172                    state.depth = 1;
173                    state.flock = Some(file);
174                    return Ok(WriteLockGuard {
175                        entry: Arc::clone(&entry),
176                    });
177                }
178                Some(_) => {
179                    state = entry.cv.wait(state).unwrap_or_else(|e| e.into_inner());
180                }
181            }
182        }
183    }
184
185    pub fn try_read(&self) -> Result<Option<ReadLockGuard>> {
186        self.ensure_lock_dir()?;
187        let file = self.open_lock_file()?;
188
189        match file.try_lock_shared() {
190            Ok(()) => Ok(Some(ReadLockGuard { _file: Some(file) })),
191            Err(_) => Ok(None),
192        }
193    }
194
195    pub fn try_write(&self) -> Result<Option<WriteLockGuard>> {
196        self.ensure_lock_dir()?;
197        let entry = entry_for(self.registry_key());
198        let mut state = lock_gate(&entry);
199        // Non-blocking acquisition is NON-reentrant: a `try_write` while the lock
200        // is held — by ANY thread, including this one — reports contention
201        // (`None`). Reentrancy exists only to keep the BLOCKING `write()` from
202        // self-deadlocking on its own `flock`; a `try_*` can never deadlock, so a
203        // caller that uses it to detect contention (e.g. the undo/redo
204        // serialization lock, heddle#355) must see "held" regardless of holder.
205        match state.owner {
206            Some(_) => Ok(None),
207            None => {
208                let file = self.open_lock_file()?;
209                match file.try_lock_exclusive() {
210                    Ok(()) => {
211                        state.owner = Some(thread::current().id());
212                        state.depth = 1;
213                        state.flock = Some(file);
214                        Ok(Some(WriteLockGuard {
215                            entry: Arc::clone(&entry),
216                        }))
217                    }
218                    Err(_) => Ok(None),
219                }
220            }
221        }
222    }
223
224    fn ensure_lock_dir(&self) -> Result<()> {
225        if let Some(parent) = self.lock_path.parent() {
226            std::fs::create_dir_all(parent).map_err(LockError::Io)?;
227        }
228        Ok(())
229    }
230
231    /// Stable registry key for this lock path. The lock file itself may not exist
232    /// yet, so canonicalize the (already-created) parent directory and re-join
233    /// the filename rather than the whole path.
234    fn registry_key(&self) -> PathBuf {
235        match self.lock_path.parent() {
236            Some(parent) => {
237                let canon_parent = parent
238                    .canonicalize()
239                    .unwrap_or_else(|_| parent.to_path_buf());
240                match self.lock_path.file_name() {
241                    Some(name) => canon_parent.join(name),
242                    None => canon_parent,
243                }
244            }
245            None => self.lock_path.clone(),
246        }
247    }
248
249    fn open_lock_file(&self) -> Result<File> {
250        File::create(&self.lock_path).map_err(LockError::Io)
251    }
252}
253
254pub trait RepositoryLockExt {
255    fn locker(&self) -> RepoLock;
256}
257
258#[cfg(test)]
259mod tests {
260    use std::{
261        sync::{
262            Arc,
263            mpsc::{self},
264        },
265        thread,
266    };
267
268    use tempfile::TempDir;
269
270    use super::*;
271
272    #[test]
273    fn test_read_lock_acquired() {
274        let temp = TempDir::new().unwrap();
275        let lock = RepoLock::new(temp.path());
276
277        let guard = lock.read().unwrap();
278        assert!(std::mem::size_of_val(&guard) > 0);
279    }
280
281    #[test]
282    fn test_write_lock_acquired() {
283        let temp = TempDir::new().unwrap();
284        let lock = RepoLock::new(temp.path());
285
286        let guard = lock.write().unwrap();
287        assert!(std::mem::size_of_val(&guard) > 0);
288    }
289
290    #[test]
291    fn test_multiple_readers() {
292        let temp = TempDir::new().unwrap();
293        let lock = Arc::new(RepoLock::new(temp.path()));
294
295        let mut handles = vec![];
296        for _ in 0..10 {
297            let lock = Arc::clone(&lock);
298            let handle = thread::spawn(move || {
299                let _guard = lock.read().unwrap();
300                thread::sleep(std::time::Duration::from_millis(10));
301            });
302            handles.push(handle);
303        }
304
305        for handle in handles {
306            handle.join().unwrap();
307        }
308    }
309
310    #[test]
311    fn test_writer_excludes_reader() {
312        let temp = TempDir::new().unwrap();
313        let lock = Arc::new(RepoLock::new(temp.path()));
314
315        let _write_guard = lock.write().unwrap();
316        let read_result = lock.try_read().unwrap();
317        assert!(read_result.is_none(), "Reader should be blocked by writer");
318    }
319
320    #[test]
321    fn test_reader_excludes_writer() {
322        let temp = TempDir::new().unwrap();
323        let lock = Arc::new(RepoLock::new(temp.path()));
324
325        let _read_guard = lock.read().unwrap();
326        let write_result = lock.try_write().unwrap();
327        assert!(write_result.is_none(), "Writer should be blocked by reader");
328    }
329
330    #[test]
331    fn test_lock_released_on_drop() {
332        let temp = TempDir::new().unwrap();
333        let lock = RepoLock::new(temp.path());
334
335        {
336            let _guard = lock.write().unwrap();
337        }
338
339        let _guard2 = lock.read().unwrap();
340    }
341
342    /// The owning thread may re-take the write lock without blocking on its own
343    /// flock — the regression that self-deadlocked the canonical import lock.
344    #[test]
345    fn same_thread_write_is_reentrant() {
346        let temp = TempDir::new().unwrap();
347        let lock = RepoLock::new(temp.path());
348
349        let _a = lock.write().unwrap();
350        let _b = lock.write().unwrap();
351        // Reaching here without hanging is the assertion (harness timeout is the
352        // backstop on regression).
353    }
354
355    /// A read taken by the thread that already holds the write lock must not
356    /// block against its own exclusive flock.
357    #[test]
358    fn same_thread_read_under_write_does_not_deadlock() {
359        let temp = TempDir::new().unwrap();
360        let lock = RepoLock::new(temp.path());
361
362        let _w = lock.write().unwrap();
363        let _r = lock.read().unwrap();
364    }
365
366    /// Reentrancy is strictly per-thread: while one thread holds the write lock,
367    /// a different thread is excluded.
368    #[test]
369    fn distinct_threads_still_exclude() {
370        let temp = TempDir::new().unwrap();
371        let lock = Arc::new(RepoLock::new(temp.path()));
372
373        let (acquired_tx, acquired_rx) = mpsc::channel();
374        let (release_tx, release_rx) = mpsc::channel();
375        let lock_a = Arc::clone(&lock);
376        let handle = thread::spawn(move || {
377            let _g = lock_a.write().unwrap();
378            acquired_tx.send(()).unwrap();
379            release_rx.recv().unwrap();
380        });
381
382        acquired_rx.recv().unwrap();
383        assert!(
384            lock.try_write().unwrap().is_none(),
385            "a second thread must not acquire the write lock"
386        );
387
388        release_tx.send(()).unwrap();
389        handle.join().unwrap();
390
391        assert!(
392            lock.try_write().unwrap().is_some(),
393            "write lock is available once the owning thread releases"
394        );
395    }
396
397    /// A reentrant (depth > 1) hold keeps the lock until the OUTERMOST guard
398    /// drops; other threads stay excluded across the inner drops.
399    #[test]
400    fn reentrant_release_keeps_lock_until_outermost_drop() {
401        let temp = TempDir::new().unwrap();
402        let lock = Arc::new(RepoLock::new(temp.path()));
403
404        let a1 = lock.write().unwrap();
405        let a2 = lock.write().unwrap();
406
407        let other = |lock: &Arc<RepoLock>| {
408            let lock = Arc::clone(lock);
409            thread::spawn(move || lock.try_write().unwrap().is_none())
410                .join()
411                .unwrap()
412        };
413
414        assert!(other(&lock), "excluded while held at depth 2");
415        drop(a2);
416        assert!(other(&lock), "still excluded while held at depth 1");
417        drop(a1);
418
419        let lock_b = Arc::clone(&lock);
420        let now_available = thread::spawn(move || lock_b.try_write().unwrap().is_some())
421            .join()
422            .unwrap();
423        assert!(now_available, "available after the outermost guard drops");
424    }
425
426    /// `try_write` is intentionally NON-reentrant: even the thread that already
427    /// holds the write lock gets `None`, not a nested guard. Reentrancy exists
428    /// only so the BLOCKING `write()` can't self-deadlock on its own `flock`; a
429    /// non-blocking `try_*` can never deadlock, and callers use it to DETECT
430    /// contention (the undo/redo serialization lock, heddle#355), so it must
431    /// report "held" regardless of holder. Do NOT "fix" this to mirror
432    /// `write()`'s reentrancy.
433    #[test]
434    fn try_write_is_non_reentrant_even_for_owner() {
435        let temp = TempDir::new().unwrap();
436        let lock = RepoLock::new(temp.path());
437
438        let _held = lock.write().unwrap();
439        assert!(
440            lock.try_write().unwrap().is_none(),
441            "try_write must report contention even for the lock's own owner thread"
442        );
443    }
444}