binary_options_tools/pocketoption/modules/
server_time.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use binary_options_tools_core_pre::{
5 error::{CoreError, CoreResult},
6 reimports::{AsyncReceiver, AsyncSender, Message},
7 traits::{LightweightModule, Rule, RunnerCommand},
8};
9use tracing::debug;
10
11use crate::pocketoption::{
12 state::State,
13 types::{StreamData, TwoStepRule},
14};
15
16pub struct ServerTimeModule {
17 receiver: AsyncReceiver<Arc<Message>>,
18 state: Arc<State>,
19}
20
21#[async_trait]
22impl LightweightModule<State> for ServerTimeModule {
23 fn new(
24 state: Arc<State>,
25 _: AsyncSender<Message>,
26 ws_receiver: AsyncReceiver<Arc<Message>>,
27 _: AsyncSender<RunnerCommand>,
28 ) -> Self
29 where
30 Self: Sized,
31 {
32 Self {
33 receiver: ws_receiver,
34 state,
35 }
36 }
37
38 async fn run(&mut self) -> CoreResult<()> {
40 while let Ok(msg) = self.receiver.recv().await {
41 match msg.as_ref() {
42 Message::Binary(data) => {
43 if let Ok(candle) = serde_json::from_slice::<StreamData>(data) {
44 debug!("Received candle data (binary): {:?}", candle);
45 self.state.update_server_time(candle.timestamp).await;
46 }
47 }
48 Message::Text(text) => {
49 if let Ok(candle) = serde_json::from_str::<StreamData>(text) {
50 debug!("Received candle data (text): {:?}", candle);
51 self.state.update_server_time(candle.timestamp).await;
52 }
53 }
54 _ => {}
55 }
56 }
57 Err(CoreError::LightweightModuleLoop(
58 "ServerTimeModule".to_string(),
59 ))
60 }
61
62 fn rule() -> Box<dyn Rule + Send + Sync> {
64 Box::new(TwoStepRule::new(r#"451-["updateStream","#))
65 }
66}