stomp_rs/
connection.rs

1use crate::protocol::BNF_LF;
2use crate::protocol::{ClientCommand, Frame, FrameParser, ParseError, ServerCommand, StompMessage};
3use log::debug;
4use std::error::Error;
5use std::fmt::{Display, Formatter};
6use std::sync::Arc;
7use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind};
8use tokio::net::TcpStream;
9use tokio::sync::mpsc::error::SendError;
10use tokio::sync::mpsc::{channel, Sender};
11use tokio::sync::Mutex;
12
13#[derive(Debug)]
14pub enum ClosingReason {
15    ParseError(ParseError),
16    ConnectionError(std::io::Error),
17    Shutdown,
18}
19
20#[derive(Debug)]
21pub enum ConnectionError {
22    Closing(ClosingReason),
23}
24
25impl Display for ConnectionError {
26    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27        write!(f, "Client error")
28    }
29}
30
31impl Error for ConnectionError {}
32
33pub struct Connection {
34    client_sender: Sender<StompMessage<ClientCommand>>,
35    server_sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
36    close_sender: Sender<()>,
37    is_closed: Arc<Mutex<bool>>,
38}
39
40impl Connection {
41    pub async fn new(
42        mut tcp_stream: TcpStream,
43        server_sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
44    ) -> Self {
45        let (sender, mut receiver) = channel(5);
46
47        let (close_sender, mut close_receiver) = channel(1);
48        let inner_close_sender = close_sender.clone();
49        let is_closed = Arc::new(Mutex::new(false));
50        let inner_is_closed = Arc::clone(&is_closed);
51
52        let connection = Self {
53            client_sender: sender,
54            server_sender: server_sender.clone(),
55            close_sender,
56            is_closed,
57        };
58
59        tokio::spawn(async move {
60            let mut msg = vec![0; 8096];
61            let mut parser: FrameParser<ServerCommand> = FrameParser::new();
62            let mut closing = false;
63
64            loop {
65                tokio::select! {
66                    frame = receiver.recv(), if !closing => {
67                         if let Some(message) = frame {
68                            match message {
69                                StompMessage::Frame(frame) => tcp_stream.write_all(&frame.to_bytes()).await.unwrap(),
70                                StompMessage::Ping => tcp_stream.write_u8(BNF_LF).await.unwrap()
71                            }
72
73                            tcp_stream.flush().await.unwrap();
74                        }
75                    },
76                    read = tcp_stream.read(&mut msg), if !closing => {
77                        match read {
78                            Ok(n) => {
79                                match parser.parse(&msg[..n]) {
80                                    Ok(messages) => {
81                                        for message in messages {
82                                            debug!("Message received {:?}", message.clone());
83                                            server_sender.send(Ok(message)).await.unwrap();
84                                        }
85                                    }
86                                    Err(e) => {
87                                        debug!("Parsing error, closing {:?}", e);
88                                        if server_sender.send(Err(ConnectionError::Closing(ClosingReason::ParseError(e))))
89                                            .await
90                                            .is_err() {
91                                            debug!("Could not inform client");
92                                        }
93                                        inner_close_sender.send(()).await.unwrap();
94                                        closing = true;
95                                    }
96                                }
97                            }
98                            Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
99                            Err(e) => {
100                                debug!("Connection error, closing {:?}", e);
101                                if server_sender.send(Err(ConnectionError::Closing(ClosingReason::ConnectionError(e))))
102                                    .await
103                                .is_err() {
104                                    debug!("Could not inform client");
105                                }
106                                inner_close_sender.send(()).await.unwrap();
107                                closing = true;
108                            }
109                        }
110                    }
111                    _ = close_receiver.recv() => {
112                        debug!("Closing connection");
113                        tcp_stream.shutdown()
114                            .await
115                            .unwrap();
116
117                        receiver.close();
118
119                        let mut guard = inner_is_closed.lock().await;
120                        *guard = true;
121                        break;
122                    }
123                };
124            }
125        });
126
127        connection
128    }
129
130    pub async fn is_closed(&self) -> bool {
131        *self.is_closed.lock().await
132    }
133    pub async fn emit<T: Into<Frame<ClientCommand>>>(
134        &self,
135        frame: T,
136    ) -> Result<(), SendError<StompMessage<ClientCommand>>> {
137        self.client_sender
138            .send(StompMessage::Frame(frame.into()))
139            .await
140    }
141
142    pub async fn heartbeat(&self) -> Result<(), SendError<StompMessage<ClientCommand>>> {
143        self.client_sender.send(StompMessage::Ping).await
144    }
145
146    pub async fn close(&self) {
147        if self
148            .server_sender
149            .send(Err(ConnectionError::Closing(ClosingReason::Shutdown)))
150            .await
151            .is_err()
152        {
153            debug!("Could not inform client");
154        }
155
156        self.close_sender.send(()).await;
157    }
158}