Skip to main content

zccache_watcher/
settle.rs

1//! Settle/coalesce buffer for bursty filesystem events.
2//!
3//! Filesystem watchers fire many events in rapid succession (e.g., a `cargo build`
4//! touching 100 files in 10ms). The settle buffer waits for a configurable quiet
5//! period before emitting a single coalesced batch.
6//!
7//! Overflow events bypass coalescing entirely — they clear pending state and
8//! emit immediately, since everything is invalidated.
9
10use std::collections::HashMap;
11use std::time::Duration;
12use tokio::sync::mpsc;
13use zccache_core::NormalizedPath;
14
15use crate::WatchEvent;
16
17/// Output from the settle buffer.
18#[derive(Debug, Clone)]
19pub enum SettledEvent {
20    /// A coalesced batch of file changes after the settle window.
21    Batch {
22        changed: Vec<NormalizedPath>,
23        removed: Vec<NormalizedPath>,
24    },
25    /// Watcher overflow — all cached state should be considered stale.
26    Overflow,
27}
28
29/// Coalesces bursty filesystem events into settled batches.
30///
31/// The daemon's file watcher produces a storm of events during builds.
32/// The settle buffer absorbs them and waits for a quiet period (the settle
33/// window) before emitting a single batch. This prevents the cache from
34/// doing redundant work during a burst.
35#[derive(Debug)]
36pub struct SettleBuffer {
37    settle_window: Duration,
38    /// Maximum time from the first event before forcing batch emission,
39    /// even if events are still arriving. Prevents starvation when
40    /// the daemon writes to watched directories (logs, artifacts).
41    max_wait: Duration,
42}
43
44/// Tracks the most recent change kind for a path during coalescing.
45#[derive(Debug, Clone, Copy)]
46enum ChangeKind {
47    Modified,
48    Removed,
49}
50
51impl SettleBuffer {
52    /// Create a settle buffer with the given settle window and max wait.
53    #[must_use]
54    pub fn new(settle_window: Duration) -> Self {
55        Self {
56            settle_window,
57            max_wait: Duration::from_millis(50),
58        }
59    }
60
61    /// Create a settle buffer with the default 50ms settle window.
62    #[must_use]
63    pub fn default_window() -> Self {
64        Self::new(Duration::from_millis(50))
65    }
66
67    /// Run the settle loop.
68    ///
69    /// Reads raw `WatchEvent`s from `rx`, coalesces them, and sends
70    /// `SettledEvent`s to `tx` after each settle window elapses.
71    ///
72    /// Returns when the input channel is closed.
73    pub async fn run(
74        &self,
75        mut rx: mpsc::UnboundedReceiver<WatchEvent>,
76        tx: mpsc::UnboundedSender<SettledEvent>,
77    ) {
78        let mut pending: HashMap<NormalizedPath, ChangeKind> = HashMap::new();
79
80        loop {
81            // Wait for the first event (or channel close).
82            let event = match rx.recv().await {
83                Some(e) => e,
84                None => {
85                    // Channel closed — flush any remaining events.
86                    if !pending.is_empty() {
87                        let _ = tx.send(Self::drain(&mut pending));
88                    }
89                    return;
90                }
91            };
92
93            // Handle overflow immediately — don't wait for settle.
94            // WatchEvent::Error is also treated as overflow because on Windows,
95            // ReadDirectoryChangesW buffer overflow and watcher death arrive as
96            // errors from the notify crate, not as distinct overflow events.
97            // Treating errors as overflow is conservative but correct: it forces
98            // a full re-stat of all cached entries on the next access.
99            if matches!(event, WatchEvent::Overflow | WatchEvent::Error(_)) {
100                pending.clear();
101                let _ = tx.send(SettledEvent::Overflow);
102                continue;
103            }
104
105            Self::apply_event(&mut pending, event);
106
107            // Coalesce: keep reading until either (a) the settle window elapses
108            // with no new events, or (b) max_wait from the first event is reached.
109            // Without (b), continuous writes (e.g. session log) starve the buffer.
110            let deadline = tokio::time::Instant::now() + self.max_wait;
111            loop {
112                let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
113                let wait = self.settle_window.min(remaining);
114                if wait.is_zero() {
115                    // Max wait reached — force emit.
116                    if !pending.is_empty() {
117                        let _ = tx.send(Self::drain(&mut pending));
118                    }
119                    break;
120                }
121                match tokio::time::timeout(wait, rx.recv()).await {
122                    Ok(Some(WatchEvent::Overflow | WatchEvent::Error(_))) => {
123                        pending.clear();
124                        let _ = tx.send(SettledEvent::Overflow);
125                        break;
126                    }
127                    Ok(Some(event)) => {
128                        Self::apply_event(&mut pending, event);
129                    }
130                    Ok(None) => {
131                        // Channel closed — flush remaining.
132                        if !pending.is_empty() {
133                            let _ = tx.send(Self::drain(&mut pending));
134                        }
135                        return;
136                    }
137                    Err(_timeout) => {
138                        // Settle window elapsed — emit batch.
139                        if !pending.is_empty() {
140                            let _ = tx.send(Self::drain(&mut pending));
141                        }
142                        break;
143                    }
144                }
145            }
146        }
147    }
148
149    fn apply_event(pending: &mut HashMap<NormalizedPath, ChangeKind>, event: WatchEvent) {
150        match event {
151            WatchEvent::Modified(p) | WatchEvent::Created(p) => {
152                pending.insert(p, ChangeKind::Modified);
153            }
154            WatchEvent::Removed(p) => {
155                pending.insert(p, ChangeKind::Removed);
156            }
157            WatchEvent::Renamed { from, to } => {
158                pending.insert(from, ChangeKind::Removed);
159                pending.insert(to, ChangeKind::Modified);
160            }
161            WatchEvent::Overflow | WatchEvent::Error(_) => {
162                // Overflow handled in run(). Errors are logged upstream.
163            }
164        }
165    }
166
167    fn drain(pending: &mut HashMap<NormalizedPath, ChangeKind>) -> SettledEvent {
168        let mut changed = Vec::new();
169        let mut removed = Vec::new();
170        for (path, kind) in pending.drain() {
171            match kind {
172                ChangeKind::Modified => changed.push(path),
173                ChangeKind::Removed => removed.push(path),
174            }
175        }
176        SettledEvent::Batch { changed, removed }
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183
184    #[tokio::test]
185    async fn single_event_settles() {
186        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
187        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
188
189        let buffer = SettleBuffer::new(Duration::from_millis(20));
190        let handle = tokio::spawn(async move {
191            buffer.run(raw_rx, settled_tx).await;
192        });
193
194        raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
195        drop(raw_tx);
196
197        let event = settled_rx.recv().await.unwrap();
198        match event {
199            SettledEvent::Batch { changed, removed } => {
200                assert_eq!(changed.len(), 1);
201                assert!(removed.is_empty());
202            }
203            SettledEvent::Overflow => panic!("expected batch"),
204        }
205
206        handle.await.unwrap();
207    }
208
209    #[tokio::test]
210    async fn rapid_events_coalesce_into_one_batch() {
211        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
212        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
213
214        let buffer = SettleBuffer::new(Duration::from_millis(50));
215        let handle = tokio::spawn(async move {
216            buffer.run(raw_rx, settled_tx).await;
217        });
218
219        for i in 0..5 {
220            raw_tx
221                .send(WatchEvent::Modified(format!("file_{i}.c").into()))
222                .unwrap();
223        }
224        drop(raw_tx);
225
226        let event = settled_rx.recv().await.unwrap();
227        match event {
228            SettledEvent::Batch { changed, removed } => {
229                assert_eq!(changed.len(), 5);
230                assert!(removed.is_empty());
231            }
232            SettledEvent::Overflow => panic!("expected batch"),
233        }
234
235        handle.await.unwrap();
236    }
237
238    #[tokio::test]
239    async fn same_file_deduplicates() {
240        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
241        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
242
243        let buffer = SettleBuffer::new(Duration::from_millis(20));
244        let handle = tokio::spawn(async move {
245            buffer.run(raw_rx, settled_tx).await;
246        });
247
248        let path = NormalizedPath::new("hot.c");
249        raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
250        raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
251        raw_tx.send(WatchEvent::Modified(path)).unwrap();
252        drop(raw_tx);
253
254        let event = settled_rx.recv().await.unwrap();
255        match event {
256            SettledEvent::Batch { changed, removed } => {
257                assert_eq!(changed.len(), 1);
258                assert!(removed.is_empty());
259            }
260            SettledEvent::Overflow => panic!("expected batch"),
261        }
262
263        handle.await.unwrap();
264    }
265
266    #[tokio::test]
267    async fn modify_then_remove_tracks_as_removed() {
268        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
269        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
270
271        let buffer = SettleBuffer::new(Duration::from_millis(20));
272        let handle = tokio::spawn(async move {
273            buffer.run(raw_rx, settled_tx).await;
274        });
275
276        let path = NormalizedPath::new("temp.c");
277        raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
278        raw_tx.send(WatchEvent::Removed(path)).unwrap();
279        drop(raw_tx);
280
281        let event = settled_rx.recv().await.unwrap();
282        match event {
283            SettledEvent::Batch { changed, removed } => {
284                assert!(changed.is_empty());
285                assert_eq!(removed.len(), 1);
286            }
287            SettledEvent::Overflow => panic!("expected batch"),
288        }
289
290        handle.await.unwrap();
291    }
292
293    #[tokio::test]
294    async fn remove_then_create_tracks_as_modified() {
295        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
296        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
297
298        let buffer = SettleBuffer::new(Duration::from_millis(20));
299        let handle = tokio::spawn(async move {
300            buffer.run(raw_rx, settled_tx).await;
301        });
302
303        let path = NormalizedPath::new("replaced.c");
304        raw_tx.send(WatchEvent::Removed(path.clone())).unwrap();
305        raw_tx.send(WatchEvent::Created(path)).unwrap();
306        drop(raw_tx);
307
308        let event = settled_rx.recv().await.unwrap();
309        match event {
310            SettledEvent::Batch { changed, removed } => {
311                assert_eq!(changed.len(), 1);
312                assert!(changed.contains(&NormalizedPath::new("replaced.c")));
313                assert!(removed.is_empty());
314            }
315            SettledEvent::Overflow => panic!("expected batch"),
316        }
317
318        handle.await.unwrap();
319    }
320
321    #[tokio::test]
322    async fn rename_becomes_remove_and_modify() {
323        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
324        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
325
326        let buffer = SettleBuffer::new(Duration::from_millis(20));
327        let handle = tokio::spawn(async move {
328            buffer.run(raw_rx, settled_tx).await;
329        });
330
331        raw_tx
332            .send(WatchEvent::Renamed {
333                from: "old.c".into(),
334                to: "new.c".into(),
335            })
336            .unwrap();
337        drop(raw_tx);
338
339        let event = settled_rx.recv().await.unwrap();
340        match event {
341            SettledEvent::Batch { changed, removed } => {
342                assert_eq!(changed.len(), 1);
343                assert!(changed.contains(&NormalizedPath::new("new.c")));
344                assert_eq!(removed.len(), 1);
345                assert!(removed.contains(&NormalizedPath::new("old.c")));
346            }
347            SettledEvent::Overflow => panic!("expected batch"),
348        }
349
350        handle.await.unwrap();
351    }
352
353    #[tokio::test]
354    async fn overflow_clears_pending_and_emits_immediately() {
355        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
356        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
357
358        let buffer = SettleBuffer::new(Duration::from_millis(50));
359        let handle = tokio::spawn(async move {
360            buffer.run(raw_rx, settled_tx).await;
361        });
362
363        // Send some events, then overflow.
364        raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
365        raw_tx.send(WatchEvent::Overflow).unwrap();
366        drop(raw_tx);
367
368        let event = settled_rx.recv().await.unwrap();
369        assert!(matches!(event, SettledEvent::Overflow));
370
371        handle.await.unwrap();
372    }
373
374    #[tokio::test]
375    async fn error_events_trigger_overflow() {
376        // On Windows, ReadDirectoryChangesW buffer overflow and watcher death
377        // arrive as errors from the notify crate. We treat them as overflow to
378        // force a full re-stat of all cached entries.
379        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
380        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
381
382        let buffer = SettleBuffer::new(Duration::from_millis(20));
383        let handle = tokio::spawn(async move {
384            buffer.run(raw_rx, settled_tx).await;
385        });
386
387        raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
388        raw_tx
389            .send(WatchEvent::Error("some error".to_string()))
390            .unwrap();
391        drop(raw_tx);
392
393        // Error should trigger overflow, discarding the pending Modified event.
394        let event = settled_rx.recv().await.unwrap();
395        assert!(matches!(event, SettledEvent::Overflow));
396
397        handle.await.unwrap();
398    }
399
400    #[tokio::test]
401    async fn default_window_creates_buffer() {
402        let buffer = SettleBuffer::default_window();
403        // Just verify it can run without panicking.
404        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
405        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
406
407        let handle = tokio::spawn(async move {
408            buffer.run(raw_rx, settled_tx).await;
409        });
410
411        raw_tx.send(WatchEvent::Modified("x.c".into())).unwrap();
412        drop(raw_tx);
413
414        let event = settled_rx.recv().await.unwrap();
415        assert!(matches!(event, SettledEvent::Batch { .. }));
416        handle.await.unwrap();
417    }
418
419    #[tokio::test]
420    async fn multiple_overflows_in_sequence() {
421        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
422        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
423
424        let buffer = SettleBuffer::new(Duration::from_millis(20));
425        let handle = tokio::spawn(async move {
426            buffer.run(raw_rx, settled_tx).await;
427        });
428
429        raw_tx.send(WatchEvent::Overflow).unwrap();
430        raw_tx.send(WatchEvent::Overflow).unwrap();
431        raw_tx.send(WatchEvent::Overflow).unwrap();
432        drop(raw_tx);
433
434        // Should get at least one overflow event.
435        let mut overflow_count = 0;
436        while let Some(event) = settled_rx.recv().await {
437            if matches!(event, SettledEvent::Overflow) {
438                overflow_count += 1;
439            }
440        }
441        assert!(overflow_count >= 1);
442        handle.await.unwrap();
443    }
444
445    #[tokio::test]
446    async fn overflow_then_normal_events() {
447        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
448        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
449
450        let buffer = SettleBuffer::new(Duration::from_millis(20));
451        let handle = tokio::spawn(async move {
452            buffer.run(raw_rx, settled_tx).await;
453        });
454
455        raw_tx.send(WatchEvent::Overflow).unwrap();
456        raw_tx.send(WatchEvent::Modified("after.c".into())).unwrap();
457        drop(raw_tx);
458
459        let mut saw_overflow = false;
460        let mut saw_batch = false;
461        while let Some(event) = settled_rx.recv().await {
462            match event {
463                SettledEvent::Overflow => saw_overflow = true,
464                SettledEvent::Batch { changed, .. } => {
465                    assert!(changed.contains(&NormalizedPath::new("after.c")));
466                    saw_batch = true;
467                }
468            }
469        }
470        assert!(saw_overflow);
471        assert!(saw_batch);
472        handle.await.unwrap();
473    }
474
475    #[tokio::test]
476    async fn large_batch_coalesces() {
477        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
478        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
479
480        let buffer = SettleBuffer::new(Duration::from_millis(50));
481        let handle = tokio::spawn(async move {
482            buffer.run(raw_rx, settled_tx).await;
483        });
484
485        for i in 0..200 {
486            raw_tx
487                .send(WatchEvent::Modified(format!("src/file_{i}.c").into()))
488                .unwrap();
489        }
490        drop(raw_tx);
491
492        // Collect all batches — total changed files should be 200.
493        let mut total_changed = 0;
494        while let Some(event) = settled_rx.recv().await {
495            if let SettledEvent::Batch { changed, .. } = event {
496                total_changed += changed.len();
497            }
498        }
499        assert_eq!(total_changed, 200);
500        handle.await.unwrap();
501    }
502
503    #[tokio::test]
504    async fn mixed_event_types_in_burst() {
505        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
506        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
507
508        let buffer = SettleBuffer::new(Duration::from_millis(20));
509        let handle = tokio::spawn(async move {
510            buffer.run(raw_rx, settled_tx).await;
511        });
512
513        raw_tx.send(WatchEvent::Created("new.c".into())).unwrap();
514        raw_tx.send(WatchEvent::Modified("edit.c".into())).unwrap();
515        raw_tx.send(WatchEvent::Removed("gone.c".into())).unwrap();
516        raw_tx
517            .send(WatchEvent::Renamed {
518                from: "old.c".into(),
519                to: "renamed.c".into(),
520            })
521            .unwrap();
522        drop(raw_tx);
523
524        let event = settled_rx.recv().await.unwrap();
525        match event {
526            SettledEvent::Batch { changed, removed } => {
527                // new.c (Created), edit.c (Modified), renamed.c (from Renamed.to)
528                assert_eq!(changed.len(), 3);
529                // gone.c (Removed), old.c (from Renamed.from)
530                assert_eq!(removed.len(), 2);
531            }
532            SettledEvent::Overflow => panic!("expected batch"),
533        }
534        handle.await.unwrap();
535    }
536
537    #[tokio::test]
538    async fn channel_close_mid_coalesce_flushes() {
539        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
540        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
541
542        let buffer = SettleBuffer::new(Duration::from_millis(500)); // Long window
543        let handle = tokio::spawn(async move {
544            buffer.run(raw_rx, settled_tx).await;
545        });
546
547        // Send events then close channel before settle window elapses.
548        raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
549        raw_tx.send(WatchEvent::Modified("b.c".into())).unwrap();
550        drop(raw_tx); // Close before 500ms window
551
552        let event = settled_rx.recv().await.unwrap();
553        match event {
554            SettledEvent::Batch { changed, .. } => {
555                assert_eq!(changed.len(), 2);
556            }
557            SettledEvent::Overflow => panic!("expected batch"),
558        }
559        handle.await.unwrap();
560    }
561
562    #[tokio::test]
563    async fn overflow_with_empty_pending() {
564        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
565        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
566
567        let buffer = SettleBuffer::new(Duration::from_millis(20));
568        let handle = tokio::spawn(async move {
569            buffer.run(raw_rx, settled_tx).await;
570        });
571
572        // Overflow with no prior events.
573        raw_tx.send(WatchEvent::Overflow).unwrap();
574        drop(raw_tx);
575
576        let event = settled_rx.recv().await.unwrap();
577        assert!(matches!(event, SettledEvent::Overflow));
578        handle.await.unwrap();
579    }
580
581    #[tokio::test]
582    async fn empty_close_produces_no_output() {
583        let (raw_tx, raw_rx) = mpsc::unbounded_channel();
584        let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
585
586        let buffer = SettleBuffer::new(Duration::from_millis(20));
587        let handle = tokio::spawn(async move {
588            buffer.run(raw_rx, settled_tx).await;
589        });
590
591        drop(raw_tx);
592
593        // Should get None (channel closed, no events).
594        let event = settled_rx.recv().await;
595        assert!(event.is_none());
596
597        handle.await.unwrap();
598    }
599}