easy_esp/server/mod.rs
1//! This module defines a simple TCP server that implements the observer pattern.
2//!
3//! The server manages client connections and validates message formats. It also
4//! facilitates sending and receiving messages between clients.
5//!
6//! # Example
7//!
8//! ```
9//! use std::net::SocketAddr;
10//! use easy_esp::{Server, SendBackHandler};
11//!
12//! #[tokio::main]
13//! async fn main() {
14//! let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
15//! let server = Server::new(addr, SendBackHandler::new());
16//!
17//! server.listen().await;
18//! }
19//! ```
20
21use super::handler::RequestHandler;
22use std::net::SocketAddr;
23use std::sync::{Arc, Mutex};
24use tokio::net::{TcpListener, TcpStream};
25use tokio::sync::broadcast::{self, Receiver, Sender};
26use tokio::task::JoinHandle;
27
28mod conn;
29use conn::Conn;
30
31mod commands;
32pub use commands::ServerCMD;
33
34/// The server manages client connections, verifies message formats, and handles sending and receiving messages.
35///
36/// # Type Parameters
37///
38/// * `H`: Type that implements the `RequestHandler` trait for handling incoming requests.
39///
40/// # Fields
41///
42/// * `address`: The socket address on which the server is listening.
43/// * `handles`: Vector of join handles for spawned connection tasks.
44/// * `send_all_tx`: Sender channel for broadcasting messages to all clients.
45/// * `cmd_rx`: Receiver channel for receiving commands from clients.
46/// * `cmd_tx`: Sender channel for sending commands to clients.
47/// * `message_handler`: Shared handler function to handle all incoming messages.
48pub struct Server<H>
49where
50 H: RequestHandler + Sync + Send + 'static,
51{
52 address: SocketAddr,
53
54 handles: Vec<JoinHandle<()>>,
55 send_all_tx: Sender<String>,
56 cmd_rx: Receiver<ServerCMD>,
57 cmd_tx: Sender<ServerCMD>,
58 message_handler: Arc<Mutex<H>>, // Shared handler func to handle all incoming messages
59}
60
61impl<H> Server<H>
62where
63 H: RequestHandler + Sync + Send,
64{
65 /// Gets the socket address of the server.
66 ///
67 /// # Returns
68 ///
69 /// The socket address on which the server is listening.
70 pub fn get_addr(&self) -> SocketAddr {
71 self.address
72 }
73
74 /// Adds a new connection to the server.
75 ///
76 /// # Arguments
77 ///
78 /// * `conn_stream` - The TCP stream representing the connection to the client.
79 pub fn add_conn(&mut self, conn_stream: TcpStream) {
80 let mut conn = Conn::new(
81 conn_stream,
82 self.message_handler.clone(),
83 self.send_all_tx.subscribe(),
84 self.cmd_tx.clone(),
85 );
86
87 let handle = tokio::spawn(async move {
88 conn.listen().await;
89 });
90
91 self.handles.push(handle);
92 }
93
94 /// Sends a message to all connected clients.
95 ///
96 /// # Arguments
97 ///
98 /// * `message` - The message to be sent to all clients.
99 pub fn send_all(&self, message: String) {
100 self.send_all_tx.send(message).unwrap();
101 }
102
103 /// Starts listening for incoming connections and commands.
104 pub async fn listen(&mut self) {
105 println!("[Server] starting on {}...", self.get_addr());
106 let listener = TcpListener::bind(self.address).await.unwrap();
107
108 loop {
109 tokio::select! {
110 // Accept a new connection
111 Ok((socket, addr)) = listener.accept() => {
112 println!("[Server] Connected with {}", addr);
113 self.add_conn(socket);
114 },
115 // Receive a message from the rx channel
116 Ok(cmd) = self.cmd_rx.recv() => {
117 match cmd {
118 ServerCMD::ShutDown(code) => {
119 println!("[Server] Server shutting down with code {}...", code);
120 }
121 ServerCMD::SendAll(message) => {
122 self.send_all(message);
123 }
124 ServerCMD::Kick(addr) => {
125 println!("[Server] Kicking client with addr {}", addr);
126 }
127 }
128 }
129 }
130 }
131 }
132
133 /// Creates a new instance of `Server`.
134 ///
135 /// # Arguments
136 ///
137 /// * `address` - The socket address on which the server will listen for incoming connections.
138 /// * `message_handler` - The handler for processing incoming requests.
139 ///
140 /// # Returns
141 ///
142 /// A new instance of `Server`.
143 pub fn new(address: SocketAddr, message_handler: H) -> Self {
144 let message_handler: Arc<Mutex<H>> = Arc::new(Mutex::new(message_handler));
145 let handles = vec![];
146
147 // doesn't really matter the count
148 let count = 16;
149
150 let (send_all_tx, _) = broadcast::channel(count);
151 let (cmd_tx, cmd_rx) = broadcast::channel::<ServerCMD>(count);
152
153 Server {
154 address,
155 handles,
156 send_all_tx,
157 cmd_rx,
158 cmd_tx,
159 message_handler,
160 }
161 }
162}