1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
pub use self::web_socket_reader::WebSocketReader;
pub use self::web_socket_writer::WebSocketWriter;
pub use self::frame::Frame;
pub use self::message::Message;
pub use self::error::Error;
mod web_socket_reader;
mod web_socket_writer;
mod frame;
mod message;
mod error;
use tokio::{
io::AsyncReadExt,
net::{tcp::OwnedReadHalf}
};
use bytes::{BytesMut};
pub struct WebSocketThread {
read_stream: OwnedReadHalf,
web_socket_reader: Box<dyn WebSocketReader>
}
impl WebSocketThread {
pub fn spawn(read_stream: OwnedReadHalf, web_socket_reader: Box<dyn WebSocketReader>) {
let mut web_socket_thread = WebSocketThread {
read_stream,
web_socket_reader
};
tokio::spawn(async move {
web_socket_thread.web_socket_reader.on_open().await;
match web_socket_thread.read_loop().await {
Ok(_) => log::debug!("Leaving read loop in a nice manner"),
Err(e) => log::debug!("Leaving read loop with error, {}", e)
};
web_socket_thread.web_socket_reader.on_close().await;
});
}
async fn read_loop(&mut self) -> Result<(), Error> {
let mut buf = BytesMut::with_capacity(8 * 1024);
loop {
let maybe_frame = loop {
match Frame::parse(&buf) {
Ok(frame) => break Ok(Some(frame)),
Err(Error::Parse(e)) => {
log::debug!("{}, clearing buffer", e);
buf.clear();
},
Err(Error::Incomplete) => (),
Err(e) => return Err(e)
};
if 0 == self.read_stream.read_buf(&mut buf).await.unwrap() {
if buf.is_empty() {
break Ok(None);
} else {
break Err(Error::ConnectionReset);
}
}
}?;
if let Some(frame) = maybe_frame {
buf.clear();
if let Some(message) = frame.message {
self.web_socket_reader.on_message(message).await;
}
} else {
break Ok(());
}
}
}
}