fluxion_test_utils/error_injection.rs
1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Test utilities for error injection in streams.
6//!
7//! This module provides stream wrappers that can inject `StreamItem::Error` values
8//! into streams for testing error propagation behavior in stream operators.
9
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use fluxion_core::{FluxionError, StreamItem, Timestamped};
13use futures::Stream;
14
15/// A stream wrapper that injects errors at specified positions.
16///
17/// This wrapper takes a stream that produces ordered values and wraps them in
18/// `StreamItem::Value`, optionally injecting `StreamItem::Error` at a specified position.
19///
20/// # Examples
21///
22/// ```rust
23/// use fluxion_test_utils::Sequenced;
24/// use fluxion_test_utils::ErrorInjectingStream;
25/// use fluxion_core::{StreamItem, Timestamped };
26/// use futures::{stream, StreamExt};
27///
28/// # async fn example() {
29/// let items = vec![
30/// <Sequenced<i32> >::with_timestamp(1, 1),
31/// <Sequenced<i32> >::with_timestamp(2, 2),
32/// <Sequenced<i32> >::with_timestamp(3, 3),
33/// ];
34///
35/// let base_stream = stream::iter(items);
36/// let mut error_stream = ErrorInjectingStream::new(base_stream, 1);
37///
38/// // First item is a value
39/// let first = error_stream.next().await.unwrap();
40/// assert!(matches!(first, StreamItem::Value(_)));
41///
42/// // Second item is the injected error
43/// let second = error_stream.next().await.unwrap();
44/// assert!(matches!(second, StreamItem::Error(_)));
45///
46/// // Third item is a value again
47/// let third = error_stream.next().await.unwrap();
48/// assert!(matches!(third, StreamItem::Value(_)));
49/// # }
50/// ```
51pub struct ErrorInjectingStream<S> {
52 inner: S,
53 inject_error_at: Option<usize>,
54 count: usize,
55}
56
57impl<S> ErrorInjectingStream<S> {
58 /// Creates a new error-injecting stream wrapper.
59 ///
60 /// # Arguments
61 ///
62 /// * `inner` - The base stream to wrap
63 /// * `inject_error_at` - The position (0-indexed) at which to inject an error
64 ///
65 /// # Examples
66 ///
67 /// ```rust
68 /// use fluxion_test_utils::{Sequenced, ErrorInjectingStream};
69 /// use futures::stream;
70 ///
71 /// let items = vec![Sequenced::new(1), Sequenced::new(2)];
72 /// let base = stream::iter(items);
73 /// let error_stream = ErrorInjectingStream::new(base, 1);
74 /// // Will inject error at position 1 (after first value)
75 /// ```
76 pub fn new(inner: S, inject_error_at: usize) -> Self {
77 Self {
78 inner,
79 inject_error_at: Some(inject_error_at),
80 count: 0,
81 }
82 }
83}
84
85impl<S> Stream for ErrorInjectingStream<S>
86where
87 S: Stream + Unpin,
88 S::Item: Timestamped,
89{
90 type Item = StreamItem<S::Item>;
91
92 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93 // Check if we should inject an error at this position
94 if let Some(error_pos) = self.inject_error_at {
95 if self.count == error_pos {
96 self.inject_error_at = None; // Only inject once
97 self.count += 1;
98 return Poll::Ready(Some(StreamItem::Error(FluxionError::stream_error(
99 "Injected test error",
100 ))));
101 }
102 }
103
104 // Otherwise, poll the inner stream and wrap in StreamItem::Value
105 match Pin::new(&mut self.inner).poll_next(cx) {
106 Poll::Ready(Some(item)) => {
107 self.count += 1;
108 Poll::Ready(Some(StreamItem::Value(item)))
109 }
110 Poll::Ready(None) => Poll::Ready(None),
111 Poll::Pending => Poll::Pending,
112 }
113 }
114}