use futures::{Sink, stream::Stream, StreamExt, Future, SinkExt};
use std::convert::TryFrom;
use std::pin::Pin;
pub type GenFut<'a, T> =
Pin<Box<dyn Future<Output = T> + Send + 'a>>;
pub enum ProcRes<T> {
None,
One(T),
Many(Vec<T>),
}
impl<T> From<Option<T>> for ProcRes<T> {
fn from(opt: Option<T>) -> Self {
match opt {
Some(v) => Self::One(v),
None => Self::None,
}
}
}
pub trait Processor
where Self::Error: Send + 'static,
Self::Item: Send + 'static,
Self: Send + Unpin + Sized,
Self::ResultItem: Send,
{
type Item;
type Error;
type ResultItem;
fn process(
&mut self,
item: Self::Item
) -> GenFut<'_, Result<ProcRes<Self::ResultItem>, Self::Error>>;
fn stopped(&mut self, _: Option<Self::Error>) -> GenFut<'_, ()> { Box::pin(futures::future::ready(())) }
fn on_error(&mut self, _error: Self::Error) -> GenFut<'_, ()> { Box::pin(futures::future::ready(())) }
fn process_builder() -> ProcessBuilder<Self::Item, Self::Error> {
ProcessBuilder::new()
}
}
pub struct Handle;
struct Runner<TStream, TItem, TError, TSink, TSinkItem, TSinkError>
where TStream: Stream<Item = Result<TItem, TError>> + Unpin,
TSink: Sink<TSinkItem, Error = TSinkError> + Unpin,
{
stream: TStream,
sink: TSink,
_marker: std::marker::PhantomData<TSinkItem>,
}
impl
<TStream,
TItem,
TError,
TSink,
TSinkItem,
TSinkError>
Runner
<TStream,
TItem,
TError,
TSink,
TSinkItem,
TSinkError>
where TStream: Stream<Item = Result<TItem, TError>> + Unpin,
TSink: Sink<TSinkItem, Error = TSinkError> + Unpin,
TError: From<TSinkError> + Send + 'static,
TItem: Send + 'static,
TSinkItem: Send,
{
fn new(stream: TStream, sink: TSink) -> Self {
Self {
stream,
sink,
_marker: Default::default(),
}
}
async fn run<P>(mut self, mut processor: P) -> Result<(), ()>
where P: Processor<Item = TItem, Error = TError, ResultItem = TSinkItem>,
{
loop {
let incoming = match self.stream.next().await {
Some(Ok(incoming)) => incoming,
Some(Err(err)) => {
processor.on_error(err).await;
continue;
},
None => {
println!("Stream empty, stopping");
processor.stopped(None).await;
return Ok(());
}
};
let proc_res = match processor.process(incoming).await {
Ok(proc_res) => proc_res,
Err(err) => {
processor.on_error(err).await;
continue;
}
};
match proc_res {
ProcRes::None => continue,
ProcRes::One(item) => {
if let Err(_failed) = self.sink.send(item).await {
continue;
}
if let Err(err) = self.sink.flush().await {
processor.on_error(TError::from(err));
}
}
ProcRes::Many(v) => {
let mut st = futures::stream::iter(v.into_iter());
if let Err(err) = self.sink.send_all(&mut st).await {
processor.on_error(TError::from(err)).await;
}
}
}
}
}
}
pub struct ProcessBuilder<TItem, TError>{
streams: Vec<Box<dyn Stream<Item = Result<TItem, TError>> + Send + Unpin>>
}
impl
<TItem,
TError>
ProcessBuilder
<TItem,
TError>
where
TError: Sized + Send + 'static,
TItem: Sized + Send + 'static,
{
pub fn new() -> Self {
Self { streams: Vec::new() }
}
pub fn add_stream<'a, TStream, TStreamItem>(&'a mut self, stream: TStream) -> &mut Self
where
TStream: Stream<Item = TStreamItem> + Send + Unpin + 'static,
TStreamItem: Into<TItem>,
{
self.streams.push(Box::new(stream.map(|item| Ok(item.into()))));
self
}
pub fn add_try<'a, TStream, TStreamItem>(&'a mut self, stream: TStream) -> &mut Self
where
TStream: Stream<Item = TStreamItem> + Send + Unpin + 'static,
TItem: TryFrom<TStreamItem, Error = TError>,
{
let stream = stream.map(|stream_item| TItem::try_from(stream_item));
self.streams.push(Box::new(stream));
self
}
pub fn add_try_stream<'a, TStream, TStreamError>(&'a mut self, stream: TStream) -> &mut Self
where
TStream: Stream<Item = Result<TItem, TStreamError>> + Send + Unpin + 'static,
TStreamError: 'static,
TError: From<TStreamError>,
{
use futures::TryStreamExt;
let stream = stream
.map_err(TError::from);
self.streams.push(Box::new(stream));
self
}
pub async fn run<TSink, TSinkError, TProcessor>(
mut self,
sink: TSink,
processor: TProcessor
) -> Result<(), ()>
where
TProcessor: Processor<Item = TItem, Error = TError>,
TSink: Sink<TProcessor::ResultItem, Error = TSinkError> + Unpin,
TError: From<TSinkError> + Send + 'static
{
let joined_streams = futures::stream::select_all(self.streams.drain(0..));
let runner = Runner::new(joined_streams, sink);
runner.run(processor).await
}
}