cataclysm_ws/
web_socket_writer.rs

1use tokio::net::{TcpStream, tcp::OwnedWriteHalf};
2use crate::{Error, Frame};
3use bytes::Buf;
4
5const CHUNK_SIZE: usize = 4_096;
6
7/// Sending part of web sockets connection
8pub struct WebSocketWriter {
9    write_stream: OwnedWriteHalf
10}
11
12impl WebSocketWriter {
13    pub const FIN_RSV: u8 = 0x80;
14    pub const OP_CODE_CONTINUATION: u8 = 0x00;
15    pub const OP_CODE_TEXT: u8 = 0x01;
16    pub const OP_CODE_BINARY: u8 = 0x02;
17    pub const OP_CODE_CLOSE: u8 = 0x08;
18    pub const OP_CODE_PING: u8 = 0x09;
19    pub const OP_CODE_PONG: u8 = 0x0A;
20
21    pub fn new_unchecked(write_stream: OwnedWriteHalf) -> Self {
22        WebSocketWriter {
23            write_stream
24        }
25    }
26
27    async fn write<A: Into<Vec<u8>>>(&self, content: A) -> Result<(), Error> {
28        let content: Vec<u8> = content.into();
29        let mut chunks_iter = content.chunks(CHUNK_SIZE);
30        #[cfg(feature = "full_log")]
31        log::trace!("writting {} chunks of maximum {} bytes each", chunks_iter.len(), CHUNK_SIZE);
32        // We check the first chunk
33        let mut current_chunk = match chunks_iter.next() {
34            Some(v) => v,
35            None => return Ok(()) // Zero length response
36        };
37        loop {
38            // Wait for the socket to be writable
39            let stream: &TcpStream = self.write_stream.as_ref();
40            stream.writable().await.unwrap();
41    
42            // Try to write data, this may still fail with `WouldBlock`
43            // if the readiness event is a false positive.
44            match stream.try_write(&current_chunk) {
45                Ok(n) => {
46                    if n != current_chunk.remaining() {
47                        // There are some bytes still to be written in this chunk
48                        #[cfg(feature = "full_log")]
49                        log::debug!("incomplete chunk, trying to serve remaining bytes ({}/{})", current_chunk.len(), CHUNK_SIZE);
50                        current_chunk.advance(n);
51                        continue;
52                    } else {
53                        current_chunk = match chunks_iter.next() {
54                            Some(v) => v,
55                            None => break Ok(())
56                        }
57                    }
58                }
59                Err(ref e) if e.kind() == tokio::io::ErrorKind::WouldBlock => {
60                    continue;
61                }
62                Err(e) => break Err(Error::Io(e))
63            }
64        }
65    }
66
67    /// Sends a text message through the websockets connection
68    pub async fn text<A: Into<String>>(&self, text: A) -> Result<(), Error> {
69        self.write(Frame::text(text)).await
70    }
71
72    /// Sends a text message through the websockets connection
73    pub async fn bytes<A: Into<Vec<u8>>>(&self, bytes: A) -> Result<(), Error> {
74        self.write(Frame::binary(bytes)).await
75    }
76
77    /// Sends a ping message through the websockets connection
78    pub async fn ping<A: Into<Vec<u8>>>(&self, payload: A) -> Result<(), Error> {
79        self.write(Frame::ping(payload)).await
80    }
81
82    /// Sends a pong message through the websockets connection
83    pub async fn pong<A: Into<Vec<u8>>>(&self, payload: A) -> Result<(), Error> {
84        self.write(Frame::pong(payload)).await
85    }
86
87    /// Closes the write part of the socket
88    pub async fn close(&self) -> Result<(), Error> {
89        self.write(Frame::close()).await
90    }
91}