dactor_test_harness/
events.rs1use crate::protocol::NodeEvent;
2use std::time::Duration;
3
4pub struct EventStream {
6 inner: tonic::Streaming<NodeEvent>,
7}
8
9impl EventStream {
10 pub fn new(inner: tonic::Streaming<NodeEvent>) -> Self {
11 Self { inner }
12 }
13
14 pub async fn next_event(&mut self, timeout: Duration) -> Option<NodeEvent> {
16 match tokio::time::timeout(timeout, self.inner.message()).await {
17 Ok(Ok(Some(event))) => Some(event),
18 _ => None,
19 }
20 }
21
22 pub async fn expect<F>(&mut self, predicate: F, timeout: Duration) -> Option<NodeEvent>
24 where
25 F: Fn(&NodeEvent) -> bool,
26 {
27 let deadline = tokio::time::Instant::now() + timeout;
28 loop {
29 let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
30 if remaining.is_zero() {
31 return None;
32 }
33 match self.next_event(remaining).await {
34 Some(event) if predicate(&event) => return Some(event),
35 Some(_) => continue,
36 None => return None,
37 }
38 }
39 }
40}