socket_flow/server.rs
1use crate::config::ServerConfig;
2use crate::event::{generate_new_uuid, Event, EventStream};
3use crate::handshake::accept_async_with_config;
4use crate::stream::SocketFlowStream;
5use futures::StreamExt;
6use std::io::Error;
7use tokio::net::TcpListener;
8use tokio::sync::mpsc;
9use tokio_rustls::{TlsAcceptor, TlsStream};
10
11/// A ready to use websockets server
12///
13/// This method is used to spawn a websockets server with just several lines of code.
14/// It accepts a port where the server will run, and the ServerConfig, which contains custom
15/// websockets configurations, and a TLS config option; in case the end-user wants to enable
16/// TLS on this server.
17/// It returns an EventStream, which is a stream
18/// that notifies all the relevant events of the websockets server, like new connected clients
19/// messages from a single client, disconnections and errors.
20pub async fn start_server_with_config(
21 port: u16,
22 config: Option<ServerConfig>,
23) -> Result<EventStream, Error> {
24 let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
25 let (tx, rx) = mpsc::channel(1000);
26 let web_socket_config = config.clone().unwrap_or_default().web_socket_config;
27 let tls_config = config.unwrap_or_default().tls_config;
28 // This method will return an EventStream, which holds a Receiver channel. Therefore, this
29 // spawned task will be used for processing new connections,
30 // messages, disconnections and errors, concurrently.
31 tokio::spawn(async move {
32 loop {
33 // we are using UUID, which is more flexible, and secure than incrementing IDs
34 let uuid = generate_new_uuid();
35 match listener.accept().await {
36 Ok((stream, _)) => {
37 let socket_stream = if let Some(config) = tls_config.clone() {
38 let acceptor = TlsAcceptor::from(config);
39 match acceptor.accept(stream).await {
40 Ok(tls_stream) => SocketFlowStream::Secure(TlsStream::from(tls_stream)),
41 Err(err) => {
42 tx.send(Event::Error(uuid, err.into())).await.unwrap();
43 continue;
44 }
45 }
46 } else {
47 SocketFlowStream::Plain(stream)
48 };
49
50 let ws_connection =
51 match accept_async_with_config(socket_stream, web_socket_config.clone())
52 .await
53 {
54 Ok(conn) => conn,
55 Err(err) => {
56 tx.send(Event::Error(uuid, err)).await.unwrap();
57 continue;
58 }
59 };
60 // splitting the connection, so we could monitor incoming messages into a
61 // separate task, and handover the writer to the end-user
62 let (mut ws_reader, ws_writer) = ws_connection.split();
63
64 // send new client event
65 tx.send(Event::NewClient(uuid, ws_writer)).await.unwrap();
66
67 let tx_task = tx.clone();
68 tokio::spawn(async move {
69 while let Some(result) = ws_reader.next().await {
70 match result {
71 Ok(message) => {
72 tx_task
73 .send(Event::NewMessage(uuid, message))
74 .await
75 .unwrap();
76 // send the received message event
77 }
78 Err(err) => {
79 tx_task.send(Event::Error(uuid, err)).await.unwrap();
80 break;
81 }
82 }
83 }
84
85 // send disconnect event when connection closed
86 let _ = tx_task.send(Event::Disconnect(uuid)).await;
87 });
88 }
89 Err(error) => {
90 tx.send(Event::Error(uuid, error.into())).await.unwrap();
91 continue;
92 }
93 }
94 }
95 });
96
97 // Delivery the EventStream to the end-user, without blocking this function call
98 // by the spawned task.
99 // Thus, processing and sending new events concurrently
100 Ok(EventStream::new(rx))
101}
102
103/// A ready to use websockets server
104///
105/// This method is used to spawn a websockets server with just several lines of code.
106/// It accepts a port where the server will run, and returns an EventStream, which is a stream
107/// that notifies all the relevant events of the websockets server, like new connected clients
108/// messages from a single client, disconnections and errors.
109pub async fn start_server(port: u16) -> Result<EventStream, Error> {
110 start_server_with_config(port, None).await
111}