tcplane/server/impl.rs
1use crate::*;
2
3/// Provides a default implementation for ServerData.
4impl Default for ServerData {
5 fn default() -> Self {
6 Self {
7 server_config: ServerConfigData::default(),
8 hook: vec![],
9 task_panic: vec![],
10 read_error: vec![],
11 }
12 }
13}
14
15/// Provides a default implementation for ServerControlHook.
16impl Default for ServerControlHook {
17 fn default() -> Self {
18 Self {
19 wait_hook: Arc::new(|| Box::pin(async {})),
20 shutdown_hook: Arc::new(|| Box::pin(async {})),
21 }
22 }
23}
24
25impl ServerData {
26 /// Gets a reference to the configuration.
27 ///
28 /// # Returns
29 ///
30 /// - `&ServerConfig` - Reference to the configuration.
31 pub(crate) fn get_config(&self) -> &ServerConfigData {
32 &self.server_config
33 }
34
35 /// Gets a mutable reference to the server configuration.
36 ///
37 /// # Returns
38 ///
39 /// - `&mut ServerConfigData` - Mutable reference to the server configuration.
40 pub(crate) fn get_mut_server_config(&mut self) -> &mut ServerConfigData {
41 &mut self.server_config
42 }
43
44 /// Gets a reference to the hook list.
45 ///
46 /// # Returns
47 ///
48 /// - `&ServerHookList` - Reference to the hook list.
49 pub(crate) fn get_hook(&self) -> &ServerHookList {
50 &self.hook
51 }
52
53 /// Gets a mutable reference to the hook list.
54 ///
55 /// # Returns
56 ///
57 /// - `&mut ServerHookList` - Mutable reference to the hook list.
58 pub(crate) fn get_mut_hook(&mut self) -> &mut ServerHookList {
59 &mut self.hook
60 }
61
62 /// Gets a reference to the task panic handler list.
63 ///
64 /// # Returns
65 ///
66 /// - `&ServerHookList` - Reference to the task panic handler list.
67 pub(crate) fn get_task_panic(&self) -> &ServerHookList {
68 &self.task_panic
69 }
70
71 /// Gets a mutable reference to the task panic handler list.
72 ///
73 /// # Returns
74 ///
75 /// - `&mut ServerHookList` - Mutable reference to the task panic handler list.
76 pub(crate) fn get_mut_task_panic(&mut self) -> &mut ServerHookList {
77 &mut self.task_panic
78 }
79
80 /// Gets a reference to the read error handler list.
81 ///
82 /// # Returns
83 ///
84 /// - `&ServerHookList` - Reference to the read error handler list.
85 pub(crate) fn get_read_error(&self) -> &ServerHookList {
86 &self.read_error
87 }
88
89 /// Gets a mutable reference to the read error handler list.
90 ///
91 /// # Returns
92 ///
93 /// - `&mut ServerHookList` - Mutable reference to the read error handler list.
94 pub(crate) fn get_mut_read_error(&mut self) -> &mut ServerHookList {
95 &mut self.read_error
96 }
97}
98
99/// Provides a default implementation for Server.
100impl Default for Server {
101 fn default() -> Self {
102 Self(Arc::new(RwLock::new(ServerData::default())))
103 }
104}
105
106/// Implementation of methods for the Server structure.
107impl Server {
108 /// Creates a new Server instance with default settings.
109 ///
110 /// # Returns
111 ///
112 /// - `Self` - A new Server instance.
113 pub fn new() -> Self {
114 Self::default()
115 }
116
117 /// Acquires a read lock on the inner server data.
118 ///
119 /// # Returns
120 ///
121 /// - `ArcRwLockReadGuard<ServerData>` - The read guard.
122 pub(crate) async fn read(&self) -> ArcRwLockReadGuard<'_, ServerData> {
123 self.0.read().await
124 }
125
126 /// Acquires a write lock on the inner server data.
127 ///
128 /// # Returns
129 ///
130 /// - `ArcRwLockWriteGuard<ServerData>` - The write guard.
131 pub(crate) async fn write(&self) -> ArcRwLockWriteGuard<'_, ServerData> {
132 self.0.write().await
133 }
134
135 /// Sets the server configuration.
136 ///
137 /// # Arguments
138 ///
139 /// - `ServerConfig` - The server configuration.
140 ///
141 /// # Returns
142 ///
143 /// - `&Self` - Reference to self for method chaining.
144 pub async fn server_config(&self, config: ServerConfig) -> &Self {
145 *self.write().await.get_mut_server_config() = config.get_data().await;
146 self
147 }
148
149 /// Constructs a bind address string from host and port。
150 ///
151 /// # Arguments
152 ///
153 /// - `AsRef<str>` - Type that can be referenced as a string slice.
154 /// - `u16` - The port number.
155 ///
156 /// # Returns
157 ///
158 /// - `String` - The formatted bind address.
159 #[inline(always)]
160 pub fn get_bind_addr<H>(host: H, port: u16) -> String
161 where
162 H: AsRef<str>,
163 {
164 format!("{}{}{}", host.as_ref(), COLON, port)
165 }
166
167 /// Adds a typed hook to the server's hook list.
168 ///
169 /// # Arguments
170 ///
171 /// - `ServerHook` - The hook type that implements `ServerHook`.
172 ///
173 /// # Returns
174 ///
175 /// - `&Self` - Reference to self for method chaining.
176 pub async fn hook<H>(&self) -> &Self
177 where
178 H: ServerHook,
179 {
180 self.write()
181 .await
182 .get_mut_hook()
183 .push(server_hook_factory::<H>());
184 self
185 }
186
187 /// Adds a panic handler to the server's task panic handler list.
188 ///
189 /// # Arguments
190 ///
191 /// - `ServerHook` - The handler type that implements `ServerHook`.
192 ///
193 /// # Returns
194 ///
195 /// - `&Self` - Reference to self for method chaining.
196 pub async fn task_panic<H>(&self) -> &Self
197 where
198 H: ServerHook,
199 {
200 self.write()
201 .await
202 .get_mut_task_panic()
203 .push(server_hook_factory::<H>());
204 self
205 }
206
207 /// Adds an error handler to the server's error handler list.
208 ///
209 /// # Arguments
210 ///
211 /// - `ServerHook` - The handler type that implements `ServerHook`.
212 ///
213 /// # Returns
214 ///
215 /// - `&Self` - Reference to self for method chaining.
216 pub async fn read_error<H>(&self) -> &Self
217 where
218 H: ServerHook,
219 {
220 self.write()
221 .await
222 .get_mut_read_error()
223 .push(server_hook_factory::<H>());
224 self
225 }
226
227 /// Creates a TCP listener bound to the configured address。
228 ///
229 /// # Returns
230 ///
231 /// - `Result<TcpListener, ServerError>` - The listener on success, or an error on failure.
232 async fn create_tcp_listener(&self) -> Result<TcpListener, ServerError> {
233 let config: ServerConfigData = self.read().await.get_config().clone();
234 let host: String = config.host;
235 let port: u16 = config.port;
236 let addr: String = Self::get_bind_addr(&host, port);
237 TcpListener::bind(&addr)
238 .await
239 .map_err(|e| ServerError::TcpBind(e.to_string()))
240 }
241
242 /// Spawns a new task to handle an incoming connection.
243 ///
244 /// # Arguments
245 ///
246 /// - `ArcRwLockStream` - The stream for the incoming connection.
247 async fn spawn_connection_handler(&self, stream: ArcRwLockStream) {
248 let server: Server = self.clone();
249 let hook: ServerHookList = self.read().await.get_hook().clone();
250 let task_panic: ServerHookList = self.read().await.get_task_panic().clone();
251 let buffer_size: usize = self.read().await.get_config().buffer_size;
252 spawn(async move {
253 server
254 .handle_connection(stream, hook, task_panic, buffer_size)
255 .await;
256 });
257 }
258
259 /// Handles an incoming connection by processing it through the hook chain.
260 ///
261 /// # Arguments
262 ///
263 /// - `ArcRwLockStream` - The stream for the connection.
264 /// - `ServerHookList` - The list of hooks to process.
265 /// - `ServerHookList` - The list of panic handlers.
266 /// - `usize` - The buffer size for reading data.
267 async fn handle_connection(
268 &self,
269 stream: ArcRwLockStream,
270 hook: ServerHookList,
271 task_panic: ServerHookList,
272 buffer_size: usize,
273 ) {
274 let request: Request = match self.read_stream(&stream, buffer_size).await {
275 Ok(data) => data,
276 Err(e) => {
277 self.read_error_handle(e.to_string()).await;
278 return;
279 }
280 };
281 let ctx: Context = self.create_context(stream, request).await;
282
283 for h in hook.iter() {
284 let ctx_clone: Context = ctx.clone();
285 let h_clone: ServerHookHandler = Arc::clone(h);
286 let join_handle: JoinHandle<()> = spawn(async move {
287 h_clone(ctx_clone).await;
288 });
289
290 match join_handle.await {
291 Ok(()) => {}
292 Err(e) if e.is_panic() => {
293 for panic_handler in task_panic.iter() {
294 panic_handler(ctx.clone()).await;
295 }
296 break;
297 }
298 Err(_) => break,
299 }
300 }
301 }
302
303 /// Reads data from the stream into a request.
304 ///
305 /// # Arguments
306 ///
307 /// - `&ArcRwLockStream` - The stream to read from.
308 /// - `usize` - The buffer size for reading.
309 ///
310 /// # Returns
311 ///
312 /// - `Result<Request, ServerError>` - The request data on success, or an error on failure.
313 async fn read_stream(
314 &self,
315 stream: &ArcRwLockStream,
316 buffer_size: usize,
317 ) -> Result<Request, ServerError> {
318 let mut buffer: Vec<u8> = Vec::new();
319 let mut tmp_buf: Vec<u8> = vec![0u8; buffer_size];
320 let mut stream_guard: ArcRwLockWriteGuard<'_, TcpStream> = stream.write().await;
321 loop {
322 match stream_guard.read(&mut tmp_buf).await {
323 Ok(0) => break,
324 Ok(n) => {
325 buffer.extend_from_slice(&tmp_buf[..n]);
326 if tmp_buf[..n].ends_with(SPLIT_REQUEST_BYTES) {
327 let end_pos: usize = buffer.len().saturating_sub(SPLIT_REQUEST_BYTES.len());
328 buffer.truncate(end_pos);
329 break;
330 }
331 if n < tmp_buf.len() {
332 break;
333 }
334 }
335 Err(e) => {
336 return Err(ServerError::TcpRead(e.to_string()));
337 }
338 }
339 }
340 Ok(buffer)
341 }
342
343 /// Creates a context for processing a request.
344 ///
345 /// # Arguments
346 ///
347 /// - `ArcRwLockStream` - The stream for the connection.
348 /// - `Request` - The request data.
349 ///
350 /// # Returns
351 ///
352 /// - `Context` - The created context.
353 async fn create_context(&self, stream: ArcRwLockStream, request: Request) -> Context {
354 let mut data: ContextData = ContextData::new();
355 data.stream = Some(stream);
356 data.request = request;
357 Context::from(data)
358 }
359
360 /// Handles an read error by invoking the configured error handlers.
361 ///
362 /// # Arguments
363 ///
364 /// - `String` - The error message.
365 async fn read_error_handle(&self, error: String) {
366 let error_handlers: ServerHookList = self.read().await.get_read_error().clone();
367 let ctx: Context = Context::new();
368 ctx.set_data("error", error).await;
369 for handler in error_handlers.iter() {
370 handler(ctx.clone()).await;
371 }
372 }
373
374 /// Starts the server and begins accepting connections.
375 ///
376 /// # Returns
377 ///
378 /// - `Result<ServerControlHook, ServerError>` - The control hook on success, or an error on failure.
379 pub async fn run(&self) -> Result<ServerControlHook, ServerError> {
380 let tcp_listener: TcpListener = self.create_tcp_listener().await?;
381 let server: Server = self.clone();
382 let (wait_sender, wait_receiver) = channel(());
383 let (shutdown_sender, mut shutdown_receiver) = channel(());
384 let accept_connections: JoinHandle<()> = spawn(async move {
385 loop {
386 tokio::select! {
387 result = tcp_listener.accept() => {
388 match result {
389 Ok((stream, _)) => {
390 let stream: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
391 server.spawn_connection_handler(stream).await;
392 }
393 Err(_) => break,
394 }
395 }
396 _ = shutdown_receiver.changed() => {
397 break;
398 }
399 }
400 }
401 let _ = wait_sender.send(());
402 });
403 let wait_hook = Arc::new(move || {
404 let mut wait_receiver_clone = wait_receiver.clone();
405 Box::pin(async move {
406 let _ = wait_receiver_clone.changed().await;
407 }) as Pin<Box<dyn Future<Output = ()> + Send + 'static>>
408 });
409 let shutdown_hook = Arc::new(move || {
410 let shutdown_sender_clone: Sender<()> = shutdown_sender.clone();
411 Box::pin(async move {
412 let _ = shutdown_sender_clone.send(());
413 }) as Pin<Box<dyn Future<Output = ()> + Send + 'static>>
414 });
415 spawn(async move {
416 let _ = accept_connections.await;
417 });
418 Ok(ServerControlHook {
419 wait_hook,
420 shutdown_hook,
421 })
422 }
423}
424
425/// Implementation of methods for the ServerControlHook structure.
426impl ServerControlHook {
427 /// Waits for the server to finish.
428 pub async fn wait(&self) {
429 (self.wait_hook)().await;
430 }
431
432 /// Initiates a graceful shutdown of the server.
433 pub async fn shutdown(&self) {
434 (self.shutdown_hook)().await;
435 }
436}