1use super::{
2 channel::{Control, Error as ChannelError, Messages},
3 errors::Error,
4 server::{channels, MonitorEvent, MonitorSender},
5};
6use clibri::{env::logs, server};
7use futures::{SinkExt, StreamExt};
8use log::{debug, error, info, warn};
9use tokio::{
10 join,
11 net::TcpStream,
12 select,
13 sync::{
14 mpsc::{channel, Receiver, Sender, UnboundedSender},
15 oneshot,
16 },
17 task::spawn,
18};
19use tokio_tungstenite::{
20 tungstenite::{
21 error::{Error as TungsteniteError, ProtocolError},
22 protocol::CloseFrame,
23 protocol::Message,
24 },
25 WebSocketStream,
26};
27use tokio_util::sync::CancellationToken;
28use uuid::Uuid;
29
30mod shortcuts {
31 use super::*;
32
33 pub async fn send_event(
34 tx_events: &UnboundedSender<server::Events<Error>>,
35 event: server::Events<Error>,
36 ) {
37 if let Err(e) = tx_events.send(event) {
38 warn!(
39 target: logs::targets::SERVER,
40 "Cannot send event. Error: {}", e
41 );
42 }
43 }
44
45 pub async fn send_message(
46 tx_events: &UnboundedSender<server::Events<Error>>,
47 tx_messages: &UnboundedSender<Messages>,
48 msg: Messages,
49 uuid: Uuid,
50 ) {
51 if let Err(err) = tx_messages.send(msg) {
52 warn!(
53 target: logs::targets::SERVER,
54 "{}:: Fail to send data back to server. Error: {}", uuid, err
55 );
56 if let Err(err) = tx_events.send(server::Events::ConnectionError(
57 Some(uuid),
58 Error::Channel(format!("{}", err)),
59 )) {
60 warn!(
61 target: logs::targets::SERVER,
62 "Cannot send event. Error: {}", err
63 );
64 }
65 }
66 }
67}
68
69enum State {
70 DisconnectByClient(Option<CloseFrame<'static>>),
71 DisconnectByClientWithError(String),
72 DisconnectByServer,
73 Error(ChannelError),
74}
75
76pub struct Connection {
77 uuid: Uuid,
78}
79
80impl Connection {
81 pub fn new(uuid: Uuid) -> Self {
82 Self { uuid }
83 }
84
85 pub async fn attach(
86 &mut self,
87 ws: WebSocketStream<TcpStream>,
88 tx_events: UnboundedSender<server::Events<Error>>,
89 tx_messages: UnboundedSender<Messages>,
90 tx_monitor: Option<MonitorSender>,
91 port: u16,
92 ) -> Result<Sender<Control>, String> {
93 let (tx_control, mut rx_control): (Sender<Control>, Receiver<Control>) =
94 channel(channels::CONNECTION_CONTROL);
95 let uuid = self.uuid;
96 if let Some(tx_monitor) = tx_monitor.as_ref() {
97 tx_monitor
98 .send((port, MonitorEvent::Connected))
99 .await
100 .map_err(|_e| String::from("Fail send monitor event - connected"))?;
101 }
102 spawn(async move {
103 let mut shutdown_resolver: Option<oneshot::Sender<()>> = None;
104 let (mut writer, mut reader) = ws.split();
105 let stop_reading = CancellationToken::new();
106 let stop_reading_emitter = stop_reading.clone();
107 let stop_writing = CancellationToken::new();
108 let stop_writing_emitter = stop_writing.clone();
109 let ((writer, writer_state), (reader, mut reader_state)) = join!(
110 async {
111 while let Some(msg) = select! {
112 msg = reader.next() => msg,
113 _ = stop_reading.cancelled() => None,
114 } {
115 let msg = match msg {
116 Ok(msg) => msg,
117 Err(err) => {
118 if let TungsteniteError::Protocol(ref err) = err {
119 if err == &ProtocolError::ResetWithoutClosingHandshake {
120 debug!(
121 target: logs::targets::SERVER,
122 "{}:: Client disconnected without closing handshake",
123 uuid
124 );
125 stop_writing_emitter.cancel();
126 return (
127 reader,
128 Some(State::DisconnectByClientWithError(
129 err.to_string(),
130 )),
131 );
132 }
133 }
134 warn!(
135 target: logs::targets::SERVER,
136 "{}:: Cannot get message. Error: {:?}", uuid, err
137 );
138 shortcuts::send_event(
139 &tx_events,
140 server::Events::ConnectionError(
141 Some(uuid),
142 Error::InvalidMessage(err.to_string()),
143 ),
144 )
145 .await;
146 stop_writing_emitter.cancel();
147 return (
148 reader,
149 Some(State::Error(ChannelError::ReadSocket(err.to_string()))),
150 );
151 }
152 };
153 match msg {
154 Message::Text(_) => {
155 warn!(
156 target: logs::targets::SERVER,
157 "{}:: has been gotten not binnary data", uuid
158 );
159 shortcuts::send_event(
160 &tx_events,
161 server::Events::ConnectionError(
162 Some(uuid),
163 Error::NonBinaryData,
164 ),
165 )
166 .await;
167 continue;
168 }
169 Message::Binary(buffer) => {
170 info!(
171 target: logs::targets::SERVER,
172 "{}:: binary data {:?}", uuid, buffer
173 );
174 shortcuts::send_message(
175 &tx_events,
176 &tx_messages,
177 Messages::Binary { uuid, buffer },
178 uuid,
179 )
180 .await;
181 }
182 Message::Ping(_) | Message::Pong(_) => {
183 warn!(target: logs::targets::SERVER, "{}:: Ping / Pong", uuid);
184 }
185 Message::Close(close_frame) => {
186 stop_writing_emitter.cancel();
187 return (reader, Some(State::DisconnectByClient(close_frame)));
188 }
189 }
190 }
191 stop_writing_emitter.cancel();
192 (reader, None)
193 },
194 async {
195 while let Some(cmd) = select! {
196 cmd = rx_control.recv() => cmd,
197 _ = stop_writing.cancelled() => None,
198 } {
199 match cmd {
200 Control::Send(buffer) => {
201 if let Err(err) = writer.send(Message::from(buffer)).await {
202 error!(
203 target: logs::targets::SERVER,
204 "{}:: Cannot send data to client. Error: {}", uuid, err
205 );
206 stop_reading_emitter.cancel();
207 return (
208 writer,
209 Some(State::Error(ChannelError::WriteSocket(
210 err.to_string(),
211 ))),
212 );
213 }
214 }
215 Control::Disconnect(tx_shutdown_resolver) => {
216 shutdown_resolver = Some(tx_shutdown_resolver);
217 stop_reading_emitter.cancel();
218 return (writer, Some(State::DisconnectByServer));
219 }
220 };
221 }
222 stop_reading_emitter.cancel();
223 (writer, None)
224 }
225 );
226 debug!(
227 target: logs::targets::SERVER,
228 "{}:: exit from socket listening loop.", uuid
229 );
230 let state: Option<State> = if let Some(state) = reader_state.take() {
231 Some(state)
232 } else {
233 writer_state
234 };
235 let code = if let Some(state) = state {
236 match state {
237 State::DisconnectByServer => None,
238 State::DisconnectByClient(frame) => {
239 if let Some(frame) = frame {
240 Some(frame.code)
241 } else {
242 None
243 }
244 }
245 State::DisconnectByClientWithError(e) => {
246 debug!(
247 target: logs::targets::SERVER,
248 "{}:: client error: {}", uuid, e
249 );
250 None
251 }
252 State::Error(error) => {
253 shortcuts::send_message(
254 &tx_events,
255 &tx_messages,
256 Messages::Error { uuid, error },
257 uuid,
258 )
259 .await;
260 None
261 }
262 }
263 } else {
264 None
265 };
266 shortcuts::send_message(
267 &tx_events,
268 &tx_messages,
269 Messages::Disconnect { uuid, code },
270 uuid,
271 )
272 .await;
273 match writer.reunite(reader) {
274 Ok(mut ws) => {
275 match ws.close(None).await {
276 Ok(()) => {}
277 Err(e) => match e {
278 TungsteniteError::AlreadyClosed
279 | TungsteniteError::ConnectionClosed => {
280 debug!(
281 target: logs::targets::SERVER,
282 "{}:: connection is already closed", uuid
283 );
284 }
285 _ => {
286 error!(
287 target: logs::targets::SERVER,
288 "{}:: fail to close connection", uuid
289 );
290 shortcuts::send_event(
291 &tx_events,
292 server::Events::ConnectionError(
293 Some(uuid),
294 Error::CloseConnection(format!(
295 "{}:: fail to close connection",
296 uuid
297 )),
298 ),
299 )
300 .await;
301 }
302 },
303 };
304 drop(ws);
305 }
306 Err(err) => {
307 error!(
308 target: logs::targets::SERVER,
309 "{}:: fail to close connection (reunite err: {})", uuid, err
310 );
311 shortcuts::send_event(
312 &tx_events,
313 server::Events::ConnectionError(
314 Some(uuid),
315 Error::CloseConnection(format!(
316 "{}:: fail to close connection (reunite err: {})",
317 uuid, err
318 )),
319 ),
320 )
321 .await;
322 }
323 }
324 if let Some(tx_monitor) = tx_monitor.as_ref() {
325 if let Err(_err) = tx_monitor.send((port, MonitorEvent::Disconnected)).await {
326 warn!(
327 target: logs::targets::SERVER,
328 "{}:: Fail send monitor event - disconnected", uuid
329 );
330 }
331 }
332 if let Some(tx_shutdown_resolver) = shutdown_resolver.take() {
333 if tx_shutdown_resolver.send(()).is_err() {
334 error!(
335 target: logs::targets::SERVER,
336 "{}:: Fail send disconnect confirmation", uuid
337 );
338 }
339 }
340 });
341 Ok(tx_control)
342 }
343}