cataclysm_ws/
web_socket_reader.rs

1use tokio::{
2    net::{TcpStream, tcp::OwnedReadHalf},
3    task::JoinHandle,
4    sync::{OwnedSemaphorePermit}
5};
6use crate::{
7    Frame,
8    Error,
9    FrameParseError,
10    WebSocketThread,
11    communication::read_frame
12};
13
14/// Runner thread for a websockets connection
15pub struct WebSocketReader {
16    read_stream: OwnedReadHalf,
17    permit: Option<OwnedSemaphorePermit>
18}
19
20impl WebSocketReader {
21    /// Generates a new instance of the websocket reader, assuming the handshake has already been performed
22    pub fn new_unchecked(read_stream: OwnedReadHalf) -> WebSocketReader {
23        WebSocketReader {
24            read_stream,
25            permit: None
26        }
27    }
28
29    /// Auxiliar function that cataclysm uses to keep track of connections
30    pub fn set_permit(&mut self, permit: OwnedSemaphorePermit) {
31        self.permit = Some(permit);
32    }
33
34    /// Blocks until a message is received
35    pub async fn try_read_frame(&self) -> Result<Frame, Error> {
36        read_frame(&self).await
37    }
38
39    /// Spawns a tokio thread that dispatches the message to the proved handler
40    pub fn spawn<H: WebSocketThread + 'static>(self, wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
41        WebSocketCustomChild::new(self).spawn(wst)
42    }
43}
44
45// Reference access to the inner structure
46impl AsRef<TcpStream> for WebSocketReader {
47    fn as_ref(&self) -> &TcpStream {
48        self.read_stream.as_ref()
49    }
50}
51
52pub struct WebSocketCustomChild {
53    automatic_close: bool,
54    wsr: WebSocketReader
55}
56
57impl WebSocketCustomChild {
58    pub fn new(wsr: WebSocketReader) -> WebSocketCustomChild {
59        WebSocketCustomChild {
60            automatic_close: true,
61            wsr
62        }
63    }
64
65    pub fn automatic_close(mut self, value: bool) -> Self {
66        self.automatic_close = value;
67        self
68    }
69
70    /// Spawns a tokio thread that dispatches the message to the proved handler
71    pub fn spawn<H: WebSocketThread + 'static>(self, mut wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
72        tokio::spawn(async move {
73            wst.on_open().await;
74            loop {
75                match self.wsr.try_read_frame().await {
76                    Ok(frame) => {
77                        if frame.message.is_close() && self.automatic_close {
78                            break wst.on_close(true).await
79                        }
80
81                        wst.on_message(frame.message).await;
82                    },
83                    Err(e) => {
84                        log::debug!("{}", e);
85                        match e {
86                            Error::FrameParse(FrameParseError::Incomplete{..}) => {
87                                // It is likely that a next chunk is missing
88                                continue
89                            },
90                            _ => {
91                                log::debug!("closing connection");
92                                break wst.on_close(false).await
93                            }
94                        }
95                    }
96                }
97            }
98        })
99    }
100}
101
102impl From<WebSocketReader> for OwnedReadHalf {
103    fn from(source: WebSocketReader) -> OwnedReadHalf {
104        source.read_stream
105    }
106}