1use degen_logger;
16
17use futures::Future;
18use serde::{Serialize};
32use serde_json;
33
34use futures_util::{ StreamExt, SinkExt};
35use tokio_tungstenite::{connect_async, tungstenite::Message};
36
37use tokio_tungstenite::{WebSocketStream,MaybeTlsStream};
39use tokio::net::TcpStream;
40
41 use tokio::sync::mpsc::{channel, Sender, Receiver};
43
44use std::thread;
45use tokio::runtime::Runtime;
46
47
48use crate::util::logtypes::CustomLogStyle;
49
50use super::reliable_message_subsystem::ReliableMessageSubsystem;
51
52
53use std::sync::Arc;
54use tokio::sync::{RwLock,Mutex};
55
56use std::collections::HashMap;
57
58use super::websocket_messages::{
59 SocketMessage,
60 InboundMessage,
61 OutboundMessage,
62
63
64 MessageReliability,
65 MessageReliabilityType,
66
67 SocketMessageError,
68
69 SocketMessageDestination, OutboundMessageDestination
70 };
71
72
73use super::websocket_server::WebsocketSystemEvent;
74
75use tokio::time::{interval,Duration};
76
77 use std::fmt;
78
79
80#[derive(Debug)]
81pub enum WebsocketClientError {
82 UnableToConnect,
83 SendMessageError,
84 SerdeJsonError(String),
85 TokioError(String),
86 SocketMessageErr,
87 NoConnectionError
88
89}
90
91
92impl fmt::Display for WebsocketClientError {
93 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
94 match self {
95 WebsocketClientError::UnableToConnect => write!(f, "Unable to connect"),
96 WebsocketClientError::SendMessageError => write!(f, "Could not send message"),
97 WebsocketClientError::SerdeJsonError(error) => write!(f, "Serde json error: {}", error),
98 WebsocketClientError::TokioError(error) => write!(f, "Tokio Error: {}", error),
99 WebsocketClientError::SocketMessageErr => write!(f,"Socket Message Error"),
100 WebsocketClientError::NoConnectionError => write!(f,"No Connnection Error")
101
102 }
103 }
104}
105
106impl From<serde_json::Error> for WebsocketClientError {
107 fn from(err: serde_json::Error) -> Self {
108 WebsocketClientError::SerdeJsonError(format!("Serialization error: {}", err))
110 }
111}
112
113impl From<tokio_tungstenite::tungstenite::Error> for WebsocketClientError {
114 fn from(err: tokio_tungstenite::tungstenite::Error) -> Self {
115 WebsocketClientError::TokioError(format!("Tokio error: {}", err))
116 }
117}
118
119
120impl From<SocketMessageError> for WebsocketClientError {
121 fn from(err: SocketMessageError) -> Self {
122 WebsocketClientError::SocketMessageErr
123
124 }
125
126}
127
128impl std::error::Error for WebsocketClientError {}
129
130
131type SocketWriteSink = futures_util::stream::SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>;
132type SocketReadStream = futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>;
133
134
135pub struct ConnectionResources {
136
137 pub write: Arc< Mutex<SocketWriteSink> >,
138 pub read: Option< SocketReadStream > , outbound_messages_rx: Option<Receiver<SocketMessage>>,
144
145 pending_reliable_messages: Arc<RwLock<HashMap<String,SocketMessage>>>,
146
147
148
149 ws_server_events_rx: Option<Receiver<WebsocketSystemEvent>>,
150
151 outbound_messages_tx: Sender<SocketMessage>,
152 ws_server_events_tx: Sender<WebsocketSystemEvent>,
153 pub socket_connection_uuid: String,
154}
155
156
157pub struct Connection {
158
159 resources: Arc<Mutex<ConnectionResources>>,
160
161}
162
163
164
165
166
167impl Connection {
168
169
170
171
172
173 pub async fn start_listening(
174
175 resources: Arc<Mutex<ConnectionResources>>,
176 sender_channel: Sender<InboundMessage>,
177 ) {
178
179 let mut resources = resources.lock().await;
180
181 let read = resources.read.take().expect("The read stream has already been consumed.");
182 let socket_connection_uuid = resources.socket_connection_uuid.clone();
183
184 let pending_reliable_messages = Arc::clone(&resources.pending_reliable_messages);
185
186
187 let outbound_messages_tx = resources.outbound_messages_tx.clone();
188 let outbound_messages_rx = resources.outbound_messages_rx.take().unwrap();
189
190 let ws_server_events_tx = resources.ws_server_events_tx.clone();
191 let ws_server_events_rx = resources.ws_server_events_rx.take().unwrap();
192
193 let write = Arc::clone(&resources.write);
194
195
196
197 let forward_inbound_msg_future = Connection::forward_inbound_messages(
199 read,
200 sender_channel,
201 socket_connection_uuid.clone(),
202 outbound_messages_tx.clone(), ws_server_events_tx.clone()
204 ) ;
205
206
207 let send_outbound_msg_future = Self::start_forwarding_outbound_messages (
208 write,
209 outbound_messages_rx,
210
211 Arc::clone(&pending_reliable_messages),
212 socket_connection_uuid.clone()
213
214 );
215
216 let resend_reliable_messages = ReliableMessageSubsystem::resend_reliable_messages (
217 Arc::clone(&pending_reliable_messages),
218 outbound_messages_tx.clone()
219 );
220
221 let handle_server_events = Self::handle_server_events(
222 ws_server_events_rx ,
223 Arc::clone(&pending_reliable_messages )
224 );
225
226
227
228 let select = tokio::select! {
229 _ = forward_inbound_msg_future => eprintln!("forward_inbound_msg_handle finished"),
230 _ = send_outbound_msg_future => eprintln!("send_outbound_msg_handle finished"),
231 _ = resend_reliable_messages => eprintln!("resend_reliable_messages_handle finished"),
232 _ = handle_server_events => eprintln!("handle_server_events_handle finished"),
233 };
234
235
236
237 degen_logger::log( format!("WS WARN: TOKIO SELECT DROPPED") , CustomLogStyle::Error ) ;
238
239
240
241
242 }
243
244
245
246
247 async fn start_forwarding_outbound_messages(
248
249 write: Arc<Mutex<SocketWriteSink>>,
250 mut receiver_channel: Receiver<SocketMessage>,
251
252 pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
253 socket_connection_uuid:String,
254 ) -> std::io::Result<()>
255 {
256
257
258
259 degen_logger::log( format!("ws client start_forwarding_outbound_messages") , CustomLogStyle::Info ) ;
260
261
262
263 loop {
264
265
266 while let Some(socket_message) = receiver_channel.recv().await { let reliability_type = socket_message.clone().reliability_type;
273
274
275 if let MessageReliabilityType::Reliable(msg_uuid) = reliability_type {
276
277
278 pending_reliable_messages.write().await.insert(msg_uuid, socket_message.clone( ) );
280 }
281
282
283
284
285 degen_logger::log( format!("ws client is sending out msg: {} ", socket_message) , CustomLogStyle::Info ) ;
286
287
288
289
290
291
292
293 let message_result = socket_message.to_message();
294
295 if let Ok(message) = message_result {
296 let send_msg = write.lock().await.send( message ).await ;
297
298 if let Err(e) = send_msg {
299
300 degen_logger::log( format!("ws client: Error sending message.. {}", e) , CustomLogStyle::Error ) ;
301
302
303
304 }
305 }
306
307 }
310
311
312
313 }
314
315 }
317
318
319 pub async fn get_outbound_messages_tx(&self) -> Sender<SocketMessage> {
321
322 self.resources.lock().await.outbound_messages_tx.clone()
323 }
324
325
326 pub async fn listen( &self, sender_channel: Sender<InboundMessage>){
327
328
329 let resources = Arc::clone(&self.resources);
330
331
332 let start_listening_future= Connection::start_listening(resources, sender_channel);
333
334 tokio::join!( start_listening_future ) ;
335
336 degen_logger::log( format!("ws client: listen loop ended") , CustomLogStyle::Error ) ;
337
338 }
339
340
341
342
343
344 pub async fn get_socket_connection_uuid(&self) -> String {
345
346
347 return self.resources.lock().await.socket_connection_uuid.clone( )
348
349
350 }
351
352
353async fn handle_server_events(
354 mut ws_event_rx: Receiver<WebsocketSystemEvent>,
355 pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
356) -> std::io::Result<()> {
357
358 loop {
359
360 while let Some(evt) = ws_event_rx.recv().await { match evt {
363
364 WebsocketSystemEvent::ReceivedMessageAck { reliable_msg_uuid } => {
365 Self::clear_pending_reliable_message(
367 Arc::clone(&pending_reliable_messages),
368 reliable_msg_uuid
369 ).await;
370 }
371
372 };
373
374 }
375
376 }
377
378
379 }
381
382pub async fn clear_pending_reliable_message(
384 pending_reliable_messages: Arc<RwLock<HashMap<String, SocketMessage>>>,
385 message_uuid: String,
386
387) {
388 let mut messages = pending_reliable_messages.write().await ;
389 messages.remove(&message_uuid) ;
390}
391
392
393 pub async fn forward_inbound_messages(
406 mut read: futures_util::stream::SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
407 sender_channel: Sender<InboundMessage>,
408 socket_connection_uuid:String ,
409 outbound_messages_tx: Sender<SocketMessage>,
410 ws_server_events_tx: Sender<WebsocketSystemEvent>
411 ){
412
413 loop{
414
415
416
417 degen_logger::log( format!("ws_client client forward_inbound_messages") , CustomLogStyle::Info ) ;
418
419
420 while let Some(message_result) = read.next().await {
422
423 match message_result {
424 Ok(message) => {
425
426 let socket_message_result = SocketMessage::from_message(message) ;
427
428 if let Ok(socket_message) = socket_message_result {
429
430
431 let inbound_msg = InboundMessage {
434 socket_connection_uuid: socket_connection_uuid.clone(),
435 message: socket_message.clone(),
436 };
437
438 sender_channel.try_send(inbound_msg.clone()) ;
444
445 degen_logger::log( format!("client got socket_message {} ", socket_message) , CustomLogStyle::Info ) ;
446
447
448
449 if let MessageReliabilityType::Reliable(msg_uuid) = socket_message.reliability_type {
450 let ack_message = SocketMessage::create_reliability_ack( msg_uuid.clone()) ;
452 println!("client creating reliability ack for {}", msg_uuid.clone());
453 let send_ack_result = outbound_messages_tx.try_send ( ack_message
456
457 );
458
459 if let Err(send_ack_err) = send_ack_result {
460 eprintln!("send ack error {}", send_ack_err);
461 }
462 }
463
464 if let SocketMessageDestination::AckToReliableMsg( reliable_msg_uuid ) = socket_message.destination {
465
466 degen_logger::log( format!("client got ack") , CustomLogStyle::Info ) ;
467
468
469 ws_server_events_tx.try_send(
470 WebsocketSystemEvent::ReceivedMessageAck { reliable_msg_uuid }
471 ) ;
472
473 }
474
475
476 }
477
478
479
480
481 }
482 Err(e) => {
483 eprintln!("Error while reading message: {:?}", e);
484 break;
485 }
486 }
487 }
488
489
490 }
491 }
492
493
494
495
496
497}
498
499
500
501pub struct WebsocketClient{
502 pub connection: Option<Connection>,
503
504
505}
506
507impl WebsocketClient {
508
509 pub fn new() -> Self {
510
511 Self {
512 connection: None,
513 }
514 }
515
516
517
518 pub async fn connect( connect_addr: String )
519 -> Result<Connection, WebsocketClientError > {
520
521
522 let mut final_connect_addr = connect_addr;
523
524 if !final_connect_addr.starts_with("ws://") {
525 final_connect_addr = format!("ws://{}", final_connect_addr);
527 }
528
529
530 let url = url::Url::parse(&final_connect_addr).unwrap();
531
532
533 for i in 0..9 {
534 match connect_async(url.clone()).await {
535 Ok((ws_stream, _)) => {
536
537 degen_logger::log( format!("WebSocket handshake has been successfully completed") , CustomLogStyle::Info ) ;
538
539
540
541 let (write, read) = ws_stream.split();
542
543 let socket_connection_uuid = uuid::Uuid::new_v4().to_string();
544
545 let (outbound_messages_tx, outbound_messages_rx) : (Sender<SocketMessage>, Receiver<SocketMessage>)= channel(500);
546 let (ws_server_events_tx, ws_server_events_rx) : (Sender<WebsocketSystemEvent>, Receiver<WebsocketSystemEvent>)= channel(500);
547
548 let resources = ConnectionResources {
549
550 write:Arc::new(Mutex::new(write)),
551 read : Some(read),
552
553 pending_reliable_messages: Arc::new(RwLock::new(HashMap::new())),
554 outbound_messages_rx: Some(outbound_messages_rx),
555 outbound_messages_tx,
556
557 ws_server_events_rx: Some(ws_server_events_rx),
558 ws_server_events_tx,
559 socket_connection_uuid,
560 };
561
562 return Ok( Connection {
563
564
565 resources: Arc::new(Mutex::new(resources)),
566
567
568
569 } );
570 },
573 Err(e) => {
574 println!("Failed to connect, retrying in 1 second...");
575 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
577
578 }
579 }
580 }
581 return Err( WebsocketClientError::UnableToConnect );
583 }
584
585
586
587 pub async fn listen_future(
588 &mut self,
589 conn: Connection,
590 channel: Sender<InboundMessage> )
592 -> impl Future<Output = ()> + '_
593 {
594
595 self.add_connection( conn );
596
597 self.connection.as_mut().unwrap().listen( channel)
598
599
600 }
601
602 pub async fn listen(
603 &mut self,
604 conn: Connection,
605 channel: Sender<InboundMessage> ){
607
608 self.add_connection( conn );
609
610 self.connection.as_mut().unwrap().listen( channel).await ;
611
612 }
613
614
615
616 pub fn add_connection( &mut self , connection: Connection ){
617
618 self.connection = Some(connection);
619
620 }
621
622
623
624 pub async fn get_outbound_messages_tx(&self) -> Result<Sender<SocketMessage>, WebsocketClientError> {
625
626 match &self.connection {
627 Some(conn) => Ok ( conn.get_outbound_messages_tx().await ) ,
628 None => Err( WebsocketClientError::NoConnectionError )
629 }
630
631 }
632
633
634
635
636}
637