Skip to main content

comet/server/
client.rs

1use axum::extract::ws::{Message, WebSocket};
2use futures::SinkExt;
3use tokio::sync::RwLock;
4
5use serde::de::DeserializeOwned;
6use serde::Serialize;
7use std::collections::HashMap;
8use std::fmt::Debug;
9use tokio::task::JoinHandle;
10
11use futures::stream::SplitSink;
12use std::sync::Arc;
13
14use crate::core::prelude::ProtoTrait;
15
16use super::universe::Universe;
17
18#[derive(Clone, Debug)]
19pub struct Client {
20    out: Arc<RwLock<SplitSink<WebSocket, Message>>>,
21    session_id: usize,
22    queries: Arc<RwLock<HashMap<u64, JoinHandle<()>>>>,
23    next_request_id: Arc<RwLock<u64>>,
24}
25
26impl Client {
27    pub fn new(out: Arc<RwLock<SplitSink<WebSocket, Message>>>, _universe: Universe) -> Self {
28        Self {
29            out,
30            session_id: 0,
31            queries: Arc::new(RwLock::new(HashMap::new())),
32            next_request_id: Arc::new(RwLock::new(0)),
33        }
34    }
35
36    pub fn set_session_id(&mut self, session_id: usize) {
37        self.session_id = session_id;
38    }
39
40    pub async fn handle_msg<P: ProtoTrait + Send + Serialize + DeserializeOwned + Debug>(
41        &self,
42        msg: Vec<u8>,
43    ) where
44        <P as ProtoTrait>::Client: Send,
45        P: ProtoTrait<Client = Self>,
46    {
47        if msg.is_empty() {
48            warn!("{}: Empty message", self.session_id);
49
50            return;
51        }
52
53        let msg = crate::Message::from_bytes(&msg);
54
55        trace!("{}: Message: {:?}", self.session_id, msg);
56
57        let proto = P::from_bytes(&msg.msg);
58
59        debug!("{}: Proto: {:?}", self.session_id, proto);
60
61        let response = proto.dispatch(msg.request_id, self.clone()).await;
62
63        if let Some(response) = response {
64            debug!("{}: Response: {:?}", self.session_id, response);
65
66            let response = response.to_bytes();
67
68            let msg = crate::Message {
69                request_id: self.next_request_id.read().await.clone(),
70                response_id: Some(msg.request_id),
71                msg: response,
72            };
73
74            let response = msg.to_bytes();
75
76            trace!("{}: Sending: {:?}", self.session_id, msg);
77
78            self.out
79                .write()
80                .await
81                .send(Message::Binary(response))
82                .await
83                .unwrap();
84        }
85    }
86
87    pub async fn send<P: ProtoTrait + Send + Serialize + DeserializeOwned>(&self, proto: P) {
88        let msg = proto.to_bytes();
89        let msg = crate::Message {
90            request_id: self.next_request_id.read().await.clone(),
91            response_id: None,
92            msg,
93        };
94        let msg = msg.to_bytes();
95
96        *self.next_request_id.write().await += 1;
97
98        debug!("{}: Sending: {:?}", self.session_id, msg);
99
100        self.out
101            .write()
102            .await
103            .send(Message::Binary(msg))
104            .await
105            .unwrap();
106    }
107
108    pub async fn add_query(&self, request_id: u64, handle: JoinHandle<()>) {
109        debug!("{}: Adding query: {}", self.session_id, request_id);
110
111        self.queries.write().await.insert(request_id, handle);
112    }
113
114    pub async fn abort_queries(&self) {
115        for (_, handle) in self.queries.write().await.iter() {
116            warn!("{}, aborting query", self.session_id);
117
118            handle.abort();
119        }
120    }
121}