Skip to main content

dactor_test_harness/
events.rs

1use crate::protocol::NodeEvent;
2use std::time::Duration;
3
4/// Client-side event stream wrapper for subscribing to node events.
5pub struct EventStream {
6    inner: tonic::Streaming<NodeEvent>,
7}
8
9impl EventStream {
10    pub fn new(inner: tonic::Streaming<NodeEvent>) -> Self {
11        Self { inner }
12    }
13
14    /// Wait for the next event, with timeout.
15    pub async fn next_event(&mut self, timeout: Duration) -> Option<NodeEvent> {
16        match tokio::time::timeout(timeout, self.inner.message()).await {
17            Ok(Ok(Some(event))) => Some(event),
18            _ => None,
19        }
20    }
21
22    /// Wait for an event matching a predicate, with timeout.
23    pub async fn expect<F>(&mut self, predicate: F, timeout: Duration) -> Option<NodeEvent>
24    where
25        F: Fn(&NodeEvent) -> bool,
26    {
27        let deadline = tokio::time::Instant::now() + timeout;
28        loop {
29            let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
30            if remaining.is_zero() {
31                return None;
32            }
33            match self.next_event(remaining).await {
34                Some(event) if predicate(&event) => return Some(event),
35                Some(_) => continue,
36                None => return None,
37            }
38        }
39    }
40}