clibri_transport_server/
connection.rs

1use super::{
2    channel::{Control, Error as ChannelError, Messages},
3    errors::Error,
4    server::{channels, MonitorEvent, MonitorSender},
5};
6use clibri::{env::logs, server};
7use futures::{SinkExt, StreamExt};
8use log::{debug, error, info, warn};
9use tokio::{
10    join,
11    net::TcpStream,
12    select,
13    sync::{
14        mpsc::{channel, Receiver, Sender, UnboundedSender},
15        oneshot,
16    },
17    task::spawn,
18};
19use tokio_tungstenite::{
20    tungstenite::{
21        error::{Error as TungsteniteError, ProtocolError},
22        protocol::CloseFrame,
23        protocol::Message,
24    },
25    WebSocketStream,
26};
27use tokio_util::sync::CancellationToken;
28use uuid::Uuid;
29
30mod shortcuts {
31    use super::*;
32
33    pub async fn send_event(
34        tx_events: &UnboundedSender<server::Events<Error>>,
35        event: server::Events<Error>,
36    ) {
37        if let Err(e) = tx_events.send(event) {
38            warn!(
39                target: logs::targets::SERVER,
40                "Cannot send event. Error: {}", e
41            );
42        }
43    }
44
45    pub async fn send_message(
46        tx_events: &UnboundedSender<server::Events<Error>>,
47        tx_messages: &UnboundedSender<Messages>,
48        msg: Messages,
49        uuid: Uuid,
50    ) {
51        if let Err(err) = tx_messages.send(msg) {
52            warn!(
53                target: logs::targets::SERVER,
54                "{}:: Fail to send data back to server. Error: {}", uuid, err
55            );
56            if let Err(err) = tx_events.send(server::Events::ConnectionError(
57                Some(uuid),
58                Error::Channel(format!("{}", err)),
59            )) {
60                warn!(
61                    target: logs::targets::SERVER,
62                    "Cannot send event. Error: {}", err
63                );
64            }
65        }
66    }
67}
68
69enum State {
70    DisconnectByClient(Option<CloseFrame<'static>>),
71    DisconnectByClientWithError(String),
72    DisconnectByServer,
73    Error(ChannelError),
74}
75
76pub struct Connection {
77    uuid: Uuid,
78}
79
80impl Connection {
81    pub fn new(uuid: Uuid) -> Self {
82        Self { uuid }
83    }
84
85    pub async fn attach(
86        &mut self,
87        ws: WebSocketStream<TcpStream>,
88        tx_events: UnboundedSender<server::Events<Error>>,
89        tx_messages: UnboundedSender<Messages>,
90        tx_monitor: Option<MonitorSender>,
91        port: u16,
92    ) -> Result<Sender<Control>, String> {
93        let (tx_control, mut rx_control): (Sender<Control>, Receiver<Control>) =
94            channel(channels::CONNECTION_CONTROL);
95        let uuid = self.uuid;
96        if let Some(tx_monitor) = tx_monitor.as_ref() {
97            tx_monitor
98                .send((port, MonitorEvent::Connected))
99                .await
100                .map_err(|_e| String::from("Fail send monitor event - connected"))?;
101        }
102        spawn(async move {
103            let mut shutdown_resolver: Option<oneshot::Sender<()>> = None;
104            let (mut writer, mut reader) = ws.split();
105            let stop_reading = CancellationToken::new();
106            let stop_reading_emitter = stop_reading.clone();
107            let stop_writing = CancellationToken::new();
108            let stop_writing_emitter = stop_writing.clone();
109            let ((writer, writer_state), (reader, mut reader_state)) = join!(
110                async {
111                    while let Some(msg) = select! {
112                        msg = reader.next() => msg,
113                        _ = stop_reading.cancelled() => None,
114                    } {
115                        let msg = match msg {
116                            Ok(msg) => msg,
117                            Err(err) => {
118                                if let TungsteniteError::Protocol(ref err) = err {
119                                    if err == &ProtocolError::ResetWithoutClosingHandshake {
120                                        debug!(
121                                            target: logs::targets::SERVER,
122                                            "{}:: Client disconnected without closing handshake",
123                                            uuid
124                                        );
125                                        stop_writing_emitter.cancel();
126                                        return (
127                                            reader,
128                                            Some(State::DisconnectByClientWithError(
129                                                err.to_string(),
130                                            )),
131                                        );
132                                    }
133                                }
134                                warn!(
135                                    target: logs::targets::SERVER,
136                                    "{}:: Cannot get message. Error: {:?}", uuid, err
137                                );
138                                shortcuts::send_event(
139                                    &tx_events,
140                                    server::Events::ConnectionError(
141                                        Some(uuid),
142                                        Error::InvalidMessage(err.to_string()),
143                                    ),
144                                )
145                                .await;
146                                stop_writing_emitter.cancel();
147                                return (
148                                    reader,
149                                    Some(State::Error(ChannelError::ReadSocket(err.to_string()))),
150                                );
151                            }
152                        };
153                        match msg {
154                            Message::Text(_) => {
155                                warn!(
156                                    target: logs::targets::SERVER,
157                                    "{}:: has been gotten not binnary data", uuid
158                                );
159                                shortcuts::send_event(
160                                    &tx_events,
161                                    server::Events::ConnectionError(
162                                        Some(uuid),
163                                        Error::NonBinaryData,
164                                    ),
165                                )
166                                .await;
167                                continue;
168                            }
169                            Message::Binary(buffer) => {
170                                info!(
171                                    target: logs::targets::SERVER,
172                                    "{}:: binary data {:?}", uuid, buffer
173                                );
174                                shortcuts::send_message(
175                                    &tx_events,
176                                    &tx_messages,
177                                    Messages::Binary { uuid, buffer },
178                                    uuid,
179                                )
180                                .await;
181                            }
182                            Message::Ping(_) | Message::Pong(_) => {
183                                warn!(target: logs::targets::SERVER, "{}:: Ping / Pong", uuid);
184                            }
185                            Message::Close(close_frame) => {
186                                stop_writing_emitter.cancel();
187                                return (reader, Some(State::DisconnectByClient(close_frame)));
188                            }
189                        }
190                    }
191                    stop_writing_emitter.cancel();
192                    (reader, None)
193                },
194                async {
195                    while let Some(cmd) = select! {
196                        cmd = rx_control.recv() => cmd,
197                        _ = stop_writing.cancelled() => None,
198                    } {
199                        match cmd {
200                            Control::Send(buffer) => {
201                                if let Err(err) = writer.send(Message::from(buffer)).await {
202                                    error!(
203                                        target: logs::targets::SERVER,
204                                        "{}:: Cannot send data to client. Error: {}", uuid, err
205                                    );
206                                    stop_reading_emitter.cancel();
207                                    return (
208                                        writer,
209                                        Some(State::Error(ChannelError::WriteSocket(
210                                            err.to_string(),
211                                        ))),
212                                    );
213                                }
214                            }
215                            Control::Disconnect(tx_shutdown_resolver) => {
216                                shutdown_resolver = Some(tx_shutdown_resolver);
217                                stop_reading_emitter.cancel();
218                                return (writer, Some(State::DisconnectByServer));
219                            }
220                        };
221                    }
222                    stop_reading_emitter.cancel();
223                    (writer, None)
224                }
225            );
226            debug!(
227                target: logs::targets::SERVER,
228                "{}:: exit from socket listening loop.", uuid
229            );
230            let state: Option<State> = if let Some(state) = reader_state.take() {
231                Some(state)
232            } else {
233                writer_state
234            };
235            let code = if let Some(state) = state {
236                match state {
237                    State::DisconnectByServer => None,
238                    State::DisconnectByClient(frame) => {
239                        if let Some(frame) = frame {
240                            Some(frame.code)
241                        } else {
242                            None
243                        }
244                    }
245                    State::DisconnectByClientWithError(e) => {
246                        debug!(
247                            target: logs::targets::SERVER,
248                            "{}:: client error: {}", uuid, e
249                        );
250                        None
251                    }
252                    State::Error(error) => {
253                        shortcuts::send_message(
254                            &tx_events,
255                            &tx_messages,
256                            Messages::Error { uuid, error },
257                            uuid,
258                        )
259                        .await;
260                        None
261                    }
262                }
263            } else {
264                None
265            };
266            shortcuts::send_message(
267                &tx_events,
268                &tx_messages,
269                Messages::Disconnect { uuid, code },
270                uuid,
271            )
272            .await;
273            match writer.reunite(reader) {
274                Ok(mut ws) => {
275                    match ws.close(None).await {
276                        Ok(()) => {}
277                        Err(e) => match e {
278                            TungsteniteError::AlreadyClosed
279                            | TungsteniteError::ConnectionClosed => {
280                                debug!(
281                                    target: logs::targets::SERVER,
282                                    "{}:: connection is already closed", uuid
283                                );
284                            }
285                            _ => {
286                                error!(
287                                    target: logs::targets::SERVER,
288                                    "{}:: fail to close connection", uuid
289                                );
290                                shortcuts::send_event(
291                                    &tx_events,
292                                    server::Events::ConnectionError(
293                                        Some(uuid),
294                                        Error::CloseConnection(format!(
295                                            "{}:: fail to close connection",
296                                            uuid
297                                        )),
298                                    ),
299                                )
300                                .await;
301                            }
302                        },
303                    };
304                    drop(ws);
305                }
306                Err(err) => {
307                    error!(
308                        target: logs::targets::SERVER,
309                        "{}:: fail to close connection (reunite err: {})", uuid, err
310                    );
311                    shortcuts::send_event(
312                        &tx_events,
313                        server::Events::ConnectionError(
314                            Some(uuid),
315                            Error::CloseConnection(format!(
316                                "{}:: fail to close connection (reunite err: {})",
317                                uuid, err
318                            )),
319                        ),
320                    )
321                    .await;
322                }
323            }
324            if let Some(tx_monitor) = tx_monitor.as_ref() {
325                if let Err(_err) = tx_monitor.send((port, MonitorEvent::Disconnected)).await {
326                    warn!(
327                        target: logs::targets::SERVER,
328                        "{}:: Fail send monitor event - disconnected", uuid
329                    );
330                }
331            }
332            if let Some(tx_shutdown_resolver) = shutdown_resolver.take() {
333                if tx_shutdown_resolver.send(()).is_err() {
334                    error!(
335                        target: logs::targets::SERVER,
336                        "{}:: Fail send disconnect confirmation", uuid
337                    );
338                }
339            }
340        });
341        Ok(tx_control)
342    }
343}