use fluxion_core::{FluxionError, StreamItem, Timestamped};
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
pub struct ErrorInjectingStream<S> {
inner: S,
inject_error_at: Option<usize>,
count: usize,
}
impl<S> ErrorInjectingStream<S> {
pub fn new(inner: S, inject_error_at: usize) -> Self {
Self {
inner,
inject_error_at: Some(inject_error_at),
count: 0,
}
}
}
impl<S> Stream for ErrorInjectingStream<S>
where
S: Stream + Unpin,
S::Item: Timestamped,
{
type Item = StreamItem<S::Item>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(error_pos) = self.inject_error_at {
if self.count == error_pos {
self.inject_error_at = None; self.count += 1;
return Poll::Ready(Some(StreamItem::Error(FluxionError::stream_error(
"Injected test error",
))));
}
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(item)) => {
self.count += 1;
Poll::Ready(Some(StreamItem::Value(item)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::Sequenced;
use futures::{stream, StreamExt};
#[tokio::test]
async fn test_error_injection_at_position() {
let items = vec![
<Sequenced<_>>::with_timestamp(1, 1_000_000_000),
<Sequenced<_>>::with_timestamp(2, 2_000_000_000),
<Sequenced<_>>::with_timestamp(3, 3_000_000_000),
];
let base_stream = stream::iter(items);
let mut error_stream = ErrorInjectingStream::new(base_stream, 1);
let first = error_stream.next().await.unwrap();
assert!(matches!(first, StreamItem::Value(_)));
let second = error_stream.next().await.unwrap();
assert!(matches!(second, StreamItem::Error(_)));
let third = error_stream.next().await.unwrap();
assert!(matches!(third, StreamItem::Value(_)));
}
#[tokio::test]
async fn test_error_injection_at_start() {
let items = vec![<Sequenced<_>>::with_timestamp(1, 1_000_000_000)];
let base_stream = stream::iter(items);
let mut error_stream = ErrorInjectingStream::new(base_stream, 0);
let first = error_stream.next().await.unwrap();
match first {
StreamItem::Error(e) => {
assert!(matches!(e, FluxionError::StreamProcessingError { .. }));
}
StreamItem::Value(_) => panic!("Expected error at position 0"),
}
let second = error_stream.next().await.unwrap();
assert!(matches!(second, StreamItem::Value(_)));
}
}