Skip to main content

binary_options_tools_core_pre/utils/
stream.rs

1use std::{sync::Arc, time::Duration};
2
3use futures_util::{stream::unfold, Stream};
4use kanal::{AsyncReceiver, ReceiveError};
5use tokio_tungstenite::tungstenite::Message;
6
7use crate::{
8    error::{CoreError, CoreResult},
9    traits::Rule,
10    utils::time::timeout,
11};
12
13pub struct RecieverStream {
14    inner: AsyncReceiver<Message>,
15    timeout: Option<Duration>,
16}
17
18pub struct FilteredRecieverStream {
19    inner: AsyncReceiver<Message>,
20    timeout: Option<Duration>,
21    filter: Box<dyn Rule + Send + Sync>,
22}
23
24impl RecieverStream {
25    pub fn new(inner: AsyncReceiver<Message>) -> Self {
26        Self {
27            inner,
28            timeout: None,
29        }
30    }
31
32    pub fn new_timed(inner: AsyncReceiver<Message>, timeout: Option<Duration>) -> Self {
33        Self { inner, timeout }
34    }
35
36    async fn receive(&self) -> CoreResult<Message> {
37        match self.timeout {
38            Some(time) => timeout(time, self.inner.recv(), "RecieverStream".to_string()).await,
39            None => Ok(self.inner.recv().await?),
40        }
41    }
42
43    pub fn to_stream(&self) -> impl Stream<Item = CoreResult<Message>> + '_ {
44        Box::pin(unfold(self, move |state| async move {
45            let item = state.receive().await;
46            Some((item, state))
47        }))
48    }
49
50    pub fn to_stream_static(self: Arc<Self>) -> impl Stream<Item = CoreResult<Message>> + 'static {
51        Box::pin(unfold(self, async |state| {
52            let item = state.receive().await;
53            Some((item, state))
54        }))
55    }
56}
57
58impl FilteredRecieverStream {
59    pub fn new(
60        inner: AsyncReceiver<Message>,
61        timeout: Option<Duration>,
62        filter: Box<dyn Rule + Send + Sync>,
63    ) -> Self {
64        Self {
65            inner,
66            timeout,
67            filter,
68        }
69    }
70
71    pub fn new_base(inner: AsyncReceiver<Message>) -> Self {
72        Self::new(inner, None, default_filter())
73    }
74
75    pub fn new_filtered(
76        inner: AsyncReceiver<Message>,
77        filter: Box<dyn Rule + Send + Sync>,
78    ) -> Self {
79        Self::new(inner, None, filter)
80    }
81
82    async fn recv(&self) -> CoreResult<Message> {
83        while let Ok(msg) = self.inner.recv().await {
84            if self.filter.call(&msg) {
85                return Ok(msg);
86            }
87        }
88        Err(CoreError::ChannelReceiver(ReceiveError::Closed))
89    }
90
91    async fn receive(&self) -> CoreResult<Message> {
92        match self.timeout {
93            Some(time) => timeout(time, self.recv(), "RecieverStream".to_string()).await,
94            None => Ok(self.inner.recv().await?),
95        }
96    }
97
98    pub fn to_stream(&self) -> impl Stream<Item = CoreResult<Message>> + '_ {
99        Box::pin(unfold(self, move |state| async move {
100            let item = state.receive().await;
101            Some((item, state))
102        }))
103    }
104
105    pub fn to_stream_static(self: Arc<Self>) -> impl Stream<Item = CoreResult<Message>> + 'static {
106        Box::pin(unfold(self, async |state| {
107            let item = state.receive().await;
108            Some((item, state))
109        }))
110    }
111}
112
113fn default_filter() -> Box<dyn Rule + Send + Sync> {
114    Box::new(move |_: &Message| true)
115}