degen_websockets/
websocket_client.rs

1//! A simple example of hooking up stdin/stdout to a WebSocket stream.
2//!
3//! This example will connect to a server specified in the argument list and
4//! then forward all data read on stdin to the server, printing out all data
5//! received on stdout.
6//!
7//! Note that this is not currently optimized for performance, especially around
8//! buffer management. Rather it's intended to show an example of working with a
9//! client.
10//!
11//! You can use this example together with the `server` example.
12
13
14
15use degen_logger;
16
17use futures::Future;
18/*
19
20Add options for auto reconnect ? 
21add crossbeam channels ? 
22
23
24
25
26May have to build some memory slots which keep track of awaiting threads which are waiting on msg responses/ACKs. 
27
28Bc - need a way to send a message that awaits a response ! 
29
30*/
31use serde::{Serialize};
32use serde_json;
33
34use futures_util::{ StreamExt, SinkExt}; 
35use tokio_tungstenite::{connect_async, tungstenite::Message};
36  
37//use tokio_tungstenite::tungstenite::Message;
38use tokio_tungstenite::{WebSocketStream,MaybeTlsStream};
39use tokio::net::TcpStream;
40 
41 //use crossbeam_channel::{ unbounded, Receiver, Sender};
42 use tokio::sync::mpsc::{channel, Sender, Receiver};
43
44use std::thread;
45use tokio::runtime::Runtime;
46 
47
48use crate::util::logtypes::CustomLogStyle;
49
50use super::reliable_message_subsystem::ReliableMessageSubsystem;
51
52 
53use std::sync::Arc; 
54use tokio::sync::{RwLock,Mutex};
55 
56use std::collections::HashMap;
57
58use super::websocket_messages::{
59    SocketMessage,
60    InboundMessage,
61     OutboundMessage,
62     
63   
64    MessageReliability, 
65    MessageReliabilityType,
66    
67    SocketMessageError,
68    
69    SocketMessageDestination, OutboundMessageDestination
70    };
71
72
73use super::websocket_server::WebsocketSystemEvent;
74
75use tokio::time::{interval,Duration};
76 
77 use std::fmt;
78
79
80#[derive(Debug)]
81pub enum WebsocketClientError {
82    UnableToConnect,
83    SendMessageError,
84    SerdeJsonError(String),
85    TokioError(String),
86    SocketMessageErr,
87    NoConnectionError
88    
89}
90 
91 
92impl fmt::Display for WebsocketClientError {
93    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94        match self {
95            WebsocketClientError::UnableToConnect => write!(f, "Unable to connect"),
96            WebsocketClientError::SendMessageError => write!(f, "Could not send message"),
97            WebsocketClientError::SerdeJsonError(error) => write!(f, "Serde json error: {}", error),
98            WebsocketClientError::TokioError(error) => write!(f, "Tokio Error: {}", error),
99            WebsocketClientError::SocketMessageErr => write!(f,"Socket Message Error"),
100            WebsocketClientError::NoConnectionError => write!(f,"No Connnection Error")
101        
102        }
103    }
104}
105 
106impl From<serde_json::Error> for WebsocketClientError {
107    fn from(err: serde_json::Error) -> Self {
108        // You may want to customize this to better suit your needs
109        WebsocketClientError::SerdeJsonError(format!("Serialization error: {}", err))
110    }
111}
112 
113impl From<tokio_tungstenite::tungstenite::Error> for WebsocketClientError {
114      fn from(err: tokio_tungstenite::tungstenite::Error) -> Self {
115        WebsocketClientError::TokioError(format!("Tokio error: {}", err))
116    }
117}
118
119
120impl From<SocketMessageError> for WebsocketClientError {
121     fn from(err: SocketMessageError) -> Self {
122         WebsocketClientError::SocketMessageErr 
123          
124    }
125    
126}
127
128impl std::error::Error for WebsocketClientError {}
129  
130 
131type SocketWriteSink = futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
132type SocketReadStream = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
133 
134 
135pub struct ConnectionResources {
136    
137    pub write: Arc< Mutex<SocketWriteSink> >,
138     pub read: Option< SocketReadStream > , //can be used like a one-time mutex !  
139    
140   
141     
142 
143    outbound_messages_rx: Option<Receiver<SocketMessage>>,
144    
145    pending_reliable_messages: Arc<RwLock<HashMap<String,SocketMessage>>>,
146  
147    
148     
149    ws_server_events_rx: Option<Receiver<WebsocketSystemEvent>>, 
150    
151     outbound_messages_tx: Sender<SocketMessage>, 
152      ws_server_events_tx: Sender<WebsocketSystemEvent>, 
153        pub socket_connection_uuid: String,
154}
155 
156 
157pub struct Connection { 
158 
159    resources: Arc<Mutex<ConnectionResources>>,
160    
161}
162
163
164
165
166
167impl Connection {
168        
169           
170          
171       
172    
173      pub async fn start_listening(
174         
175        resources: Arc<Mutex<ConnectionResources>>,
176        sender_channel: Sender<InboundMessage>, 
177        ) {
178            
179             let mut resources = resources.lock().await; 
180           
181             let read = resources.read.take().expect("The read stream has already been consumed.");
182             let socket_connection_uuid = resources.socket_connection_uuid.clone();
183 
184             let pending_reliable_messages = Arc::clone(&resources.pending_reliable_messages);
185           
186             
187             let outbound_messages_tx = resources.outbound_messages_tx.clone();
188             let outbound_messages_rx = resources.outbound_messages_rx.take().unwrap();
189            
190             let ws_server_events_tx = resources.ws_server_events_tx.clone();
191             let ws_server_events_rx = resources.ws_server_events_rx.take().unwrap();
192             
193             let write = Arc::clone(&resources.write);
194         
195               
196               
197               //this is nottt working 
198              let forward_inbound_msg_future =  Connection::forward_inbound_messages(   
199                        read,
200                        sender_channel,
201                        socket_connection_uuid.clone(),
202                        outbound_messages_tx.clone(),  // for sending reliability ack 
203                        ws_server_events_tx.clone()
204                 ) ;
205                 
206                 
207                let send_outbound_msg_future = Self::start_forwarding_outbound_messages (
208                    write,
209                    outbound_messages_rx,
210                    
211                    Arc::clone(&pending_reliable_messages),
212                    socket_connection_uuid.clone() 
213                    
214                );
215                 
216                let resend_reliable_messages = ReliableMessageSubsystem::resend_reliable_messages (
217                        Arc::clone(&pending_reliable_messages),  
218                        outbound_messages_tx.clone()
219                    );
220                    
221                let handle_server_events = Self::handle_server_events(
222                 ws_server_events_rx ,
223                 Arc::clone(&pending_reliable_messages   )                     
224                );
225                 
226                    
227                 
228                    let select = tokio::select! {
229                    _ = forward_inbound_msg_future => eprintln!("forward_inbound_msg_handle finished"),
230                    _ = send_outbound_msg_future => eprintln!("send_outbound_msg_handle finished"),
231                    _ = resend_reliable_messages => eprintln!("resend_reliable_messages_handle finished"),
232                    _ = handle_server_events => eprintln!("handle_server_events_handle finished"),
233                   };
234                    
235                    
236                    
237                   degen_logger::log(  format!("WS WARN: TOKIO SELECT DROPPED")  , CustomLogStyle::Error  ) ; 
238 
239              
240                 
241      
242    }
243    
244     
245     
246         
247          async fn start_forwarding_outbound_messages(
248         
249            write: Arc<Mutex<SocketWriteSink>>,
250            mut receiver_channel: Receiver<SocketMessage>,
251            
252            pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
253            socket_connection_uuid:String, 
254        ) -> std::io::Result<()>     
255        {
256            
257              
258                  
259            degen_logger::log(  format!("ws client start_forwarding_outbound_messages")  , CustomLogStyle::Info  ) ; 
260        
261            
262            
263            loop {
264                 
265                 
266            while let Some(socket_message) =  receiver_channel.recv().await {   //let up so other threads in the join  can run 
267                  
268                    
269                    
270                    
271                    
272                       let reliability_type = socket_message.clone().reliability_type;
273         
274                        
275                        if let MessageReliabilityType::Reliable(msg_uuid) = reliability_type {
276                                
277 
278                            //could cause deadlock !? 
279                            pending_reliable_messages.write().await.insert(msg_uuid,   socket_message.clone( ) );
280                        }
281                                      
282                    
283                    
284                
285                     degen_logger::log( format!("ws client is sending out msg: {} ", socket_message)  , CustomLogStyle::Info  ) ; 
286 
287              
288                    
289                    
290                    
291                          
292                          
293                    let message_result = socket_message.to_message();
294                    
295                     if let Ok(message) = message_result {
296                        let send_msg =  write.lock().await.send( message ).await ;    
297                        
298                        if let Err(e) = send_msg  {
299                            
300                                degen_logger::log(  format!("ws client: Error sending message.. {}", e)  , CustomLogStyle::Error  ) ; 
301 
302                            
303                           
304                        }    
305                     }
306                     
307                  //  Ok(())
308                
309            }
310            
311         
312            
313            }
314        
315          //  Ok(())
316        }
317    
318    
319    //should be a SocketMessage sender right... oh well anyways 
320    pub async fn get_outbound_messages_tx(&self) -> Sender<SocketMessage> {
321        
322        self.resources.lock().await.outbound_messages_tx.clone()
323    }
324    
325    
326     pub async fn listen( &self, sender_channel: Sender<InboundMessage>){
327        
328          
329          let resources = Arc::clone(&self.resources);
330          
331        
332        let start_listening_future=   Connection::start_listening(resources, sender_channel); 
333           
334        tokio::join!( start_listening_future ) ;
335        
336        degen_logger::log(  format!("ws client: listen loop ended")  , CustomLogStyle::Error  ) ; 
337 
338    }
339    
340     
341    
342    
343    
344    pub async fn get_socket_connection_uuid(&self) -> String {
345        
346        
347       return  self.resources.lock().await.socket_connection_uuid.clone( )
348        
349        
350    } 
351    
352
353async fn handle_server_events( 
354    mut ws_event_rx: Receiver<WebsocketSystemEvent>,
355    pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
356) -> std::io::Result<()>  {
357    
358    loop {
359    
360        while let Some(evt) = ws_event_rx.recv().await {  //give control back to async executor
361            
362            match evt {
363                
364                WebsocketSystemEvent::ReceivedMessageAck { reliable_msg_uuid } => {
365                    //pending_reliable_messages.write().await.
366                    Self::clear_pending_reliable_message(
367                        Arc::clone(&pending_reliable_messages),
368                        reliable_msg_uuid
369                    ).await;
370                }
371                
372                };
373            
374            }
375        
376    }
377    
378        
379    // Ok(())
380    }
381
382//when we receive an ACK with this message uuid, we clear 
383pub async fn clear_pending_reliable_message( 
384    pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
385    message_uuid: String,
386    
387) {
388    let mut messages = pending_reliable_messages.write().await ;
389    messages.remove(&message_uuid) ;
390}
391    
392    
393    /*
394    pub async fn send_message_immediately(&mut self,  socket_message:  SocketMessage ) 
395    -> Result<(), WebsocketClientError>
396    {  
397          self.write.lock().await.send( socket_message.to_message()? ).await?;
398        
399      Ok(())
400    }*/
401   
402    
403    //this should loop forever and never end 
404    //this also should not take up 100% of the thread use 
405    pub async fn forward_inbound_messages( 
406        mut read: futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>, 
407        sender_channel: Sender<InboundMessage>,
408        socket_connection_uuid:String  ,
409        outbound_messages_tx: Sender<SocketMessage>,
410        ws_server_events_tx: Sender<WebsocketSystemEvent>
411            ){
412                   
413        loop{    
414            
415              
416             
417              degen_logger::log(  format!("ws_client client forward_inbound_messages")  , CustomLogStyle::Info  ) ; 
418 
419           
420                        //this await yields back to the executor so its ok to loop ! 
421                while let Some(message_result) = read.next().await {
422                   
423                      match message_result {
424                        Ok(message) => {
425                            
426                            let socket_message_result = SocketMessage::from_message(message) ;
427                            
428                            if let Ok(socket_message) = socket_message_result {
429                                
430                                
431                             //println!("client got an inbound msg {}", socket_message);
432                                
433                                let inbound_msg = InboundMessage {
434                                    socket_connection_uuid: socket_connection_uuid.clone(),
435                                    message:  socket_message.clone(), 
436                                };
437                                
438                                //parse the message into a SocketMessage 
439                                
440                                //check to see if socket message ie reliable 
441        
442                                // Send the message into the   channel
443                                sender_channel.try_send(inbound_msg.clone()) ;
444                                
445                                degen_logger::log(   format!("client got socket_message {} ", socket_message) , CustomLogStyle::Info  ) ; 
446 
447                                
448                               
449                                if let MessageReliabilityType::Reliable(msg_uuid) = socket_message.reliability_type {
450                                    //we need to send an ack ! 
451                                    let ack_message =  SocketMessage::create_reliability_ack( msg_uuid.clone()) ;
452                                    println!("client creating reliability ack for {}", msg_uuid.clone());
453                                    let send_ack_result = outbound_messages_tx.try_send (  //should be try send as to not block 
454                                       
455                                        ack_message
456                                        
457                                    );
458                                    
459                                    if let Err(send_ack_err) = send_ack_result {
460                                        eprintln!("send ack error {}", send_ack_err);
461                                    }
462                                }
463                                
464                                  if let SocketMessageDestination::AckToReliableMsg( reliable_msg_uuid ) = socket_message.destination   {
465                                       
466                                    degen_logger::log(   format!("client got ack") , CustomLogStyle::Info  ) ; 
467 
468                                     
469                                     ws_server_events_tx.try_send( 
470                                    WebsocketSystemEvent::ReceivedMessageAck { reliable_msg_uuid }   
471                                    ) ;                      
472                          
473                                }      
474                             
475                             
476                            }
477                            
478                         
479                             
480                             
481                        }
482                        Err(e) => {
483                            eprintln!("Error while reading message: {:?}", e);
484                            break;
485                        }
486                    }
487                }
488               
489                        
490        }
491    }
492  
493    
494      
495          
496          
497}
498
499
500
501pub struct WebsocketClient{
502    pub connection: Option<Connection>,
503    
504     
505}
506
507impl WebsocketClient {
508
509    pub fn new() -> Self {
510        
511        Self {
512             connection: None,
513            }
514    }
515     
516  
517
518    pub async fn connect(  connect_addr: String ) 
519    -> Result<Connection, WebsocketClientError > {
520
521
522        let mut final_connect_addr = connect_addr;
523
524        if !final_connect_addr.starts_with("ws://") {
525            // append it to the beginning
526            final_connect_addr = format!("ws://{}", final_connect_addr);
527        }
528
529       
530        let url = url::Url::parse(&final_connect_addr).unwrap(); 
531       
532
533        for  i in 0..9 {
534            match connect_async(url.clone()).await {
535                Ok((ws_stream, _)) => {
536                   
537                     degen_logger::log(   format!("WebSocket handshake has been successfully completed") , CustomLogStyle::Info  ) ; 
538 
539                    
540                    
541                    let (write, read) = ws_stream.split();
542                    
543                    let socket_connection_uuid =  uuid::Uuid::new_v4().to_string();
544                    
545                    let (outbound_messages_tx, outbound_messages_rx) : (Sender<SocketMessage>, Receiver<SocketMessage>)= channel(500);
546                    let (ws_server_events_tx, ws_server_events_rx) : (Sender<WebsocketSystemEvent>, Receiver<WebsocketSystemEvent>)= channel(500);
547                    
548                    let resources = ConnectionResources {
549                        
550                        write:Arc::new(Mutex::new(write)),
551                        read : Some(read),
552                      
553                        pending_reliable_messages: Arc::new(RwLock::new(HashMap::new())),
554                        outbound_messages_rx: Some(outbound_messages_rx),
555                        outbound_messages_tx,
556                        
557                        ws_server_events_rx: Some(ws_server_events_rx),
558                        ws_server_events_tx,
559                         socket_connection_uuid,
560                    };
561                        
562                   return Ok( Connection { 
563                     
564                        
565                        resources: Arc::new(Mutex::new(resources)),
566                      
567                         
568                      
569                    } );
570                    // once connected, break the loop
571                    //break;
572                },
573                Err(e) => {
574                    println!("Failed to connect, retrying in 1 second...");
575                    // wait for 1 second
576                    tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
577                   
578                }
579            }
580        }
581      //  Ok(())
582        return Err(  WebsocketClientError::UnableToConnect );
583    }
584    
585     
586    
587       pub async fn listen_future(
588        &mut self,
589         conn: Connection,
590         channel:  Sender<InboundMessage>  //we send inbound msgs into here 
591         )  
592       -> impl Future<Output = ()> + '_
593        {
594         
595        self.add_connection( conn );
596        
597        self.connection.as_mut().unwrap().listen( channel) 
598                
599        
600     }
601    
602     pub async fn listen(
603        &mut self,
604         conn: Connection,
605         channel:  Sender<InboundMessage>  //we send inbound msgs into here 
606         ){
607         
608        self.add_connection( conn );
609        
610        self.connection.as_mut().unwrap().listen( channel).await ;
611                 
612     }
613    
614   
615    
616    pub fn add_connection( &mut self ,  connection:  Connection ){
617         
618         self.connection = Some(connection);
619         
620    }
621    
622    
623    
624    pub async fn get_outbound_messages_tx(&self) -> Result<Sender<SocketMessage>, WebsocketClientError> { 
625        
626        match &self.connection {
627            Some(conn) => Ok ( conn.get_outbound_messages_tx().await ) ,
628            None => Err(  WebsocketClientError::NoConnectionError )
629        }
630       
631    }
632    
633     
634    
635
636}
637