binary_options_tools_core/general/
stream.rs1use async_channel::Receiver;
2use futures_util::{stream::unfold, Stream};
3
4pub struct RecieverStream<T> {
5 inner: Receiver<T>,
6}
7
8impl<T> RecieverStream<T> {
9 pub fn new(inner: Receiver<T>) -> Self {
10 Self { inner }
11 }
12
13 async fn receive(&self) -> anyhow::Result<T> {
14 Ok(self.inner.recv().await?)
15 }
16
17 pub fn to_stream(&self) -> impl Stream<Item = anyhow::Result<T>> + '_ {
18 Box::pin(unfold(self, |state| async move {
19 let item = state.receive().await;
20 Some((item, state))
21 }))
22 }
23}