1use axum::extract::ws::{Message, WebSocket};
2use futures::SinkExt;
3use tokio::sync::RwLock;
4
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use std::collections::HashMap;
8use std::fmt::Debug;
9use tokio::task::JoinHandle;
10
11use futures::stream::SplitSink;
12use std::sync::Arc;
13
14use crate::core::prelude::ProtoTrait;
15
16use super::universe::Universe;
17
18#[derive(Clone, Debug)]
19pub struct Client {
20 out: Arc<RwLock<SplitSink<WebSocket, Message>>>,
21 session_id: usize,
22 queries: Arc<RwLock<HashMap<u64, JoinHandle<()>>>>,
23 next_request_id: Arc<RwLock<u64>>,
24}
25
26impl Client {
27 pub fn new(out: Arc<RwLock<SplitSink<WebSocket, Message>>>, _universe: Universe) -> Self {
28 Self {
29 out,
30 session_id: 0,
31 queries: Arc::new(RwLock::new(HashMap::new())),
32 next_request_id: Arc::new(RwLock::new(0)),
33 }
34 }
35
36 pub fn set_session_id(&mut self, session_id: usize) {
37 self.session_id = session_id;
38 }
39
40 pub async fn handle_msg<P: ProtoTrait + Send + Serialize + DeserializeOwned + Debug>(
41 &self,
42 msg: Vec<u8>,
43 ) where
44 <P as ProtoTrait>::Client: Send,
45 P: ProtoTrait<Client = Self>,
46 {
47 if msg.is_empty() {
48 warn!("{}: Empty message", self.session_id);
49
50 return;
51 }
52
53 let msg = crate::Message::from_bytes(&msg);
54
55 trace!("{}: Message: {:?}", self.session_id, msg);
56
57 let proto = P::from_bytes(&msg.msg);
58
59 debug!("{}: Proto: {:?}", self.session_id, proto);
60
61 let response = proto.dispatch(msg.request_id, self.clone()).await;
62
63 if let Some(response) = response {
64 debug!("{}: Response: {:?}", self.session_id, response);
65
66 let response = response.to_bytes();
67
68 let msg = crate::Message {
69 request_id: self.next_request_id.read().await.clone(),
70 response_id: Some(msg.request_id),
71 msg: response,
72 };
73
74 let response = msg.to_bytes();
75
76 trace!("{}: Sending: {:?}", self.session_id, msg);
77
78 self.out
79 .write()
80 .await
81 .send(Message::Binary(response))
82 .await
83 .unwrap();
84 }
85 }
86
87 pub async fn send<P: ProtoTrait + Send + Serialize + DeserializeOwned>(&self, proto: P) {
88 let msg = proto.to_bytes();
89 let msg = crate::Message {
90 request_id: self.next_request_id.read().await.clone(),
91 response_id: None,
92 msg,
93 };
94 let msg = msg.to_bytes();
95
96 *self.next_request_id.write().await += 1;
97
98 debug!("{}: Sending: {:?}", self.session_id, msg);
99
100 self.out
101 .write()
102 .await
103 .send(Message::Binary(msg))
104 .await
105 .unwrap();
106 }
107
108 pub async fn add_query(&self, request_id: u64, handle: JoinHandle<()>) {
109 debug!("{}: Adding query: {}", self.session_id, request_id);
110
111 self.queries.write().await.insert(request_id, handle);
112 }
113
114 pub async fn abort_queries(&self) {
115 for (_, handle) in self.queries.write().await.iter() {
116 warn!("{}, aborting query", self.session_id);
117
118 handle.abort();
119 }
120 }
121}