cataclysm_ws/
web_socket_writer.rs1use tokio::net::{TcpStream, tcp::OwnedWriteHalf};
2use crate::{Error, Frame};
3use bytes::Buf;
4
5const CHUNK_SIZE: usize = 4_096;
6
7pub struct WebSocketWriter {
9 write_stream: OwnedWriteHalf
10}
11
12impl WebSocketWriter {
13 pub const FIN_RSV: u8 = 0x80;
14 pub const OP_CODE_CONTINUATION: u8 = 0x00;
15 pub const OP_CODE_TEXT: u8 = 0x01;
16 pub const OP_CODE_BINARY: u8 = 0x02;
17 pub const OP_CODE_CLOSE: u8 = 0x08;
18 pub const OP_CODE_PING: u8 = 0x09;
19 pub const OP_CODE_PONG: u8 = 0x0A;
20
21 pub fn new_unchecked(write_stream: OwnedWriteHalf) -> Self {
22 WebSocketWriter {
23 write_stream
24 }
25 }
26
27 async fn write<A: Into<Vec<u8>>>(&self, content: A) -> Result<(), Error> {
28 let content: Vec<u8> = content.into();
29 let mut chunks_iter = content.chunks(CHUNK_SIZE);
30 #[cfg(feature = "full_log")]
31 log::trace!("writting {} chunks of maximum {} bytes each", chunks_iter.len(), CHUNK_SIZE);
32 let mut current_chunk = match chunks_iter.next() {
34 Some(v) => v,
35 None => return Ok(()) };
37 loop {
38 let stream: &TcpStream = self.write_stream.as_ref();
40 stream.writable().await.unwrap();
41
42 match stream.try_write(¤t_chunk) {
45 Ok(n) => {
46 if n != current_chunk.remaining() {
47 #[cfg(feature = "full_log")]
49 log::debug!("incomplete chunk, trying to serve remaining bytes ({}/{})", current_chunk.len(), CHUNK_SIZE);
50 current_chunk.advance(n);
51 continue;
52 } else {
53 current_chunk = match chunks_iter.next() {
54 Some(v) => v,
55 None => break Ok(())
56 }
57 }
58 }
59 Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
60 continue;
61 }
62 Err(e) => break Err(Error::Io(e))
63 }
64 }
65 }
66
67 pub async fn text<A: Into<String>>(&self, text: A) -> Result<(), Error> {
69 self.write(Frame::text(text)).await
70 }
71
72 pub async fn bytes<A: Into<Vec<u8>>>(&self, bytes: A) -> Result<(), Error> {
74 self.write(Frame::binary(bytes)).await
75 }
76
77 pub async fn ping<A: Into<Vec<u8>>>(&self, payload: A) -> Result<(), Error> {
79 self.write(Frame::ping(payload)).await
80 }
81
82 pub async fn pong<A: Into<Vec<u8>>>(&self, payload: A) -> Result<(), Error> {
84 self.write(Frame::pong(payload)).await
85 }
86
87 pub async fn close(&self) -> Result<(), Error> {
89 self.write(Frame::close()).await
90 }
91}