use crate::{OrphanedSubscriberError, Subscriber};
pub fn subscriber_into_changed_stream<S, T>(
mut subscriber: Subscriber<S>,
mut next_item_fn: impl FnMut(&S) -> T + Send,
) -> impl futures_core::Stream<Item = T> + Send
where
S: Send + Sync,
T: Send,
{
asynk_strim::stream_fn(|mut yielder| async move {
let next_item_fn = &mut next_item_fn;
#[expect(clippy::while_let_loop)]
loop {
match subscriber.map_changed(|next| next_item_fn(next)).await {
Ok(next_item) => {
yielder.yield_item(next_item).await;
}
Err(OrphanedSubscriberError) => {
break;
}
}
}
})
}
pub fn subscriber_into_changed_stream_filtered<S, T>(
mut subscriber: Subscriber<S>,
mut next_item_fn: impl FnMut(&S) -> Option<T> + Send,
) -> impl futures_core::Stream<Item = T> + Send
where
S: Send + Sync,
T: Send,
{
asynk_strim::stream_fn(|mut yielder| async move {
let next_item_fn = &mut next_item_fn;
#[expect(clippy::while_let_loop)]
loop {
match subscriber
.filter_map_changed(|next| next_item_fn(next))
.await
{
Ok(next_item) => {
yielder.yield_item(next_item).await;
}
Err(OrphanedSubscriberError) => {
break;
}
}
}
})
}