Skip to main content

atomr_testkit/
event_filter.rs

1//! `EventFilter` — observes events on an `EventStream` and blocks until
2//! expected number of matches are seen.
3
4use std::any::Any;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use atomr_core::event::{EventStream, Subscription};
10
11pub struct EventFilter {
12    matches: Arc<AtomicUsize>,
13    _sub: Subscription,
14}
15
16impl EventFilter {
17    pub fn new<T: Any + Send + Sync + 'static, F>(stream: &EventStream, predicate: F) -> Self
18    where
19        F: Fn(&T) -> bool + Send + Sync + 'static,
20    {
21        let matches = Arc::new(AtomicUsize::new(0));
22        let c = matches.clone();
23        let sub = stream.subscribe(move |v: &T| {
24            if predicate(v) {
25                c.fetch_add(1, Ordering::Relaxed);
26            }
27        });
28        Self { matches, _sub: sub }
29    }
30
31    pub fn count(&self) -> usize {
32        self.matches.load(Ordering::Relaxed)
33    }
34
35    pub async fn await_count(&self, n: usize, timeout: Duration) -> bool {
36        let deadline = tokio::time::Instant::now() + timeout;
37        while tokio::time::Instant::now() < deadline {
38            if self.count() >= n {
39                return true;
40            }
41            tokio::time::sleep(Duration::from_millis(5)).await;
42        }
43        false
44    }
45}
46
47#[cfg(test)]
48mod tests {
49    use super::*;
50
51    #[tokio::test]
52    async fn filter_counts_matches() {
53        let bus = EventStream::new();
54        let f = EventFilter::new::<u32, _>(&bus, |v| *v > 5);
55        bus.publish(1u32);
56        bus.publish(10u32);
57        bus.publish(7u32);
58        assert!(f.await_count(2, Duration::from_millis(100)).await);
59    }
60}