1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
use crate::event::{generate_new_uuid, Event, EventStream};
use crate::handshake::accept_async;
use futures::StreamExt;
use std::io::Error;
use tokio::net::TcpListener;
use tokio::sync::mpsc;
/// A ready to use websockets server
///
/// This method is used to spawn a websockets server with just several lines of code.
/// Accepts as argument that port, where the server will be running, and returns an `EventStream`.
/// Which implements Stream trait, being capable of processing a stream of events sequentially
/// notifying the end-user, about new client connections, disconnections, messages and errors.
pub async fn start_server(port: u16) -> Result<EventStream, Error> {
let listener = TcpListener::bind(format!("0.0.0.0:{}", port)).await?;
let (tx, rx) = mpsc::channel(1000);
// This method will return an EventStream, which holds a Receiver channel. Therefore, this
// spawned task will be used for processing new connections,
// messages, disconnections and errors, concurrently.
tokio::spawn(async move {
loop {
// we are using UUID, which is more flexible, and secure than incrementing IDs
let uuid = generate_new_uuid();
match listener.accept().await {
Ok((stream, _)) => {
let ws_connection = match accept_async(stream).await {
Ok(conn) => conn,
Err(err) => {
tx.send(Event::Error(uuid, err)).await.unwrap();
break;
}
};
// splitting the connection, so we could monitor incoming messages into a
// separate task, and handover the writer to the end-user
let (mut ws_reader, ws_writer) = ws_connection.split();
// send new client event
tx.send(Event::NewClient(uuid, ws_writer)).await.unwrap();
let tx_task = tx.clone();
tokio::spawn(async move {
while let Some(result) = ws_reader.next().await {
match result {
Ok(message) => {
tx_task
.send(Event::NewMessage(uuid, message))
.await
.unwrap();
// send the received message event
}
Err(err) => {
tx_task.send(Event::Error(uuid, err)).await.unwrap();
break;
}
}
}
// send disconnect event when connection closed
let _ = tx_task.send(Event::Disconnect(uuid)).await;
});
}
Err(error) => {
tx.send(Event::Error(uuid, error.into())).await.unwrap();
break;
}
}
}
});
// Delivery the EventStream to the end-user, without blocking this function call
// by the spawned task.
// Thus, processing and sending new events concurrently
Ok(EventStream::new(rx))
}