lean_ctx/core/
path_locks.rs1use std::collections::HashMap;
20use std::sync::{Arc, Mutex};
21
22const MAX_ENTRIES: usize = 500;
24
25pub fn per_file_lock(path: &str) -> Arc<Mutex<()>> {
30 static FILE_LOCKS: std::sync::OnceLock<Mutex<HashMap<String, Arc<Mutex<()>>>>> =
31 std::sync::OnceLock::new();
32 let map = FILE_LOCKS.get_or_init(|| Mutex::new(HashMap::new()));
33 let mut map = map.lock().unwrap_or_else(|poisoned| {
34 tracing::warn!("path_locks registry poisoned; recovering");
35 poisoned.into_inner()
36 });
37
38 if map.len() > MAX_ENTRIES {
41 map.retain(|_, v| Arc::strong_count(v) > 1);
42 }
43
44 map.entry(path.to_string())
45 .or_insert_with(|| Arc::new(Mutex::new(())))
46 .clone()
47}
48
49#[cfg(test)]
50mod tests {
51 use super::*;
52 use std::sync::atomic::{AtomicUsize, Ordering};
53 use std::sync::Barrier;
54
55 #[test]
56 fn same_path_returns_same_mutex() {
57 let a1 = per_file_lock("/tmp/path_locks_same.txt");
58 let a2 = per_file_lock("/tmp/path_locks_same.txt");
59 assert!(Arc::ptr_eq(&a1, &a2));
60 }
61
62 #[test]
63 fn different_paths_return_different_mutexes() {
64 let a = per_file_lock("/tmp/path_locks_a.txt");
65 let b = per_file_lock("/tmp/path_locks_b.txt");
66 assert!(!Arc::ptr_eq(&a, &b));
67 }
68
69 #[test]
70 fn serializes_concurrent_access_to_same_path() {
71 let counter = Arc::new(AtomicUsize::new(0));
72 let max_concurrent = Arc::new(AtomicUsize::new(0));
73 let barrier = Arc::new(Barrier::new(8));
74 let path = "/tmp/path_locks_serialize.txt";
75 let mut handles = Vec::new();
76 for _ in 0..8 {
77 let counter = Arc::clone(&counter);
78 let max_concurrent = Arc::clone(&max_concurrent);
79 let barrier = Arc::clone(&barrier);
80 handles.push(std::thread::spawn(move || {
81 barrier.wait();
82 let lock = per_file_lock(path);
83 let _guard = lock.lock().unwrap();
84 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
85 max_concurrent.fetch_max(active, Ordering::SeqCst);
86 std::thread::sleep(std::time::Duration::from_millis(5));
87 counter.fetch_sub(1, Ordering::SeqCst);
88 }));
89 }
90 for h in handles {
91 h.join().unwrap();
92 }
93 assert_eq!(
94 max_concurrent.load(Ordering::SeqCst),
95 1,
96 "per-file lock must serialize same-path access"
97 );
98 }
99
100 #[test]
101 fn allows_parallel_access_to_different_paths() {
102 let counter = Arc::new(AtomicUsize::new(0));
103 let max_concurrent = Arc::new(AtomicUsize::new(0));
104 let barrier = Arc::new(Barrier::new(8));
105 let mut handles = Vec::new();
106 for i in 0..8 {
107 let counter = Arc::clone(&counter);
108 let max_concurrent = Arc::clone(&max_concurrent);
109 let barrier = Arc::clone(&barrier);
110 handles.push(std::thread::spawn(move || {
111 let path = format!("/tmp/path_locks_parallel_{i}.txt");
112 barrier.wait();
113 let lock = per_file_lock(&path);
114 let _guard = lock.lock().unwrap();
115 let active = counter.fetch_add(1, Ordering::SeqCst) + 1;
116 max_concurrent.fetch_max(active, Ordering::SeqCst);
117 std::thread::sleep(std::time::Duration::from_millis(5));
118 counter.fetch_sub(1, Ordering::SeqCst);
119 }));
120 }
121 for h in handles {
122 h.join().unwrap();
123 }
124 assert!(
125 max_concurrent.load(Ordering::SeqCst) > 1,
126 "different paths must be allowed to run in parallel"
127 );
128 }
129}