1use crate::protocol::BNF_LF;
2use crate::protocol::{ClientCommand, Frame, FrameParser, ParseError, ServerCommand, StompMessage};
3use log::debug;
4use std::error::Error;
5use std::fmt::{Display, Formatter};
6use std::sync::Arc;
7use tokio::io::{AsyncReadExt, AsyncWriteExt, ErrorKind};
8use tokio::net::TcpStream;
9use tokio::sync::mpsc::error::SendError;
10use tokio::sync::mpsc::{channel, Sender};
11use tokio::sync::Mutex;
12
13#[derive(Debug)]
14pub enum ClosingReason {
15 ParseError(ParseError),
16 ConnectionError(std::io::Error),
17 Shutdown,
18}
19
20#[derive(Debug)]
21pub enum ConnectionError {
22 Closing(ClosingReason),
23}
24
25impl Display for ConnectionError {
26 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
27 write!(f, "Client error")
28 }
29}
30
31impl Error for ConnectionError {}
32
33pub struct Connection {
34 client_sender: Sender<StompMessage<ClientCommand>>,
35 server_sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
36 close_sender: Sender<()>,
37 is_closed: Arc<Mutex<bool>>,
38}
39
40impl Connection {
41 pub async fn new(
42 mut tcp_stream: TcpStream,
43 server_sender: Sender<Result<StompMessage<ServerCommand>, ConnectionError>>,
44 ) -> Self {
45 let (sender, mut receiver) = channel(5);
46
47 let (close_sender, mut close_receiver) = channel(1);
48 let inner_close_sender = close_sender.clone();
49 let is_closed = Arc::new(Mutex::new(false));
50 let inner_is_closed = Arc::clone(&is_closed);
51
52 let connection = Self {
53 client_sender: sender,
54 server_sender: server_sender.clone(),
55 close_sender,
56 is_closed,
57 };
58
59 tokio::spawn(async move {
60 let mut msg = vec![0; 8096];
61 let mut parser: FrameParser<ServerCommand> = FrameParser::new();
62 let mut closing = false;
63
64 loop {
65 tokio::select! {
66 frame = receiver.recv(), if !closing => {
67 if let Some(message) = frame {
68 match message {
69 StompMessage::Frame(frame) => tcp_stream.write_all(&frame.to_bytes()).await.unwrap(),
70 StompMessage::Ping => tcp_stream.write_u8(BNF_LF).await.unwrap()
71 }
72
73 tcp_stream.flush().await.unwrap();
74 }
75 },
76 read = tcp_stream.read(&mut msg), if !closing => {
77 match read {
78 Ok(n) => {
79 match parser.parse(&msg[..n]) {
80 Ok(messages) => {
81 for message in messages {
82 debug!("Message received {:?}", message.clone());
83 server_sender.send(Ok(message)).await.unwrap();
84 }
85 }
86 Err(e) => {
87 debug!("Parsing error, closing {:?}", e);
88 if server_sender.send(Err(ConnectionError::Closing(ClosingReason::ParseError(e))))
89 .await
90 .is_err() {
91 debug!("Could not inform client");
92 }
93 inner_close_sender.send(()).await.unwrap();
94 closing = true;
95 }
96 }
97 }
98 Err(ref e) if e.kind() == ErrorKind::WouldBlock => {}
99 Err(e) => {
100 debug!("Connection error, closing {:?}", e);
101 if server_sender.send(Err(ConnectionError::Closing(ClosingReason::ConnectionError(e))))
102 .await
103 .is_err() {
104 debug!("Could not inform client");
105 }
106 inner_close_sender.send(()).await.unwrap();
107 closing = true;
108 }
109 }
110 }
111 _ = close_receiver.recv() => {
112 debug!("Closing connection");
113 tcp_stream.shutdown()
114 .await
115 .unwrap();
116
117 receiver.close();
118
119 let mut guard = inner_is_closed.lock().await;
120 *guard = true;
121 break;
122 }
123 };
124 }
125 });
126
127 connection
128 }
129
130 pub async fn is_closed(&self) -> bool {
131 *self.is_closed.lock().await
132 }
133 pub async fn emit<T: Into<Frame<ClientCommand>>>(
134 &self,
135 frame: T,
136 ) -> Result<(), SendError<StompMessage<ClientCommand>>> {
137 self.client_sender
138 .send(StompMessage::Frame(frame.into()))
139 .await
140 }
141
142 pub async fn heartbeat(&self) -> Result<(), SendError<StompMessage<ClientCommand>>> {
143 self.client_sender.send(StompMessage::Ping).await
144 }
145
146 pub async fn close(&self) {
147 if self
148 .server_sender
149 .send(Err(ConnectionError::Closing(ClosingReason::Shutdown)))
150 .await
151 .is_err()
152 {
153 debug!("Could not inform client");
154 }
155
156 self.close_sender.send(()).await;
157 }
158}