binary_options_tools_core_pre/utils/
stream.rs1use std::{sync::Arc, time::Duration};
2
3use futures_util::{stream::unfold, Stream};
4use kanal::{AsyncReceiver, ReceiveError};
5use tokio_tungstenite::tungstenite::Message;
6
7use crate::{
8 error::{CoreError, CoreResult},
9 traits::Rule,
10 utils::time::timeout,
11};
12
13pub struct RecieverStream {
14 inner: AsyncReceiver<Message>,
15 timeout: Option<Duration>,
16}
17
18pub struct FilteredRecieverStream {
19 inner: AsyncReceiver<Message>,
20 timeout: Option<Duration>,
21 filter: Box<dyn Rule + Send + Sync>,
22}
23
24impl RecieverStream {
25 pub fn new(inner: AsyncReceiver<Message>) -> Self {
26 Self {
27 inner,
28 timeout: None,
29 }
30 }
31
32 pub fn new_timed(inner: AsyncReceiver<Message>, timeout: Option<Duration>) -> Self {
33 Self { inner, timeout }
34 }
35
36 async fn receive(&self) -> CoreResult<Message> {
37 match self.timeout {
38 Some(time) => timeout(time, self.inner.recv(), "RecieverStream".to_string()).await,
39 None => Ok(self.inner.recv().await?),
40 }
41 }
42
43 pub fn to_stream(&self) -> impl Stream<Item = CoreResult<Message>> + '_ {
44 Box::pin(unfold(self, move |state| async move {
45 let item = state.receive().await;
46 Some((item, state))
47 }))
48 }
49
50 pub fn to_stream_static(self: Arc<Self>) -> impl Stream<Item = CoreResult<Message>> + 'static {
51 Box::pin(unfold(self, async |state| {
52 let item = state.receive().await;
53 Some((item, state))
54 }))
55 }
56}
57
58impl FilteredRecieverStream {
59 pub fn new(
60 inner: AsyncReceiver<Message>,
61 timeout: Option<Duration>,
62 filter: Box<dyn Rule + Send + Sync>,
63 ) -> Self {
64 Self {
65 inner,
66 timeout,
67 filter,
68 }
69 }
70
71 pub fn new_base(inner: AsyncReceiver<Message>) -> Self {
72 Self::new(inner, None, default_filter())
73 }
74
75 pub fn new_filtered(
76 inner: AsyncReceiver<Message>,
77 filter: Box<dyn Rule + Send + Sync>,
78 ) -> Self {
79 Self::new(inner, None, filter)
80 }
81
82 async fn recv(&self) -> CoreResult<Message> {
83 while let Ok(msg) = self.inner.recv().await {
84 if self.filter.call(&msg) {
85 return Ok(msg);
86 }
87 }
88 Err(CoreError::ChannelReceiver(ReceiveError::Closed))
89 }
90
91 async fn receive(&self) -> CoreResult<Message> {
92 match self.timeout {
93 Some(time) => timeout(time, self.recv(), "RecieverStream".to_string()).await,
94 None => Ok(self.inner.recv().await?),
95 }
96 }
97
98 pub fn to_stream(&self) -> impl Stream<Item = CoreResult<Message>> + '_ {
99 Box::pin(unfold(self, move |state| async move {
100 let item = state.receive().await;
101 Some((item, state))
102 }))
103 }
104
105 pub fn to_stream_static(self: Arc<Self>) -> impl Stream<Item = CoreResult<Message>> + 'static {
106 Box::pin(unfold(self, async |state| {
107 let item = state.receive().await;
108 Some((item, state))
109 }))
110 }
111}
112
113fn default_filter() -> Box<dyn Rule + Send + Sync> {
114 Box::new(move |_: &Message| true)
115}