binary_option_tools/pocketoption/parser/
message.rs

1use core::fmt;
2use std::vec;
3
4use serde::Deserialize;
5use serde_json::{from_str, Value};
6use tracing::warn;
7
8use binary_options_tools_core::{
9    general::traits::MessageTransfer,
10    reimports::{Bytes, Message},
11};
12
13use crate::pocketoption::{
14    error::{PocketOptionError, PocketResult},
15    types::{
16        base::{ChangeSymbol, SubscribeSymbol},
17        info::MessageInfo,
18        order::{
19            Deal, FailOpenOrder, FailOpenPendingOrder, OpenOrder, OpenPendingOrder,
20            PocketMessageFail, SuccessCloseOrder, SuccessOpenPendingOrder, UpdateClosedDeals,
21            UpdateOpenedDeals,
22        },
23        success::SuccessAuth,
24        update::{
25            LoadHistoryPeriodResult, UpdateAssets, UpdateBalance, UpdateHistoryNew, UpdateStream,
26        },
27    },
28    ws::ssid::Ssid,
29};
30
31use super::basic::LoadHistoryPeriod;
32
33#[derive(Debug, Deserialize, Clone)]
34#[serde(untagged)]
35pub enum WebSocketMessage {
36    OpenOrder(OpenOrder),
37    ChangeSymbol(ChangeSymbol),
38    Auth(Ssid),
39    GetCandles(LoadHistoryPeriod),
40
41    LoadHistoryPeriod(LoadHistoryPeriodResult),
42    UpdateStream(UpdateStream),
43    UpdateHistoryNew(UpdateHistoryNew),
44    SubscribeSymbol(SubscribeSymbol),
45    UpdateAssets(UpdateAssets),
46    UpdateBalance(UpdateBalance),
47    SuccessAuth(SuccessAuth),
48    UpdateClosedDeals(UpdateClosedDeals),
49    SuccesscloseOrder(SuccessCloseOrder),
50    SuccessopenOrder(Deal),
51    SuccessupdateBalance(UpdateBalance),
52    UpdateOpenedDeals(UpdateOpenedDeals),
53    FailOpenOrder(FailOpenOrder),
54    FailOpenPendingOrder(FailOpenPendingOrder),
55    SuccessupdatePending(Value),
56    OpenPendingOrder(OpenPendingOrder),
57    SuccessOpenPendingOrder(SuccessOpenPendingOrder),
58
59    Raw(String),
60    None,
61}
62
63impl WebSocketMessage {
64    pub fn parse(data: impl ToString) -> PocketResult<Self> {
65        let data = data.to_string();
66        let message: Result<Self, serde_json::Error> = from_str(&data);
67        match message {
68            Ok(message) => Ok(message),
69            Err(e) => {
70                if let Ok(assets) = from_str::<UpdateAssets>(&data) {
71                    return Ok(Self::UpdateAssets(assets));
72                }
73                if let Ok(history) = from_str::<UpdateHistoryNew>(&data) {
74                    return Ok(Self::UpdateHistoryNew(history));
75                }
76                if let Ok(stream) = from_str::<UpdateStream>(&data) {
77                    return Ok(Self::UpdateStream(stream));
78                }
79                if let Ok(balance) = from_str::<UpdateBalance>(&data) {
80                    return Ok(Self::UpdateBalance(balance));
81                }
82                Err(e.into())
83            }
84        }
85    }
86
87    pub fn parse_with_context(data: impl ToString, previous: &MessageInfo) -> PocketResult<Self> {
88        let data = data.to_string();
89        match previous {
90            MessageInfo::OpenOrder => {
91                if let Ok(order) = from_str::<OpenOrder>(&data) {
92                    return Ok(Self::OpenOrder(order));
93                }
94            }
95            MessageInfo::UpdateStream => {
96                if let Ok(stream) = from_str::<UpdateStream>(&data) {
97                    return Ok(Self::UpdateStream(stream));
98                }
99            }
100            MessageInfo::UpdateHistoryNew => {
101                if let Ok(history) = from_str::<UpdateHistoryNew>(&data) {
102                    return Ok(Self::UpdateHistoryNew(history));
103                }
104            }
105            MessageInfo::UpdateAssets => {
106                if let Ok(assets) = from_str::<UpdateAssets>(&data) {
107                    return Ok(Self::UpdateAssets(assets));
108                }
109            }
110            MessageInfo::UpdateBalance => {
111                if let Ok(balance) = from_str::<UpdateBalance>(&data) {
112                    return Ok(Self::UpdateBalance(balance));
113                }
114            }
115            MessageInfo::SuccesscloseOrder => {
116                if let Ok(order) = from_str::<SuccessCloseOrder>(&data) {
117                    return Ok(Self::SuccesscloseOrder(order));
118                }
119            }
120            MessageInfo::Auth => {
121                if let Ok(auth) = from_str::<Ssid>(&data) {
122                    return Ok(Self::Auth(auth));
123                }
124            }
125            MessageInfo::ChangeSymbol => {
126                if let Ok(symbol) = from_str::<ChangeSymbol>(&data) {
127                    return Ok(Self::ChangeSymbol(symbol));
128                }
129            }
130            MessageInfo::SuccessupdateBalance => {
131                if let Ok(balance) = from_str::<UpdateBalance>(&data) {
132                    return Ok(Self::SuccessupdateBalance(balance));
133                }
134            }
135            MessageInfo::SuccessupdatePending => {
136                if let Ok(pending) = from_str::<Value>(&data) {
137                    return Ok(Self::SuccessupdatePending(pending));
138                }
139            }
140            MessageInfo::SubscribeSymbol => {
141                if let Ok(symbol) = from_str::<SubscribeSymbol>(&data) {
142                    return Ok(Self::SubscribeSymbol(symbol));
143                }
144            }
145            MessageInfo::Successauth => {
146                if let Ok(auth) = from_str::<SuccessAuth>(&data) {
147                    return Ok(Self::SuccessAuth(auth));
148                }
149            }
150            MessageInfo::UpdateOpenedDeals => {
151                if let Ok(deals) = from_str::<UpdateOpenedDeals>(&data) {
152                    return Ok(Self::UpdateOpenedDeals(deals));
153                }
154            }
155            MessageInfo::UpdateClosedDeals => {
156                if let Ok(deals) = from_str::<UpdateClosedDeals>(&data) {
157                    return Ok(Self::UpdateClosedDeals(deals));
158                }
159            }
160            MessageInfo::SuccessopenOrder => {
161                if let Ok(order) = from_str::<Deal>(&data) {
162                    return Ok(Self::SuccessopenOrder(order));
163                }
164            }
165            MessageInfo::LoadHistoryPeriod => {
166                if let Ok(history) = from_str::<LoadHistoryPeriodResult>(&data) {
167                    return Ok(Self::LoadHistoryPeriod(history));
168                }
169            }
170            MessageInfo::UpdateCharts => {
171                return Err(PocketOptionError::GeneralParsingError(
172                    "This is expected, there is no parser for the 'updateCharts' message"
173                        .to_string(),
174                ));
175                // TODO: Add this
176            }
177            MessageInfo::GetCandles => {
178                if let Ok(candles) = from_str::<LoadHistoryPeriod>(&data) {
179                    return Ok(Self::GetCandles(candles));
180                }
181            }
182            MessageInfo::FailopenOrder => {
183                if let Ok(fail) = from_str::<FailOpenOrder>(&data) {
184                    return Ok(Self::FailOpenOrder(fail));
185                }
186            }
187            MessageInfo::FailopenPendingOrder => {
188                if let Ok(fail) = from_str::<FailOpenPendingOrder>(&data) {
189                    return Ok(Self::FailOpenPendingOrder(fail));
190                }
191            }
192            MessageInfo::OpenPendingOrder => {
193                if let Ok(order) = from_str::<OpenPendingOrder>(&data) {
194                    return Ok(Self::OpenPendingOrder(order));
195                }
196            }
197            MessageInfo::SuccessopenPendingOrder => {
198                if let Ok(order) = from_str::<SuccessOpenPendingOrder>(&data) {
199                    return Ok(Self::SuccessOpenPendingOrder(order));
200                }
201            }
202            MessageInfo::None => return WebSocketMessage::parse(data.clone()),
203        }
204        warn!("Failed to parse message of type '{previous}':\n {data}");
205        Err(PocketOptionError::GeneralParsingError(format!(
206            "Error parsing message for message type '{}'",
207            previous
208        )))
209    }
210
211    pub fn information(&self) -> MessageInfo {
212        match self {
213            Self::UpdateStream(_) => MessageInfo::UpdateStream,
214            Self::UpdateHistoryNew(_) => MessageInfo::UpdateHistoryNew,
215            Self::UpdateAssets(_) => MessageInfo::UpdateAssets,
216            Self::UpdateBalance(_) => MessageInfo::UpdateBalance,
217            Self::OpenOrder(_) => MessageInfo::OpenOrder,
218            Self::SuccessAuth(_) => MessageInfo::Successauth,
219            Self::UpdateClosedDeals(_) => MessageInfo::UpdateClosedDeals,
220            Self::SuccesscloseOrder(_) => MessageInfo::SuccesscloseOrder,
221            Self::SuccessopenOrder(_) => MessageInfo::SuccessopenOrder,
222            Self::ChangeSymbol(_) => MessageInfo::ChangeSymbol,
223            Self::Auth(_) => MessageInfo::Auth,
224            Self::SuccessupdateBalance(_) => MessageInfo::SuccessupdateBalance,
225            Self::UpdateOpenedDeals(_) => MessageInfo::UpdateOpenedDeals,
226            Self::SubscribeSymbol(_) => MessageInfo::SubscribeSymbol,
227            Self::LoadHistoryPeriod(_) => MessageInfo::LoadHistoryPeriod,
228            Self::GetCandles(_) => MessageInfo::GetCandles,
229            Self::FailOpenOrder(_) => MessageInfo::FailopenOrder,
230            Self::SuccessupdatePending(_) => MessageInfo::SuccessupdatePending,
231            Self::FailOpenPendingOrder(_) => MessageInfo::FailopenPendingOrder,
232            Self::SuccessOpenPendingOrder(_) => MessageInfo::SuccessopenPendingOrder,
233            Self::OpenPendingOrder(_) => MessageInfo::OpenPendingOrder,
234            Self::Raw(_) => MessageInfo::None,
235            Self::None => MessageInfo::None,
236        }
237    }
238}
239
240impl fmt::Display for WebSocketMessage {
241    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242        match self {
243            WebSocketMessage::ChangeSymbol(change_symbol) => {
244                write!(
245                    f,
246                    "42[{},{}]",
247                    serde_json::to_string(&MessageInfo::ChangeSymbol).map_err(|_| fmt::Error)?,
248                    serde_json::to_string(&change_symbol).map_err(|_| fmt::Error)?
249                )
250            }
251            WebSocketMessage::Auth(auth) => auth.fmt(f),
252            WebSocketMessage::GetCandles(candles) => {
253                write!(
254                    f,
255                    "42[{},{}]",
256                    serde_json::to_string(&MessageInfo::LoadHistoryPeriod)
257                        .map_err(|_| fmt::Error)?,
258                    serde_json::to_string(candles).map_err(|_| fmt::Error)?
259                )
260            }
261            WebSocketMessage::OpenOrder(open_order) => {
262                write!(
263                    f,
264                    "42[{},{}]",
265                    serde_json::to_string(&MessageInfo::OpenOrder).map_err(|_| fmt::Error)?,
266                    serde_json::to_string(open_order).map_err(|_| fmt::Error)?
267                )
268            }
269            WebSocketMessage::SubscribeSymbol(subscribe_symbol) => {
270                write!(f, "{:?}", subscribe_symbol)
271            }
272            WebSocketMessage::Raw(text) => text.fmt(f),
273
274            WebSocketMessage::UpdateStream(update_stream) => write!(f, "{:?}", update_stream),
275            WebSocketMessage::UpdateHistoryNew(update_history_new) => {
276                write!(f, "{:?}", update_history_new)
277            }
278            WebSocketMessage::UpdateAssets(update_assets) => write!(f, "{:?}", update_assets),
279            WebSocketMessage::UpdateBalance(update_balance) => write!(f, "{:?}", update_balance),
280            WebSocketMessage::SuccessAuth(success_auth) => write!(f, "{:?}", success_auth),
281            WebSocketMessage::UpdateClosedDeals(update_closed_deals) => {
282                write!(f, "{:?}", update_closed_deals)
283            }
284            WebSocketMessage::SuccesscloseOrder(success_close_order) => {
285                write!(f, "{:?}", success_close_order)
286            }
287            WebSocketMessage::SuccessopenOrder(success_open_order) => {
288                write!(f, "{:?}", success_open_order)
289            }
290            WebSocketMessage::SuccessupdateBalance(update_balance) => {
291                write!(f, "{:?}", update_balance)
292            }
293            WebSocketMessage::UpdateOpenedDeals(update_opened_deals) => {
294                write!(f, "{:?}", update_opened_deals)
295            }
296            WebSocketMessage::SuccessOpenPendingOrder(order) => write!(f, "{:?}", order),
297            WebSocketMessage::FailOpenPendingOrder(order) => write!(f, "{:?}", order),
298            WebSocketMessage::OpenPendingOrder(order) => write!(f, "{:?}", order),
299
300            WebSocketMessage::None => write!(f, "None"),
301            // 42["loadHistoryPeriod",{"asset":"#AXP_otc","index":173384282247,"time":1733482800,"offset":540000,"period":3600}]
302            WebSocketMessage::LoadHistoryPeriod(period) => {
303                write!(
304                    f,
305                    "42[{}, {}]",
306                    serde_json::to_string(&MessageInfo::LoadHistoryPeriod)
307                        .map_err(|_| fmt::Error)?,
308                    serde_json::to_string(&period).map_err(|_| fmt::Error)?
309                )
310            }
311            WebSocketMessage::FailOpenOrder(order) => order.fmt(f),
312            WebSocketMessage::SuccessupdatePending(pending) => pending.fmt(f),
313        }
314    }
315}
316
317impl From<WebSocketMessage> for Message {
318    fn from(value: WebSocketMessage) -> Self {
319        Box::new(value).into()
320    }
321}
322
323impl From<Box<WebSocketMessage>> for Message {
324    fn from(value: Box<WebSocketMessage>) -> Self {
325        if value.info() == MessageInfo::None {
326            return Message::Ping(Bytes::new());
327        }
328        Message::text(value.to_string())
329    }
330}
331
332impl MessageTransfer for WebSocketMessage {
333    type Error = PocketMessageFail;
334
335    type TransferError = PocketMessageFail;
336
337    type Info = MessageInfo;
338
339    fn info(&self) -> Self::Info {
340        self.information()
341    }
342
343    fn error(&self) -> Option<Self::Error> {
344        if let Self::FailOpenOrder(fail) = self {
345            return Some(PocketMessageFail::Order(fail.to_owned()));
346        }
347        None
348    }
349
350    fn to_error(&self) -> Self::TransferError {
351        if let Self::FailOpenOrder(fail) = self {
352            PocketMessageFail::Order(fail.to_owned())
353        } else {
354            PocketMessageFail::Order(FailOpenOrder::new(
355                "This is unexpected and should never happend",
356                1.0,
357                "None",
358            ))
359        }
360    }
361
362    fn error_info(&self) -> Option<Vec<Self::Info>> {
363        if let Self::FailOpenOrder(_) = self {
364            Some(vec![MessageInfo::SuccessopenOrder])
365        } else {
366            None
367        }
368    }
369}
370
371#[cfg(test)]
372mod tests {
373    use super::*;
374
375    use std::{
376        error::Error,
377        fs::File,
378        io::{BufReader, Read, Write},
379    };
380
381    use std::fs;
382    use std::path::Path;
383
384    fn get_files_in_directory(path: &str) -> Result<Vec<String>, std::io::Error> {
385        let dir_path = Path::new(path);
386
387        match fs::read_dir(dir_path) {
388            Ok(entries) => {
389                let mut file_names = Vec::new();
390
391                for entry in entries {
392                    let file_name = entry?.file_name().to_string_lossy().to_string();
393                    file_names.push(format!("{path}/{file_name}"));
394                }
395
396                Ok(file_names)
397            }
398            Err(e) => Err(e),
399        }
400    }
401
402    #[test]
403    fn test_descerialize_message() -> Result<(), Box<dyn Error>> {
404        let tests = [
405            r#"[["AUS200_otc",1732830010,6436.06]]"#,
406            r#"[["AUS200_otc",1732830108.205,6435.96]]"#,
407            r#"[["AEDCNY_otc",1732829668.352,1.89817]]"#,
408            r#"[["CADJPY_otc",1732830170.793,109.442]]"#,
409        ];
410        for item in tests.iter() {
411            let val = WebSocketMessage::parse(item)?;
412            dbg!(&val);
413        }
414        let mut history_raw = File::open("tests/update_history_new.txt")?;
415        let mut content = String::new();
416        history_raw.read_to_string(&mut content)?;
417        let history_new: WebSocketMessage = from_str(&content)?;
418        dbg!(&history_new);
419
420        let mut assets_raw = File::open("tests/data.json")?;
421        let mut content = String::new();
422        assets_raw.read_to_string(&mut content)?;
423        let assets_raw: WebSocketMessage = from_str(&content)?;
424        dbg!(&assets_raw);
425        Ok(())
426    }
427
428    #[test]
429    fn deep_test_descerialize_message() -> anyhow::Result<()> {
430        let dirs = get_files_in_directory("tests")?;
431        for dir in dirs {
432            dbg!(&dir);
433            if !dir.ends_with(".json") {
434                continue;
435            }
436            let file = File::open(dir)?;
437
438            let reader = BufReader::new(file);
439            let _: WebSocketMessage = serde_json::from_reader(reader)?;
440        }
441
442        Ok(())
443    }
444
445    #[test]
446    fn test_write_assets() -> anyhow::Result<()> {
447        let raw: UpdateAssets = serde_json::from_str(include_str!("../../../tests/data.json"))?;
448        let mut file = File::create("tests/assets.txt")?;
449        let data = raw.0.iter().fold(String::new(), |mut s, a| {
450            s.push_str(&format!("{}\n", a.symbol));
451            s
452        });
453        file.write_all(data.as_bytes())?;
454        Ok(())
455    }
456}