socket_flow/
server.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
use crate::config::ServerConfig;
use crate::event::{generate_new_uuid, Event, EventStream};
use crate::handshake::accept_async_with_config;
use crate::stream::SocketFlowStream;
use futures::StreamExt;
use std::io::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio_rustls::{TlsAcceptor, TlsStream};

/// A ready to use websockets server
///
/// This method is used to spawn a websockets server with just several lines of code.
/// It accepts a port where the server will run, and the ServerConfig, which contains custom
/// websockets configurations, and a TLS config option; in case the end-user wants to enable
/// TLS on this server.
/// It returns an EventStream, which is a stream
/// that notifies all the relevant events of the websockets server, like new connected clients
/// messages from a single client, disconnections and errors.
pub async fn start_server_with_config(
    port: u16,
    config: Option<ServerConfig>,
) -> Result<EventStream, Error> {
    let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
    let (tx, rx) = mpsc::channel(1000);
    let web_socket_config = config.clone().unwrap_or_default().web_socket_config;
    let tls_config = config.unwrap_or_default().tls_config;
    // This method will return an EventStream, which holds a Receiver channel. Therefore, this
    // spawned task will be used for processing new connections,
    // messages, disconnections and errors, concurrently.
    tokio::spawn(async move {
        loop {
            // we are using UUID, which is more flexible, and secure than incrementing IDs
            let uuid = generate_new_uuid();
            match listener.accept().await {
                Ok((stream, _)) => {
                    let socket_stream = if let Some(config) = tls_config.clone() {
                        let acceptor = TlsAcceptor::from(config);
                        match acceptor.accept(stream).await {
                            Ok(tls_stream) => SocketFlowStream::Secure(TlsStream::from(tls_stream)),
                            Err(err) => {
                                tx.send(Event::Error(uuid, err.into())).await.unwrap();
                                continue;
                            }
                        }
                    } else {
                        SocketFlowStream::Plain(stream)
                    };

                    let ws_connection =
                        match accept_async_with_config(socket_stream, web_socket_config.clone())
                            .await
                        {
                            Ok(conn) => conn,
                            Err(err) => {
                                tx.send(Event::Error(uuid, err)).await.unwrap();
                                continue;
                            }
                        };
                    // splitting the connection, so we could monitor incoming messages into a
                    // separate task, and handover the writer to the end-user
                    let (mut ws_reader, ws_writer) = ws_connection.split();

                    // send new client event
                    tx.send(Event::NewClient(uuid, ws_writer)).await.unwrap();

                    let tx_task = tx.clone();
                    tokio::spawn(async move {
                        while let Some(result) = ws_reader.next().await {
                            match result {
                                Ok(message) => {
                                    tx_task
                                        .send(Event::NewMessage(uuid, message))
                                        .await
                                        .unwrap();
                                    // send the received message event
                                }
                                Err(err) => {
                                    tx_task.send(Event::Error(uuid, err)).await.unwrap();
                                    break;
                                }
                            }
                        }

                        // send disconnect event when connection closed
                        let _ = tx_task.send(Event::Disconnect(uuid)).await;
                    });
                }
                Err(error) => {
                    tx.send(Event::Error(uuid, error.into())).await.unwrap();
                    continue;
                }
            }
        }
    });

    // Delivery the EventStream to the end-user, without blocking this function call
    // by the spawned task.
    // Thus, processing and sending new events concurrently
    Ok(EventStream::new(rx))
}

/// A ready to use websockets server
///
/// This method is used to spawn a websockets server with just several lines of code.
/// It accepts a port where the server will run, and returns an EventStream, which is a stream
/// that notifies all the relevant events of the websockets server, like new connected clients
/// messages from a single client, disconnections and errors.
pub async fn start_server(port: u16) -> Result<EventStream, Error> {
    start_server_with_config(port, None).await
}