Skip to main content

binary_options_tools/pocketoption/modules/
server_time.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use binary_options_tools_core_pre::{
5    error::{CoreError, CoreResult},
6    reimports::{AsyncReceiver, AsyncSender, Message},
7    traits::{LightweightModule, Rule, RunnerCommand},
8};
9use tracing::debug;
10
11use crate::pocketoption::{
12    state::State,
13    types::{StreamData, TwoStepRule},
14};
15
16pub struct ServerTimeModule {
17    receiver: AsyncReceiver<Arc<Message>>,
18    state: Arc<State>,
19}
20
21#[async_trait]
22impl LightweightModule<State> for ServerTimeModule {
23    fn new(
24        state: Arc<State>,
25        _: AsyncSender<Message>,
26        ws_receiver: AsyncReceiver<Arc<Message>>,
27        _: AsyncSender<RunnerCommand>,
28    ) -> Self
29    where
30        Self: Sized,
31    {
32        Self {
33            receiver: ws_receiver,
34            state,
35        }
36    }
37
38    /// The module's asynchronous run loop.
39    async fn run(&mut self) -> CoreResult<()> {
40        while let Ok(msg) = self.receiver.recv().await {
41            match msg.as_ref() {
42                Message::Binary(data) => {
43                    if let Ok(candle) = serde_json::from_slice::<StreamData>(data) {
44                        debug!("Received candle data (binary): {:?}", candle);
45                        self.state.update_server_time(candle.timestamp).await;
46                    }
47                }
48                Message::Text(text) => {
49                    if let Ok(candle) = serde_json::from_str::<StreamData>(text) {
50                        debug!("Received candle data (text): {:?}", candle);
51                        self.state.update_server_time(candle.timestamp).await;
52                    }
53                }
54                _ => {}
55            }
56        }
57        Err(CoreError::LightweightModuleLoop(
58            "ServerTimeModule".to_string(),
59        ))
60    }
61
62    /// Route only messages for which this returns true.
63    fn rule() -> Box<dyn Rule + Send + Sync> {
64        Box::new(TwoStepRule::new(r#"451-["updateStream","#))
65    }
66}