use async_trait::async_trait;
use futures::{
stream::{Stream, StreamExt},
};
use super::Running;
#[async_trait]
pub trait Writer: Sized {
type Item: Send + 'static;
type Ok: Send;
type Error: std::error::Error + Send;
async fn op(&mut self, item: Self::Item) -> Running<Result<Self::Ok, Self::Error>, Option<Self::Error>>;
async fn handle_result(res: Result<Self::Ok, Self::Error>) -> Running<(), Option<Self::Error>> {
if let Err(_err) = res {
#[cfg(feature = "debug")]
log::error!("{:?}", _err);
}
Running::Continue(())
}
async fn writer_loop<S>(mut self, mut items: S) -> Result<(), Self::Error>
where
S: Stream<Item = Self::Item> + Send + Unpin
{
while let Some(item) = items.next().await {
match self.op(item).await {
Running::Continue(res) => {
match <Self as Writer>::handle_result(res).await {
Running::Continue(_) => { },
Running::Stop(e) => {
match e {
None => return Ok(()),
Some(err) => return Err(err),
}
}
}
},
Running::Stop(e) => {
match e {
None => return Ok(()),
Some(err) => return Err(err),
}
}
}
}
#[cfg(feature = "debug")]
log::debug!("Exiting writer loop");
Ok(())
}
}
#[cfg(test)]
mod tests {
use async_trait::async_trait;
use futures::sink::SinkExt;
use crate::{Writer, Running};
struct TestWriter { }
#[derive(Debug)]
enum TestWriterItem {
Foo(String),
Bar(i32)
}
#[async_trait]
impl super::Writer for TestWriter {
type Item = TestWriterItem;
type Ok = ();
type Error = std::io::Error;
async fn op(&mut self, item: Self::Item) -> Running<Result<Self::Ok, Self::Error>, Option<Self::Error>> {
println!("{:?}", item);
Running::Continue(Ok(()))
}
async fn handle_result(res: Result<Self::Ok, Self::Error>) -> Running<(), Option<Self::Error>> {
if let Err(err) = res {
println!("{:?}", err);
}
Running::Continue(())
}
}
#[tokio::test]
async fn test_writer() {
let w = TestWriter{ };
let (tx, rx) = flume::unbounded();
let mut tx = tx.into_sink();
let handle = tokio::task::spawn(w.writer_loop(rx.into_stream()));
for i in 0..10 {
tx.send(TestWriterItem::Foo(i.to_string())).await
.unwrap();
tx.send(TestWriterItem::Bar(i)).await
.unwrap();
}
drop(tx);
let res = handle.await;
println!("{:?}", res);
}
}