1use core::fmt;
2use std::vec;
3
4use serde::Deserialize;
5use serde_json::{from_str, Value};
6use tracing::warn;
7
8use binary_options_tools_core::{
9 general::traits::MessageTransfer,
10 reimports::{Bytes, Message},
11};
12
13use crate::pocketoption::{
14 error::{PocketOptionError, PocketResult},
15 types::{
16 base::{ChangeSymbol, SubscribeSymbol},
17 info::MessageInfo,
18 order::{
19 Deal, FailOpenOrder, FailOpenPendingOrder, OpenOrder, OpenPendingOrder,
20 PocketMessageFail, SuccessCloseOrder, SuccessOpenPendingOrder, UpdateClosedDeals,
21 UpdateOpenedDeals,
22 },
23 success::SuccessAuth,
24 update::{
25 LoadHistoryPeriodResult, UpdateAssets, UpdateBalance, UpdateHistoryNew, UpdateStream,
26 },
27 },
28 ws::ssid::Ssid,
29};
30
31use super::basic::LoadHistoryPeriod;
32
33#[derive(Debug, Deserialize, Clone)]
34#[serde(untagged)]
35pub enum WebSocketMessage {
36 OpenOrder(OpenOrder),
37 ChangeSymbol(ChangeSymbol),
38 Auth(Ssid),
39 GetCandles(LoadHistoryPeriod),
40
41 LoadHistoryPeriod(LoadHistoryPeriodResult),
42 UpdateStream(UpdateStream),
43 UpdateHistoryNew(UpdateHistoryNew),
44 SubscribeSymbol(SubscribeSymbol),
45 UpdateAssets(UpdateAssets),
46 UpdateBalance(UpdateBalance),
47 SuccessAuth(SuccessAuth),
48 UpdateClosedDeals(UpdateClosedDeals),
49 SuccesscloseOrder(SuccessCloseOrder),
50 SuccessopenOrder(Deal),
51 SuccessupdateBalance(UpdateBalance),
52 UpdateOpenedDeals(UpdateOpenedDeals),
53 FailOpenOrder(FailOpenOrder),
54 FailOpenPendingOrder(FailOpenPendingOrder),
55 SuccessupdatePending(Value),
56 OpenPendingOrder(OpenPendingOrder),
57 SuccessOpenPendingOrder(SuccessOpenPendingOrder),
58
59 Raw(String),
60 None,
61}
62
63impl WebSocketMessage {
64 pub fn parse(data: impl ToString) -> PocketResult<Self> {
65 let data = data.to_string();
66 let message: Result<Self, serde_json::Error> = from_str(&data);
67 match message {
68 Ok(message) => Ok(message),
69 Err(e) => {
70 if let Ok(assets) = from_str::<UpdateAssets>(&data) {
71 return Ok(Self::UpdateAssets(assets));
72 }
73 if let Ok(history) = from_str::<UpdateHistoryNew>(&data) {
74 return Ok(Self::UpdateHistoryNew(history));
75 }
76 if let Ok(stream) = from_str::<UpdateStream>(&data) {
77 return Ok(Self::UpdateStream(stream));
78 }
79 if let Ok(balance) = from_str::<UpdateBalance>(&data) {
80 return Ok(Self::UpdateBalance(balance));
81 }
82 Err(e.into())
83 }
84 }
85 }
86
87 pub fn parse_with_context(data: impl ToString, previous: &MessageInfo) -> PocketResult<Self> {
88 let data = data.to_string();
89 match previous {
90 MessageInfo::OpenOrder => {
91 if let Ok(order) = from_str::<OpenOrder>(&data) {
92 return Ok(Self::OpenOrder(order));
93 }
94 }
95 MessageInfo::UpdateStream => {
96 if let Ok(stream) = from_str::<UpdateStream>(&data) {
97 return Ok(Self::UpdateStream(stream));
98 }
99 }
100 MessageInfo::UpdateHistoryNew => {
101 if let Ok(history) = from_str::<UpdateHistoryNew>(&data) {
102 return Ok(Self::UpdateHistoryNew(history));
103 }
104 }
105 MessageInfo::UpdateAssets => {
106 if let Ok(assets) = from_str::<UpdateAssets>(&data) {
107 return Ok(Self::UpdateAssets(assets));
108 }
109 }
110 MessageInfo::UpdateBalance => {
111 if let Ok(balance) = from_str::<UpdateBalance>(&data) {
112 return Ok(Self::UpdateBalance(balance));
113 }
114 }
115 MessageInfo::SuccesscloseOrder => {
116 if let Ok(order) = from_str::<SuccessCloseOrder>(&data) {
117 return Ok(Self::SuccesscloseOrder(order));
118 }
119 }
120 MessageInfo::Auth => {
121 if let Ok(auth) = from_str::<Ssid>(&data) {
122 return Ok(Self::Auth(auth));
123 }
124 }
125 MessageInfo::ChangeSymbol => {
126 if let Ok(symbol) = from_str::<ChangeSymbol>(&data) {
127 return Ok(Self::ChangeSymbol(symbol));
128 }
129 }
130 MessageInfo::SuccessupdateBalance => {
131 if let Ok(balance) = from_str::<UpdateBalance>(&data) {
132 return Ok(Self::SuccessupdateBalance(balance));
133 }
134 }
135 MessageInfo::SuccessupdatePending => {
136 if let Ok(pending) = from_str::<Value>(&data) {
137 return Ok(Self::SuccessupdatePending(pending));
138 }
139 }
140 MessageInfo::SubscribeSymbol => {
141 if let Ok(symbol) = from_str::<SubscribeSymbol>(&data) {
142 return Ok(Self::SubscribeSymbol(symbol));
143 }
144 }
145 MessageInfo::Successauth => {
146 if let Ok(auth) = from_str::<SuccessAuth>(&data) {
147 return Ok(Self::SuccessAuth(auth));
148 }
149 }
150 MessageInfo::UpdateOpenedDeals => {
151 if let Ok(deals) = from_str::<UpdateOpenedDeals>(&data) {
152 return Ok(Self::UpdateOpenedDeals(deals));
153 }
154 }
155 MessageInfo::UpdateClosedDeals => {
156 if let Ok(deals) = from_str::<UpdateClosedDeals>(&data) {
157 return Ok(Self::UpdateClosedDeals(deals));
158 }
159 }
160 MessageInfo::SuccessopenOrder => {
161 if let Ok(order) = from_str::<Deal>(&data) {
162 return Ok(Self::SuccessopenOrder(order));
163 }
164 }
165 MessageInfo::LoadHistoryPeriod => {
166 if let Ok(history) = from_str::<LoadHistoryPeriodResult>(&data) {
167 return Ok(Self::LoadHistoryPeriod(history));
168 }
169 }
170 MessageInfo::UpdateCharts => {
171 return Err(PocketOptionError::GeneralParsingError(
172 "This is expected, there is no parser for the 'updateCharts' message"
173 .to_string(),
174 ));
175 }
177 MessageInfo::GetCandles => {
178 if let Ok(candles) = from_str::<LoadHistoryPeriod>(&data) {
179 return Ok(Self::GetCandles(candles));
180 }
181 }
182 MessageInfo::FailopenOrder => {
183 if let Ok(fail) = from_str::<FailOpenOrder>(&data) {
184 return Ok(Self::FailOpenOrder(fail));
185 }
186 }
187 MessageInfo::FailopenPendingOrder => {
188 if let Ok(fail) = from_str::<FailOpenPendingOrder>(&data) {
189 return Ok(Self::FailOpenPendingOrder(fail));
190 }
191 }
192 MessageInfo::OpenPendingOrder => {
193 if let Ok(order) = from_str::<OpenPendingOrder>(&data) {
194 return Ok(Self::OpenPendingOrder(order));
195 }
196 }
197 MessageInfo::SuccessopenPendingOrder => {
198 if let Ok(order) = from_str::<SuccessOpenPendingOrder>(&data) {
199 return Ok(Self::SuccessOpenPendingOrder(order));
200 }
201 }
202 MessageInfo::None => return WebSocketMessage::parse(data.clone()),
203 }
204 warn!("Failed to parse message of type '{previous}':\n {data}");
205 Err(PocketOptionError::GeneralParsingError(format!(
206 "Error parsing message for message type '{}'",
207 previous
208 )))
209 }
210
211 pub fn information(&self) -> MessageInfo {
212 match self {
213 Self::UpdateStream(_) => MessageInfo::UpdateStream,
214 Self::UpdateHistoryNew(_) => MessageInfo::UpdateHistoryNew,
215 Self::UpdateAssets(_) => MessageInfo::UpdateAssets,
216 Self::UpdateBalance(_) => MessageInfo::UpdateBalance,
217 Self::OpenOrder(_) => MessageInfo::OpenOrder,
218 Self::SuccessAuth(_) => MessageInfo::Successauth,
219 Self::UpdateClosedDeals(_) => MessageInfo::UpdateClosedDeals,
220 Self::SuccesscloseOrder(_) => MessageInfo::SuccesscloseOrder,
221 Self::SuccessopenOrder(_) => MessageInfo::SuccessopenOrder,
222 Self::ChangeSymbol(_) => MessageInfo::ChangeSymbol,
223 Self::Auth(_) => MessageInfo::Auth,
224 Self::SuccessupdateBalance(_) => MessageInfo::SuccessupdateBalance,
225 Self::UpdateOpenedDeals(_) => MessageInfo::UpdateOpenedDeals,
226 Self::SubscribeSymbol(_) => MessageInfo::SubscribeSymbol,
227 Self::LoadHistoryPeriod(_) => MessageInfo::LoadHistoryPeriod,
228 Self::GetCandles(_) => MessageInfo::GetCandles,
229 Self::FailOpenOrder(_) => MessageInfo::FailopenOrder,
230 Self::SuccessupdatePending(_) => MessageInfo::SuccessupdatePending,
231 Self::FailOpenPendingOrder(_) => MessageInfo::FailopenPendingOrder,
232 Self::SuccessOpenPendingOrder(_) => MessageInfo::SuccessopenPendingOrder,
233 Self::OpenPendingOrder(_) => MessageInfo::OpenPendingOrder,
234 Self::Raw(_) => MessageInfo::None,
235 Self::None => MessageInfo::None,
236 }
237 }
238}
239
240impl fmt::Display for WebSocketMessage {
241 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
242 match self {
243 WebSocketMessage::ChangeSymbol(change_symbol) => {
244 write!(
245 f,
246 "42[{},{}]",
247 serde_json::to_string(&MessageInfo::ChangeSymbol).map_err(|_| fmt::Error)?,
248 serde_json::to_string(&change_symbol).map_err(|_| fmt::Error)?
249 )
250 }
251 WebSocketMessage::Auth(auth) => auth.fmt(f),
252 WebSocketMessage::GetCandles(candles) => {
253 write!(
254 f,
255 "42[{},{}]",
256 serde_json::to_string(&MessageInfo::LoadHistoryPeriod)
257 .map_err(|_| fmt::Error)?,
258 serde_json::to_string(candles).map_err(|_| fmt::Error)?
259 )
260 }
261 WebSocketMessage::OpenOrder(open_order) => {
262 write!(
263 f,
264 "42[{},{}]",
265 serde_json::to_string(&MessageInfo::OpenOrder).map_err(|_| fmt::Error)?,
266 serde_json::to_string(open_order).map_err(|_| fmt::Error)?
267 )
268 }
269 WebSocketMessage::SubscribeSymbol(subscribe_symbol) => {
270 write!(f, "{:?}", subscribe_symbol)
271 }
272 WebSocketMessage::Raw(text) => text.fmt(f),
273
274 WebSocketMessage::UpdateStream(update_stream) => write!(f, "{:?}", update_stream),
275 WebSocketMessage::UpdateHistoryNew(update_history_new) => {
276 write!(f, "{:?}", update_history_new)
277 }
278 WebSocketMessage::UpdateAssets(update_assets) => write!(f, "{:?}", update_assets),
279 WebSocketMessage::UpdateBalance(update_balance) => write!(f, "{:?}", update_balance),
280 WebSocketMessage::SuccessAuth(success_auth) => write!(f, "{:?}", success_auth),
281 WebSocketMessage::UpdateClosedDeals(update_closed_deals) => {
282 write!(f, "{:?}", update_closed_deals)
283 }
284 WebSocketMessage::SuccesscloseOrder(success_close_order) => {
285 write!(f, "{:?}", success_close_order)
286 }
287 WebSocketMessage::SuccessopenOrder(success_open_order) => {
288 write!(f, "{:?}", success_open_order)
289 }
290 WebSocketMessage::SuccessupdateBalance(update_balance) => {
291 write!(f, "{:?}", update_balance)
292 }
293 WebSocketMessage::UpdateOpenedDeals(update_opened_deals) => {
294 write!(f, "{:?}", update_opened_deals)
295 }
296 WebSocketMessage::SuccessOpenPendingOrder(order) => write!(f, "{:?}", order),
297 WebSocketMessage::FailOpenPendingOrder(order) => write!(f, "{:?}", order),
298 WebSocketMessage::OpenPendingOrder(order) => write!(f, "{:?}", order),
299
300 WebSocketMessage::None => write!(f, "None"),
301 WebSocketMessage::LoadHistoryPeriod(period) => {
303 write!(
304 f,
305 "42[{}, {}]",
306 serde_json::to_string(&MessageInfo::LoadHistoryPeriod)
307 .map_err(|_| fmt::Error)?,
308 serde_json::to_string(&period).map_err(|_| fmt::Error)?
309 )
310 }
311 WebSocketMessage::FailOpenOrder(order) => order.fmt(f),
312 WebSocketMessage::SuccessupdatePending(pending) => pending.fmt(f),
313 }
314 }
315}
316
317impl From<WebSocketMessage> for Message {
318 fn from(value: WebSocketMessage) -> Self {
319 Box::new(value).into()
320 }
321}
322
323impl From<Box<WebSocketMessage>> for Message {
324 fn from(value: Box<WebSocketMessage>) -> Self {
325 if value.info() == MessageInfo::None {
326 return Message::Ping(Bytes::new());
327 }
328 Message::text(value.to_string())
329 }
330}
331
332impl MessageTransfer for WebSocketMessage {
333 type Error = PocketMessageFail;
334
335 type TransferError = PocketMessageFail;
336
337 type Info = MessageInfo;
338
339 fn info(&self) -> Self::Info {
340 self.information()
341 }
342
343 fn error(&self) -> Option<Self::Error> {
344 if let Self::FailOpenOrder(fail) = self {
345 return Some(PocketMessageFail::Order(fail.to_owned()));
346 }
347 None
348 }
349
350 fn to_error(&self) -> Self::TransferError {
351 if let Self::FailOpenOrder(fail) = self {
352 PocketMessageFail::Order(fail.to_owned())
353 } else {
354 PocketMessageFail::Order(FailOpenOrder::new(
355 "This is unexpected and should never happend",
356 1.0,
357 "None",
358 ))
359 }
360 }
361
362 fn error_info(&self) -> Option<Vec<Self::Info>> {
363 if let Self::FailOpenOrder(_) = self {
364 Some(vec![MessageInfo::SuccessopenOrder])
365 } else {
366 None
367 }
368 }
369}
370
371#[cfg(test)]
372mod tests {
373 use super::*;
374
375 use std::{
376 error::Error,
377 fs::File,
378 io::{BufReader, Read, Write},
379 };
380
381 use std::fs;
382 use std::path::Path;
383
384 fn get_files_in_directory(path: &str) -> Result<Vec<String>, std::io::Error> {
385 let dir_path = Path::new(path);
386
387 match fs::read_dir(dir_path) {
388 Ok(entries) => {
389 let mut file_names = Vec::new();
390
391 for entry in entries {
392 let file_name = entry?.file_name().to_string_lossy().to_string();
393 file_names.push(format!("{path}/{file_name}"));
394 }
395
396 Ok(file_names)
397 }
398 Err(e) => Err(e),
399 }
400 }
401
402 #[test]
403 fn test_descerialize_message() -> Result<(), Box<dyn Error>> {
404 let tests = [
405 r#"[["AUS200_otc",1732830010,6436.06]]"#,
406 r#"[["AUS200_otc",1732830108.205,6435.96]]"#,
407 r#"[["AEDCNY_otc",1732829668.352,1.89817]]"#,
408 r#"[["CADJPY_otc",1732830170.793,109.442]]"#,
409 ];
410 for item in tests.iter() {
411 let val = WebSocketMessage::parse(item)?;
412 dbg!(&val);
413 }
414 let mut history_raw = File::open("tests/update_history_new.txt")?;
415 let mut content = String::new();
416 history_raw.read_to_string(&mut content)?;
417 let history_new: WebSocketMessage = from_str(&content)?;
418 dbg!(&history_new);
419
420 let mut assets_raw = File::open("tests/data.json")?;
421 let mut content = String::new();
422 assets_raw.read_to_string(&mut content)?;
423 let assets_raw: WebSocketMessage = from_str(&content)?;
424 dbg!(&assets_raw);
425 Ok(())
426 }
427
428 #[test]
429 fn deep_test_descerialize_message() -> anyhow::Result<()> {
430 let dirs = get_files_in_directory("tests")?;
431 for dir in dirs {
432 dbg!(&dir);
433 if !dir.ends_with(".json") {
434 continue;
435 }
436 let file = File::open(dir)?;
437
438 let reader = BufReader::new(file);
439 let _: WebSocketMessage = serde_json::from_reader(reader)?;
440 }
441
442 Ok(())
443 }
444
445 #[test]
446 fn test_write_assets() -> anyhow::Result<()> {
447 let raw: UpdateAssets = serde_json::from_str(include_str!("../../../tests/data.json"))?;
448 let mut file = File::create("tests/assets.txt")?;
449 let data = raw.0.iter().fold(String::new(), |mut s, a| {
450 s.push_str(&format!("{}\n", a.symbol));
451 s
452 });
453 file.write_all(data.as_bytes())?;
454 Ok(())
455 }
456}