binary_option_tools_core/pocketoption/ws/
stream.rs1use std::sync::Arc;
2
3use tracing::debug;
4use crate::pocketoption::{
6 error::PocketResult, parser::message::WebSocketMessage, types::update::DataCandle,
7};
8
9use async_channel::Receiver;
10use futures_util::stream::unfold;
11use futures_util::Stream;
12
13#[derive(Clone)]
14pub struct StreamAsset {
15 reciever: Receiver<WebSocketMessage>,
16 asset: String,
17 chunk_size: usize,
18}
19
20impl StreamAsset {
21 pub fn new(reciever: Receiver<WebSocketMessage>, asset: String) -> Self {
22 Self {
23 reciever,
24 asset,
25 chunk_size: 1,
26 }
27 }
28
29 pub fn new_chuncked(
30 reciever: Receiver<WebSocketMessage>,
31 asset: String,
32 chunk_size: usize,
33 ) -> Self {
34 Self {
35 reciever,
36 asset,
37 chunk_size,
38 }
39 }
40
41 pub async fn recieve(&self) -> PocketResult<DataCandle> {
42 while let Ok(candle) = self.reciever.recv().await {
43 debug!(target: "StreamAsset", "Recieved UpdateStream!");
44 if let WebSocketMessage::UpdateStream(candle) = candle {
45 if let Some(candle) = candle.0.first().take_if(|x| x.active == self.asset) {
46 return Ok(candle.into());
47 }
48 }
49 }
50
51 unreachable!(
52 "This should never happen, please contact Rick-29 at https://github.com/Rick-29"
53 )
54 }
55
56 pub async fn recieve_chunked(&self) -> PocketResult<DataCandle> {
57 let mut chunk = vec![];
58 while let Ok(candle) = self.reciever.recv().await {
59 debug!(target: "StreamAsset", "Recieved UpdateStream!");
60 if let WebSocketMessage::UpdateStream(candle) = candle {
61 if let Some(candle) = candle.0.first().take_if(|x| x.active == self.asset) {
62 chunk.push(candle.into());
63 if chunk.len() >= self.chunk_size {
64 return chunk.try_into();
65 }
66 }
67 }
68 }
69
70 unreachable!(
71 "This should never happen, please contact Rick-29 at https://github.com/Rick-29"
72 )
73 }
74
75 pub fn to_stream(&self) -> impl Stream<Item = PocketResult<DataCandle>> + '_ {
76 Box::pin(unfold(self, |state| async move {
77 let item = state.recieve().await;
78 Some((item, state))
79 }))
80 }
81
82 pub fn to_stream_chuncked(&self) -> impl Stream<Item = PocketResult<DataCandle>> + '_ {
83 Box::pin(unfold(self, |state| async move {
84 let item = state.recieve_chunked().await;
85 Some((item, state))
86 }))
87 }
88
89 pub fn to_stream_static(
90 self: Arc<Self>,
91 ) -> impl Stream<Item = PocketResult<DataCandle>> + 'static {
92 Box::pin(unfold(self, |state| async move {
93 let item = state.recieve().await;
94 Some((item, state))
95 }))
96 }
97
98 pub fn to_stream_chuncked_static(
99 self: Arc<Self>,
100 ) -> impl Stream<Item = PocketResult<DataCandle>> + 'static {
101 Box::pin(unfold(self, |state| async move {
102 let item = state.recieve_chunked().await;
103 Some((item, state))
104 }))
105 }
106}
107
108
109