1use std::ops::Deref;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_channel::{bounded, Receiver, RecvError, Sender};
6use futures_util::future::try_join4;
7use futures_util::stream::{SplitSink, SplitStream};
8use futures_util::{SinkExt, StreamExt};
9use tokio::net::TcpStream;
10use tokio::task::JoinHandle;
11use tokio::time::sleep;
12use tokio_tungstenite::tungstenite::Message;
13use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
14use tracing::{debug, error, info, warn};
15
16use crate::contstants::{MAX_CHANNEL_CAPACITY, RECONNECT_CALLBACK};
17use crate::error::{BinaryOptionsResult, BinaryOptionsToolsError};
18use crate::general::types::MessageType;
19use crate::utils::time::timeout;
20
21use super::traits::{Callback, Connect, Credentials, DataHandler, MessageHandler, MessageTransfer};
22use super::types::Data;
23
24const MAX_ALLOWED_LOOPS: u32 = 8;
25const SLEEP_INTERVAL: u64 = 2;
26
27#[derive(Clone)]
28pub struct WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
29where
30 Transfer: MessageTransfer,
31 Handler: MessageHandler,
32 Connector: Connect,
33 Creds: Credentials,
34 T: DataHandler,
35 C: Callback,
36{
37 inner: Arc<WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>>,
38}
39
40pub struct WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>
41where
42 Transfer: MessageTransfer,
43 Handler: MessageHandler,
44 Connector: Connect,
45 Creds: Credentials,
46 T: DataHandler,
47 C: Callback,
48{
49 pub credentials: Creds,
50 pub connector: Connector,
51 pub handler: Handler,
52 pub data: Data<T, Transfer>,
53 pub sender: SenderMessage<Transfer>,
54 pub reconnect_callback: Option<C>,
55 _event_loop: JoinHandle<BinaryOptionsResult<()>>,
56}
57
58impl<Transfer, Handler, Connector, Creds, T, C> Deref
59 for WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
60where
61 Transfer: MessageTransfer,
62 Handler: MessageHandler,
63 Connector: Connect,
64 Creds: Credentials,
65 T: DataHandler,
66 C: Callback,
67{
68 type Target = WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>;
69
70 fn deref(&self) -> &Self::Target {
71 self.inner.as_ref()
72 }
73}
74
75impl<Transfer, Handler, Connector, Creds, T, C>
76 WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
77where
78 Transfer: MessageTransfer + 'static,
79 Handler: MessageHandler<Transfer = Transfer> + 'static,
80 Creds: Credentials + 'static,
81 Connector: Connect<Creds = Creds> + 'static,
82 T: DataHandler<Transfer = Transfer> + 'static,
83 C: Callback<T = T, Transfer = Transfer> + 'static,
84{
85 pub async fn init(
86 credentials: Creds,
87 connector: Connector,
88 data: Data<T, Transfer>,
89 handler: Handler,
90 timeout: Duration,
91 reconnect_callback: Option<C>,
92 ) -> BinaryOptionsResult<Self> {
93 let inner = WebSocketInnerClient::init(
94 credentials,
95 connector,
96 data,
97 handler,
98 timeout,
99 reconnect_callback,
100 )
101 .await?;
102 Ok(Self {
103 inner: Arc::new(inner),
104 })
105 }
106}
107
108impl<Transfer, Handler, Connector, Creds, T, C>
109 WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>
110where
111 Transfer: MessageTransfer + 'static,
112 Handler: MessageHandler<Transfer = Transfer> + 'static,
113 Creds: Credentials + 'static,
114 Connector: Connect<Creds = Creds> + 'static,
115 T: DataHandler<Transfer = Transfer> + 'static,
116 C: Callback<T = T, Transfer = Transfer> + 'static,
117{
118 pub async fn init(
119 credentials: Creds,
120 connector: Connector,
121 data: Data<T, Transfer>,
122 handler: Handler,
123 timeout: Duration,
124 reconnect_callback: Option<C>,
125 ) -> BinaryOptionsResult<Self> {
126 let _connection = connector.connect(credentials.clone()).await?;
127 let (_event_loop, sender) = Self::start_loops(
128 handler.clone(),
129 credentials.clone(),
130 data.clone(),
131 connector.clone(),
132 reconnect_callback.clone(),
133 )
134 .await?;
135 info!("Started WebSocketClient");
136 sleep(timeout).await;
137 Ok(Self {
138 credentials,
139 connector,
140 handler,
141 data,
142 sender,
143 reconnect_callback,
144 _event_loop,
145 })
146 }
147
148 async fn start_loops(
149 handler: Handler,
150 credentials: Creds,
151 data: Data<T, Transfer>,
152 connector: Connector,
153 reconnect_callback: Option<C>,
154 ) -> BinaryOptionsResult<(JoinHandle<BinaryOptionsResult<()>>, SenderMessage<Transfer>)> {
155 let (mut write, mut read) = connector.connect(credentials.clone()).await?.split();
156 let (sender, mut reciever) = bounded(MAX_CHANNEL_CAPACITY);
157 let (msg_sender, mut msg_reciever) = bounded(MAX_CHANNEL_CAPACITY);
158 let msg_sender = SenderMessage::new(msg_sender).clone();
159 let sender_msg = msg_sender.clone();
160 let task = tokio::task::spawn(async move {
161 let previous = None;
162 let mut loops = 0;
163 let mut reconnected = false;
164 loop {
165 let listener_future = WebSocketInnerClient::<
166 Transfer,
167 Handler,
168 Connector,
169 Creds,
170 T,
171 C,
172 >::listener_loop(
173 previous.clone(),
174 &data,
175 handler.clone(),
176 &sender,
177 &mut read,
178 );
179 let sender_future =
180 WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::sender_loop(
181 &mut write,
182 &mut reciever,
183 );
184 let update_loop =
185 WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::api_loop(
186 &mut msg_reciever,
187 &sender,
188 );
189 let callback = WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::reconnect_callback(reconnect_callback.clone(), data.clone(), sender_msg.clone(), reconnected);
190
191 match try_join4(listener_future, sender_future, update_loop, callback).await {
192 Ok(_) => {
193 if let Ok(websocket) = connector.connect(credentials.clone()).await {
194 (write, read) = websocket.split();
195 info!("Reconnected successfully!");
196 loops = 0;
197 reconnected = true;
198 } else {
199 loops += 1;
200 warn!("Error reconnecting... trying again in {SLEEP_INTERVAL} seconds (try {loops} of {MAX_ALLOWED_LOOPS}");
201 sleep(Duration::from_secs(SLEEP_INTERVAL)).await;
202 if loops >= MAX_ALLOWED_LOOPS {
203 panic!("Too many failed connections");
204 }
205 }
206 }
207 Err(e) => {
208 warn!("Error in event loop, {e}, reconnecting...");
209 println!("Reconnecting...");
210 if let Ok(websocket) = connector.connect(credentials.clone()).await {
211 (write, read) = websocket.split();
212 info!("Reconnected successfully!");
213 println!("Reconnected successfully!");
214 loops = 0;
215 reconnected = true;
216 } else {
217 loops += 1;
218 warn!("Error reconnecting... trying again in {SLEEP_INTERVAL} seconds (try {loops} of {MAX_ALLOWED_LOOPS}");
219 sleep(Duration::from_secs(SLEEP_INTERVAL)).await;
220 if loops >= MAX_ALLOWED_LOOPS {
221 error!("Too many failed connections");
222 break;
223 }
224 }
225 }
226 }
227 }
228 Ok(())
229 });
230 Ok((task, msg_sender))
231 }
232
233 async fn listener_loop(
234 mut previous: Option<<<Handler as MessageHandler>::Transfer as MessageTransfer>::Info>,
235 data: &Data<T, Transfer>,
236 handler: Handler,
237 sender: &Sender<Message>,
238 ws: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
239 ) -> BinaryOptionsResult<()> {
240 while let Some(msg) = &ws.next().await {
241 let msg = msg
242 .as_ref()
243 .inspect_err(|e| warn!("Error recieving websocket message, {e}"))
244 .map_err(|e| {
245 BinaryOptionsToolsError::WebsocketRecievingConnectionError(e.to_string())
246 })?;
247 match handler.process_message(msg, &previous, sender).await {
248 Ok((msg, close)) => {
249 if close {
250 info!("Recieved closing frame");
251 return Err(BinaryOptionsToolsError::WebsocketConnectionClosed(
252 "Recieved closing frame".into(),
253 ));
254 }
255 if let Some(msg) = msg {
256 match msg {
257 MessageType::Info(info) => {
258 debug!("Recieved info: {}", info);
259 previous = Some(info);
260 }
261 MessageType::Transfer(transfer) => {
262 debug!("Recieved data of type: {}", transfer.info());
263 if let Some(senders) = data.update_data(transfer.clone()).await? {
264 for sender in senders {
265 sender.send(transfer.clone()).await.map_err(|e| {
266 BinaryOptionsToolsError::ChannelRequestSendingError(
267 e.to_string(),
268 )
269 })?;
270 }
271 }
272 }
273 }
274 }
275 }
276 Err(e) => {
277 debug!("Error processing message, {e}");
278 }
279 }
280 }
281 todo!()
282 }
283
284 async fn sender_loop(
285 ws: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
286 reciever: &mut Receiver<Message>,
287 ) -> BinaryOptionsResult<()> {
288 while let Ok(msg) = reciever.recv().await {
289 match ws.send(msg).await {
290 Ok(_) => debug!("Sent message"),
291 Err(e) => {
292 warn!("Error sending messge: {e}");
293 return Err(e.into());
294 }
295 }
296 ws.flush().await?;
297 }
298 Ok(())
299 }
300
301 async fn api_loop(
302 reciever: &mut Receiver<Transfer>,
303 sender: &Sender<Message>,
304 ) -> BinaryOptionsResult<()> {
305 while let Ok(msg) = reciever.recv().await {
306 sender.send(msg.into()).await?;
307 }
308 Ok(())
309 }
310
311 async fn reconnect_callback(
312 reconnect_callback: Option<C>,
313 data: Data<T, Transfer>,
314 sender: SenderMessage<Transfer>,
315 reconnect: bool,
316 ) -> BinaryOptionsResult<BinaryOptionsResult<()>> {
317 Ok(tokio::spawn(async move {
318 sleep(Duration::from_secs(RECONNECT_CALLBACK)).await;
319 if reconnect {
320 if let Some(callback) = &reconnect_callback {
321 callback.call(data.clone(), &sender).await.inspect_err(
322 |e| error!(target: "EventLoop","Error calling callback, {e}"),
323 )?;
324 }
325 }
326 Ok(())
327 })
328 .await?)
329 }
330 pub async fn send_message(
331 &self,
332 msg: Transfer,
333 response_type: Transfer::Info,
334 validator: impl Fn(&Transfer) -> bool + Send + Sync,
335 ) -> BinaryOptionsResult<Transfer> {
336 self.sender
337 .send_message(&self.data, msg, response_type, validator)
338 .await
339 }
340
341 pub async fn send_message_with_timout(
342 &self,
343 timeout: Duration,
344 task: impl ToString,
345 msg: Transfer,
346 response_type: Transfer::Info,
347 validator: impl Fn(&Transfer) -> bool + Send + Sync,
348 ) -> BinaryOptionsResult<Transfer> {
349 self.sender
350 .send_message_with_timout(timeout, task, &self.data, msg, response_type, validator)
351 .await
352 }
353
354 pub async fn send_message_with_timeout_and_retry(
355 &self,
356 timeout: Duration,
357 task: impl ToString,
358 msg: Transfer,
359 response_type: Transfer::Info,
360 validator: impl Fn(&Transfer) -> bool + Send + Sync,
361 ) -> BinaryOptionsResult<Transfer> {
362 self.sender
363 .send_message_with_timeout_and_retry(
364 timeout,
365 task,
366 &self.data,
367 msg,
368 response_type,
369 validator,
370 )
371 .await
372 }
373}
374
375pub fn validate<Transfer>(
376 validator: impl Fn(&Transfer) -> bool + Send + Sync,
377 message: Transfer,
378) -> BinaryOptionsResult<Option<Transfer>>
379where
380 Transfer: MessageTransfer,
381{
382 if let Some(e) = message.error() {
383 Err(BinaryOptionsToolsError::WebSocketMessageError(
384 e.to_string(),
385 ))
386 } else if validator(&message) {
387 Ok(Some(message))
388 } else {
389 Ok(None)
390 }
391}
392
393#[derive(Clone)]
394pub struct SenderMessage<Transfer>
395where
396 Transfer: MessageTransfer,
397{
398 sender: Sender<Transfer>,
399 }
401
402impl<Transfer> SenderMessage<Transfer>
403where
404 Transfer: MessageTransfer,
405{
406 fn new(sender: Sender<Transfer>) -> Self {
416 Self { sender }
417 }
418 async fn reciever<T: DataHandler<Transfer = Transfer>>(
419 &self,
420 data: &Data<T, Transfer>,
421 msg: Transfer,
422 response_type: Transfer::Info,
423 ) -> BinaryOptionsResult<Receiver<Transfer>> {
424 let reciever = data.add_request(response_type).await;
425
426 self.sender
427 .send(msg)
428 .await
429 .map_err(|e| BinaryOptionsToolsError::ThreadMessageSendingErrorMPCS(e.to_string()))?;
430 Ok(reciever)
431 }
432
433 pub async fn send_message<T: DataHandler<Transfer = Transfer>>(
452 &self,
453 data: &Data<T, Transfer>,
454 msg: Transfer,
455 response_type: Transfer::Info,
456 validator: impl Fn(&Transfer) -> bool + Send + Sync,
457 ) -> BinaryOptionsResult<Transfer> {
458 let reciever = self.reciever(data, msg, response_type).await?;
459
460 while let Ok(msg) = reciever.recv().await {
461 if let Some(msg) =
462 validate(&validator, msg).inspect_err(|e| error!("Failed to place trade {e}"))?
463 {
464 return Ok(msg);
465 }
466 }
467 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
468 RecvError,
469 ))
470 }
471
472 pub async fn send_message_with_timout<T: DataHandler<Transfer = Transfer>>(
519 &self,
520 time: Duration,
521 task: impl ToString,
522 data: &Data<T, Transfer>,
523 msg: Transfer,
524 response_type: Transfer::Info,
525 validator: impl Fn(&Transfer) -> bool + Send + Sync,
526 ) -> BinaryOptionsResult<Transfer> {
527 let reciever = self.reciever(data, msg, response_type).await?;
528
529 timeout(
530 time,
531 async {
532 while let Ok(msg) = reciever.recv().await {
533 if let Some(msg) = validate(&validator, msg)
534 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
535 {
536 return Ok(msg);
537 }
538 }
539 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
540 RecvError,
541 ))
542 },
543 task.to_string(),
544 )
545 .await
546 }
547
548 pub async fn send_message_with_timeout_and_retry<T: DataHandler<Transfer = Transfer>>(
549 &self,
550 time: Duration,
551 task: impl ToString,
552 data: &Data<T, Transfer>,
553 msg: Transfer,
554 response_type: Transfer::Info,
555 validator: impl Fn(&Transfer) -> bool + Send + Sync,
556 ) -> BinaryOptionsResult<Transfer> {
557 let reciever = self
558 .reciever(data, msg.clone(), response_type.clone())
559 .await?;
560
561 let call1 = timeout(
562 time,
563 async {
564 while let Ok(msg) = reciever.recv().await {
565 if let Some(msg) = validate(&validator, msg)
566 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
567 {
568 return Ok(msg);
569 }
570 }
571 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
572 RecvError,
573 ))
574 },
575 task.to_string(),
576 )
577 .await;
578 match call1 {
579 Ok(res) => Ok(res),
580 Err(_) => {
581 println!("Failded 1 trying again");
582 let reciever = self.reciever(data, msg, response_type).await?;
583 timeout(
584 time,
585 async {
586 while let Ok(msg) = reciever.recv().await {
587 if let Some(msg) = validate(&validator, msg)
588 .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
589 {
590 return Ok(msg);
591 }
592 }
593 Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
594 RecvError,
595 ))
596 },
597 task.to_string(),
598 )
599 .await
600 }
601 }
602 }
603}
604
605#[cfg(test)]
622mod tests {
623 use std::time::Duration;
624
625 use async_channel::{bounded, Receiver, Sender};
626 use futures_util::{future::try_join, stream::{select_all, unfold}, Stream, StreamExt};
627 use rand::{distributions::Alphanumeric, Rng};
628 use tokio::time::sleep;
629 use tracing::info;
630
631 use crate::utils::tracing::start_tracing;
632
633 struct RecieverStream<T> {
634 inner: Receiver<T>
635 }
636
637 impl<T> RecieverStream<T> {
638 fn new(inner: Receiver<T>) -> Self {
639 Self { inner }
640 }
641
642 async fn receive(&self) -> anyhow::Result<T> {
643 Ok(self.inner.recv().await?)
644 }
645
646 fn to_stream(&self) -> impl Stream<Item = anyhow::Result<T>> + '_ {
647 Box::pin(unfold(self, |state| async move {
648 let item = state.receive().await;
649 Some((item, state))
650 }))
651 }
652
653 }
654
655
656
657 async fn recieve_dif(reciever: Receiver<String>, receiver_priority: Receiver<String>) -> anyhow::Result<()> {
658 async fn receiv(r: &Receiver<String>) -> anyhow::Result<()> {
659 while let Ok(t) = r.recv().await {
660 info!(target: "High priority", "Recieved: {}", t);
661 }
662 Ok(())
663 }
664 tokio::select! {
665 _ = receiv(&receiver_priority) => {
666
667 },
668 _ = tokio::time::sleep(Duration::from_secs(5)) => {}
669 }
670 let receiver = RecieverStream::new(reciever);
671 let receiver_priority = RecieverStream::new(receiver_priority);
672 let mut fused = select_all([receiver.to_stream(), receiver_priority.to_stream()]);
673 while let Some(value) = fused.next().await {
674 info!(target: "Fused", "Recieved: {}", value?);
675 }
676
677 Ok(())
678 }
679
680 async fn sender_dif(sender: Sender<String>, sender_priority: Sender<String>) -> anyhow::Result<()> {
681 loop {
682 let s1: String = rand::thread_rng()
683 .sample_iter(&Alphanumeric)
684 .take(7)
685 .map(char::from)
686 .collect();
687 let s2: String = rand::thread_rng()
688 .sample_iter(&Alphanumeric)
689 .take(7)
690 .map(char::from)
691 .collect();
692 sender.send(s1).await?;
693 sender_priority.send(s2).await?;
694 sleep(Duration::from_secs(1)).await;
695 }
696 }
697
698 #[tokio::test]
699 async fn test_multi_priority_reciever() -> anyhow::Result<()> {
700 start_tracing(true)?;
701 let (s, r) = bounded(8);
702 let (sp, rp) = bounded(8);
703 try_join(sender_dif(s, sp), recieve_dif(r, rp)).await?;
704 Ok(())
705 }
706}