use core::pin::Pin;
use core::task::{Context, Poll};
use fluxion_core::{FluxionError, StreamItem, Timestamped};
use futures::Stream;
pub struct ErrorInjectingStream<S> {
inner: S,
inject_error_at: Option<usize>,
count: usize,
}
impl<S> ErrorInjectingStream<S> {
pub fn new(inner: S, inject_error_at: usize) -> Self {
Self {
inner,
inject_error_at: Some(inject_error_at),
count: 0,
}
}
}
impl<S> Stream for ErrorInjectingStream<S>
where
S: Stream + Unpin,
S::Item: Timestamped,
{
type Item = StreamItem<S::Item>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if let Some(error_pos) = self.inject_error_at {
if self.count == error_pos {
self.inject_error_at = None; self.count += 1;
return Poll::Ready(Some(StreamItem::Error(FluxionError::stream_error(
"Injected test error",
))));
}
}
match Pin::new(&mut self.inner).poll_next(cx) {
Poll::Ready(Some(item)) => {
self.count += 1;
Poll::Ready(Some(StreamItem::Value(item)))
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
}
}
}