socket_flow/
server.rs

1use crate::config::ServerConfig;
2use crate::event::{generate_new_uuid, Event, EventStream};
3use crate::handshake::accept_async_with_config;
4use crate::stream::SocketFlowStream;
5use futures::StreamExt;
6use std::io::Error;
7use tokio::net::TcpListener;
8use tokio::sync::mpsc;
9use tokio_rustls::{TlsAcceptor, TlsStream};
10
11/// A ready to use websockets server
12///
13/// This method is used to spawn a websockets server with just several lines of code.
14/// It accepts a port where the server will run, and the ServerConfig, which contains custom
15/// websockets configurations, and a TLS config option; in case the end-user wants to enable
16/// TLS on this server.
17/// It returns an EventStream, which is a stream
18/// that notifies all the relevant events of the websockets server, like new connected clients
19/// messages from a single client, disconnections and errors.
20pub async fn start_server_with_config(
21    port: u16,
22    config: Option<ServerConfig>,
23) -> Result<EventStream, Error> {
24    let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
25    let (tx, rx) = mpsc::channel(1000);
26    let web_socket_config = config.clone().unwrap_or_default().web_socket_config;
27    let tls_config = config.unwrap_or_default().tls_config;
28    // This method will return an EventStream, which holds a Receiver channel. Therefore, this
29    // spawned task will be used for processing new connections,
30    // messages, disconnections and errors, concurrently.
31    tokio::spawn(async move {
32        loop {
33            // we are using UUID, which is more flexible, and secure than incrementing IDs
34            let uuid = generate_new_uuid();
35            match listener.accept().await {
36                Ok((stream, _)) => {
37                    let socket_stream = if let Some(config) = tls_config.clone() {
38                        let acceptor = TlsAcceptor::from(config);
39                        match acceptor.accept(stream).await {
40                            Ok(tls_stream) => SocketFlowStream::Secure(TlsStream::from(tls_stream)),
41                            Err(err) => {
42                                tx.send(Event::Error(uuid, err.into())).await.unwrap();
43                                continue;
44                            }
45                        }
46                    } else {
47                        SocketFlowStream::Plain(stream)
48                    };
49
50                    let ws_connection =
51                        match accept_async_with_config(socket_stream, web_socket_config.clone())
52                            .await
53                        {
54                            Ok(conn) => conn,
55                            Err(err) => {
56                                tx.send(Event::Error(uuid, err)).await.unwrap();
57                                continue;
58                            }
59                        };
60                    // splitting the connection, so we could monitor incoming messages into a
61                    // separate task, and handover the writer to the end-user
62                    let (mut ws_reader, ws_writer) = ws_connection.split();
63
64                    // send new client event
65                    tx.send(Event::NewClient(uuid, ws_writer)).await.unwrap();
66
67                    let tx_task = tx.clone();
68                    tokio::spawn(async move {
69                        while let Some(result) = ws_reader.next().await {
70                            match result {
71                                Ok(message) => {
72                                    tx_task
73                                        .send(Event::NewMessage(uuid, message))
74                                        .await
75                                        .unwrap();
76                                    // send the received message event
77                                }
78                                Err(err) => {
79                                    tx_task.send(Event::Error(uuid, err)).await.unwrap();
80                                    break;
81                                }
82                            }
83                        }
84
85                        // send disconnect event when connection closed
86                        let _ = tx_task.send(Event::Disconnect(uuid)).await;
87                    });
88                }
89                Err(error) => {
90                    tx.send(Event::Error(uuid, error.into())).await.unwrap();
91                    continue;
92                }
93            }
94        }
95    });
96
97    // Delivery the EventStream to the end-user, without blocking this function call
98    // by the spawned task.
99    // Thus, processing and sending new events concurrently
100    Ok(EventStream::new(rx))
101}
102
103/// A ready to use websockets server
104///
105/// This method is used to spawn a websockets server with just several lines of code.
106/// It accepts a port where the server will run, and returns an EventStream, which is a stream
107/// that notifies all the relevant events of the websockets server, like new connected clients
108/// messages from a single client, disconnections and errors.
109pub async fn start_server(port: u16) -> Result<EventStream, Error> {
110    start_server_with_config(port, None).await
111}