Skip to main content

lean_ctx/core/
path_locks.rs

1//! Process-wide per-path advisory locks.
2//!
3//! These in-process mutexes serialize concurrent operations on the *same* file
4//! path while letting operations on *different* paths run fully in parallel.
5//!
6//! Why this exists: tools like `ctx_read` and `ctx_edit` would otherwise contend
7//! on the single global cache write-lock for the entire duration of their disk
8//! I/O. When several agents (or sub-agents) hammer files concurrently, that
9//! global lock becomes a bottleneck and edits can time out waiting for it (see
10//! issue #320). A per-path lock keeps the contention scoped to the one file that
11//! actually needs serialization, so unrelated reads/edits never block each other.
12//!
13//! Lock ordering (see `LOCK_ORDERING.md`, L17): the inner registry mutex is held
14//! only long enough to clone the per-path `Arc<Mutex<()>>`, then released before
15//! the per-path lock itself is acquired. Never hold the registry mutex across the
16//! per-path lock, and never acquire a per-path lock while holding the global
17//! cache write-lock.
18
19use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21
22/// Upper bound on retained lock entries before we garbage-collect unused ones.
23const MAX_ENTRIES: usize = 500;
24
25/// Returns the shared advisory lock for `path`, creating it on first use.
26///
27/// The same path always yields the same `Arc<Mutex<()>>`, so callers across
28/// threads serialize on it. Different paths yield independent mutexes.
29pub fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
30    static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
31        std::sync::OnceLock::new();
32    let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
33    let mut map = map.lock().unwrap_or_else(|poisoned| {
34        tracing::warn!("path_locks registry poisoned; recovering");
35        poisoned.into_inner()
36    });
37
38    // Bounded growth: drop entries no one else is holding a reference to. The
39    // `> 1` check keeps any lock that is currently in use by another caller.
40    if map.len() > MAX_ENTRIES {
41        map.retain(|_, v| Arc::strong_count(v) > 1);
42    }
43
44    map.entry(path.to_string())
45        .or_insert_with(|| Arc::new(Mutex::new(())))
46        .clone()
47}
48
49#[cfg(test)]
50mod tests {
51    use super::*;
52    use std::sync::atomic::{AtomicUsize, Ordering};
53    use std::sync::Barrier;
54
55    #[test]
56    fn same_path_returns_same_mutex() {
57        let a1 = per_file_lock("/tmp/path_locks_same.txt");
58        let a2 = per_file_lock("/tmp/path_locks_same.txt");
59        assert!(Arc::ptr_eq(&a1, &a2));
60    }
61
62    #[test]
63    fn different_paths_return_different_mutexes() {
64        let a = per_file_lock("/tmp/path_locks_a.txt");
65        let b = per_file_lock("/tmp/path_locks_b.txt");
66        assert!(!Arc::ptr_eq(&a, &b));
67    }
68
69    #[test]
70    fn serializes_concurrent_access_to_same_path() {
71        let counter = Arc::new(AtomicUsize::new(0));
72        let max_concurrent = Arc::new(AtomicUsize::new(0));
73        let barrier = Arc::new(Barrier::new(8));
74        let path = "/tmp/path_locks_serialize.txt";
75        let mut handles = Vec::new();
76        for _ in 0..8 {
77            let counter = Arc::clone(&counter);
78            let max_concurrent = Arc::clone(&max_concurrent);
79            let barrier = Arc::clone(&barrier);
80            handles.push(std::thread::spawn(move || {
81                barrier.wait();
82                let lock = per_file_lock(path);
83                let _guard = lock.lock().unwrap();
84                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
85                max_concurrent.fetch_max(active, Ordering::SeqCst);
86                std::thread::sleep(std::time::Duration::from_millis(5));
87                counter.fetch_sub(1, Ordering::SeqCst);
88            }));
89        }
90        for h in handles {
91            h.join().unwrap();
92        }
93        assert_eq!(
94            max_concurrent.load(Ordering::SeqCst),
95            1,
96            "per-file lock must serialize same-path access"
97        );
98    }
99
100    #[test]
101    fn allows_parallel_access_to_different_paths() {
102        let counter = Arc::new(AtomicUsize::new(0));
103        let max_concurrent = Arc::new(AtomicUsize::new(0));
104        let barrier = Arc::new(Barrier::new(8));
105        let mut handles = Vec::new();
106        for i in 0..8 {
107            let counter = Arc::clone(&counter);
108            let max_concurrent = Arc::clone(&max_concurrent);
109            let barrier = Arc::clone(&barrier);
110            handles.push(std::thread::spawn(move || {
111                let path = format!("/tmp/path_locks_parallel_{i}.txt");
112                barrier.wait();
113                let lock = per_file_lock(&path);
114                let _guard = lock.lock().unwrap();
115                let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
116                max_concurrent.fetch_max(active, Ordering::SeqCst);
117                std::thread::sleep(std::time::Duration::from_millis(5));
118                counter.fetch_sub(1, Ordering::SeqCst);
119            }));
120        }
121        for h in handles {
122            h.join().unwrap();
123        }
124        assert!(
125            max_concurrent.load(Ordering::SeqCst) > 1,
126            "different paths must be allowed to run in parallel"
127        );
128    }
129}