easy_esp/server/
mod.rs

1//! This module defines a simple TCP server that implements the observer pattern.
2//!
3//! The server manages client connections and validates message formats. It also
4//! facilitates sending and receiving messages between clients.
5//!
6//! # Example
7//!
8//! ```
9//! use std::net::SocketAddr;
10//! use easy_esp::{Server, SendBackHandler};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//!     let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
15//!     let server = Server::new(addr, SendBackHandler::new());
16//!
17//!     server.listen().await;
18//! }
19//! ```
20
21use super::handler::RequestHandler;
22use std::net::SocketAddr;
23use std::sync::{Arc, Mutex};
24use tokio::net::{TcpListener, TcpStream};
25use tokio::sync::broadcast::{self, Receiver, Sender};
26use tokio::task::JoinHandle;
27
28mod conn;
29use conn::Conn;
30
31mod commands;
32pub use commands::ServerCMD;
33
34/// The server manages client connections, verifies message formats, and handles sending and receiving messages.
35///
36/// # Type Parameters
37///
38/// * `H`: Type that implements the `RequestHandler` trait for handling incoming requests.
39///
40/// # Fields
41///
42/// * `address`: The socket address on which the server is listening.
43/// * `handles`: Vector of join handles for spawned connection tasks.
44/// * `send_all_tx`: Sender channel for broadcasting messages to all clients.
45/// * `cmd_rx`: Receiver channel for receiving commands from clients.
46/// * `cmd_tx`: Sender channel for sending commands to clients.
47/// * `message_handler`: Shared handler function to handle all incoming messages.
48pub struct Server<H>
49where
50    H: RequestHandler + Sync + Send + 'static,
51{
52    address: SocketAddr,
53
54    handles: Vec<JoinHandle<()>>,
55    send_all_tx: Sender<String>,
56    cmd_rx: Receiver<ServerCMD>,
57    cmd_tx: Sender<ServerCMD>,
58    message_handler: Arc<Mutex<H>>, // Shared handler func to handle all incoming messages
59}
60
61impl<H> Server<H>
62where
63    H: RequestHandler + Sync + Send,
64{
65    /// Gets the socket address of the server.
66    ///
67    /// # Returns
68    ///
69    /// The socket address on which the server is listening.
70    pub fn get_addr(&self) -> SocketAddr {
71        self.address
72    }
73
74    /// Adds a new connection to the server.
75    ///
76    /// # Arguments
77    ///
78    /// * `conn_stream` - The TCP stream representing the connection to the client.
79    pub fn add_conn(&mut self, conn_stream: TcpStream) {
80        let mut conn = Conn::new(
81            conn_stream,
82            self.message_handler.clone(),
83            self.send_all_tx.subscribe(),
84            self.cmd_tx.clone(),
85        );
86
87        let handle = tokio::spawn(async move {
88            conn.listen().await;
89        });
90
91        self.handles.push(handle);
92    }
93
94    /// Sends a message to all connected clients.
95    ///
96    /// # Arguments
97    ///
98    /// * `message` - The message to be sent to all clients.
99    pub fn send_all(&self, message: String) {
100        self.send_all_tx.send(message).unwrap();
101    }
102
103    /// Starts listening for incoming connections and commands.
104    pub async fn listen(&mut self) {
105        println!("[Server] starting on {}...", self.get_addr());
106        let listener = TcpListener::bind(self.address).await.unwrap();
107
108        loop {
109            tokio::select! {
110                // Accept a new connection
111                Ok((socket, addr)) = listener.accept() => {
112                    println!("[Server] Connected with {}", addr);
113                    self.add_conn(socket);
114                },
115                // Receive a message from the rx channel
116                Ok(cmd) = self.cmd_rx.recv() => {
117                    match cmd {
118                        ServerCMD::ShutDown(code) =>  {
119                            println!("[Server] Server shutting down with code {}...", code);
120                        }
121                        ServerCMD::SendAll(message) => {
122                            self.send_all(message);
123                        }
124                        ServerCMD::Kick(addr) => {
125                            println!("[Server] Kicking client with addr {}", addr);
126                        }
127                    }
128                }
129            }
130        }
131    }
132
133    /// Creates a new instance of `Server`.
134    ///
135    /// # Arguments
136    ///
137    /// * `address` - The socket address on which the server will listen for incoming connections.
138    /// * `message_handler` - The handler for processing incoming requests.
139    ///
140    /// # Returns
141    ///
142    /// A new instance of `Server`.
143    pub fn new(address: SocketAddr, message_handler: H) -> Self {
144        let message_handler: Arc<Mutex<H>> = Arc::new(Mutex::new(message_handler));
145        let handles = vec![];
146
147        // doesn't really matter the count
148        let count = 16;
149
150        let (send_all_tx, _) = broadcast::channel(count);
151        let (cmd_tx, cmd_rx) = broadcast::channel::<ServerCMD>(count);
152
153        Server {
154            address,
155            handles,
156            send_all_tx,
157            cmd_rx,
158            cmd_tx,
159            message_handler,
160        }
161    }
162}