binary_option_tools_core/pocketoption/ws/
stream.rs

1use std::sync::Arc;
2
3use tracing::debug;
4// use pin_project_lite::pin_project;
5use crate::pocketoption::{
6    error::PocketResult, parser::message::WebSocketMessage, types::update::DataCandle,
7};
8
9use async_channel::Receiver;
10use futures_util::stream::unfold;
11use futures_util::Stream;
12
13#[derive(Clone)]
14pub struct StreamAsset {
15    reciever: Receiver<WebSocketMessage>,
16    asset: String,
17    chunk_size: usize,
18}
19
20impl StreamAsset {
21    pub fn new(reciever: Receiver<WebSocketMessage>, asset: String) -> Self {
22        Self {
23            reciever,
24            asset,
25            chunk_size: 1,
26        }
27    }
28
29    pub fn new_chuncked(
30        reciever: Receiver<WebSocketMessage>,
31        asset: String,
32        chunk_size: usize,
33    ) -> Self {
34        Self {
35            reciever,
36            asset,
37            chunk_size,
38        }
39    }
40
41    pub async fn recieve(&self) -> PocketResult<DataCandle> {
42        while let Ok(candle) = self.reciever.recv().await {
43            debug!(target: "StreamAsset", "Recieved UpdateStream!");
44            if let WebSocketMessage::UpdateStream(candle) = candle {
45                if let Some(candle) = candle.0.first().take_if(|x| x.active == self.asset) {
46                    return Ok(candle.into());
47                }
48            }
49        }
50
51        unreachable!(
52            "This should never happen, please contact Rick-29 at https://github.com/Rick-29"
53        )
54    }
55
56    pub async fn recieve_chunked(&self) -> PocketResult<DataCandle> {
57        let mut chunk = vec![];
58        while let Ok(candle) = self.reciever.recv().await {
59            debug!(target: "StreamAsset", "Recieved UpdateStream!");
60            if let WebSocketMessage::UpdateStream(candle) = candle {
61                if let Some(candle) = candle.0.first().take_if(|x| x.active == self.asset) {
62                    chunk.push(candle.into());
63                    if chunk.len() >= self.chunk_size {
64                        return chunk.try_into();
65                    }
66                }
67            }
68        }
69
70        unreachable!(
71            "This should never happen, please contact Rick-29 at https://github.com/Rick-29"
72        )
73    }
74
75    pub fn to_stream(&self) -> impl Stream<Item = PocketResult<DataCandle>> + '_ {
76        Box::pin(unfold(self, |state| async move {
77            let item = state.recieve().await;
78            Some((item, state))
79        }))
80    }
81
82    pub fn to_stream_chuncked(&self) -> impl Stream<Item = PocketResult<DataCandle>> + '_ {
83        Box::pin(unfold(self, |state| async move {
84            let item = state.recieve_chunked().await;
85            Some((item, state))
86        }))
87    }
88
89    pub fn to_stream_static(
90        self: Arc<Self>,
91    ) -> impl Stream<Item = PocketResult<DataCandle>> + 'static {
92        Box::pin(unfold(self, |state| async move {
93            let item = state.recieve().await;
94            Some((item, state))
95        }))
96    }
97
98    pub fn to_stream_chuncked_static(
99        self: Arc<Self>,
100    ) -> impl Stream<Item = PocketResult<DataCandle>> + 'static {
101        Box::pin(unfold(self, |state| async move {
102            let item = state.recieve_chunked().await;
103            Some((item, state))
104        }))
105    }
106}
107
108
109// impl Stream for StreamAsset {
110//     type Item = Candle;
111
112//     fn poll_next(self: std::pin::Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
113//         match self.reciever.recv()
114
115//         }
116//     }
117// }