#[cfg(all(
feature = "crossbeam-backend",
not(feature = "tokio-backend"),
not(feature = "std-backend")
))]
use crate::stream::crossbeam::AsyncStream;
#[cfg(all(feature = "std-backend", not(feature = "tokio-backend")))]
use crate::stream::std::AsyncStream;
#[cfg(feature = "tokio-backend")]
use crate::stream::tokio::AsyncStream;
use std::future::Future;
use std::pin::Pin;
use tokio::sync::mpsc;
pub struct EmitterBuilder<T> {
inner: Box<dyn EmitterImpl<T>>,
}
pub type EmitterFuture<T> =
Pin<Box<dyn Future<Output = Result<Vec<T>, Box<dyn std::error::Error + Send>>> + Send>>;
pub trait EmitterImpl<T>: Send {
fn execute(self: Box<Self>) -> EmitterFuture<T>;
}
impl<T: Send + 'static + super::NotResult> EmitterBuilder<T> {
pub fn new(inner: Box<dyn EmitterImpl<T>>) -> Self {
Self { inner }
}
pub fn emit<FOk, FErr>(self, on_ok: FOk, on_err: FErr) -> AsyncStream<T>
where
FOk: FnOnce(Vec<T>) -> Vec<T> + Send + 'static,
FErr: FnOnce(Box<dyn std::error::Error + Send>) + Send + 'static,
{
let (tx, rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
match self.inner.execute().await {
Ok(items) => {
for item in on_ok(items) {
if tx.send(item).is_err() {
break;
}
}
}
Err(e) => on_err(e),
}
});
AsyncStream::new(rx)
}
}
#[macro_export]
macro_rules! emit {
($stream:expr, Ok($items:ident) => $ok:expr, Err($e:ident) => $err:expr) => {
$stream.emit(|$items| $ok, |$e| $err)
};
}