rab/builtin/
file_mutation_queue.rs1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, LazyLock, Mutex};
4use tokio::sync::Notify;
5
6static FILE_QUEUES: LazyLock<Mutex<HashMap<String, Arc<Notify>>>> =
9 LazyLock::new(|| Mutex::new(HashMap::new()));
10
11fn resolve_path(path: &str, cwd: &Path) -> String {
13 let p = Path::new(path);
14 if p.is_absolute() {
15 path.to_string()
16 } else {
17 let joined = cwd.join(p);
18 joined.to_string_lossy().replace('\\', "/")
20 }
21}
22
23pub async fn with_file_mutation_queue<T, E, F, Fut>(
38 file_path: &str,
39 cwd: &Path,
40 f: F,
41) -> Result<T, E>
42where
43 F: FnOnce() -> Fut,
44 Fut: std::future::Future<Output = Result<T, E>>,
45{
46 let key = resolve_path(file_path, cwd);
47
48 let our_notify = Arc::new(Notify::new());
51 let prev_notify = {
52 let mut queues = FILE_QUEUES.lock().unwrap();
53 queues.insert(key.clone(), our_notify.clone())
54 };
55
56 if let Some(prev) = &prev_notify {
58 prev.notified().await;
59 }
60
61 let result = f().await;
63
64 our_notify.notify_one();
68
69 let mut queues = FILE_QUEUES.lock().unwrap();
71 if let Some(current) = queues.get(&key)
72 && Arc::ptr_eq(current, &our_notify)
73 {
74 queues.remove(&key);
76 }
77 result
81}
82
83#[cfg(test)]
84mod tests {
85 use super::*;
86 use std::sync::atomic::{AtomicUsize, Ordering};
87 use std::time::Duration;
88
89 #[tokio::test]
90 async fn runs_without_previous() {
91 let cwd = Path::new("/tmp");
92 let mut ran = false;
93 with_file_mutation_queue("/tmp/test_file_1.txt", cwd, || async {
94 ran = true;
95 Ok::<_, String>(42)
96 })
97 .await
98 .unwrap();
99 assert!(ran);
100 }
101
102 #[tokio::test]
103 async fn serializes_concurrent_access() {
104 let cwd = Path::new("/tmp");
105 let counter = Arc::new(AtomicUsize::new(0));
106 let max = Arc::new(AtomicUsize::new(0));
107
108 let mut handles = Vec::new();
109 for _ in 0..10 {
110 let c = counter.clone();
111 let m = max.clone();
112 handles.push(tokio::spawn(async move {
113 with_file_mutation_queue("/tmp/test_serial.txt", cwd, || async {
114 let v = c.fetch_add(1, Ordering::SeqCst) + 1;
115 let prev_max = m.fetch_max(v, Ordering::SeqCst);
117 tokio::time::sleep(Duration::from_millis(5)).await;
119 c.fetch_sub(1, Ordering::SeqCst);
120 if prev_max >= 1 && v > 1 {
122 panic!("concurrent access detected: v={}", v);
123 }
124 Ok::<_, String>(())
125 })
126 .await
127 .unwrap();
128 }));
129 }
130
131 for handle in handles {
132 handle.await.unwrap();
133 }
134
135 assert_eq!(max.load(Ordering::SeqCst), 1);
137 }
138
139 #[tokio::test]
140 async fn different_files_run_in_parallel() {
141 let cwd = Path::new("/tmp");
142 let start = std::time::Instant::now();
143
144 let mut handles = Vec::new();
145 for i in 0..5 {
146 handles.push(tokio::spawn(async move {
147 with_file_mutation_queue(&format!("/tmp/parallel_{}.txt", i), cwd, || async {
148 tokio::time::sleep(Duration::from_millis(50)).await;
149 Ok::<_, String>(i)
150 })
151 .await
152 .unwrap()
153 }));
154 }
155
156 for handle in handles {
157 handle.await.unwrap();
158 }
159
160 let elapsed = start.elapsed();
162 assert!(
163 elapsed < Duration::from_millis(150),
164 "took too long: {:?} — files ran sequentially instead of in parallel",
165 elapsed
166 );
167 }
168
169 #[tokio::test]
170 async fn returns_value() {
171 let cwd = Path::new("/tmp");
172 let result: Result<i32, String> =
173 with_file_mutation_queue("/tmp/retval.txt", cwd, || async { Ok(99) }).await;
174 assert_eq!(result.unwrap(), 99);
175 }
176
177 #[tokio::test]
178 async fn propagates_error() {
179 let cwd = Path::new("/tmp");
180 let result: Result<i32, String> =
181 with_file_mutation_queue("/tmp/error.txt", cwd, || async { Err("oops".to_string()) })
182 .await;
183 assert!(result.is_err());
184 assert_eq!(result.unwrap_err(), "oops");
185 }
186
187 #[tokio::test]
188 async fn chains_correctly() {
189 let cwd = Path::new("/tmp");
191 let order = Arc::new(std::sync::Mutex::new(Vec::new()));
192
193 let mut handles = Vec::new();
194 for i in 0..3 {
195 let o = order.clone();
196 handles.push(tokio::spawn(async move {
197 with_file_mutation_queue("/tmp/chaining.txt", cwd, || async {
198 tokio::time::sleep(Duration::from_millis(10 * (3 - i))).await;
200 o.lock().unwrap().push(i);
201 Ok::<_, String>(())
202 })
203 .await
204 .unwrap()
205 }));
206 }
207
208 for handle in handles {
209 handle.await.unwrap();
210 }
211
212 let order = order.lock().unwrap();
216 assert_eq!(*order, vec![0, 1, 2], "operations executed out of order");
217 }
218}