use core::fmt::Debug;
use core::future::Future;
use fluxion_core::{CancellationToken, Result};
use futures::stream::{Stream, StreamExt};
pub async fn subscribe_impl<S, T, F, Fut, E, OnError>(
mut stream: S,
on_next_func: F,
on_error_callback: OnError,
cancellation_token: Option<CancellationToken>,
) -> Result<()>
where
S: Stream<Item = T> + Unpin,
F: Fn(T, CancellationToken) -> Fut + Clone,
Fut: Future<Output = core::result::Result<(), E>>,
OnError: Fn(E) + Clone,
T: Debug + Clone,
{
let cancellation_token = cancellation_token.unwrap_or_default();
while let Some(item) = stream.next().await {
if cancellation_token.is_cancelled() {
break;
}
let result = on_next_func(item.clone(), cancellation_token.clone()).await;
if let Err(error) = result {
on_error_callback(error);
}
}
Ok(())
}
macro_rules! define_subscribe_impl {
(@step #[$attr:meta], $($bounds:tt)*) => {
use alloc::boxed::Box;
use async_trait::async_trait;
use core::fmt::Debug;
use core::future::Future;
use fluxion_core::{CancellationToken, Result};
use futures::stream::Stream;
use crate::subscribe::implementation::subscribe_impl;
#[$attr]
pub trait SubscribeExt<T>: Stream<Item = T> + Sized {
async fn subscribe<F, Fut, E, OnError>(
self,
on_next_func: F,
on_error_callback: OnError,
cancellation_token: Option<CancellationToken>,
) -> Result<()>
where
F: Fn(T, CancellationToken) -> Fut + Clone + $($bounds)* 'static,
Fut: Future<Output = core::result::Result<(), E>> + $($bounds)* 'static,
OnError: Fn(E) + Clone + $($bounds)* 'static,
T: Debug + Clone + $($bounds)* 'static,
E: $($bounds)* 'static;
}
#[$attr]
impl<S, T> SubscribeExt<T> for S
where
S: Stream<Item = T> + Unpin + $($bounds)* 'static,
T: $($bounds)* 'static,
{
async fn subscribe<F, Fut, E, OnError>(
self,
on_next_func: F,
on_error_callback: OnError,
cancellation_token: Option<CancellationToken>,
) -> Result<()>
where
F: Fn(T, CancellationToken) -> Fut + Clone + $($bounds)* 'static,
Fut: Future<Output = core::result::Result<(), E>> + $($bounds)* 'static,
OnError: Fn(E) + Clone + $($bounds)* 'static,
T: Debug + Clone + $($bounds)* 'static,
E: $($bounds)* 'static,
{
subscribe_impl(self, on_next_func, on_error_callback, cancellation_token).await
}
}
};
() => {
define_subscribe_impl!(@step #[async_trait(?Send)], );
};
($($bounds:tt)+) => {
define_subscribe_impl!(@step #[async_trait], $($bounds)+);
};
}