use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::mpsc;
use zccache_core::NormalizedPath;
use crate::WatchEvent;
#[derive(Debug, Clone)]
pub enum SettledEvent {
Batch {
changed: Vec<NormalizedPath>,
removed: Vec<NormalizedPath>,
},
Overflow,
}
#[derive(Debug)]
pub struct SettleBuffer {
settle_window: Duration,
max_wait: Duration,
}
#[derive(Debug, Clone, Copy)]
enum ChangeKind {
Modified,
Removed,
}
impl SettleBuffer {
#[must_use]
pub fn new(settle_window: Duration) -> Self {
Self {
settle_window,
max_wait: Duration::from_millis(50),
}
}
#[must_use]
pub fn default_window() -> Self {
Self::new(Duration::from_millis(50))
}
pub async fn run(
&self,
mut rx: mpsc::UnboundedReceiver<WatchEvent>,
tx: mpsc::UnboundedSender<SettledEvent>,
) {
let mut pending: HashMap<NormalizedPath, ChangeKind> = HashMap::new();
loop {
let event = match rx.recv().await {
Some(e) => e,
None => {
if !pending.is_empty() {
let _ = tx.send(Self::drain(&mut pending));
}
return;
}
};
if matches!(event, WatchEvent::Overflow | WatchEvent::Error(_)) {
pending.clear();
let _ = tx.send(SettledEvent::Overflow);
continue;
}
Self::apply_event(&mut pending, event);
let deadline = tokio::time::Instant::now() + self.max_wait;
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let wait = self.settle_window.min(remaining);
if wait.is_zero() {
if !pending.is_empty() {
let _ = tx.send(Self::drain(&mut pending));
}
break;
}
match tokio::time::timeout(wait, rx.recv()).await {
Ok(Some(WatchEvent::Overflow | WatchEvent::Error(_))) => {
pending.clear();
let _ = tx.send(SettledEvent::Overflow);
break;
}
Ok(Some(event)) => {
Self::apply_event(&mut pending, event);
}
Ok(None) => {
if !pending.is_empty() {
let _ = tx.send(Self::drain(&mut pending));
}
return;
}
Err(_timeout) => {
if !pending.is_empty() {
let _ = tx.send(Self::drain(&mut pending));
}
break;
}
}
}
}
}
fn apply_event(pending: &mut HashMap<NormalizedPath, ChangeKind>, event: WatchEvent) {
match event {
WatchEvent::Modified(p) | WatchEvent::Created(p) => {
pending.insert(p, ChangeKind::Modified);
}
WatchEvent::Removed(p) => {
pending.insert(p, ChangeKind::Removed);
}
WatchEvent::Renamed { from, to } => {
pending.insert(from, ChangeKind::Removed);
pending.insert(to, ChangeKind::Modified);
}
WatchEvent::Overflow | WatchEvent::Error(_) => {
}
}
}
fn drain(pending: &mut HashMap<NormalizedPath, ChangeKind>) -> SettledEvent {
let mut changed = Vec::new();
let mut removed = Vec::new();
for (path, kind) in pending.drain() {
match kind {
ChangeKind::Modified => changed.push(path),
ChangeKind::Removed => removed.push(path),
}
}
SettledEvent::Batch { changed, removed }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn single_event_settles() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 1);
assert!(removed.is_empty());
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn rapid_events_coalesce_into_one_batch() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(50));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
for i in 0..5 {
raw_tx
.send(WatchEvent::Modified(format!("file_{i}.c").into()))
.unwrap();
}
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 5);
assert!(removed.is_empty());
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn same_file_deduplicates() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
let path = NormalizedPath::new("hot.c");
raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
raw_tx.send(WatchEvent::Modified(path)).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 1);
assert!(removed.is_empty());
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn modify_then_remove_tracks_as_removed() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
let path = NormalizedPath::new("temp.c");
raw_tx.send(WatchEvent::Modified(path.clone())).unwrap();
raw_tx.send(WatchEvent::Removed(path)).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert!(changed.is_empty());
assert_eq!(removed.len(), 1);
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn remove_then_create_tracks_as_modified() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
let path = NormalizedPath::new("replaced.c");
raw_tx.send(WatchEvent::Removed(path.clone())).unwrap();
raw_tx.send(WatchEvent::Created(path)).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 1);
assert!(changed.contains(&NormalizedPath::new("replaced.c")));
assert!(removed.is_empty());
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn rename_becomes_remove_and_modify() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx
.send(WatchEvent::Renamed {
from: "old.c".into(),
to: "new.c".into(),
})
.unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 1);
assert!(changed.contains(&NormalizedPath::new("new.c")));
assert_eq!(removed.len(), 1);
assert!(removed.contains(&NormalizedPath::new("old.c")));
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn overflow_clears_pending_and_emits_immediately() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(50));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
raw_tx.send(WatchEvent::Overflow).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
assert!(matches!(event, SettledEvent::Overflow));
handle.await.unwrap();
}
#[tokio::test]
async fn error_events_trigger_overflow() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
raw_tx
.send(WatchEvent::Error("some error".to_string()))
.unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
assert!(matches!(event, SettledEvent::Overflow));
handle.await.unwrap();
}
#[tokio::test]
async fn default_window_creates_buffer() {
let buffer = SettleBuffer::default_window();
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Modified("x.c".into())).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
assert!(matches!(event, SettledEvent::Batch { .. }));
handle.await.unwrap();
}
#[tokio::test]
async fn multiple_overflows_in_sequence() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Overflow).unwrap();
raw_tx.send(WatchEvent::Overflow).unwrap();
raw_tx.send(WatchEvent::Overflow).unwrap();
drop(raw_tx);
let mut overflow_count = 0;
while let Some(event) = settled_rx.recv().await {
if matches!(event, SettledEvent::Overflow) {
overflow_count += 1;
}
}
assert!(overflow_count >= 1);
handle.await.unwrap();
}
#[tokio::test]
async fn overflow_then_normal_events() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Overflow).unwrap();
raw_tx.send(WatchEvent::Modified("after.c".into())).unwrap();
drop(raw_tx);
let mut saw_overflow = false;
let mut saw_batch = false;
while let Some(event) = settled_rx.recv().await {
match event {
SettledEvent::Overflow => saw_overflow = true,
SettledEvent::Batch { changed, .. } => {
assert!(changed.contains(&NormalizedPath::new("after.c")));
saw_batch = true;
}
}
}
assert!(saw_overflow);
assert!(saw_batch);
handle.await.unwrap();
}
#[tokio::test]
async fn large_batch_coalesces() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(50));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
for i in 0..200 {
raw_tx
.send(WatchEvent::Modified(format!("src/file_{i}.c").into()))
.unwrap();
}
drop(raw_tx);
let mut total_changed = 0;
while let Some(event) = settled_rx.recv().await {
if let SettledEvent::Batch { changed, .. } = event {
total_changed += changed.len();
}
}
assert_eq!(total_changed, 200);
handle.await.unwrap();
}
#[tokio::test]
async fn mixed_event_types_in_burst() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Created("new.c".into())).unwrap();
raw_tx.send(WatchEvent::Modified("edit.c".into())).unwrap();
raw_tx.send(WatchEvent::Removed("gone.c".into())).unwrap();
raw_tx
.send(WatchEvent::Renamed {
from: "old.c".into(),
to: "renamed.c".into(),
})
.unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, removed } => {
assert_eq!(changed.len(), 3);
assert_eq!(removed.len(), 2);
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn channel_close_mid_coalesce_flushes() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(500)); let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Modified("a.c".into())).unwrap();
raw_tx.send(WatchEvent::Modified("b.c".into())).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
match event {
SettledEvent::Batch { changed, .. } => {
assert_eq!(changed.len(), 2);
}
SettledEvent::Overflow => panic!("expected batch"),
}
handle.await.unwrap();
}
#[tokio::test]
async fn overflow_with_empty_pending() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
raw_tx.send(WatchEvent::Overflow).unwrap();
drop(raw_tx);
let event = settled_rx.recv().await.unwrap();
assert!(matches!(event, SettledEvent::Overflow));
handle.await.unwrap();
}
#[tokio::test]
async fn empty_close_produces_no_output() {
let (raw_tx, raw_rx) = mpsc::unbounded_channel();
let (settled_tx, mut settled_rx) = mpsc::unbounded_channel();
let buffer = SettleBuffer::new(Duration::from_millis(20));
let handle = tokio::spawn(async move {
buffer.run(raw_rx, settled_tx).await;
});
drop(raw_tx);
let event = settled_rx.recv().await;
assert!(event.is_none());
handle.await.unwrap();
}
}