cataclysm_ws/
web_socket_reader.rs1use tokio::{
2 net::{TcpStream, tcp::OwnedReadHalf},
3 task::JoinHandle,
4 sync::{OwnedSemaphorePermit}
5};
6use crate::{
7 Frame,
8 Error,
9 FrameParseError,
10 WebSocketThread,
11 communication::read_frame
12};
13
14pub struct WebSocketReader {
16 read_stream: OwnedReadHalf,
17 permit: Option<OwnedSemaphorePermit>
18}
19
20impl WebSocketReader {
21 pub fn new_unchecked(read_stream: OwnedReadHalf) -> WebSocketReader {
23 WebSocketReader {
24 read_stream,
25 permit: None
26 }
27 }
28
29 pub fn set_permit(&mut self, permit: OwnedSemaphorePermit) {
31 self.permit = Some(permit);
32 }
33
34 pub async fn try_read_frame(&self) -> Result<Frame, Error> {
36 read_frame(&self).await
37 }
38
39 pub fn spawn<H: WebSocketThread + 'static>(self, wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
41 WebSocketCustomChild::new(self).spawn(wst)
42 }
43}
44
45impl AsRef<TcpStream> for WebSocketReader {
47 fn as_ref(&self) -> &TcpStream {
48 self.read_stream.as_ref()
49 }
50}
51
52pub struct WebSocketCustomChild {
53 automatic_close: bool,
54 wsr: WebSocketReader
55}
56
57impl WebSocketCustomChild {
58 pub fn new(wsr: WebSocketReader) -> WebSocketCustomChild {
59 WebSocketCustomChild {
60 automatic_close: true,
61 wsr
62 }
63 }
64
65 pub fn automatic_close(mut self, value: bool) -> Self {
66 self.automatic_close = value;
67 self
68 }
69
70 pub fn spawn<H: WebSocketThread + 'static>(self, mut wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
72 tokio::spawn(async move {
73 wst.on_open().await;
74 loop {
75 match self.wsr.try_read_frame().await {
76 Ok(frame) => {
77 if frame.message.is_close() && self.automatic_close {
78 break wst.on_close(true).await
79 }
80
81 wst.on_message(frame.message).await;
82 },
83 Err(e) => {
84 log::debug!("{}", e);
85 match e {
86 Error::FrameParse(FrameParseError::Incomplete{..}) => {
87 continue
89 },
90 _ => {
91 log::debug!("closing connection");
92 break wst.on_close(false).await
93 }
94 }
95 }
96 }
97 }
98 })
99 }
100}
101
102impl From<WebSocketReader> for OwnedReadHalf {
103 fn from(source: WebSocketReader) -> OwnedReadHalf {
104 source.read_stream
105 }
106}