1use std::{
24 collections::VecDeque,
25 fmt::Debug,
26 panic::panic_any,
27 sync::{Arc, Condvar, Mutex, MutexGuard},
28 time::{Duration, Instant},
29};
30
31use crate::stream::{BoxStream, NotUsed, Sink, Source, StreamCompletion};
32use crate::{StreamError, StreamResult};
33
34const DEFAULT_PROBE_TIMEOUT: Duration = Duration::from_secs(3);
35
36pub struct TestSource;
37
38impl TestSource {
39 #[must_use]
40 pub fn probe<T: Send + 'static>() -> Source<T, TestPublisherProbe<T>> {
41 Source::from_materialized_factory(|_| {
42 let shared = Arc::new(SourceProbeShared::default());
43 let stream = Box::new(TestSourceStream {
44 shared: Arc::clone(&shared),
45 waiting_for_command: false,
46 }) as BoxStream<T>;
47 Ok((stream, TestPublisherProbe::new(shared)))
48 })
49 }
50}
51
52pub struct TestSink;
53
54impl TestSink {
55 #[must_use]
56 pub fn probe<T: Send + 'static>() -> Sink<T, TestSubscriberProbe<T>> {
57 Sink::from_runner(|mut input, materializer| {
58 let shared = Arc::new(SinkProbeShared::default());
59 let task_shared = Arc::clone(&shared);
60 let completion = materializer.spawn_stream(move |_cancelled| {
61 loop {
62 task_shared.wait_for_request()?;
63 match input.next() {
64 Some(Ok(item)) => task_shared.push_event(SinkEvent::Next(item)),
65 Some(Err(error)) => {
66 task_shared.push_event(SinkEvent::Error(error.clone()));
67 return Err(error);
68 }
69 None => {
70 task_shared.push_event(SinkEvent::Complete);
71 return Ok(NotUsed);
72 }
73 }
74 }
75 });
76 Ok(TestSubscriberProbe::new(shared, completion))
77 })
78 }
79}
80
81pub fn assert_next_eq<T>(actual: &T, expected: &T)
82where
83 T: Debug + PartialEq,
84{
85 assert_eq!(
86 actual, expected,
87 "expected next element {expected:?}, got {actual:?}"
88 );
89}
90
91pub fn assert_next_n_eq<T>(actual: &[T], expected: &[T])
92where
93 T: Debug + PartialEq,
94{
95 assert_eq!(
96 actual, expected,
97 "expected next elements {expected:?}, got {actual:?}"
98 );
99}
100
101pub struct TestPublisherProbe<T> {
102 shared: Arc<SourceProbeShared<T>>,
103 timeout: Duration,
104}
105
106impl<T> TestPublisherProbe<T> {
107 fn new(shared: Arc<SourceProbeShared<T>>) -> Self {
108 Self {
109 shared,
110 timeout: DEFAULT_PROBE_TIMEOUT,
111 }
112 }
113
114 pub fn set_timeout(&mut self, timeout: Duration) {
115 self.timeout = timeout;
116 }
117
118 pub fn send_next(&self, element: T) {
119 self.shared.enqueue(SourceCommand::Next(element));
120 }
121
122 pub fn send_complete(&self) {
123 self.shared.enqueue(SourceCommand::Complete);
124 }
125
126 pub fn send_error(&self, error: StreamError) {
127 self.shared.enqueue(SourceCommand::Error(error));
128 }
129
130 pub fn expect_request(&self) -> usize {
137 self.shared.expect_request(self.timeout)
138 }
139
140 pub fn expect_cancellation(&self) {
141 self.shared.expect_cancellation(self.timeout);
142 }
143}
144
145impl<T> Drop for TestPublisherProbe<T> {
146 fn drop(&mut self) {
147 self.shared.fail_if_open(StreamError::Failed(
148 "test source probe dropped before completion".to_owned(),
149 ));
150 }
151}
152
153pub struct TestSubscriberProbe<T> {
154 shared: Arc<SinkProbeShared<T>>,
155 timeout: Duration,
156 completion: Option<StreamCompletion<NotUsed>>,
157}
158
159impl<T> TestSubscriberProbe<T> {
160 fn new(shared: Arc<SinkProbeShared<T>>, completion: StreamCompletion<NotUsed>) -> Self {
161 Self {
162 shared,
163 timeout: DEFAULT_PROBE_TIMEOUT,
164 completion: Some(completion),
165 }
166 }
167
168 pub fn set_timeout(&mut self, timeout: Duration) {
169 self.timeout = timeout;
170 }
171
172 pub fn request(&self, n: usize) {
173 assert!(n > 0, "request count must be positive, got {n}");
174 self.shared.request(n);
175 }
176
177 pub fn expect_next(&self) -> T {
178 match self.shared.expect_event(self.timeout, "next element") {
179 SinkEvent::Next(item) => item,
180 SinkEvent::Complete => panic_any(format!(
181 "expected next element, got stream completion after waiting {:?}",
182 self.timeout
183 )),
184 SinkEvent::Error(error) => {
185 panic_any(format!("expected next element, got stream error {error:?}"))
186 }
187 }
188 }
189
190 pub fn assert_next(&self, expected: T)
191 where
192 T: Debug + PartialEq,
193 {
194 let actual = self.expect_next();
195 assert_next_eq(&actual, &expected);
196 }
197
198 pub fn expect_next_n(&self, n: usize) -> Vec<T> {
199 (0..n).map(|_| self.expect_next()).collect()
200 }
201
202 pub fn assert_next_n<I>(&self, expected: I)
203 where
204 T: Debug + PartialEq,
205 I: IntoIterator<Item = T>,
206 {
207 let expected: Vec<T> = expected.into_iter().collect();
208 let actual = self.expect_next_n(expected.len());
209 assert_next_n_eq(&actual, &expected);
210 }
211
212 pub fn expect_complete(&self) {
218 match self.shared.expect_event(self.timeout, "stream completion") {
219 SinkEvent::Complete => {}
220 SinkEvent::Next(_) => panic_any("expected stream completion, got next element"),
221 SinkEvent::Error(error) => panic_any(format!(
222 "expected stream completion, got stream error {error:?}"
223 )),
224 }
225 }
226
227 pub fn expect_error(&self) -> StreamError {
233 match self.shared.expect_event(self.timeout, "stream error") {
234 SinkEvent::Error(error) => error,
235 SinkEvent::Next(_) => panic_any("expected stream error, got next element"),
236 SinkEvent::Complete => panic_any("expected stream error, got stream completion"),
237 }
238 }
239
240 pub fn expect_no_message(&self, timeout: Duration) {
241 self.shared.expect_no_message(timeout);
242 }
243
244 #[must_use]
253 pub fn drain_until_complete(&self) -> Vec<T> {
254 self.request(usize::MAX / 2);
255 let mut values = Vec::new();
256 loop {
257 match self.shared.expect_event(self.timeout, "stream completion") {
258 SinkEvent::Next(item) => values.push(item),
259 SinkEvent::Complete => return values,
260 SinkEvent::Error(error) => panic_any(format!(
261 "expected stream completion, got stream error {error:?}"
262 )),
263 }
264 }
265 }
266
267 pub fn cancel(&mut self) {
273 self.shared.cancel();
274 let _ = self.completion.take();
275 }
276}
277
278impl<T> Drop for TestSubscriberProbe<T> {
279 fn drop(&mut self) {
280 self.shared.cancel();
281 let _ = self.completion.take();
282 }
283}
284
285struct TestSourceStream<T> {
286 shared: Arc<SourceProbeShared<T>>,
287 waiting_for_command: bool,
288}
289
290impl<T> Iterator for TestSourceStream<T> {
291 type Item = StreamResult<T>;
292
293 fn next(&mut self) -> Option<Self::Item> {
294 if !self.waiting_for_command {
295 self.shared.record_demand();
296 self.waiting_for_command = true;
297 }
298
299 match self.shared.next_command() {
300 Some(SourceCommand::Next(item)) => {
301 self.waiting_for_command = false;
302 Some(Ok(item))
303 }
304 Some(SourceCommand::Complete) => {
305 self.waiting_for_command = false;
306 None
307 }
308 Some(SourceCommand::Error(error)) => {
309 self.waiting_for_command = false;
310 Some(Err(error))
311 }
312 None => {
313 self.waiting_for_command = false;
314 None
315 }
316 }
317 }
318}
319
320impl<T> Drop for TestSourceStream<T> {
321 fn drop(&mut self) {
322 self.shared.mark_cancelled();
323 }
324}
325
326enum SourceCommand<T> {
327 Next(T),
328 Complete,
329 Error(StreamError),
330}
331
332struct SourceProbeShared<T> {
333 state: Mutex<SourceProbeState<T>>,
334 condvar: Condvar,
335}
336
337struct SourceProbeState<T> {
338 commands: VecDeque<SourceCommand<T>>,
339 request_events: VecDeque<usize>,
340 cancelled: bool,
341 terminated: bool,
342}
343
344impl<T> Default for SourceProbeShared<T> {
345 fn default() -> Self {
346 Self {
347 state: Mutex::new(SourceProbeState {
348 commands: VecDeque::new(),
349 request_events: VecDeque::new(),
350 cancelled: false,
351 terminated: false,
352 }),
353 condvar: Condvar::new(),
354 }
355 }
356}
357
358impl<T> SourceProbeShared<T> {
359 fn enqueue(&self, command: SourceCommand<T>) {
360 let mut state = lock_unpoison(&self.state);
361 if state.terminated {
362 panic_any("test source probe is already terminated");
363 }
364 state.commands.push_back(command);
365 if !matches!(state.commands.back(), Some(SourceCommand::Next(_))) {
366 state.terminated = true;
367 }
368 self.condvar.notify_all();
369 }
370
371 fn fail_if_open(&self, error: StreamError) {
372 let mut state = lock_unpoison(&self.state);
373 if state.terminated {
374 return;
375 }
376 state.commands.push_back(SourceCommand::Error(error));
377 state.terminated = true;
378 self.condvar.notify_all();
379 }
380
381 fn record_demand(&self) {
382 let mut state = lock_unpoison(&self.state);
383 if state.terminated {
384 return;
385 }
386 state.request_events.push_back(1);
387 self.condvar.notify_all();
388 }
389
390 fn next_command(&self) -> Option<SourceCommand<T>> {
391 let mut state = lock_unpoison(&self.state);
392 loop {
393 if let Some(command) = state.commands.pop_front() {
394 if matches!(command, SourceCommand::Complete | SourceCommand::Error(_)) {
395 state.terminated = true;
396 }
397 return Some(command);
398 }
399 if state.terminated {
400 return None;
401 }
402 state = wait_unpoison(&self.condvar, state);
403 }
404 }
405
406 fn expect_request(&self, timeout: Duration) -> usize {
407 let deadline = Instant::now() + timeout;
408 let mut state = lock_unpoison(&self.state);
409 loop {
410 if let Some(requested) = state.request_events.pop_front() {
411 return requested;
412 }
413 if state.cancelled {
414 panic_any("expected downstream demand, but the stream was cancelled");
415 }
416 state = wait_until(&self.condvar, state, deadline, "downstream demand");
417 }
418 }
419
420 fn expect_cancellation(&self, timeout: Duration) {
421 let deadline = Instant::now() + timeout;
422 let mut state = lock_unpoison(&self.state);
423 while !state.cancelled {
424 state = wait_until(&self.condvar, state, deadline, "stream cancellation");
425 }
426 }
427
428 fn mark_cancelled(&self) {
429 let mut state = lock_unpoison(&self.state);
430 state.cancelled = true;
431 state.terminated = true;
432 self.condvar.notify_all();
433 }
434}
435
436enum SinkEvent<T> {
437 Next(T),
438 Complete,
439 Error(StreamError),
440}
441
442struct SinkProbeShared<T> {
443 state: Mutex<SinkProbeState<T>>,
444 condvar: Condvar,
445}
446
447struct SinkProbeState<T> {
448 requested: usize,
449 events: VecDeque<SinkEvent<T>>,
450 cancelled: bool,
451}
452
453impl<T> Default for SinkProbeShared<T> {
454 fn default() -> Self {
455 Self {
456 state: Mutex::new(SinkProbeState {
457 requested: 0,
458 events: VecDeque::new(),
459 cancelled: false,
460 }),
461 condvar: Condvar::new(),
462 }
463 }
464}
465
466impl<T> SinkProbeShared<T> {
467 fn request(&self, n: usize) {
468 let mut state = lock_unpoison(&self.state);
469 state.requested = state.requested.saturating_add(n);
470 self.condvar.notify_all();
471 }
472
473 fn wait_for_request(&self) -> StreamResult<()> {
474 let mut state = lock_unpoison(&self.state);
475 loop {
476 if state.cancelled {
477 return Err(StreamError::Cancelled);
478 }
479 if state.requested > 0 {
480 state.requested -= 1;
481 return Ok(());
482 }
483 state = wait_unpoison(&self.condvar, state);
484 }
485 }
486
487 fn push_event(&self, event: SinkEvent<T>) {
488 let mut state = lock_unpoison(&self.state);
489 state.events.push_back(event);
490 self.condvar.notify_all();
491 }
492
493 fn expect_event(&self, timeout: Duration, expected: &str) -> SinkEvent<T> {
494 let deadline = Instant::now() + timeout;
495 let mut state = lock_unpoison(&self.state);
496 loop {
497 if let Some(event) = state.events.pop_front() {
498 return event;
499 }
500 state = wait_until(&self.condvar, state, deadline, expected);
501 }
502 }
503
504 fn expect_no_message(&self, timeout: Duration) {
505 let deadline = Instant::now() + timeout;
506 let mut state = lock_unpoison(&self.state);
507 while state.events.is_empty() {
508 let remaining = deadline.saturating_duration_since(Instant::now());
509 if remaining.is_zero() {
510 return;
511 }
512 let (next_state, result) = wait_timeout_unpoison(&self.condvar, state, remaining);
513 state = next_state;
514 if result.timed_out() && state.events.is_empty() {
515 return;
516 }
517 }
518 let event = state
519 .events
520 .pop_front()
521 .expect("queued sink event present after wake");
522 panic_any(format!(
523 "expected no stream message for {timeout:?}, got {}",
524 describe_event(&event)
525 ));
526 }
527
528 fn cancel(&self) {
529 let mut state = lock_unpoison(&self.state);
530 state.cancelled = true;
531 self.condvar.notify_all();
532 }
533}
534
535fn wait_until<'a, T>(
536 condvar: &Condvar,
537 state: MutexGuard<'a, T>,
538 deadline: Instant,
539 expected: &str,
540) -> MutexGuard<'a, T> {
541 let started = Instant::now();
542 let remaining = deadline.saturating_duration_since(Instant::now());
543 if remaining.is_zero() {
544 panic_any(format!(
545 "timed out waiting for {expected} after {:?}",
546 started.elapsed()
547 ));
548 }
549 let (state, result) = wait_timeout_unpoison(condvar, state, remaining);
550 if result.timed_out() {
551 panic_any(format!(
552 "timed out waiting for {expected} after {:?}",
553 started.elapsed()
554 ));
555 }
556 state
557}
558
559fn lock_unpoison<T>(mutex: &Mutex<T>) -> MutexGuard<'_, T> {
560 mutex
561 .lock()
562 .unwrap_or_else(|poisoned| poisoned.into_inner())
563}
564
565fn wait_unpoison<'a, T>(condvar: &Condvar, guard: MutexGuard<'a, T>) -> MutexGuard<'a, T> {
566 condvar
567 .wait(guard)
568 .unwrap_or_else(|poisoned| poisoned.into_inner())
569}
570
571fn wait_timeout_unpoison<'a, T>(
572 condvar: &Condvar,
573 guard: MutexGuard<'a, T>,
574 timeout: Duration,
575) -> (MutexGuard<'a, T>, std::sync::WaitTimeoutResult) {
576 condvar
577 .wait_timeout(guard, timeout)
578 .unwrap_or_else(|poisoned| poisoned.into_inner())
579}
580
581fn describe_event<T>(event: &SinkEvent<T>) -> String {
582 match event {
583 SinkEvent::Next(_) => "next element".to_owned(),
584 SinkEvent::Complete => "stream completion".to_owned(),
585 SinkEvent::Error(error) => format!("stream error {error:?}"),
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use crate::{Keep, Materializer, Sink, Source};
593 use std::panic::{self, AssertUnwindSafe};
594
595 fn panic_message(payload: Box<dyn std::any::Any + Send>) -> String {
596 match payload.downcast::<String>() {
597 Ok(message) => *message,
598 Err(payload) => match payload.downcast::<&'static str>() {
599 Ok(message) => (*message).to_owned(),
600 Err(_) => "<non-string panic payload>".to_owned(),
601 },
602 }
603 }
604
605 #[test]
606 fn test_source_and_sink_probes_drive_map_and_completion() {
607 let materializer = Materializer::new();
608 let (source, sink) = TestSource::probe::<i32>()
609 .map(|value| value * 2)
610 .to_mat(TestSink::probe(), Keep::both)
611 .run_with_materializer(&materializer)
612 .expect("test graph materializes");
613
614 sink.request(1);
615 assert_eq!(source.expect_request(), 1);
616 source.send_next(2);
617 sink.assert_next(4);
618
619 sink.request(1);
620 assert_eq!(source.expect_request(), 1);
621 source.send_complete();
622 sink.expect_complete();
623 }
624
625 #[test]
626 fn test_sink_probe_validates_take_and_completion() {
627 let sink = Source::from_iter(1..=5)
628 .map(|value| value + 10)
629 .take(2)
630 .run_with(TestSink::probe())
631 .expect("test sink materializes");
632
633 sink.request(2);
634 sink.assert_next_n([11, 12]);
635 sink.request(1);
636 sink.expect_complete();
637 }
638
639 #[test]
640 fn test_source_probe_surfaces_stream_errors() {
641 let materializer = Materializer::new();
642 let (source, sink) = TestSource::probe::<i32>()
643 .to_mat(TestSink::probe(), Keep::both)
644 .run_with_materializer(&materializer)
645 .expect("test graph materializes");
646
647 sink.request(1);
648 assert_eq!(source.expect_request(), 1);
649 source.send_error(StreamError::Failed("boom".to_owned()));
650
651 assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
652 }
653
654 #[test]
655 fn test_source_probe_observes_downstream_cancellation() {
656 let materializer = Materializer::new();
657 let (source, completion) = TestSource::probe::<i32>()
658 .take(1)
659 .to_mat(Sink::collect(), Keep::both)
660 .run_with_materializer(&materializer)
661 .expect("test graph materializes");
662
663 assert_eq!(source.expect_request(), 1);
664 source.send_next(7);
665 assert_eq!(completion.wait().expect("take collects one item"), vec![7]);
666 source.expect_cancellation();
667 }
668
669 #[test]
670 fn test_sink_probe_observes_empty_source_completion_after_request() {
671 let sink = Source::<i32>::empty()
672 .run_with(TestSink::probe())
673 .expect("test sink materializes");
674
675 sink.request(1);
676 sink.expect_complete();
677 }
678
679 #[test]
680 fn test_sink_probe_observes_failed_source_error_after_request() {
681 let sink = Source::<i32>::failed(StreamError::Failed("boom".to_owned()))
682 .run_with(TestSink::probe())
683 .expect("test sink materializes");
684
685 sink.request(1);
686 assert_eq!(sink.expect_error(), StreamError::Failed("boom".to_owned()));
687 }
688
689 #[test]
690 fn test_testkit_blueprints_materialize_independent_probe_pairs() {
691 let blueprint = TestSource::probe::<i32>()
692 .map(|value| value * 10)
693 .to_mat(TestSink::probe(), Keep::both);
694 let materializer = Materializer::new();
695
696 let (source_a, sink_a) = blueprint
697 .run_with_materializer(&materializer)
698 .expect("first probe pair materializes");
699 let (source_b, sink_b) = blueprint
700 .run_with_materializer(&materializer)
701 .expect("second probe pair materializes");
702
703 sink_a.request(1);
704 assert_eq!(source_a.expect_request(), 1);
705 source_a.send_next(2);
706 sink_a.assert_next(20);
707 sink_b.expect_no_message(Duration::from_millis(25));
708
709 sink_b.request(1);
710 assert_eq!(source_b.expect_request(), 1);
711 source_b.send_next(3);
712 sink_b.assert_next(30);
713 sink_a.expect_no_message(Duration::from_millis(25));
714
715 sink_a.request(1);
716 assert_eq!(source_a.expect_request(), 1);
717 source_a.send_complete();
718 sink_a.expect_complete();
719
720 sink_b.request(1);
721 sink_b.expect_no_message(Duration::from_millis(25));
722 source_b.send_complete();
723 sink_b.expect_complete();
724 }
725
726 #[test]
727 fn test_assert_next_reports_expected_and_actual_values() {
728 let sink = Source::single(1)
729 .run_with(TestSink::probe())
730 .expect("test sink materializes");
731 sink.request(1);
732
733 let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.assert_next(2)))
734 .expect_err("assert_next should panic on mismatch");
735 let message = panic_message(panic);
736
737 assert!(message.contains("expected next element 2, got 1"));
738 }
739
740 #[test]
741 fn test_expect_complete_times_out_with_clear_message() {
742 let materializer = Materializer::new();
743 let (source, mut sink) = TestSource::probe::<i32>()
744 .to_mat(TestSink::probe(), Keep::both)
745 .run_with_materializer(&materializer)
746 .expect("test graph materializes");
747 sink.set_timeout(Duration::from_millis(50));
748
749 sink.request(1);
750 assert_eq!(source.expect_request(), 1);
751 source.send_next(1);
752 sink.assert_next(1);
753
754 let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.expect_complete()))
755 .expect_err("expect_complete should panic on timeout");
756 let message = panic_message(panic);
757
758 assert!(message.contains("timed out waiting for stream completion"));
759 }
760
761 #[test]
762 fn test_expect_next_times_out_with_clear_message() {
763 let materializer = Materializer::new();
764 let (_source, mut sink) = TestSource::probe::<i32>()
765 .to_mat(TestSink::probe(), Keep::both)
766 .run_with_materializer(&materializer)
767 .expect("test graph materializes");
768 sink.set_timeout(Duration::from_millis(50));
769 sink.request(1);
770
771 let panic = panic::catch_unwind(AssertUnwindSafe(|| sink.expect_next()))
772 .expect_err("expect_next should panic on timeout");
773 let message = panic_message(panic);
774
775 assert!(message.contains("timed out waiting for next element"));
776 }
777}