fluxion_test_utils/
error_injection.rs

1// Copyright 2025 Umberto Gotti <umberto.gotti@umbertogotti.dev>
2// Licensed under the Apache License, Version 2.0
3// http://www.apache.org/licenses/LICENSE-2.0
4
5//! Test utilities for error injection in streams.
6//!
7//! This module provides stream wrappers that can inject `StreamItem::Error` values
8//! into streams for testing error propagation behavior in stream operators.
9
10use core::pin::Pin;
11use core::task::{Context, Poll};
12use fluxion_core::{FluxionError, StreamItem, Timestamped};
13use futures::Stream;
14
15/// A stream wrapper that injects errors at specified positions.
16///
17/// This wrapper takes a stream that produces ordered values and wraps them in
18/// `StreamItem::Value`, optionally injecting `StreamItem::Error` at a specified position.
19///
20/// # Examples
21///
22/// ```rust
23/// use fluxion_test_utils::Sequenced;
24/// use fluxion_test_utils::ErrorInjectingStream;
25/// use fluxion_core::{StreamItem, Timestamped };
26/// use futures::{stream, StreamExt};
27///
28/// # async fn example() {
29/// let items = vec![
30///     <Sequenced<i32> >::with_timestamp(1, 1),
31///     <Sequenced<i32> >::with_timestamp(2, 2),
32///     <Sequenced<i32> >::with_timestamp(3, 3),
33/// ];
34///
35/// let base_stream = stream::iter(items);
36/// let mut error_stream = ErrorInjectingStream::new(base_stream, 1);
37///
38/// // First item is a value
39/// let first = error_stream.next().await.unwrap();
40/// assert!(matches!(first, StreamItem::Value(_)));
41///
42/// // Second item is the injected error
43/// let second = error_stream.next().await.unwrap();
44/// assert!(matches!(second, StreamItem::Error(_)));
45///
46/// // Third item is a value again
47/// let third = error_stream.next().await.unwrap();
48/// assert!(matches!(third, StreamItem::Value(_)));
49/// # }
50/// ```
51pub struct ErrorInjectingStream<S> {
52    inner: S,
53    inject_error_at: Option<usize>,
54    count: usize,
55}
56
57impl<S> ErrorInjectingStream<S> {
58    /// Creates a new error-injecting stream wrapper.
59    ///
60    /// # Arguments
61    ///
62    /// * `inner` - The base stream to wrap
63    /// * `inject_error_at` - The position (0-indexed) at which to inject an error
64    ///
65    /// # Examples
66    ///
67    /// ```rust
68    /// use fluxion_test_utils::{Sequenced, ErrorInjectingStream};
69    /// use futures::stream;
70    ///
71    /// let items = vec![Sequenced::new(1), Sequenced::new(2)];
72    /// let base = stream::iter(items);
73    /// let error_stream = ErrorInjectingStream::new(base, 1);
74    /// // Will inject error at position 1 (after first value)
75    /// ```
76    pub fn new(inner: S, inject_error_at: usize) -> Self {
77        Self {
78            inner,
79            inject_error_at: Some(inject_error_at),
80            count: 0,
81        }
82    }
83}
84
85impl<S> Stream for ErrorInjectingStream<S>
86where
87    S: Stream + Unpin,
88    S::Item: Timestamped,
89{
90    type Item = StreamItem<S::Item>;
91
92    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
93        // Check if we should inject an error at this position
94        if let Some(error_pos) = self.inject_error_at {
95            if self.count == error_pos {
96                self.inject_error_at = None; // Only inject once
97                self.count += 1;
98                return Poll::Ready(Some(StreamItem::Error(FluxionError::stream_error(
99                    "Injected test error",
100                ))));
101            }
102        }
103
104        // Otherwise, poll the inner stream and wrap in StreamItem::Value
105        match Pin::new(&mut self.inner).poll_next(cx) {
106            Poll::Ready(Some(item)) => {
107                self.count += 1;
108                Poll::Ready(Some(StreamItem::Value(item)))
109            }
110            Poll::Ready(None) => Poll::Ready(None),
111            Poll::Pending => Poll::Pending,
112        }
113    }
114}