atomr_testkit/
event_filter.rs1use std::any::Any;
5use std::sync::atomic::{AtomicUsize, Ordering};
6use std::sync::Arc;
7use std::time::Duration;
8
9use atomr_core::event::{EventStream, Subscription};
10
11pub struct EventFilter {
12 matches: Arc<AtomicUsize>,
13 _sub: Subscription,
14}
15
16impl EventFilter {
17 pub fn new<T: Any + Send + Sync + 'static, F>(stream: &EventStream, predicate: F) -> Self
18 where
19 F: Fn(&T) -> bool + Send + Sync + 'static,
20 {
21 let matches = Arc::new(AtomicUsize::new(0));
22 let c = matches.clone();
23 let sub = stream.subscribe(move |v: &T| {
24 if predicate(v) {
25 c.fetch_add(1, Ordering::Relaxed);
26 }
27 });
28 Self { matches, _sub: sub }
29 }
30
31 pub fn count(&self) -> usize {
32 self.matches.load(Ordering::Relaxed)
33 }
34
35 pub async fn await_count(&self, n: usize, timeout: Duration) -> bool {
36 let deadline = tokio::time::Instant::now() + timeout;
37 while tokio::time::Instant::now() < deadline {
38 if self.count() >= n {
39 return true;
40 }
41 tokio::time::sleep(Duration::from_millis(5)).await;
42 }
43 false
44 }
45}
46
47#[cfg(test)]
48mod tests {
49 use super::*;
50
51 #[tokio::test]
52 async fn filter_counts_matches() {
53 let bus = EventStream::new();
54 let f = EventFilter::new::<u32, _>(&bus, |v| *v > 5);
55 bus.publish(1u32);
56 bus.publish(10u32);
57 bus.publish(7u32);
58 assert!(f.await_count(2, Duration::from_millis(100)).await);
59 }
60}