1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
use tokio::{
    net::{TcpStream, tcp::OwnedReadHalf},
    task::JoinHandle
};
use crate::{
    Frame,
    Error,
    FrameParseError,
    WebSocketThread,
    communication::read_frame
};

/// Runner thread for a websockets connection
pub struct WebSocketReader {
    read_stream: OwnedReadHalf
}

impl WebSocketReader {
    /// Generates a new instance of the websocket reader, assuming the handshake has already been performed
    pub fn new_unchecked(read_stream: OwnedReadHalf) -> WebSocketReader {
        WebSocketReader {
            read_stream
        }
    }

    /// Blocks until a message is received
    pub async fn try_read_frame(&self) -> Result<Frame, Error> {
        read_frame(&self).await
    }

    /// Spawns a tokio thread that dispatches the message to the proved handler
    pub fn spawn<H: WebSocketThread + 'static>(self, wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
        WebSocketCustomChild::new(self).spawn(wst)
    }
}

// Reference access to the inner structure
impl AsRef<TcpStream> for WebSocketReader {
    fn as_ref(&self) -> &TcpStream {
        self.read_stream.as_ref()
    }
}

pub struct WebSocketCustomChild {
    automatic_close: bool,
    wsr: WebSocketReader
}

impl WebSocketCustomChild {
    pub fn new(wsr: WebSocketReader) -> WebSocketCustomChild {
        WebSocketCustomChild {
            automatic_close: true,
            wsr
        }
    }

    pub fn automatic_close(mut self, value: bool) -> Self {
        self.automatic_close = value;
        self
    }

    /// Spawns a tokio thread that dispatches the message to the proved handler
    pub fn spawn<H: WebSocketThread + 'static>(self, mut wst: H) -> JoinHandle<<H as WebSocketThread>::Output> {
        tokio::spawn(async move {
            wst.on_open().await;
            loop {
                match self.wsr.try_read_frame().await {
                    Ok(frame) => {
                        if frame.message.is_close() && self.automatic_close {
                            break wst.on_close(true).await
                        }

                        wst.on_message(frame.message).await;
                    },
                    Err(e) => {
                        log::debug!("{}", e);
                        match e {
                            Error::FrameParse(FrameParseError::Incomplete{..}) => {
                                // It is likely that a next chunk is missing
                                continue
                            },
                            _ => {
                                log::debug!("closing connection");
                                break wst.on_close(false).await
                            }
                        }
                    }
                }
            }
        })
    }
}

impl From<WebSocketReader> for OwnedReadHalf {
    fn from(source: WebSocketReader) -> OwnedReadHalf {
        source.read_stream
    }
}