binary_option_tools_core/general/
client.rs

1use std::ops::Deref;
2use std::sync::Arc;
3use std::time::Duration;
4
5use async_channel::{bounded, Receiver, RecvError, Sender};
6use futures_util::future::try_join4;
7use futures_util::stream::{SplitSink, SplitStream};
8use futures_util::{SinkExt, StreamExt};
9use tokio::net::TcpStream;
10use tokio::task::JoinHandle;
11use tokio::time::sleep;
12use tokio_tungstenite::tungstenite::Message;
13use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
14use tracing::{debug, error, info, warn};
15
16use crate::contstants::{MAX_CHANNEL_CAPACITY, RECONNECT_CALLBACK};
17use crate::error::{BinaryOptionsResult, BinaryOptionsToolsError};
18use crate::general::types::MessageType;
19use crate::utils::time::timeout;
20
21use super::traits::{Callback, Connect, Credentials, DataHandler, MessageHandler, MessageTransfer};
22use super::types::Data;
23
24const MAX_ALLOWED_LOOPS: u32 = 8;
25const SLEEP_INTERVAL: u64 = 2;
26
27#[derive(Clone)]
28pub struct WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
29where
30    Transfer: MessageTransfer,
31    Handler: MessageHandler,
32    Connector: Connect,
33    Creds: Credentials,
34    T: DataHandler,
35    C: Callback,
36{
37    inner: Arc<WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>>,
38}
39
40pub struct WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>
41where
42    Transfer: MessageTransfer,
43    Handler: MessageHandler,
44    Connector: Connect,
45    Creds: Credentials,
46    T: DataHandler,
47    C: Callback,
48{
49    pub credentials: Creds,
50    pub connector: Connector,
51    pub handler: Handler,
52    pub data: Data<T, Transfer>,
53    pub sender: SenderMessage<Transfer>,
54    pub reconnect_callback: Option<C>,
55    _event_loop: JoinHandle<BinaryOptionsResult<()>>,
56}
57
58impl<Transfer, Handler, Connector, Creds, T, C> Deref
59    for WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
60where
61    Transfer: MessageTransfer,
62    Handler: MessageHandler,
63    Connector: Connect,
64    Creds: Credentials,
65    T: DataHandler,
66    C: Callback,
67{
68    type Target = WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>;
69
70    fn deref(&self) -> &Self::Target {
71        self.inner.as_ref()
72    }
73}
74
75impl<Transfer, Handler, Connector, Creds, T, C>
76    WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
77where
78    Transfer: MessageTransfer + 'static,
79    Handler: MessageHandler<Transfer = Transfer> + 'static,
80    Creds: Credentials + 'static,
81    Connector: Connect<Creds = Creds> + 'static,
82    T: DataHandler<Transfer = Transfer> + 'static,
83    C: Callback<T = T, Transfer = Transfer> + 'static,
84{
85    pub async fn init(
86        credentials: Creds,
87        connector: Connector,
88        data: Data<T, Transfer>,
89        handler: Handler,
90        timeout: Duration,
91        reconnect_callback: Option<C>,
92    ) -> BinaryOptionsResult<Self> {
93        let inner = WebSocketInnerClient::init(
94            credentials,
95            connector,
96            data,
97            handler,
98            timeout,
99            reconnect_callback,
100        )
101        .await?;
102        Ok(Self {
103            inner: Arc::new(inner),
104        })
105    }
106}
107
108impl<Transfer, Handler, Connector, Creds, T, C>
109    WebSocketInnerClient<Transfer, Handler, Connector, Creds, T, C>
110where
111    Transfer: MessageTransfer + 'static,
112    Handler: MessageHandler<Transfer = Transfer> + 'static,
113    Creds: Credentials + 'static,
114    Connector: Connect<Creds = Creds> + 'static,
115    T: DataHandler<Transfer = Transfer> + 'static,
116    C: Callback<T = T, Transfer = Transfer> + 'static,
117{
118    pub async fn init(
119        credentials: Creds,
120        connector: Connector,
121        data: Data<T, Transfer>,
122        handler: Handler,
123        timeout: Duration,
124        reconnect_callback: Option<C>,
125    ) -> BinaryOptionsResult<Self> {
126        let _connection = connector.connect(credentials.clone()).await?;
127        let (_event_loop, sender) = Self::start_loops(
128            handler.clone(),
129            credentials.clone(),
130            data.clone(),
131            connector.clone(),
132            reconnect_callback.clone(),
133        )
134        .await?;
135        info!("Started WebSocketClient");
136        sleep(timeout).await;
137        Ok(Self {
138            credentials,
139            connector,
140            handler,
141            data,
142            sender,
143            reconnect_callback,
144            _event_loop,
145        })
146    }
147
148    async fn start_loops(
149        handler: Handler,
150        credentials: Creds,
151        data: Data<T, Transfer>,
152        connector: Connector,
153        reconnect_callback: Option<C>,
154    ) -> BinaryOptionsResult<(JoinHandle<BinaryOptionsResult<()>>, SenderMessage<Transfer>)> {
155        let (mut write, mut read) = connector.connect(credentials.clone()).await?.split();
156        let (sender, mut reciever) = bounded(MAX_CHANNEL_CAPACITY);
157        let (msg_sender, mut msg_reciever) = bounded(MAX_CHANNEL_CAPACITY);
158        let msg_sender = SenderMessage::new(msg_sender).clone();
159        let sender_msg = msg_sender.clone();
160        let task = tokio::task::spawn(async move {
161            let previous = None;
162            let mut loops = 0;
163            let mut reconnected = false;
164            loop {
165                let listener_future = WebSocketInnerClient::<
166                    Transfer,
167                    Handler,
168                    Connector,
169                    Creds,
170                    T,
171                    C,
172                >::listener_loop(
173                    previous.clone(),
174                    &data,
175                    handler.clone(),
176                    &sender,
177                    &mut read,
178                );
179                let sender_future =
180                    WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::sender_loop(
181                        &mut write,
182                        &mut reciever,
183                    );
184                let update_loop =
185                    WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::api_loop(
186                        &mut msg_reciever,
187                        &sender,
188                    );
189                let callback = WebSocketInnerClient::<Transfer, Handler, Connector, Creds, T, C>::reconnect_callback(reconnect_callback.clone(), data.clone(), sender_msg.clone(), reconnected);
190
191                match try_join4(listener_future, sender_future, update_loop, callback).await {
192                    Ok(_) => {
193                        if let Ok(websocket) = connector.connect(credentials.clone()).await {
194                            (write, read) = websocket.split();
195                            info!("Reconnected successfully!");
196                            loops = 0;
197                            reconnected = true;
198                        } else {
199                            loops += 1;
200                            warn!("Error reconnecting... trying again in {SLEEP_INTERVAL} seconds (try {loops} of {MAX_ALLOWED_LOOPS}");
201                            sleep(Duration::from_secs(SLEEP_INTERVAL)).await;
202                            if loops >= MAX_ALLOWED_LOOPS {
203                                panic!("Too many failed connections");
204                            }
205                        }
206                    }
207                    Err(e) => {
208                        warn!("Error in event loop, {e}, reconnecting...");
209                        println!("Reconnecting...");
210                        if let Ok(websocket) = connector.connect(credentials.clone()).await {
211                            (write, read) = websocket.split();
212                            info!("Reconnected successfully!");
213                            println!("Reconnected successfully!");
214                            loops = 0;
215                            reconnected = true;
216                        } else {
217                            loops += 1;
218                            warn!("Error reconnecting... trying again in {SLEEP_INTERVAL} seconds (try {loops} of {MAX_ALLOWED_LOOPS}");
219                            sleep(Duration::from_secs(SLEEP_INTERVAL)).await;
220                            if loops >= MAX_ALLOWED_LOOPS {
221                                error!("Too many failed connections");
222                                break;
223                            }
224                        }
225                    }
226                }
227            }
228            Ok(())
229        });
230        Ok((task, msg_sender))
231    }
232
233    async fn listener_loop(
234        mut previous: Option<<<Handler as MessageHandler>::Transfer as MessageTransfer>::Info>,
235        data: &Data<T, Transfer>,
236        handler: Handler,
237        sender: &Sender<Message>,
238        ws: &mut SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
239    ) -> BinaryOptionsResult<()> {
240        while let Some(msg) = &ws.next().await {
241            let msg = msg
242                .as_ref()
243                .inspect_err(|e| warn!("Error recieving websocket message, {e}"))
244                .map_err(|e| {
245                    BinaryOptionsToolsError::WebsocketRecievingConnectionError(e.to_string())
246                })?;
247            match handler.process_message(msg, &previous, sender).await {
248                Ok((msg, close)) => {
249                    if close {
250                        info!("Recieved closing frame");
251                        return Err(BinaryOptionsToolsError::WebsocketConnectionClosed(
252                            "Recieved closing frame".into(),
253                        ));
254                    }
255                    if let Some(msg) = msg {
256                        match msg {
257                            MessageType::Info(info) => {
258                                debug!("Recieved info: {}", info);
259                                previous = Some(info);
260                            }
261                            MessageType::Transfer(transfer) => {
262                                debug!("Recieved data of type: {}", transfer.info());
263                                if let Some(senders) = data.update_data(transfer.clone()).await? {
264                                    for sender in senders {
265                                        sender.send(transfer.clone()).await.map_err(|e| {
266                                            BinaryOptionsToolsError::ChannelRequestSendingError(
267                                                e.to_string(),
268                                            )
269                                        })?;
270                                    }
271                                }
272                            }
273                        }
274                    }
275                }
276                Err(e) => {
277                    debug!("Error processing message, {e}");
278                }
279            }
280        }
281        todo!()
282    }
283
284    async fn sender_loop(
285        ws: &mut SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>,
286        reciever: &mut Receiver<Message>,
287    ) -> BinaryOptionsResult<()> {
288        while let Ok(msg) = reciever.recv().await {
289            match ws.send(msg).await {
290                Ok(_) => debug!("Sent message"),
291                Err(e) => {
292                    warn!("Error sending messge: {e}");
293                    return Err(e.into());
294                }
295            }
296            ws.flush().await?;
297        }
298        Ok(())
299    }
300
301    async fn api_loop(
302        reciever: &mut Receiver<Transfer>,
303        sender: &Sender<Message>,
304    ) -> BinaryOptionsResult<()> {
305        while let Ok(msg) = reciever.recv().await {
306            sender.send(msg.into()).await?;
307        }
308        Ok(())
309    }
310
311    async fn reconnect_callback(
312        reconnect_callback: Option<C>,
313        data: Data<T, Transfer>,
314        sender: SenderMessage<Transfer>,
315        reconnect: bool,
316    ) -> BinaryOptionsResult<BinaryOptionsResult<()>> {
317        Ok(tokio::spawn(async move {
318            sleep(Duration::from_secs(RECONNECT_CALLBACK)).await;
319            if reconnect {
320                if let Some(callback) = &reconnect_callback {
321                    callback.call(data.clone(), &sender).await.inspect_err(
322                        |e| error!(target: "EventLoop","Error calling callback, {e}"),
323                    )?;
324                }
325            }
326            Ok(())
327        })
328        .await?)
329    }
330    pub async fn send_message(
331        &self,
332        msg: Transfer,
333        response_type: Transfer::Info,
334        validator: impl Fn(&Transfer) -> bool + Send + Sync,
335    ) -> BinaryOptionsResult<Transfer> {
336        self.sender
337            .send_message(&self.data, msg, response_type, validator)
338            .await
339    }
340
341    pub async fn send_message_with_timout(
342        &self,
343        timeout: Duration,
344        task: impl ToString,
345        msg: Transfer,
346        response_type: Transfer::Info,
347        validator: impl Fn(&Transfer) -> bool + Send + Sync,
348    ) -> BinaryOptionsResult<Transfer> {
349        self.sender
350            .send_message_with_timout(timeout, task, &self.data, msg, response_type, validator)
351            .await
352    }
353
354    pub async fn send_message_with_timeout_and_retry(
355        &self,
356        timeout: Duration,
357        task: impl ToString,
358        msg: Transfer,
359        response_type: Transfer::Info,
360        validator: impl Fn(&Transfer) -> bool + Send + Sync,
361    ) -> BinaryOptionsResult<Transfer> {
362        self.sender
363            .send_message_with_timeout_and_retry(
364                timeout,
365                task,
366                &self.data,
367                msg,
368                response_type,
369                validator,
370            )
371            .await
372    }
373}
374
375pub fn validate<Transfer>(
376    validator: impl Fn(&Transfer) -> bool + Send + Sync,
377    message: Transfer,
378) -> BinaryOptionsResult<Option<Transfer>>
379where
380    Transfer: MessageTransfer,
381{
382    if let Some(e) = message.error() {
383        Err(BinaryOptionsToolsError::WebSocketMessageError(
384            e.to_string(),
385        ))
386    } else if validator(&message) {
387        Ok(Some(message))
388    } else {
389        Ok(None)
390    }
391}
392
393#[derive(Clone)]
394pub struct SenderMessage<Transfer>
395where
396    Transfer: MessageTransfer,
397{
398    sender: Sender<Transfer>,
399    // sender_priority: Sender<Transfer>
400}
401
402impl<Transfer> SenderMessage<Transfer>
403where
404    Transfer: MessageTransfer,
405{
406    // fn new(cap: usize) -> (Self, (Receiver<Transfer>, Receiver<Transfer>)) {
407    //     let (s, r) = bounded(cap);
408    //     let (sp, rp) = bounded(cap);
409
410    //     (
411    //         Self { sender: s, sender_priority: sp },
412    //         (r, rp)
413    //     )
414    // }
415    fn new(sender: Sender<Transfer>) -> Self {
416        Self { sender }
417    }
418    async fn reciever<T: DataHandler<Transfer = Transfer>>(
419        &self,
420        data: &Data<T, Transfer>,
421        msg: Transfer,
422        response_type: Transfer::Info,
423    ) -> BinaryOptionsResult<Receiver<Transfer>> {
424        let reciever = data.add_request(response_type).await;
425
426        self.sender
427            .send(msg)
428            .await
429            .map_err(|e| BinaryOptionsToolsError::ThreadMessageSendingErrorMPCS(e.to_string()))?;
430        Ok(reciever)
431    }
432
433    // pub async fn send(&self, msg: Transfer) -> BinaryOptionsResult<()> {
434    //     self.sender.send(msg).await.map_err(|e| {
435    //         BinaryOptionsToolsError::ChannelRequestSendingError(
436    //             e.to_string(),
437    //         )
438    //     })?;
439    //     Ok(())
440    // }
441
442    // pub async fn priority_send(&self, msg: Transfer) -> BinaryOptionsResult<()> {
443    //     self.sender_priority.send(msg).await.map_err(|e| {
444    //         BinaryOptionsToolsError::ChannelRequestSendingError(
445    //             e.to_string(),
446    //         )
447    //     })?;
448    //     Ok(())
449    // }
450
451    pub async fn send_message<T: DataHandler<Transfer = Transfer>>(
452        &self,
453        data: &Data<T, Transfer>,
454        msg: Transfer,
455        response_type: Transfer::Info,
456        validator: impl Fn(&Transfer) -> bool + Send + Sync,
457    ) -> BinaryOptionsResult<Transfer> {
458        let reciever = self.reciever(data, msg, response_type).await?;
459
460        while let Ok(msg) = reciever.recv().await {
461            if let Some(msg) =
462                validate(&validator, msg).inspect_err(|e| error!("Failed to place trade {e}"))?
463            {
464                return Ok(msg);
465            }
466        }
467        Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
468            RecvError,
469        ))
470    }
471
472    // pub async fn send_message_with_timout<T: DataHandler<Transfer = Transfer>>(
473    //     &self,
474    //     timeout: Duration,
475    //     task: impl ToString,
476    //     data: &Data<T, Transfer>,
477    //     msg: Transfer,
478    //     response_type: Transfer::Info,
479    //     validator: impl Fn(&Transfer) -> bool + Send + Sync,
480    // ) -> BinaryOptionsResult<Transfer> {
481    //     let reciever = data.add_request(response_type).await;
482
483    //     self.sender
484    //         .send(msg)
485    //         .await
486    //         .map_err(|e| BinaryOptionsToolsError::ThreadMessageSendingErrorMPCS(e.to_string()))?;
487
488    //     let start_time = Instant::now();
489
490    //     loop {
491    //         match reciever.try_recv() {
492    //             Ok(msg) => {
493    //                 println!("Called");
494    //                 if let Some(msg) = validate(&validator, msg)
495    //                     .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
496    //                 {
497    //                     return Ok(msg);
498    //                 }
499    //             }
500    //             Err(err) => match err {
501    //                 TryRecvError::Closed => {
502    //                     return Err(BinaryOptionsToolsError::Unallowed(
503    //                         "Api channel connectionc closed".into(),
504    //                     ))
505    //                 }
506    //                 TryRecvError::Empty => {}
507    //             },
508    //         }
509    //         if Instant::now() - start_time >= timeout {
510    //             return Err(BinaryOptionsToolsError::TimeoutError {
511    //                 task: task.to_string(),
512    //                 duration: timeout,
513    //             });
514    //         }
515    //     }
516    // }
517
518    pub async fn send_message_with_timout<T: DataHandler<Transfer = Transfer>>(
519        &self,
520        time: Duration,
521        task: impl ToString,
522        data: &Data<T, Transfer>,
523        msg: Transfer,
524        response_type: Transfer::Info,
525        validator: impl Fn(&Transfer) -> bool + Send + Sync,
526    ) -> BinaryOptionsResult<Transfer> {
527        let reciever = self.reciever(data, msg, response_type).await?;
528
529        timeout(
530            time,
531            async {
532                while let Ok(msg) = reciever.recv().await {
533                    if let Some(msg) = validate(&validator, msg)
534                        .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
535                    {
536                        return Ok(msg);
537                    }
538                }
539                Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
540                    RecvError,
541                ))
542            },
543            task.to_string(),
544        )
545        .await
546    }
547
548    pub async fn send_message_with_timeout_and_retry<T: DataHandler<Transfer = Transfer>>(
549        &self,
550        time: Duration,
551        task: impl ToString,
552        data: &Data<T, Transfer>,
553        msg: Transfer,
554        response_type: Transfer::Info,
555        validator: impl Fn(&Transfer) -> bool + Send + Sync,
556    ) -> BinaryOptionsResult<Transfer> {
557        let reciever = self
558            .reciever(data, msg.clone(), response_type.clone())
559            .await?;
560
561        let call1 = timeout(
562            time,
563            async {
564                while let Ok(msg) = reciever.recv().await {
565                    if let Some(msg) = validate(&validator, msg)
566                        .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
567                    {
568                        return Ok(msg);
569                    }
570                }
571                Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
572                    RecvError,
573                ))
574            },
575            task.to_string(),
576        )
577        .await;
578        match call1 {
579            Ok(res) => Ok(res),
580            Err(_) => {
581                println!("Failded 1 trying again");
582                let reciever = self.reciever(data, msg, response_type).await?;
583                timeout(
584                    time,
585                    async {
586                        while let Ok(msg) = reciever.recv().await {
587                            if let Some(msg) = validate(&validator, msg)
588                                .inspect_err(|e| eprintln!("Failed to place trade {e}"))?
589                            {
590                                return Ok(msg);
591                            }
592                        }
593                        Err(BinaryOptionsToolsError::ChannelRequestRecievingError(
594                            RecvError,
595                        ))
596                    },
597                    task.to_string(),
598                )
599                .await
600            }
601        }
602    }
603}
604
605// impl<Transfer, Handler, Connector, Creds, T, C> Drop
606//     for WebSocketClient<Transfer, Handler, Connector, Creds, T, C>
607// where
608//     Transfer: MessageTransfer,
609//     Handler: MessageHandler,
610//     Connector: Connect,
611//     Creds: Credentials,
612//     T: DataHandler,
613//     C: Callback,
614// {
615//     fn drop(&mut self) {
616//         self._event_loop.abort();
617//         info!(target: "Drop", "Dropping WebSocketClient instance");
618//     }
619// }
620
621#[cfg(test)]
622mod tests {
623    use std::time::Duration;
624
625    use async_channel::{bounded, Receiver, Sender};
626    use futures_util::{future::try_join, stream::{select_all, unfold}, Stream, StreamExt};
627    use rand::{distributions::Alphanumeric, Rng};
628    use tokio::time::sleep;
629    use tracing::info;
630
631    use crate::utils::tracing::start_tracing;
632
633    struct RecieverStream<T> {
634        inner: Receiver<T>
635    }
636
637    impl<T> RecieverStream<T> {
638        fn new(inner: Receiver<T>) -> Self {
639            Self { inner }
640        }
641
642        async fn receive(&self) -> anyhow::Result<T> {
643            Ok(self.inner.recv().await?)
644        }
645
646        fn to_stream(&self) -> impl Stream<Item = anyhow::Result<T>> + '_ {
647            Box::pin(unfold(self, |state| async move {
648                let item = state.receive().await;
649                Some((item, state))
650            }))        
651        }
652        
653    }
654
655
656
657    async fn recieve_dif(reciever: Receiver<String>, receiver_priority: Receiver<String>) -> anyhow::Result<()> {
658        async fn receiv(r: &Receiver<String>) -> anyhow::Result<()> {
659            while let Ok(t) = r.recv().await {
660                info!(target: "High priority", "Recieved: {}", t);
661            }
662            Ok(())
663        }
664        tokio::select! {
665            _ = receiv(&receiver_priority) => {
666
667            },
668            _ = tokio::time::sleep(Duration::from_secs(5)) => {}
669        }
670        let receiver = RecieverStream::new(reciever);
671        let receiver_priority = RecieverStream::new(receiver_priority);
672        let mut fused = select_all([receiver.to_stream(), receiver_priority.to_stream()]);
673        while let Some(value) = fused.next().await {
674            info!(target: "Fused", "Recieved: {}", value?);
675        }
676    
677        Ok(())
678    }
679
680    async fn sender_dif(sender: Sender<String>, sender_priority: Sender<String>) -> anyhow::Result<()> {
681        loop {
682            let s1: String = rand::thread_rng()
683            .sample_iter(&Alphanumeric)
684            .take(7)
685            .map(char::from)
686            .collect();
687            let s2: String = rand::thread_rng()
688            .sample_iter(&Alphanumeric)
689            .take(7)
690            .map(char::from)
691            .collect();
692            sender.send(s1).await?;
693            sender_priority.send(s2).await?;
694            sleep(Duration::from_secs(1)).await;
695        }
696    }
697
698    #[tokio::test]
699    async fn test_multi_priority_reciever() -> anyhow::Result<()> {
700        start_tracing(true)?;
701        let (s, r) = bounded(8);
702        let (sp, rp) = bounded(8);
703        try_join(sender_dif(s, sp), recieve_dif(r, rp)).await?;
704        Ok(())
705    }
706}