Skip to main content

datum/
testkit.rs

1//! Stream testing probes built on Datum's normal runtime.
2//!
3//! The testkit is always available instead of feature-gated: it has no extra
4//! dependencies, adds no alternate execution path, and later parity work
5//! packages need it in ordinary crate tests to validate stream semantics.
6//!
7//! Probe semantics follow Datum's real pull-based runtime rather than Akka's
8//! push + batched-request model. In particular, terminal signals are observed
9//! only when downstream performs a pull, so [`TestSubscriberProbe::expect_complete`]
10//! and [`TestSubscriberProbe::expect_error`] require an outstanding credit.
11//! For empty or immediately failed sources, the expected pattern is
12//! `request(1)` and then `expect_complete()` / `expect_error()`. The testkit
13//! intentionally does not issue speculative pulls on behalf of the test,
14//! because that would consume elements the test never requested.
15//!
16//! Likewise, [`TestSubscriberProbe::cancel`] is cooperative with the pull loop:
17//! cancellation is noticed at the worker's next credit wait. If the worker is
18//! currently blocked inside upstream `next()`, it unblocks when upstream yields
19//! or when the upstream probe is dropped. [`TestPublisherProbe::drop`] fails
20//! open streams, so in probe-driven tests any such stall is bounded by the
21//! test's lifetime rather than leaking indefinitely.
22
23use std::{
24    collections::VecDeque,
25    fmt::Debug,
26    panic::panic_any,
27    sync::{Arc, Condvar, Mutex, MutexGuard},
28    time::{Duration, Instant},
29};
30
31use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
32use crate::{StreamError, StreamResult};
33
34const DEFAULT_PROBE_TIMEOUT: Duration = Duration::from_secs(3);
35
36pub struct TestSource;
37
38impl TestSource {
39    #[must_use]
40    pub fn probe<T: Send + 'static>() -> Source<T, TestPublisherProbe<T>> {
41        Source::from_materialized_factory(|_| {
42            let shared = Arc::new(SourceProbeShared::default());
43            let stream = Box::new(TestSourceStream {
44                shared: Arc::clone(&shared),
45                waiting_for_command: false,
46            }) as BoxStream<T>;
47            Ok((stream, TestPublisherProbe::new(shared)))
48        })
49    }
50}
51
52pub struct TestSink;
53
54impl TestSink {
55    #[must_use]
56    pub fn probe<T: Send + 'static>() -> Sink<T, TestSubscriberProbe<T>> {
57        Sink::from_runner(|mut input, materializer| {
58            let shared = Arc::new(SinkProbeShared::default());
59            let task_shared = Arc::clone(&shared);
60            let completion = materializer.spawn_stream(move |_cancelled| {
61                loop {
62                    task_shared.wait_for_request()?;
63                    match input.next() {
64                        Some(Ok(item)) => task_shared.push_event(SinkEvent::Next(item)),
65                        Some(Err(error)) => {
66                            task_shared.push_event(SinkEvent::Error(error.clone()));
67                            return Err(error);
68                        }
69                        None => {
70                            task_shared.push_event(SinkEvent::Complete);
71                            return Ok(NotUsed);
72                        }
73                    }
74                }
75            });
76            Ok(TestSubscriberProbe::new(shared, completion))
77        })
78    }
79}
80
81pub fn assert_next_eq<T>(actual: &T, expected: &T)
82where
83    T: Debug + PartialEq,
84{
85    assert_eq!(
86        actual, expected,
87        "expected next element {expected:?}, got {actual:?}"
88    );
89}
90
91pub fn assert_next_n_eq<T>(actual: &[T], expected: &[T])
92where
93    T: Debug + PartialEq,
94{
95    assert_eq!(
96        actual, expected,
97        "expected next elements {expected:?}, got {actual:?}"
98    );
99}
100
101pub struct TestPublisherProbe<T> {
102    shared: Arc<SourceProbeShared<T>>,
103    timeout: Duration,
104}
105
106impl<T> TestPublisherProbe<T> {
107    fn new(shared: Arc<SourceProbeShared<T>>) -> Self {
108        Self {
109            shared,
110            timeout: DEFAULT_PROBE_TIMEOUT,
111        }
112    }
113
114    pub fn set_timeout(&mut self, timeout: Duration) {
115        self.timeout = timeout;
116    }
117
118    pub fn send_next(&self, element: T) {
119        self.shared.enqueue(SourceCommand::Next(element));
120    }
121
122    pub fn send_complete(&self) {
123        self.shared.enqueue(SourceCommand::Complete);
124    }
125
126    pub fn send_error(&self, error: StreamError) {
127        self.shared.enqueue(SourceCommand::Error(error));
128    }
129
130    /// Waits for one downstream pull and returns the observed demand unit.
131    ///
132    /// In Datum's current pull model this reports one unit per `next()` call
133    /// from downstream, so it currently returns `1` for each observed demand
134    /// event instead of Akka-style batched `request(n)` totals. Tests must not
135    /// assume batching here; later buffering work may change pull granularity.
136    pub fn expect_request(&self) -> usize {
137        self.shared.expect_request(self.timeout)
138    }
139
140    pub fn expect_cancellation(&self) {
141        self.shared.expect_cancellation(self.timeout);
142    }
143}
144
145impl<T> Drop for TestPublisherProbe<T> {
146    fn drop(&mut self) {
147        self.shared.fail_if_open(StreamError::Failed(
148            "test source probe dropped before completion".to_owned(),
149        ));
150    }
151}
152
153pub struct TestSubscriberProbe<T> {
154    shared: Arc<SinkProbeShared<T>>,
155    timeout: Duration,
156    completion: Option<StreamCompletion<NotUsed>>,
157}
158
159impl<T> TestSubscriberProbe<T> {
160    fn new(shared: Arc<SinkProbeShared<T>>, completion: StreamCompletion<NotUsed>) -> Self {
161        Self {
162            shared,
163            timeout: DEFAULT_PROBE_TIMEOUT,
164            completion: Some(completion),
165        }
166    }
167
168    pub fn set_timeout(&mut self, timeout: Duration) {
169        self.timeout = timeout;
170    }
171
172    pub fn request(&self, n: usize) {
173        assert!(n > 0, "request count must be positive, got {n}");
174        self.shared.request(n);
175    }
176
177    pub fn expect_next(&self) -> T {
178        match self.shared.expect_event(self.timeout, "next element") {
179            SinkEvent::Next(item) => item,
180            SinkEvent::Complete => panic_any(format!(
181                "expected next element, got stream completion after waiting {:?}",
182                self.timeout
183            )),
184            SinkEvent::Error(error) => {
185                panic_any(format!("expected next element, got stream error {error:?}"))
186            }
187        }
188    }
189
190    pub fn assert_next(&self, expected: T)
191    where
192        T: Debug + PartialEq,
193    {
194        let actual = self.expect_next();
195        assert_next_eq(&actual, &expected);
196    }
197
198    pub fn expect_next_n(&self, n: usize) -> Vec<T> {
199        (0..n).map(|_| self.expect_next()).collect()
200    }
201
202    pub fn assert_next_n<I>(&self, expected: I)
203    where
204        T: Debug + PartialEq,
205        I: IntoIterator<Item = T>,
206    {
207        let expected: Vec<T> = expected.into_iter().collect();
208        let actual = self.expect_next_n(expected.len());
209        assert_next_n_eq(&actual, &expected);
210    }
211
212    /// Expects stream completion after a downstream pull has been issued.
213    ///
214    /// Datum's probes observe terminal signals through the normal pull loop, so
215    /// completion is not surfaced until a credit is outstanding. For empty
216    /// sources, call `request(1)` before `expect_complete()`.
217    pub fn expect_complete(&self) {
218        match self.shared.expect_event(self.timeout, "stream completion") {
219            SinkEvent::Complete => {}
220            SinkEvent::Next(_) => panic_any("expected stream completion, got next element"),
221            SinkEvent::Error(error) => panic_any(format!(
222                "expected stream completion, got stream error {error:?}"
223            )),
224        }
225    }
226
227    /// Expects a stream error after a downstream pull has been issued.
228    ///
229    /// Datum's probes observe terminal signals through the normal pull loop, so
230    /// an immediately failed source still requires `request(1)` before
231    /// `expect_error()`.
232    pub fn expect_error(&self) -> StreamError {
233        match self.shared.expect_event(self.timeout, "stream error") {
234            SinkEvent::Error(error) => error,
235            SinkEvent::Next(_) => panic_any("expected stream error, got next element"),
236            SinkEvent::Complete => panic_any("expected stream error, got stream completion"),
237        }
238    }
239
240    pub fn expect_no_message(&self, timeout: Duration) {
241        self.shared.expect_no_message(timeout);
242    }
243
244    /// Drains all remaining queued events until stream completion, returning
245    /// every collected value. Uses the probe's configured timeout for each
246    /// event wait. Panics if the stream fails instead of completing.
247    ///
248    /// This is load-robust: it drains every `Next` event before expecting
249    /// `Complete`, so it tolerates operators that may emit extra elements
250    /// under thread contention without the test needing to know the exact
251    /// count ahead of time.
252    #[must_use]
253    pub fn drain_until_complete(&self) -> Vec<T> {
254        self.request(usize::MAX / 2);
255        let mut values = Vec::new();
256        loop {
257            match self.shared.expect_event(self.timeout, "stream completion") {
258                SinkEvent::Next(item) => values.push(item),
259                SinkEvent::Complete => return values,
260                SinkEvent::Error(error) => panic_any(format!(
261                    "expected stream completion, got stream error {error:?}"
262                )),
263            }
264        }
265    }
266
267    /// Cancels the sink probe.
268    ///
269    /// Cancellation is observed at the worker's next credit wait. If the
270    /// worker is already blocked inside upstream `next()`, it will unblock when
271    /// upstream yields or when an upstream test source probe is dropped.
272    pub fn cancel(&mut self) {
273        self.shared.cancel();
274        let _ = self.completion.take();
275    }
276}
277
278impl<T> Drop for TestSubscriberProbe<T> {
279    fn drop(&mut self) {
280        self.shared.cancel();
281        let _ = self.completion.take();
282    }
283}
284
285struct TestSourceStream<T> {
286    shared: Arc<SourceProbeShared<T>>,
287    waiting_for_command: bool,
288}
289
290impl<T> Iterator for TestSourceStream<T> {
291    type Item = StreamResult<T>;
292
293    fn next(&mut self) -> Option<Self::Item> {
294        if !self.waiting_for_command {
295            self.shared.record_demand();
296            self.waiting_for_command = true;
297        }
298
299        match self.shared.next_command() {
300            Some(SourceCommand::Next(item)) => {
301                self.waiting_for_command = false;
302                Some(Ok(item))
303            }
304            Some(SourceCommand::Complete) => {
305                self.waiting_for_command = false;
306                None
307            }
308            Some(SourceCommand::Error(error)) => {
309                self.waiting_for_command = false;
310                Some(Err(error))
311            }
312            None => {
313                self.waiting_for_command = false;
314                None
315            }
316        }
317    }
318}
319
320impl<T> Drop for TestSourceStream<T> {
321    fn drop(&mut self) {
322        self.shared.mark_cancelled();
323    }
324}
325
326enum SourceCommand<T> {
327    Next(T),
328    Complete,
329    Error(StreamError),
330}
331
332struct SourceProbeShared<T> {
333    state: Mutex<SourceProbeState<T>>,
334    condvar: Condvar,
335}
336
337struct SourceProbeState<T> {
338    commands: VecDeque<SourceCommand<T>>,
339    request_events: VecDeque<usize>,
340    cancelled: bool,
341    terminated: bool,
342}
343
344impl<T> Default for SourceProbeShared<T> {
345    fn default() -> Self {
346        Self {
347            state: Mutex::new(SourceProbeState {
348                commands: VecDeque::new(),
349                request_events: VecDeque::new(),
350                cancelled: false,
351                terminated: false,
352            }),
353            condvar: Condvar::new(),
354        }
355    }
356}
357
358impl<T> SourceProbeShared<T> {
359    fn enqueue(&self, command: SourceCommand<T>) {
360        let mut state = lock_unpoison(&self.state);
361        if state.terminated {
362            panic_any("test source probe is already terminated");
363        }
364        state.commands.push_back(command);
365        if !matches!(state.commands.back(), Some(SourceCommand::Next(_))) {
366            state.terminated = true;
367        }
368        self.condvar.notify_all();
369    }
370
371    fn fail_if_open(&self, error: StreamError) {
372        let mut state = lock_unpoison(&self.state);
373        if state.terminated {
374            return;
375        }
376        state.commands.push_back(SourceCommand::Error(error));
377        state.terminated = true;
378        self.condvar.notify_all();
379    }
380
381    fn record_demand(&self) {
382        let mut state = lock_unpoison(&self.state);
383        if state.terminated {
384            return;
385        }
386        state.request_events.push_back(1);
387        self.condvar.notify_all();
388    }
389
390    fn next_command(&self) -> Option<SourceCommand<T>> {
391        let mut state = lock_unpoison(&self.state);
392        loop {
393            if let Some(command) = state.commands.pop_front() {
394                if matches!(command, SourceCommand::Complete | SourceCommand::Error(_)) {
395                    state.terminated = true;
396                }
397                return Some(command);
398            }
399            if state.terminated {
400                return None;
401            }
402            state = wait_unpoison(&self.condvar, state);
403        }
404    }
405
406    fn expect_request(&self, timeout: Duration) -> usize {
407        let deadline = Instant::now() + timeout;
408        let mut state = lock_unpoison(&self.state);
409        loop {
410            if let Some(requested) = state.request_events.pop_front() {
411                return requested;
412            }
413            if state.cancelled {
414                panic_any("expected downstream demand, but the stream was cancelled");
415            }
416            state = wait_until(&self.condvar, state, deadline, "downstream demand");
417        }
418    }
419
420    fn expect_cancellation(&self, timeout: Duration) {
421        let deadline = Instant::now() + timeout;
422        let mut state = lock_unpoison(&self.state);
423        while !state.cancelled {
424            state = wait_until(&self.condvar, state, deadline, "stream cancellation");
425        }
426    }
427
428    fn mark_cancelled(&self) {
429        let mut state = lock_unpoison(&self.state);
430        state.cancelled = true;
431        state.terminated = true;
432        self.condvar.notify_all();
433    }
434}
435
436enum SinkEvent<T> {
437    Next(T),
438    Complete,
439    Error(StreamError),
440}
441
442struct SinkProbeShared<T> {
443    state: Mutex<SinkProbeState<T>>,
444    condvar: Condvar,
445}
446
447struct SinkProbeState<T> {
448    requested: usize,
449    events: VecDeque<SinkEvent<T>>,
450    cancelled: bool,
451}
452
453impl<T> Default for SinkProbeShared<T> {
454    fn default() -> Self {
455        Self {
456            state: Mutex::new(SinkProbeState {
457                requested: 0,
458                events: VecDeque::new(),
459                cancelled: false,
460            }),
461            condvar: Condvar::new(),
462        }
463    }
464}
465
466impl<T> SinkProbeShared<T> {
467    fn request(&self, n: usize) {
468        let mut state = lock_unpoison(&self.state);
469        state.requested = state.requested.saturating_add(n);
470        self.condvar.notify_all();
471    }
472
473    fn wait_for_request(&self) -> StreamResult<()> {
474        let mut state = lock_unpoison(&self.state);
475        loop {
476            if state.cancelled {
477                return Err(StreamError::Cancelled);
478            }
479            if state.requested > 0 {
480                state.requested -= 1;
481                return Ok(());
482            }
483            state = wait_unpoison(&self.condvar, state);
484        }
485    }
486
487    fn push_event(&self, event: SinkEvent<T>) {
488        let mut state = lock_unpoison(&self.state);
489        state.events.push_back(event);
490        self.condvar.notify_all();
491    }
492
493    fn expect_event(&self, timeout: Duration, expected: &str) -> SinkEvent<T> {
494        let deadline = Instant::now() + timeout;
495        let mut state = lock_unpoison(&self.state);
496        loop {
497            if let Some(event) = state.events.pop_front() {
498                return event;
499            }
500            state = wait_until(&self.condvar, state, deadline, expected);
501        }
502    }
503
504    fn expect_no_message(&self, timeout: Duration) {
505        let deadline = Instant::now() + timeout;
506        let mut state = lock_unpoison(&self.state);
507        while state.events.is_empty() {
508            let remaining = deadline.saturating_duration_since(Instant::now());
509            if remaining.is_zero() {
510                return;
511            }
512            let (next_state, result) = wait_timeout_unpoison(&self.condvar, state, remaining);
513            state = next_state;
514            if result.timed_out() && state.events.is_empty() {
515                return;
516            }
517        }
518        let event = state
519            .events
520            .pop_front()
521            .expect("queued sink event present after wake");
522        panic_any(format!(
523            "expected no stream message for {timeout:?}, got {}",
524            describe_event(&event)
525        ));
526    }
527
528    fn cancel(&self) {
529        let mut state = lock_unpoison(&self.state);
530        state.cancelled = true;
531        self.condvar.notify_all();
532    }
533}
534
535fn wait_until<'a, T>(
536    condvar: &Condvar,
537    state: MutexGuard<'a, T>,
538    deadline: Instant,
539    expected: &str,
540) -> MutexGuard<'a, T> {
541    let started = Instant::now();
542    let remaining = deadline.saturating_duration_since(Instant::now());
543    if remaining.is_zero() {
544        panic_any(format!(
545            "timed out waiting for {expected} after {:?}",
546            started.elapsed()
547        ));
548    }
549    let (state, result) = wait_timeout_unpoison(condvar, state, remaining);
550    if result.timed_out() {
551        panic_any(format!(
552            "timed out waiting for {expected} after {:?}",
553            started.elapsed()
554        ));
555    }
556    state
557}
558
559fn lock_unpoison<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
560    mutex
561        .lock()
562        .unwrap_or_else(|poisoned| poisoned.into_inner())
563}
564
565fn wait_unpoison<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
566    condvar
567        .wait(guard)
568        .unwrap_or_else(|poisoned| poisoned.into_inner())
569}
570
571fn wait_timeout_unpoison<'a, T>(
572    condvar: &Condvar,
573    guard: MutexGuard<'a, T>,
574    timeout: Duration,
575) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
576    condvar
577        .wait_timeout(guard, timeout)
578        .unwrap_or_else(|poisoned| poisoned.into_inner())
579}
580
581fn describe_event<T>(event: &SinkEvent<T>) -> String {
582    match event {
583        SinkEvent::Next(_) => "next element".to_owned(),
584        SinkEvent::Complete => "stream completion".to_owned(),
585        SinkEvent::Error(error) => format!("stream error {error:?}"),
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use crate::{Keep, Materializer, Sink, Source};
593    use std::panic::{self, AssertUnwindSafe};
594
595    fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
596        match payload.downcast::<String>() {
597            Ok(message) => *message,
598            Err(payload) => match payload.downcast::<&'static str>() {
599                Ok(message) => (*message).to_owned(),
600                Err(_) => "<non-string panic payload>".to_owned(),
601            },
602        }
603    }
604
605    #[test]
606    fn test_source_and_sink_probes_drive_map_and_completion() {
607        let materializer = Materializer::new();
608        let (source, sink) = TestSource::probe::<i32>()
609            .map(|value| value * 2)
610            .to_mat(TestSink::probe(), Keep::both)
611            .run_with_materializer(&materializer)
612            .expect("test graph materializes");
613
614        sink.request(1);
615        assert_eq!(source.expect_request(), 1);
616        source.send_next(2);
617        sink.assert_next(4);
618
619        sink.request(1);
620        assert_eq!(source.expect_request(), 1);
621        source.send_complete();
622        sink.expect_complete();
623    }
624
625    #[test]
626    fn test_sink_probe_validates_take_and_completion() {
627        let sink = Source::from_iter(1..=5)
628            .map(|value| value + 10)
629            .take(2)
630            .run_with(TestSink::probe())
631            .expect("test sink materializes");
632
633        sink.request(2);
634        sink.assert_next_n([11, 12]);
635        sink.request(1);
636        sink.expect_complete();
637    }
638
639    #[test]
640    fn test_source_probe_surfaces_stream_errors() {
641        let materializer = Materializer::new();
642        let (source, sink) = TestSource::probe::<i32>()
643            .to_mat(TestSink::probe(), Keep::both)
644            .run_with_materializer(&materializer)
645            .expect("test graph materializes");
646
647        sink.request(1);
648        assert_eq!(source.expect_request(), 1);
649        source.send_error(StreamError::Failed("boom".to_owned()));
650
651        assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
652    }
653
654    #[test]
655    fn test_source_probe_observes_downstream_cancellation() {
656        let materializer = Materializer::new();
657        let (source, completion) = TestSource::probe::<i32>()
658            .take(1)
659            .to_mat(Sink::collect(), Keep::both)
660            .run_with_materializer(&materializer)
661            .expect("test graph materializes");
662
663        assert_eq!(source.expect_request(), 1);
664        source.send_next(7);
665        assert_eq!(completion.wait().expect("take collects one item"), vec![7]);
666        source.expect_cancellation();
667    }
668
669    #[test]
670    fn test_sink_probe_observes_empty_source_completion_after_request() {
671        let sink = Source::<i32>::empty()
672            .run_with(TestSink::probe())
673            .expect("test sink materializes");
674
675        sink.request(1);
676        sink.expect_complete();
677    }
678
679    #[test]
680    fn test_sink_probe_observes_failed_source_error_after_request() {
681        let sink = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
682            .run_with(TestSink::probe())
683            .expect("test sink materializes");
684
685        sink.request(1);
686        assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
687    }
688
689    #[test]
690    fn test_testkit_blueprints_materialize_independent_probe_pairs() {
691        let blueprint = TestSource::probe::<i32>()
692            .map(|value| value * 10)
693            .to_mat(TestSink::probe(), Keep::both);
694        let materializer = Materializer::new();
695
696        let (source_a, sink_a) = blueprint
697            .run_with_materializer(&materializer)
698            .expect("first probe pair materializes");
699        let (source_b, sink_b) = blueprint
700            .run_with_materializer(&materializer)
701            .expect("second probe pair materializes");
702
703        sink_a.request(1);
704        assert_eq!(source_a.expect_request(), 1);
705        source_a.send_next(2);
706        sink_a.assert_next(20);
707        sink_b.expect_no_message(Duration::from_millis(25));
708
709        sink_b.request(1);
710        assert_eq!(source_b.expect_request(), 1);
711        source_b.send_next(3);
712        sink_b.assert_next(30);
713        sink_a.expect_no_message(Duration::from_millis(25));
714
715        sink_a.request(1);
716        assert_eq!(source_a.expect_request(), 1);
717        source_a.send_complete();
718        sink_a.expect_complete();
719
720        sink_b.request(1);
721        sink_b.expect_no_message(Duration::from_millis(25));
722        source_b.send_complete();
723        sink_b.expect_complete();
724    }
725
726    #[test]
727    fn test_assert_next_reports_expected_and_actual_values() {
728        let sink = Source::single(1)
729            .run_with(TestSink::probe())
730            .expect("test sink materializes");
731        sink.request(1);
732
733        let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.assert_next(2)))
734            .expect_err("assert_next should panic on mismatch");
735        let message = panic_message(panic);
736
737        assert!(message.contains("expected next element 2, got 1"));
738    }
739
740    #[test]
741    fn test_expect_complete_times_out_with_clear_message() {
742        let materializer = Materializer::new();
743        let (source, mut sink) = TestSource::probe::<i32>()
744            .to_mat(TestSink::probe(), Keep::both)
745            .run_with_materializer(&materializer)
746            .expect("test graph materializes");
747        sink.set_timeout(Duration::from_millis(50));
748
749        sink.request(1);
750        assert_eq!(source.expect_request(), 1);
751        source.send_next(1);
752        sink.assert_next(1);
753
754        let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.expect_complete()))
755            .expect_err("expect_complete should panic on timeout");
756        let message = panic_message(panic);
757
758        assert!(message.contains("timed out waiting for stream completion"));
759    }
760
761    #[test]
762    fn test_expect_next_times_out_with_clear_message() {
763        let materializer = Materializer::new();
764        let (_source, mut sink) = TestSource::probe::<i32>()
765            .to_mat(TestSink::probe(), Keep::both)
766            .run_with_materializer(&materializer)
767            .expect("test graph materializes");
768        sink.set_timeout(Duration::from_millis(50));
769        sink.request(1);
770
771        let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.expect_next()))
772            .expect_err("expect_next should panic on timeout");
773        let message = panic_message(panic);
774
775        assert!(message.contains("timed out waiting for next element"));
776    }
777}