hyperlane/server/impl.rs
1use crate::*;
2
3/// Provides a default implementation for ServerInner.
4impl Default for ServerInner {
5 /// Creates a new ServerInner instance with default values.
6 ///
7 /// # Returns
8 ///
9 /// - `Self` - A new instance with default configuration.
10 fn default() -> Self {
11 Self {
12 config: ServerConfigInner::default(),
13 panic_hook: vec![],
14 route_matcher: RouteMatcher::new(),
15 request_middleware: vec![],
16 response_middleware: vec![],
17 }
18 }
19}
20
21/// Implements the `PartialEq` trait for `ServerInner`.
22///
23/// This allows for comparing two `ServerInner` instances for equality.
24impl PartialEq for ServerInner {
25 /// Checks if two `ServerInner` instances are equal.
26 ///
27 /// # Arguments
28 ///
29 /// - `&Self`- The other `ServerInner` instance to compare against.
30 ///
31 /// # Returns
32 ///
33 /// - `bool`- `true` if the instances are equal, `false` otherwise.
34 fn eq(&self, other: &Self) -> bool {
35 self.config == other.config
36 && self.route_matcher == other.route_matcher
37 && self.panic_hook.len() == other.panic_hook.len()
38 && self.request_middleware.len() == other.request_middleware.len()
39 && self.response_middleware.len() == other.response_middleware.len()
40 && self
41 .panic_hook
42 .iter()
43 .zip(other.panic_hook.iter())
44 .all(|(a, b)| Arc::ptr_eq(a, b))
45 && self
46 .request_middleware
47 .iter()
48 .zip(other.request_middleware.iter())
49 .all(|(a, b)| Arc::ptr_eq(a, b))
50 && self
51 .response_middleware
52 .iter()
53 .zip(other.response_middleware.iter())
54 .all(|(a, b)| Arc::ptr_eq(a, b))
55 }
56}
57
58/// Implements the `Eq` trait for `ServerInner`.
59///
60/// This indicates that `ServerInner` has a total equality relation.
61impl Eq for ServerInner {}
62
63/// Implements the `PartialEq` trait for `Server`.
64///
65/// This allows for comparing two `Server` instances for equality.
66impl PartialEq for Server {
67 /// Checks if two `Server` instances are equal.
68 ///
69 /// # Arguments
70 ///
71 /// - `&Self`- The other `Server` instance to compare against.
72 ///
73 /// # Returns
74 ///
75 /// - `bool`- `true` if the instances are equal, `false` otherwise.
76 fn eq(&self, other: &Self) -> bool {
77 if Arc::ptr_eq(self.get_0(), other.get_0()) {
78 return true;
79 }
80 if let (Ok(s), Ok(o)) = (self.get_0().try_read(), other.get_0().try_read()) {
81 *s == *o
82 } else {
83 false
84 }
85 }
86}
87
88/// Implements the `Eq` trait for `Server`.
89///
90/// This indicates that `Server` has a total equality relation.
91impl Eq for Server {}
92
93/// Manages the state for handling a single connection, including the stream and context.
94///
95/// This struct provides a convenient way to pass around the necessary components
96/// for processing a request or WebSocket frame.
97impl HandlerState {
98 /// Creates a new HandlerState instance.
99 ///
100 /// # Arguments
101 ///
102 /// - `&'a ArcRwLockStream` - The network stream.
103 /// - `&'a Context` - The request context.
104 /// - `usize` - The buffer size for reading HTTP requests.
105 ///
106 /// # Returns
107 ///
108 /// - `Self` - The newly created handler state.
109 #[inline]
110 pub(super) fn new(stream: ArcRwLockStream, ctx: Context, buffer: usize) -> Self {
111 Self {
112 stream,
113 ctx,
114 buffer,
115 }
116 }
117}
118
119/// Represents the server, providing methods to configure and run it.
120///
121/// This struct wraps the `ServerInner` configuration and routing logic,
122/// offering a high-level API for setting up the HTTP and WebSocket server.
123impl Server {
124 /// Creates a new Server instance with default settings.
125 ///
126 /// # Returns
127 ///
128 /// - `Self` - A new Server instance.
129 pub async fn new() -> Self {
130 let server: ServerInner = ServerInner::default();
131 Self(arc_rwlock(server))
132 }
133
134 /// Creates a new Server instance from a configuration.
135 ///
136 /// # Arguments
137 ///
138 /// - `ServerConfig` - The server configuration.
139 ///
140 /// # Returns
141 ///
142 /// - `Self` - A new Server instance.
143 pub async fn from(config: ServerConfig) -> Self {
144 let server: Self = Self::new().await;
145 server.config(config).await;
146 server
147 }
148
149 /// Acquires a read lock on the inner server data.
150 ///
151 /// # Returns
152 ///
153 /// - `ServerStateReadGuard` - The read guard for ServerInner.
154 pub(super) async fn read(&self) -> ServerStateReadGuard<'_> {
155 self.get_0().read().await
156 }
157
158 /// Acquires a write lock on the inner server data.
159 ///
160 /// # Returns
161 ///
162 /// - `ServerStateWriteGuard` - The write guard for ServerInner.
163 async fn write(&self) -> ServerStateWriteGuard<'_> {
164 self.get_0().write().await
165 }
166
167 /// Gets the route matcher.
168 ///
169 /// # Returns
170 /// - `RouteMatcher` - The route matcher.
171 pub async fn get_route_matcher(&self) -> RouteMatcher {
172 self.read().await.get_route_matcher().clone()
173 }
174
175 /// Handle a given hook macro asynchronously.
176 ///
177 /// This function dispatches the provided `HookMacro` to the appropriate
178 /// internal handler based on its `HookType`. Supported hook types include
179 /// panic hooks, request/response middleware, and route.
180 ///
181 /// # Arguments
182 ///
183 /// - `HookMacro`- The `HookMacro` instance containing the `HookType` and its handler.
184 pub async fn handle_hook(&self, hook: HookMacro) {
185 match (hook.hook_type, hook.handler) {
186 (HookType::PanicHook(_), HookHandler::Handler(handler)) => {
187 self.write().await.get_mut_panic_hook().push(handler);
188 }
189 (HookType::PanicHook(_), HookHandler::Factory(factory)) => {
190 self.write().await.get_mut_panic_hook().push(factory());
191 }
192 (HookType::RequestMiddleware(_), HookHandler::Handler(handler)) => {
193 self.write()
194 .await
195 .get_mut_request_middleware()
196 .push(handler);
197 }
198 (HookType::RequestMiddleware(_), HookHandler::Factory(factory)) => {
199 self.write()
200 .await
201 .get_mut_request_middleware()
202 .push(factory());
203 }
204 (HookType::Route(path), HookHandler::Handler(handler)) => {
205 self.write()
206 .await
207 .get_mut_route_matcher()
208 .add(path, handler)
209 .unwrap();
210 }
211 (HookType::Route(path), HookHandler::Factory(factory)) => {
212 self.write()
213 .await
214 .get_mut_route_matcher()
215 .add(path, factory())
216 .unwrap();
217 }
218 (HookType::ResponseMiddleware(_), HookHandler::Handler(handler)) => {
219 self.write()
220 .await
221 .get_mut_response_middleware()
222 .push(handler);
223 }
224 (HookType::ResponseMiddleware(_), HookHandler::Factory(factory)) => {
225 self.write()
226 .await
227 .get_mut_response_middleware()
228 .push(factory());
229 }
230 };
231 }
232
233 /// Sets the server configuration from a string.
234 ///
235 /// # Arguments
236 ///
237 /// - `C: ToString` - The configuration.
238 ///
239 /// # Returns
240 ///
241 /// - `&Self` - Reference to self for method chaining.
242 pub async fn config_str<C: ToString>(&self, config_str: C) -> &Self {
243 let config: ServerConfig = ServerConfig::from_json_str(&config_str.to_string()).unwrap();
244 self.write().await.set_config(config.get_inner().await);
245 self
246 }
247
248 /// Sets the server configuration.
249 ///
250 /// # Arguments
251 ///
252 /// - `ServerConfig` - The server configuration.
253 ///
254 /// # Returns
255 ///
256 /// - `&Self` - Reference to self for method chaining.
257 pub async fn config(&self, config: ServerConfig) -> &Self {
258 self.write().await.set_config(config.get_inner().await);
259 self
260 }
261
262 /// Registers a panic hook handler to the processing pipeline.
263 ///
264 /// This method allows registering panic hooks that implement the `ServerHook` trait,
265 /// which will be executed when a panic occurs during request processing.
266 ///
267 /// # Type Parameters
268 ///
269 /// - `ServerHook` - The panic hook type that implements `ServerHook`.
270 ///
271 /// # Returns
272 ///
273 /// - `&Self` - Reference to self for method chaining.
274 pub async fn panic_hook<S>(&self) -> &Self
275 where
276 S: ServerHook,
277 {
278 self.write()
279 .await
280 .get_mut_panic_hook()
281 .push(server_hook_factory::<S>());
282 self
283 }
284
285 /// Registers a route handler for a specific path.
286 ///
287 /// This method allows registering route handlers that implement the `ServerHook` trait,
288 /// providing type safety and better code organization.
289 ///
290 /// # Type Parameters
291 ///
292 /// - `ServerHook` - The route handler type that implements `ServerHook`.
293 ///
294 /// # Arguments
295 ///
296 /// - `path` - The route path pattern.
297 ///
298 /// # Returns
299 ///
300 /// - `&Self` - Reference to self for method chaining.
301 pub async fn route<S>(&self, path: impl ToString) -> &Self
302 where
303 S: ServerHook,
304 {
305 self.write()
306 .await
307 .get_mut_route_matcher()
308 .add(&path.to_string(), server_hook_factory::<S>())
309 .unwrap();
310 self
311 }
312
313 /// Registers request middleware to the processing pipeline.
314 ///
315 /// This method allows registering middleware that implements the `ServerHook` trait,
316 /// which will be executed before route handlers for every incoming request.
317 ///
318 /// # Type Parameters
319 ///
320 /// - `ServerHook` - The middleware type that implements `ServerHook`.
321 ///
322 /// # Returns
323 ///
324 /// - `&Self` - Reference to self for method chaining.
325 pub async fn request_middleware<S>(&self) -> &Self
326 where
327 S: ServerHook,
328 {
329 self.write()
330 .await
331 .get_mut_request_middleware()
332 .push(server_hook_factory::<S>());
333 self
334 }
335
336 /// Registers response middleware to the processing pipeline.
337 ///
338 /// This method allows registering middleware that implements the `ServerHook` trait,
339 /// which will be executed after route handlers for every outgoing response.
340 ///
341 /// # Type Parameters
342 ///
343 /// - `ServerHook` - The middleware type that implements `ServerHook`.
344 ///
345 /// # Returns
346 ///
347 /// - `&Self` - Reference to self for method chaining.
348 pub async fn response_middleware<S>(&self) -> &Self
349 where
350 S: ServerHook,
351 {
352 self.write()
353 .await
354 .get_mut_response_middleware()
355 .push(server_hook_factory::<S>());
356 self
357 }
358
359 /// Formats the host and port into a bindable address string.
360 ///
361 /// # Arguments
362 ///
363 /// - `H: ToString` - The host address.
364 /// - `usize` - The port number.
365 ///
366 /// # Returns
367 ///
368 /// - `String` - The formatted address string.
369 #[inline]
370 pub fn format_host_port<H: ToString>(host: H, port: usize) -> String {
371 format!("{}{COLON}{port}", host.to_string())
372 }
373
374 /// Handles a panic that has been captured and associated with a specific request `Context`.
375 ///
376 /// This function is invoked when a panic occurs within a task that has access to the request
377 /// context, such as a route handler or middleware. It ensures that the panic information is
378 /// recorded in the `Context` and then passed to the server's configured panic hook for
379 /// processing.
380 ///
381 /// By associating the panic with the context, the handler can access request-specific details
382 /// to provide more meaningful error logging and responses.
383 ///
384 /// # Arguments
385 ///
386 /// - `&Context` - The context of the request during which the panic occurred.
387 /// - `&Panic` - The captured panic information.
388 async fn handle_panic_with_context(&self, ctx: &Context, panic: &Panic) {
389 let panic_clone: Panic = panic.clone();
390 ctx.cancel_aborted().await.set_panic(panic_clone).await;
391 for hook in self.read().await.get_panic_hook().iter() {
392 if let Err(join_error) = spawn(hook(ctx)).await
393 && join_error.is_panic()
394 {
395 eprintln!("Panic occurred in panic hook: {:?}", join_error);
396 }
397 if ctx.get_aborted().await {
398 return;
399 }
400 }
401 }
402
403 /// Handles a panic that occurred within a spawned Tokio task.
404 ///
405 /// It extracts the panic information from the `JoinError` and processes it.
406 ///
407 /// # Arguments
408 ///
409 /// - `&Context` - The context associated with the task.
410 /// - `JoinError` - The `JoinError` returned from the panicked task.
411 async fn handle_task_panic(&self, ctx: &Context, join_error: JoinError) {
412 let panic: Panic = Panic::from_join_error(join_error);
413 self.handle_panic_with_context(ctx, &panic).await;
414 }
415
416 /// Executes a middleware handler and manages the request lifecycle.
417 ///
418 /// This function executes middleware with spawn to catch panics properly.
419 /// While this adds some overhead, it's necessary to ensure panic hooks
420 /// can send error responses to clients.
421 ///
422 /// # Arguments
423 ///
424 /// - `&Context` - The request context.
425 /// - `&mut RequestLifecycle` - A mutable reference to the current request lifecycle state.
426 /// - `&ServerHookHandler` - The middleware handler to execute.
427 async fn handle_middleware_with_lifecycle(
428 &self,
429 ctx: &Context,
430 lifecycle: &mut RequestLifecycle,
431 handler: &ServerHookHandler,
432 ) {
433 ctx.update_lifecycle_status(lifecycle).await;
434 if let Err(join_error) = spawn(handler(ctx)).await
435 && join_error.is_panic()
436 {
437 self.handle_task_panic(ctx, join_error).await;
438 }
439 }
440
441 /// Executes a route handler and manages the request lifecycle.
442 ///
443 /// This function executes the route handler with spawn to catch panics properly.
444 /// While this adds some overhead, it's necessary to ensure panic hooks
445 /// can send error responses to clients.
446 ///
447 /// # Arguments
448 ///
449 /// - `&Context` - The request context.
450 /// - `&mut RequestLifecycle` - A mutable reference to the current request lifecycle state.
451 /// - `&ServerHookHandler` - The route handler to execute.
452 async fn handle_route_matcher_with_lifecycle(
453 &self,
454 ctx: &Context,
455 lifecycle: &mut RequestLifecycle,
456 handler: &ServerHookHandler,
457 ) {
458 ctx.update_lifecycle_status(lifecycle).await;
459 if let Err(join_error) = spawn(handler(ctx)).await
460 && join_error.is_panic()
461 {
462 self.handle_task_panic(ctx, join_error).await;
463 }
464 }
465
466 /// Creates and binds a `TcpListener` based on the server's configuration.
467 ///
468 /// # Returns
469 ///
470 /// Returns a `ServerResult` containing the bound `TcpListener` on success,
471 /// or a `ServerError` on failure.
472 async fn create_tcp_listener(&self) -> ServerResult<TcpListener> {
473 let config: ServerConfigInner = self.read().await.get_config().clone();
474 let host: String = config.get_host().clone();
475 let port: usize = *config.get_port();
476 let addr: String = Self::format_host_port(host, port);
477 TcpListener::bind(&addr)
478 .await
479 .map_err(|err| ServerError::TcpBind(err.to_string()))
480 }
481
482 /// Enters a loop to accept incoming TCP connections and spawn handlers for them.
483 ///
484 /// # Arguments
485 ///
486 /// - `&TcpListener` - A reference to the `TcpListener` to accept connections from.
487 ///
488 /// # Returns
489 ///
490 /// - `ServerResult<()>` - A `ServerResult` which is typically `Ok(())` unless an unrecoverable
491 /// error occurs.
492 async fn accept_connections(&self, tcp_listener: &TcpListener) -> ServerResult<()> {
493 while let Ok((stream, _socket_addr)) = tcp_listener.accept().await {
494 self.configure_stream(&stream).await;
495 let stream: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
496 self.spawn_connection_handler(stream).await;
497 }
498 Ok(())
499 }
500
501 /// Configures socket options for a newly accepted `TcpStream`.
502 ///
503 /// This applies settings like `SO_LINGER`, `TCP_NODELAY`, and `IP_TTL` from the server's configuration.
504 ///
505 /// # Arguments
506 ///
507 /// - `&TcpStream` - A reference to the `TcpStream` to configure.
508 async fn configure_stream(&self, stream: &TcpStream) {
509 let server_inner: ServerStateReadGuard = self.read().await;
510 let config: &ServerConfigInner = server_inner.get_config();
511 let linger_opt: &OptionDuration = config.get_linger();
512 let nodelay_opt: &OptionBool = config.get_nodelay();
513 let ttl_opt: &OptionU32 = config.get_ttl();
514 let _ = stream.set_linger(*linger_opt);
515 if let Some(nodelay) = nodelay_opt {
516 let _ = stream.set_nodelay(*nodelay);
517 }
518 if let Some(ttl) = ttl_opt {
519 let _ = stream.set_ttl(*ttl);
520 }
521 }
522
523 /// Spawns a new asynchronous task to handle a single client connection.
524 ///
525 /// # Arguments
526 ///
527 /// - `ArcRwLockStream` - The thread-safe stream representing the client connection.
528 async fn spawn_connection_handler(&self, stream: ArcRwLockStream) {
529 let server: Server = self.clone();
530 let buffer: usize = *self.read().await.get_config().get_buffer();
531 spawn(async move {
532 server.handle_connection(stream, buffer).await;
533 });
534 }
535
536 /// Handles a single client connection, determining whether it's an HTTP or WebSocket request.
537 ///
538 /// It reads the initial request from the stream and dispatches it to the appropriate handler.
539 ///
540 /// # Arguments
541 ///
542 /// - `ArcRwLockStream` - The stream for the client connection.
543 /// - `usize` - The buffer size to use for reading the initial HTTP request.
544 async fn handle_connection(&self, stream: ArcRwLockStream, buffer: usize) {
545 if let Ok(request) = Request::http_from_stream(&stream, buffer).await {
546 let ctx: Context = Context::create_context(&stream, &request);
547 let handler: HandlerState = HandlerState::new(stream, ctx, buffer);
548 self.handle_http_requests(&handler, &request).await;
549 }
550 }
551
552 /// The core request handling pipeline.
553 ///
554 /// This function orchestrates the execution of request middleware, the route handler,
555 /// and response middleware. It supports both function-based and trait-based handlers.
556 ///
557 /// # Arguments
558 ///
559 /// - `&HandlerState` - The `HandlerState` for the current connection.
560 /// - `&Request` - The incoming request to be processed.
561 ///
562 /// # Returns
563 ///
564 /// - `bool` - A boolean indicating whether the connection should be kept alive.
565 async fn request_hook(&self, state: &HandlerState, request: &Request) -> bool {
566 let route: &str = request.get_path();
567 let ctx: &Context = state.get_ctx();
568 ctx.set_request(request).await;
569 let mut lifecycle: RequestLifecycle = RequestLifecycle::new(request.is_enable_keep_alive());
570 if self.handle_request_middleware(ctx, &mut lifecycle).await {
571 return lifecycle.keep_alive();
572 }
573 if self.handle_route_matcher(ctx, route, &mut lifecycle).await {
574 return lifecycle.keep_alive();
575 }
576 if self.handle_response_middleware(ctx, &mut lifecycle).await {
577 return lifecycle.keep_alive();
578 }
579 if let Some(panic) = ctx.try_get_panic().await {
580 self.handle_panic_with_context(ctx, &panic).await;
581 }
582 lifecycle.keep_alive()
583 }
584
585 /// Handles subsequent HTTP requests on a persistent (keep-alive) connection.
586 ///
587 /// # Arguments
588 ///
589 /// - `&HandlerState` - The `HandlerState` for the current connection.
590 /// - `&Request` - The initial request that established the keep-alive connection.
591 async fn handle_http_requests(&self, state: &HandlerState, request: &Request) {
592 if self.request_hook(state, request).await {
593 return;
594 }
595 let stream: &ArcRwLockStream = state.get_stream();
596 let buffer: usize = *state.get_buffer();
597 while let Ok(new_request) = &Request::http_from_stream(stream, buffer).await {
598 if !self.request_hook(state, new_request).await {
599 return;
600 }
601 }
602 }
603
604 /// Executes trait-based request middleware in sequence.
605 ///
606 /// # Arguments
607 ///
608 /// - `&Context` - The request context.
609 /// - `&mut RequestLifecycle` - A mutable reference to the request lifecycle state.
610 ///
611 /// # Returns
612 ///
613 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
614 pub(super) async fn handle_request_middleware(
615 &self,
616 ctx: &Context,
617 lifecycle: &mut RequestLifecycle,
618 ) -> bool {
619 for handler in self.read().await.get_request_middleware().iter() {
620 self.handle_middleware_with_lifecycle(ctx, lifecycle, handler)
621 .await;
622 if lifecycle.is_aborted() {
623 return true;
624 }
625 }
626 false
627 }
628
629 /// Executes a trait-based route handler if one matches.
630 ///
631 /// # Arguments
632 ///
633 /// - `&Context` - The request context.
634 /// - `&str` - The request path to match.
635 /// - `&mut RequestLifecycle` - A mutable reference to the request lifecycle state.
636 ///
637 /// # Returns
638 ///
639 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
640 pub(super) async fn handle_route_matcher(
641 &self,
642 ctx: &Context,
643 path: &str,
644 lifecycle: &mut RequestLifecycle,
645 ) -> bool {
646 let route_matcher: RouteMatcher = self.read().await.get_route_matcher().clone();
647 if let Some(handler) = route_matcher.try_resolve_route(ctx, path).await {
648 self.handle_route_matcher_with_lifecycle(ctx, lifecycle, &handler)
649 .await;
650 if lifecycle.is_aborted() {
651 return true;
652 }
653 }
654 false
655 }
656
657 /// Executes trait-based response middleware in sequence.
658 ///
659 /// # Arguments
660 ///
661 /// - `&Context` - The request context.
662 /// - `&mut RequestLifecycle` - A mutable reference to the request lifecycle state.
663 ///
664 /// # Returns
665 ///
666 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
667 pub(super) async fn handle_response_middleware(
668 &self,
669 ctx: &Context,
670 lifecycle: &mut RequestLifecycle,
671 ) -> bool {
672 for handler in self.read().await.get_response_middleware().iter() {
673 self.handle_middleware_with_lifecycle(ctx, lifecycle, handler)
674 .await;
675 if lifecycle.is_aborted() {
676 return true;
677 }
678 }
679 false
680 }
681
682 /// Starts the server, binds to the configured address, and begins listening for connections.
683 ///
684 /// This is the main entry point to launch the server. It will initialize the panic hook,
685 /// create a TCP listener, and then enter the connection acceptance loop in a background task.
686 ///
687 /// # Returns
688 ///
689 /// Returns a `ServerResult` containing a shutdown function on success.
690 /// Calling this function will shut down the server by aborting its main task.
691 /// Returns an error if the server fails to start.
692 pub async fn run(&self) -> ServerResult<ServerControlHook> {
693 let tcp_listener: TcpListener = self.create_tcp_listener().await?;
694 let server: Server = self.clone();
695 let (wait_sender, wait_receiver) = channel(());
696 let (shutdown_sender, mut shutdown_receiver) = channel(());
697 let accept_connections: JoinHandle<()> = spawn(async move {
698 let _ = server.accept_connections(&tcp_listener).await;
699 let _ = wait_sender.send(());
700 });
701 let wait_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
702 let mut wait_receiver_clone: Receiver<()> = wait_receiver.clone();
703 Box::pin(async move {
704 let _ = wait_receiver_clone.changed().await;
705 })
706 });
707 let shutdown_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
708 let shutdown_sender_clone: Sender<()> = shutdown_sender.clone();
709 Box::pin(async move {
710 let _ = shutdown_sender_clone.send(());
711 })
712 });
713 spawn(async move {
714 let _ = shutdown_receiver.changed().await;
715 accept_connections.abort();
716 });
717 let mut server_lifecycle: ServerControlHook = ServerControlHook::default();
718 server_lifecycle.set_shutdown_hook(shutdown_hook);
719 server_lifecycle.set_wait_hook(wait_hook);
720 Ok(server_lifecycle)
721 }
722}