binary_options_tools_core/general/
stream.rs

1use async_channel::Receiver;
2use futures_util::{stream::unfold, Stream};
3
4pub struct RecieverStream<T> {
5    inner: Receiver<T>,
6}
7
8impl<T> RecieverStream<T> {
9    pub fn new(inner: Receiver<T>) -> Self {
10        Self { inner }
11    }
12
13    async fn receive(&self) -> anyhow::Result<T> {
14        Ok(self.inner.recv().await?)
15    }
16
17    pub fn to_stream(&self) -> impl Stream<Item = anyhow::Result<T>> + '_ {
18        Box::pin(unfold(self, |state| async move {
19            let item = state.receive().await;
20            Some((item, state))
21        }))
22    }
23}