Skip to main content

tcplane/server/
impl.rs

1use crate::*;
2
3/// Provides a default implementation for ServerData.
4impl Default for ServerData {
5    fn default() -> Self {
6        Self {
7            server_config: ServerConfigData::default(),
8            hook: vec![],
9            task_panic: vec![],
10            read_error: vec![],
11        }
12    }
13}
14
15/// Provides a default implementation for ServerControlHook.
16impl Default for ServerControlHook {
17    fn default() -> Self {
18        Self {
19            wait_hook: Arc::new(|| Box::pin(async {})),
20            shutdown_hook: Arc::new(|| Box::pin(async {})),
21        }
22    }
23}
24
25impl ServerData {
26    /// Gets a reference to the configuration.
27    ///
28    /// # Returns
29    ///
30    /// - `&ServerConfig` - Reference to the configuration.
31    pub(crate) fn get_config(&self) -> &ServerConfigData {
32        &self.server_config
33    }
34
35    /// Gets a mutable reference to the server configuration.
36    ///
37    /// # Returns
38    ///
39    /// - `&mut ServerConfigData` - Mutable reference to the server configuration.
40    pub(crate) fn get_mut_server_config(&mut self) -> &mut ServerConfigData {
41        &mut self.server_config
42    }
43
44    /// Gets a reference to the hook list.
45    ///
46    /// # Returns
47    ///
48    /// - `&ServerHookList` - Reference to the hook list.
49    pub(crate) fn get_hook(&self) -> &ServerHookList {
50        &self.hook
51    }
52
53    /// Gets a mutable reference to the hook list.
54    ///
55    /// # Returns
56    ///
57    /// - `&mut ServerHookList` - Mutable reference to the hook list.
58    pub(crate) fn get_mut_hook(&mut self) -> &mut ServerHookList {
59        &mut self.hook
60    }
61
62    /// Gets a reference to the task panic handler list.
63    ///
64    /// # Returns
65    ///
66    /// - `&ServerHookList` - Reference to the task panic handler list.
67    pub(crate) fn get_task_panic(&self) -> &ServerHookList {
68        &self.task_panic
69    }
70
71    /// Gets a mutable reference to the task panic handler list.
72    ///
73    /// # Returns
74    ///
75    /// - `&mut ServerHookList` - Mutable reference to the task panic handler list.
76    pub(crate) fn get_mut_task_panic(&mut self) -> &mut ServerHookList {
77        &mut self.task_panic
78    }
79
80    /// Gets a reference to the read error handler list.
81    ///
82    /// # Returns
83    ///
84    /// - `&ServerHookList` - Reference to the read error handler list.
85    pub(crate) fn get_read_error(&self) -> &ServerHookList {
86        &self.read_error
87    }
88
89    /// Gets a mutable reference to the read error handler list.
90    ///
91    /// # Returns
92    ///
93    /// - `&mut ServerHookList` - Mutable reference to the read error handler list.
94    pub(crate) fn get_mut_read_error(&mut self) -> &mut ServerHookList {
95        &mut self.read_error
96    }
97}
98
99/// Provides a default implementation for Server.
100impl Default for Server {
101    fn default() -> Self {
102        Self(Arc::new(RwLock::new(ServerData::default())))
103    }
104}
105
106/// Implementation of methods for the Server structure.
107impl Server {
108    /// Creates a new Server instance with default settings.
109    ///
110    /// # Returns
111    ///
112    /// - `Self` - A new Server instance.
113    pub fn new() -> Self {
114        Self::default()
115    }
116
117    /// Acquires a read lock on the inner server data.
118    ///
119    /// # Returns
120    ///
121    /// - `ArcRwLockReadGuard<ServerData>` - The read guard.
122    pub(crate) async fn read(&self) -> ArcRwLockReadGuard<'_, ServerData> {
123        self.0.read().await
124    }
125
126    /// Acquires a write lock on the inner server data.
127    ///
128    /// # Returns
129    ///
130    /// - `ArcRwLockWriteGuard<ServerData>` - The write guard.
131    pub(crate) async fn write(&self) -> ArcRwLockWriteGuard<'_, ServerData> {
132        self.0.write().await
133    }
134
135    /// Sets the server configuration.
136    ///
137    /// # Arguments
138    ///
139    /// - `ServerConfig` - The server configuration.
140    ///
141    /// # Returns
142    ///
143    /// - `&Self` - Reference to self for method chaining.
144    pub async fn server_config(&self, config: ServerConfig) -> &Self {
145        *self.write().await.get_mut_server_config() = config.get_data().await;
146        self
147    }
148
149    /// Constructs a bind address string from host and port。
150    ///
151    /// # Arguments
152    ///
153    /// - `AsRef<str>` - Type that can be referenced as a string slice.
154    /// - `u16` - The port number.
155    ///
156    /// # Returns
157    ///
158    /// - `String` - The formatted bind address.
159    #[inline(always)]
160    pub fn get_bind_addr<H>(host: H, port: u16) -> String
161    where
162        H: AsRef<str>,
163    {
164        format!("{}{}{}", host.as_ref(), COLON, port)
165    }
166
167    /// Adds a typed hook to the server's hook list.
168    ///
169    /// # Arguments
170    ///
171    /// - `ServerHook` - The hook type that implements `ServerHook`.
172    ///
173    /// # Returns
174    ///
175    /// - `&Self` - Reference to self for method chaining.
176    pub async fn hook<H>(&self) -> &Self
177    where
178        H: ServerHook,
179    {
180        self.write()
181            .await
182            .get_mut_hook()
183            .push(server_hook_factory::<H>());
184        self
185    }
186
187    /// Adds a panic handler to the server's task panic handler list.
188    ///
189    /// # Arguments
190    ///
191    /// - `ServerHook` - The handler type that implements `ServerHook`.
192    ///
193    /// # Returns
194    ///
195    /// - `&Self` - Reference to self for method chaining.
196    pub async fn task_panic<H>(&self) -> &Self
197    where
198        H: ServerHook,
199    {
200        self.write()
201            .await
202            .get_mut_task_panic()
203            .push(server_hook_factory::<H>());
204        self
205    }
206
207    /// Adds an error handler to the server's error handler list.
208    ///
209    /// # Arguments
210    ///
211    /// - `ServerHook` - The handler type that implements `ServerHook`.
212    ///
213    /// # Returns
214    ///
215    /// - `&Self` - Reference to self for method chaining.
216    pub async fn read_error<H>(&self) -> &Self
217    where
218        H: ServerHook,
219    {
220        self.write()
221            .await
222            .get_mut_read_error()
223            .push(server_hook_factory::<H>());
224        self
225    }
226
227    /// Creates a TCP listener bound to the configured address。
228    ///
229    /// # Returns
230    ///
231    /// - `Result<TcpListener, ServerError>` - The listener on success, or an error on failure.
232    async fn create_tcp_listener(&self) -> Result<TcpListener, ServerError> {
233        let config: ServerConfigData = self.read().await.get_config().clone();
234        let host: String = config.host;
235        let port: u16 = config.port;
236        let addr: String = Self::get_bind_addr(&host, port);
237        TcpListener::bind(&addr)
238            .await
239            .map_err(|e| ServerError::TcpBind(e.to_string()))
240    }
241
242    /// Spawns a new task to handle an incoming connection.
243    ///
244    /// # Arguments
245    ///
246    /// - `ArcRwLockStream` - The stream for the incoming connection.
247    async fn spawn_connection_handler(&self, stream: ArcRwLockStream) {
248        let server: Server = self.clone();
249        let hook: ServerHookList = self.read().await.get_hook().clone();
250        let task_panic: ServerHookList = self.read().await.get_task_panic().clone();
251        let buffer_size: usize = self.read().await.get_config().buffer_size;
252        spawn(async move {
253            server
254                .handle_connection(stream, hook, task_panic, buffer_size)
255                .await;
256        });
257    }
258
259    /// Handles an incoming connection by processing it through the hook chain.
260    ///
261    /// # Arguments
262    ///
263    /// - `ArcRwLockStream` - The stream for the connection.
264    /// - `ServerHookList` - The list of hooks to process.
265    /// - `ServerHookList` - The list of panic handlers.
266    /// - `usize` - The buffer size for reading data.
267    async fn handle_connection(
268        &self,
269        stream: ArcRwLockStream,
270        hook: ServerHookList,
271        task_panic: ServerHookList,
272        buffer_size: usize,
273    ) {
274        let request: Request = match self.read_stream(&stream, buffer_size).await {
275            Ok(data) => data,
276            Err(e) => {
277                self.read_error_handle(e.to_string()).await;
278                return;
279            }
280        };
281        let ctx: Context = self.create_context(stream, request).await;
282
283        for h in hook.iter() {
284            let ctx_clone: Context = ctx.clone();
285            let h_clone: ServerHookHandler = Arc::clone(h);
286            let join_handle: JoinHandle<()> = spawn(async move {
287                h_clone(ctx_clone).await;
288            });
289
290            match join_handle.await {
291                Ok(()) => {}
292                Err(e) if e.is_panic() => {
293                    for panic_handler in task_panic.iter() {
294                        panic_handler(ctx.clone()).await;
295                    }
296                    break;
297                }
298                Err(_) => break,
299            }
300        }
301    }
302
303    /// Reads data from the stream into a request.
304    ///
305    /// # Arguments
306    ///
307    /// - `&ArcRwLockStream` - The stream to read from.
308    /// - `usize` - The buffer size for reading.
309    ///
310    /// # Returns
311    ///
312    /// - `Result<Request, ServerError>` - The request data on success, or an error on failure.
313    async fn read_stream(
314        &self,
315        stream: &ArcRwLockStream,
316        buffer_size: usize,
317    ) -> Result<Request, ServerError> {
318        let mut buffer: Vec<u8> = Vec::new();
319        let mut tmp_buf: Vec<u8> = vec![0u8; buffer_size];
320        let mut stream_guard: ArcRwLockWriteGuard<'_, TcpStream> = stream.write().await;
321        loop {
322            match stream_guard.read(&mut tmp_buf).await {
323                Ok(0) => break,
324                Ok(n) => {
325                    buffer.extend_from_slice(&tmp_buf[..n]);
326                    if tmp_buf[..n].ends_with(SPLIT_REQUEST_BYTES) {
327                        let end_pos: usize = buffer.len().saturating_sub(SPLIT_REQUEST_BYTES.len());
328                        buffer.truncate(end_pos);
329                        break;
330                    }
331                    if n < tmp_buf.len() {
332                        break;
333                    }
334                }
335                Err(e) => {
336                    return Err(ServerError::TcpRead(e.to_string()));
337                }
338            }
339        }
340        Ok(buffer)
341    }
342
343    /// Creates a context for processing a request.
344    ///
345    /// # Arguments
346    ///
347    /// - `ArcRwLockStream` - The stream for the connection.
348    /// - `Request` - The request data.
349    ///
350    /// # Returns
351    ///
352    /// - `Context` - The created context.
353    async fn create_context(&self, stream: ArcRwLockStream, request: Request) -> Context {
354        let mut data: ContextData = ContextData::new();
355        data.stream = Some(stream);
356        data.request = request;
357        Context::from(data)
358    }
359
360    /// Handles an read error by invoking the configured error handlers.
361    ///
362    /// # Arguments
363    ///
364    /// - `String` - The error message.
365    async fn read_error_handle(&self, error: String) {
366        let error_handlers: ServerHookList = self.read().await.get_read_error().clone();
367        let ctx: Context = Context::new();
368        ctx.set_data("error", error).await;
369        for handler in error_handlers.iter() {
370            handler(ctx.clone()).await;
371        }
372    }
373
374    /// Starts the server and begins accepting connections.
375    ///
376    /// # Returns
377    ///
378    /// - `Result<ServerControlHook, ServerError>` - The control hook on success, or an error on failure.
379    pub async fn run(&self) -> Result<ServerControlHook, ServerError> {
380        let tcp_listener: TcpListener = self.create_tcp_listener().await?;
381        let server: Server = self.clone();
382        let (wait_sender, wait_receiver) = channel(());
383        let (shutdown_sender, mut shutdown_receiver) = channel(());
384        let accept_connections: JoinHandle<()> = spawn(async move {
385            loop {
386                tokio::select! {
387                    result = tcp_listener.accept() => {
388                        match result {
389                            Ok((stream, _)) => {
390                                let stream: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
391                                server.spawn_connection_handler(stream).await;
392                            }
393                            Err(_) => break,
394                        }
395                    }
396                    _ = shutdown_receiver.changed() => {
397                        break;
398                    }
399                }
400            }
401            let _ = wait_sender.send(());
402        });
403        let wait_hook = Arc::new(move || {
404            let mut wait_receiver_clone = wait_receiver.clone();
405            Box::pin(async move {
406                let _ = wait_receiver_clone.changed().await;
407            }) as Pin<Box<dyn Future<Output = ()> + Send + 'static>>
408        });
409        let shutdown_hook = Arc::new(move || {
410            let shutdown_sender_clone: Sender<()> = shutdown_sender.clone();
411            Box::pin(async move {
412                let _ = shutdown_sender_clone.send(());
413            }) as Pin<Box<dyn Future<Output = ()> + Send + 'static>>
414        });
415        spawn(async move {
416            let _ = accept_connections.await;
417        });
418        Ok(ServerControlHook {
419            wait_hook,
420            shutdown_hook,
421        })
422    }
423}
424
425/// Implementation of methods for the ServerControlHook structure.
426impl ServerControlHook {
427    /// Waits for the server to finish.
428    pub async fn wait(&self) {
429        (self.wait_hook)().await;
430    }
431
432    /// Initiates a graceful shutdown of the server.
433    pub async fn shutdown(&self) {
434        (self.shutdown_hook)().await;
435    }
436}