Skip to main content

rab/builtin/
file_mutation_queue.rs

1use std::collections::HashMap;
2use std::path::Path;
3use std::sync::{Arc, LazyLock, Mutex};
4use tokio::sync::Notify;
5
6/// Per-file queue entries. Each entry is a `Notify` that the NEXT operation
7/// will wait on. Operations chain through these to serialize access.
8static FILE_QUEUES: LazyLock<Mutex<HashMap<String, Arc<Notify>>>> =
9    LazyLock::new(|| Mutex::new(HashMap::new()));
10
11/// Resolve a file path (relative or absolute) against a working directory.
12fn resolve_path(path: &str, cwd: &Path) -> String {
13    let p = Path::new(path);
14    if p.is_absolute() {
15        path.to_string()
16    } else {
17        let joined = cwd.join(p);
18        // Normalize using the filesystem representation
19        joined.to_string_lossy().replace('\\', "/")
20    }
21}
22
23/// Serialize file mutation operations targeting the same file.
24///
25/// Operations for different files still run in parallel. This mirrors pi's
26/// `withFileMutationQueue` in file-mutation-queue.ts.
27///
28/// The implementation:
29/// - Each file has a `Notify` stored in a global map, representing the
30///   "next operation" signal.
31/// - An operation registers by replacing the entry with its own `Notify`
32///   (for the operation after it), and picking up the previous `Notify`
33///   to wait on.
34/// - When the operation finishes, it signals its own `Notify` (which the
35///   next operation is waiting on) and, if it is still the latest entry,
36///   cleans up.
37pub async fn with_file_mutation_queue<T, E, F, Fut>(
38    file_path: &str,
39    cwd: &Path,
40    f: F,
41) -> Result<T, E>
42where
43    F: FnOnce() -> Fut,
44    Fut: std::future::Future<Output = Result<T, E>>,
45{
46    let key = resolve_path(file_path, cwd);
47
48    // ── Registration phase ─────────────────────────────────────
49    // Atomically: pick up the previous Notify (if any) and store ours.
50    let our_notify = Arc::new(Notify::new());
51    let prev_notify = {
52        let mut queues = FILE_QUEUES.lock().unwrap();
53        queues.insert(key.clone(), our_notify.clone())
54    };
55
56    // ── Wait for the previous operation to finish ──────────────
57    if let Some(prev) = &prev_notify {
58        prev.notified().await;
59    }
60
61    // ── Run the operation ──────────────────────────────────────
62    let result = f().await;
63
64    // ── Signal the next operation ──────────────────────────────
65    // Our Notify may have been picked up by the next operation as
66    // its prev_notify. Signal it so the next operation can proceed.
67    our_notify.notify_one();
68
69    // ── Clean up if we're still the latest entry ───────────────
70    let mut queues = FILE_QUEUES.lock().unwrap();
71    if let Some(current) = queues.get(&key)
72        && Arc::ptr_eq(current, &our_notify)
73    {
74        // No new operation registered after us — clean up.
75        queues.remove(&key);
76    }
77    // If a new operation registered, its own Notify is now in the
78    // map; we leave it there for the next cleanup cycle.
79
80    result
81}
82
83#[cfg(test)]
84mod tests {
85    use super::*;
86    use std::sync::atomic::{AtomicUsize, Ordering};
87    use std::time::Duration;
88
89    #[tokio::test]
90    async fn runs_without_previous() {
91        let cwd = Path::new("/tmp");
92        let mut ran = false;
93        with_file_mutation_queue("/tmp/test_file_1.txt", cwd, || async {
94            ran = true;
95            Ok::<_, String>(42)
96        })
97        .await
98        .unwrap();
99        assert!(ran);
100    }
101
102    #[tokio::test]
103    async fn serializes_concurrent_access() {
104        let cwd = Path::new("/tmp");
105        let counter = Arc::new(AtomicUsize::new(0));
106        let max = Arc::new(AtomicUsize::new(0));
107
108        let mut handles = Vec::new();
109        for _ in 0..10 {
110            let c = counter.clone();
111            let m = max.clone();
112            handles.push(tokio::spawn(async move {
113                with_file_mutation_queue("/tmp/test_serial.txt", cwd, || async {
114                    let v = c.fetch_add(1, Ordering::SeqCst) + 1;
115                    // Track the maximum concurrent count
116                    let prev_max = m.fetch_max(v, Ordering::SeqCst);
117                    // Simulate work
118                    tokio::time::sleep(Duration::from_millis(5)).await;
119                    c.fetch_sub(1, Ordering::SeqCst);
120                    // If max concurrent > 1, the queue didn't work
121                    if prev_max >= 1 && v > 1 {
122                        panic!("concurrent access detected: v={}", v);
123                    }
124                    Ok::<_, String>(())
125                })
126                .await
127                .unwrap();
128            }));
129        }
130
131        for handle in handles {
132            handle.await.unwrap();
133        }
134
135        // Max concurrent should be 1 (serialized)
136        assert_eq!(max.load(Ordering::SeqCst), 1);
137    }
138
139    #[tokio::test]
140    async fn different_files_run_in_parallel() {
141        let cwd = Path::new("/tmp");
142        let start = std::time::Instant::now();
143
144        let mut handles = Vec::new();
145        for i in 0..5 {
146            handles.push(tokio::spawn(async move {
147                with_file_mutation_queue(&format!("/tmp/parallel_{}.txt", i), cwd, || async {
148                    tokio::time::sleep(Duration::from_millis(50)).await;
149                    Ok::<_, String>(i)
150                })
151                .await
152                .unwrap()
153            }));
154        }
155
156        for handle in handles {
157            handle.await.unwrap();
158        }
159
160        // All 5 ran in parallel, so total time should be ~50ms not ~250ms
161        let elapsed = start.elapsed();
162        assert!(
163            elapsed < Duration::from_millis(150),
164            "took too long: {:?} — files ran sequentially instead of in parallel",
165            elapsed
166        );
167    }
168
169    #[tokio::test]
170    async fn returns_value() {
171        let cwd = Path::new("/tmp");
172        let result: Result<i32, String> =
173            with_file_mutation_queue("/tmp/retval.txt", cwd, || async { Ok(99) }).await;
174        assert_eq!(result.unwrap(), 99);
175    }
176
177    #[tokio::test]
178    async fn propagates_error() {
179        let cwd = Path::new("/tmp");
180        let result: Result<i32, String> =
181            with_file_mutation_queue("/tmp/error.txt", cwd, || async { Err("oops".to_string()) })
182                .await;
183        assert!(result.is_err());
184        assert_eq!(result.unwrap_err(), "oops");
185    }
186
187    #[tokio::test]
188    async fn chains_correctly() {
189        // Test that three operations on the same file run in order
190        let cwd = Path::new("/tmp");
191        let order = Arc::new(std::sync::Mutex::new(Vec::new()));
192
193        let mut handles = Vec::new();
194        for i in 0..3 {
195            let o = order.clone();
196            handles.push(tokio::spawn(async move {
197                with_file_mutation_queue("/tmp/chaining.txt", cwd, || async {
198                    // Simulate variable work time
199                    tokio::time::sleep(Duration::from_millis(10 * (3 - i))).await;
200                    o.lock().unwrap().push(i);
201                    Ok::<_, String>(())
202                })
203                .await
204                .unwrap()
205            }));
206        }
207
208        for handle in handles {
209            handle.await.unwrap();
210        }
211
212        // Despite task 0 having the longest sleep (30ms),
213        // task 1 (20ms) and 2 (10ms) should execute AFTER task 0
214        // because they're serialized
215        let order = order.lock().unwrap();
216        assert_eq!(*order, vec![0, 1, 2], "operations executed out of order");
217    }
218}