1use std::collections::HashMap;
11use std::time::Duration;
12use tokio::sync::mpsc;
13use zccache_core::NormalizedPath;
14
15use crate::WatchEvent;
16
17#[derive(Debug, Clone)]
19pub enum SettledEvent {
20 Batch {
22 changed: Vec<NormalizedPath>,
23 removed: Vec<NormalizedPath>,
24 },
25 Overflow,
27}
28
29#[derive(Debug)]
36pub struct SettleBuffer {
37 settle_window: Duration,
38 max_wait: Duration,
42}
43
44#[derive(Debug, Clone, Copy)]
46enum ChangeKind {
47 Modified,
48 Removed,
49}
50
51impl SettleBuffer {
52 #[must_use]
54 pub fn new(settle_window: Duration) -> Self {
55 Self {
56 settle_window,
57 max_wait: Duration::from_millis(50),
58 }
59 }
60
61 #[must_use]
63 pub fn default_window() -> Self {
64 Self::new(Duration::from_millis(50))
65 }
66
67 pub async fn run(
74 &self,
75 mut rx: mpsc::UnboundedReceiver<WatchEvent>,
76 tx: mpsc::UnboundedSender<SettledEvent>,
77 ) {
78 let mut pending: HashMap<NormalizedPath, ChangeKind> = HashMap::new();
79
80 loop {
81 let event = match rx.recv().await {
83 Some(e) => e,
84 None => {
85 if !pending.is_empty() {
87 let _ = tx.send(Self::drain(&mut pending));
88 }
89 return;
90 }
91 };
92
93 if matches!(event, WatchEvent::Overflow | WatchEvent::Error(_)) {
100 pending.clear();
101 let _ = tx.send(SettledEvent::Overflow);
102 continue;
103 }
104
105 Self::apply_event(&mut pending, event);
106
107 let deadline = tokio::time::Instant::now() + self.max_wait;
111 loop {
112 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
113 let wait = self.settle_window.min(remaining);
114 if wait.is_zero() {
115 if !pending.is_empty() {
117 let _ = tx.send(Self::drain(&mut pending));
118 }
119 break;
120 }
121 match tokio::time::timeout(wait, rx.recv()).await {
122 Ok(Some(WatchEvent::Overflow | WatchEvent::Error(_))) => {
123 pending.clear();
124 let _ = tx.send(SettledEvent::Overflow);
125 break;
126 }
127 Ok(Some(event)) => {
128 Self::apply_event(&mut pending, event);
129 }
130 Ok(None) => {
131 if !pending.is_empty() {
133 let _ = tx.send(Self::drain(&mut pending));
134 }
135 return;
136 }
137 Err(_timeout) => {
138 if !pending.is_empty() {
140 let _ = tx.send(Self::drain(&mut pending));
141 }
142 break;
143 }
144 }
145 }
146 }
147 }
148
149 fn apply_event(pending: &mut HashMap<NormalizedPath, ChangeKind>, event: WatchEvent) {
150 match event {
151 WatchEvent::Modified(p) | WatchEvent::Created(p) => {
152 pending.insert(p, ChangeKind::Modified);
153 }
154 WatchEvent::Removed(p) => {
155 pending.insert(p, ChangeKind::Removed);
156 }
157 WatchEvent::Renamed { from, to } => {
158 pending.insert(from, ChangeKind::Removed);
159 pending.insert(to, ChangeKind::Modified);
160 }
161 WatchEvent::Overflow | WatchEvent::Error(_) => {
162 }
164 }
165 }
166
167 fn drain(pending: &mut HashMap<NormalizedPath, ChangeKind>) -> SettledEvent {
168 let mut changed = Vec::new();
169 let mut removed = Vec::new();
170 for (path, kind) in pending.drain() {
171 match kind {
172 ChangeKind::Modified => changed.push(path),
173 ChangeKind::Removed => removed.push(path),
174 }
175 }
176 SettledEvent::Batch { changed, removed }
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183
184 #[tokio::test]
185 async fn single_event_settles() {
186 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
187 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
188
189 let buffer = SettleBuffer::new(Duration::from_millis(20));
190 let handle = tokio::spawn(async move {
191 buffer.run(raw_rx, settled_tx).await;
192 });
193
194 raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
195 drop(raw_tx);
196
197 let event = settled_rx.recv().await.unwrap();
198 match event {
199 SettledEvent::Batch { changed, removed } => {
200 assert_eq!(changed.len(), 1);
201 assert!(removed.is_empty());
202 }
203 SettledEvent::Overflow => panic!("expected batch"),
204 }
205
206 handle.await.unwrap();
207 }
208
209 #[tokio::test]
210 async fn rapid_events_coalesce_into_one_batch() {
211 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
212 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
213
214 let buffer = SettleBuffer::new(Duration::from_millis(50));
215 let handle = tokio::spawn(async move {
216 buffer.run(raw_rx, settled_tx).await;
217 });
218
219 for i in 0..5 {
220 raw_tx
221 .send(WatchEvent::Modified(format!("file_{i}.c").into()))
222 .unwrap();
223 }
224 drop(raw_tx);
225
226 let event = settled_rx.recv().await.unwrap();
227 match event {
228 SettledEvent::Batch { changed, removed } => {
229 assert_eq!(changed.len(), 5);
230 assert!(removed.is_empty());
231 }
232 SettledEvent::Overflow => panic!("expected batch"),
233 }
234
235 handle.await.unwrap();
236 }
237
238 #[tokio::test]
239 async fn same_file_deduplicates() {
240 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
241 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
242
243 let buffer = SettleBuffer::new(Duration::from_millis(20));
244 let handle = tokio::spawn(async move {
245 buffer.run(raw_rx, settled_tx).await;
246 });
247
248 let path = NormalizedPath::new("hot.c");
249 raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
250 raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
251 raw_tx.send(WatchEvent::Modified(path)).unwrap();
252 drop(raw_tx);
253
254 let event = settled_rx.recv().await.unwrap();
255 match event {
256 SettledEvent::Batch { changed, removed } => {
257 assert_eq!(changed.len(), 1);
258 assert!(removed.is_empty());
259 }
260 SettledEvent::Overflow => panic!("expected batch"),
261 }
262
263 handle.await.unwrap();
264 }
265
266 #[tokio::test]
267 async fn modify_then_remove_tracks_as_removed() {
268 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
269 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
270
271 let buffer = SettleBuffer::new(Duration::from_millis(20));
272 let handle = tokio::spawn(async move {
273 buffer.run(raw_rx, settled_tx).await;
274 });
275
276 let path = NormalizedPath::new("temp.c");
277 raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
278 raw_tx.send(WatchEvent::Removed(path)).unwrap();
279 drop(raw_tx);
280
281 let event = settled_rx.recv().await.unwrap();
282 match event {
283 SettledEvent::Batch { changed, removed } => {
284 assert!(changed.is_empty());
285 assert_eq!(removed.len(), 1);
286 }
287 SettledEvent::Overflow => panic!("expected batch"),
288 }
289
290 handle.await.unwrap();
291 }
292
293 #[tokio::test]
294 async fn remove_then_create_tracks_as_modified() {
295 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
296 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
297
298 let buffer = SettleBuffer::new(Duration::from_millis(20));
299 let handle = tokio::spawn(async move {
300 buffer.run(raw_rx, settled_tx).await;
301 });
302
303 let path = NormalizedPath::new("replaced.c");
304 raw_tx.send(WatchEvent::Removed(path.clone())).unwrap();
305 raw_tx.send(WatchEvent::Created(path)).unwrap();
306 drop(raw_tx);
307
308 let event = settled_rx.recv().await.unwrap();
309 match event {
310 SettledEvent::Batch { changed, removed } => {
311 assert_eq!(changed.len(), 1);
312 assert!(changed.contains(&NormalizedPath::new("replaced.c")));
313 assert!(removed.is_empty());
314 }
315 SettledEvent::Overflow => panic!("expected batch"),
316 }
317
318 handle.await.unwrap();
319 }
320
321 #[tokio::test]
322 async fn rename_becomes_remove_and_modify() {
323 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
324 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
325
326 let buffer = SettleBuffer::new(Duration::from_millis(20));
327 let handle = tokio::spawn(async move {
328 buffer.run(raw_rx, settled_tx).await;
329 });
330
331 raw_tx
332 .send(WatchEvent::Renamed {
333 from: "old.c".into(),
334 to: "new.c".into(),
335 })
336 .unwrap();
337 drop(raw_tx);
338
339 let event = settled_rx.recv().await.unwrap();
340 match event {
341 SettledEvent::Batch { changed, removed } => {
342 assert_eq!(changed.len(), 1);
343 assert!(changed.contains(&NormalizedPath::new("new.c")));
344 assert_eq!(removed.len(), 1);
345 assert!(removed.contains(&NormalizedPath::new("old.c")));
346 }
347 SettledEvent::Overflow => panic!("expected batch"),
348 }
349
350 handle.await.unwrap();
351 }
352
353 #[tokio::test]
354 async fn overflow_clears_pending_and_emits_immediately() {
355 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
356 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
357
358 let buffer = SettleBuffer::new(Duration::from_millis(50));
359 let handle = tokio::spawn(async move {
360 buffer.run(raw_rx, settled_tx).await;
361 });
362
363 raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
365 raw_tx.send(WatchEvent::Overflow).unwrap();
366 drop(raw_tx);
367
368 let event = settled_rx.recv().await.unwrap();
369 assert!(matches!(event, SettledEvent::Overflow));
370
371 handle.await.unwrap();
372 }
373
374 #[tokio::test]
375 async fn error_events_trigger_overflow() {
376 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
380 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
381
382 let buffer = SettleBuffer::new(Duration::from_millis(20));
383 let handle = tokio::spawn(async move {
384 buffer.run(raw_rx, settled_tx).await;
385 });
386
387 raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
388 raw_tx
389 .send(WatchEvent::Error("some error".to_string()))
390 .unwrap();
391 drop(raw_tx);
392
393 let event = settled_rx.recv().await.unwrap();
395 assert!(matches!(event, SettledEvent::Overflow));
396
397 handle.await.unwrap();
398 }
399
400 #[tokio::test]
401 async fn default_window_creates_buffer() {
402 let buffer = SettleBuffer::default_window();
403 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
405 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
406
407 let handle = tokio::spawn(async move {
408 buffer.run(raw_rx, settled_tx).await;
409 });
410
411 raw_tx.send(WatchEvent::Modified("x.c".into())).unwrap();
412 drop(raw_tx);
413
414 let event = settled_rx.recv().await.unwrap();
415 assert!(matches!(event, SettledEvent::Batch { .. }));
416 handle.await.unwrap();
417 }
418
419 #[tokio::test]
420 async fn multiple_overflows_in_sequence() {
421 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
422 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
423
424 let buffer = SettleBuffer::new(Duration::from_millis(20));
425 let handle = tokio::spawn(async move {
426 buffer.run(raw_rx, settled_tx).await;
427 });
428
429 raw_tx.send(WatchEvent::Overflow).unwrap();
430 raw_tx.send(WatchEvent::Overflow).unwrap();
431 raw_tx.send(WatchEvent::Overflow).unwrap();
432 drop(raw_tx);
433
434 let mut overflow_count = 0;
436 while let Some(event) = settled_rx.recv().await {
437 if matches!(event, SettledEvent::Overflow) {
438 overflow_count += 1;
439 }
440 }
441 assert!(overflow_count >= 1);
442 handle.await.unwrap();
443 }
444
445 #[tokio::test]
446 async fn overflow_then_normal_events() {
447 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
448 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
449
450 let buffer = SettleBuffer::new(Duration::from_millis(20));
451 let handle = tokio::spawn(async move {
452 buffer.run(raw_rx, settled_tx).await;
453 });
454
455 raw_tx.send(WatchEvent::Overflow).unwrap();
456 raw_tx.send(WatchEvent::Modified("after.c".into())).unwrap();
457 drop(raw_tx);
458
459 let mut saw_overflow = false;
460 let mut saw_batch = false;
461 while let Some(event) = settled_rx.recv().await {
462 match event {
463 SettledEvent::Overflow => saw_overflow = true,
464 SettledEvent::Batch { changed, .. } => {
465 assert!(changed.contains(&NormalizedPath::new("after.c")));
466 saw_batch = true;
467 }
468 }
469 }
470 assert!(saw_overflow);
471 assert!(saw_batch);
472 handle.await.unwrap();
473 }
474
475 #[tokio::test]
476 async fn large_batch_coalesces() {
477 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
478 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
479
480 let buffer = SettleBuffer::new(Duration::from_millis(50));
481 let handle = tokio::spawn(async move {
482 buffer.run(raw_rx, settled_tx).await;
483 });
484
485 for i in 0..200 {
486 raw_tx
487 .send(WatchEvent::Modified(format!("src/file_{i}.c").into()))
488 .unwrap();
489 }
490 drop(raw_tx);
491
492 let mut total_changed = 0;
494 while let Some(event) = settled_rx.recv().await {
495 if let SettledEvent::Batch { changed, .. } = event {
496 total_changed += changed.len();
497 }
498 }
499 assert_eq!(total_changed, 200);
500 handle.await.unwrap();
501 }
502
503 #[tokio::test]
504 async fn mixed_event_types_in_burst() {
505 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
506 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
507
508 let buffer = SettleBuffer::new(Duration::from_millis(20));
509 let handle = tokio::spawn(async move {
510 buffer.run(raw_rx, settled_tx).await;
511 });
512
513 raw_tx.send(WatchEvent::Created("new.c".into())).unwrap();
514 raw_tx.send(WatchEvent::Modified("edit.c".into())).unwrap();
515 raw_tx.send(WatchEvent::Removed("gone.c".into())).unwrap();
516 raw_tx
517 .send(WatchEvent::Renamed {
518 from: "old.c".into(),
519 to: "renamed.c".into(),
520 })
521 .unwrap();
522 drop(raw_tx);
523
524 let event = settled_rx.recv().await.unwrap();
525 match event {
526 SettledEvent::Batch { changed, removed } => {
527 assert_eq!(changed.len(), 3);
529 assert_eq!(removed.len(), 2);
531 }
532 SettledEvent::Overflow => panic!("expected batch"),
533 }
534 handle.await.unwrap();
535 }
536
537 #[tokio::test]
538 async fn channel_close_mid_coalesce_flushes() {
539 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
540 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
541
542 let buffer = SettleBuffer::new(Duration::from_millis(500)); let handle = tokio::spawn(async move {
544 buffer.run(raw_rx, settled_tx).await;
545 });
546
547 raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
549 raw_tx.send(WatchEvent::Modified("b.c".into())).unwrap();
550 drop(raw_tx); let event = settled_rx.recv().await.unwrap();
553 match event {
554 SettledEvent::Batch { changed, .. } => {
555 assert_eq!(changed.len(), 2);
556 }
557 SettledEvent::Overflow => panic!("expected batch"),
558 }
559 handle.await.unwrap();
560 }
561
562 #[tokio::test]
563 async fn overflow_with_empty_pending() {
564 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
565 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
566
567 let buffer = SettleBuffer::new(Duration::from_millis(20));
568 let handle = tokio::spawn(async move {
569 buffer.run(raw_rx, settled_tx).await;
570 });
571
572 raw_tx.send(WatchEvent::Overflow).unwrap();
574 drop(raw_tx);
575
576 let event = settled_rx.recv().await.unwrap();
577 assert!(matches!(event, SettledEvent::Overflow));
578 handle.await.unwrap();
579 }
580
581 #[tokio::test]
582 async fn empty_close_produces_no_output() {
583 let (raw_tx, raw_rx) = mpsc::unbounded_channel();
584 let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
585
586 let buffer = SettleBuffer::new(Duration::from_millis(20));
587 let handle = tokio::spawn(async move {
588 buffer.run(raw_rx, settled_tx).await;
589 });
590
591 drop(raw_tx);
592
593 let event = settled_rx.recv().await;
595 assert!(event.is_none());
596
597 handle.await.unwrap();
598 }
599}