hyperlane/server/impl.rs
1use crate::*;
2
3/// Provides a default implementation for ServerData.
4impl Default for ServerData {
5 /// Creates a new ServerData instance with default values.
6 ///
7 /// # Returns
8 ///
9 /// - `Self` - A new instance with default configuration.
10 #[inline(always)]
11 fn default() -> Self {
12 Self {
13 server_config: ServerConfigData::default(),
14 request_config: RequestConfigData::default(),
15 task_panic: vec![],
16 request_error: vec![],
17 route_matcher: RouteMatcher::new(),
18 request_middleware: vec![],
19 response_middleware: vec![],
20 }
21 }
22}
23
24/// Implements the `PartialEq` trait for `ServerData`.
25///
26/// This allows for comparing two `ServerData` instances for equality.
27impl PartialEq for ServerData {
28 /// Checks if two `ServerData` instances are equal.
29 ///
30 /// # Arguments
31 ///
32 /// - `&Self`- The other `ServerData` instance to compare against.
33 ///
34 /// # Returns
35 ///
36 /// - `bool`- `true` if the instances are equal, `false` otherwise.
37 fn eq(&self, other: &Self) -> bool {
38 self.get_server_config() == other.get_server_config()
39 && self.get_request_config() == other.get_request_config()
40 && self.get_route_matcher() == other.get_route_matcher()
41 && self.get_task_panic().len() == other.get_task_panic().len()
42 && self.get_request_error().len() == other.get_request_error().len()
43 && self.get_request_middleware().len() == other.get_request_middleware().len()
44 && self.get_response_middleware().len() == other.get_response_middleware().len()
45 && self
46 .get_task_panic()
47 .iter()
48 .zip(other.get_task_panic().iter())
49 .all(|(a, b)| Arc::ptr_eq(a, b))
50 && self
51 .get_request_error()
52 .iter()
53 .zip(other.get_request_error().iter())
54 .all(|(a, b)| Arc::ptr_eq(a, b))
55 && self
56 .get_request_middleware()
57 .iter()
58 .zip(other.get_request_middleware().iter())
59 .all(|(a, b)| Arc::ptr_eq(a, b))
60 && self
61 .get_response_middleware()
62 .iter()
63 .zip(other.get_response_middleware().iter())
64 .all(|(a, b)| Arc::ptr_eq(a, b))
65 }
66}
67
68/// Implements the `Eq` trait for `ServerData`.
69///
70/// This indicates that `ServerData` has a total equality relation.
71impl Eq for ServerData {}
72
73/// Implements the `PartialEq` trait for `Server`.
74///
75/// This allows for comparing two `Server` instances for equality.
76impl PartialEq for Server {
77 /// Checks if two `Server` instances are equal.
78 ///
79 /// # Arguments
80 ///
81 /// - `&Self`- The other `Server` instance to compare against.
82 ///
83 /// # Returns
84 ///
85 /// - `bool`- `true` if the instances are equal, `false` otherwise.
86 #[inline(always)]
87 fn eq(&self, other: &Self) -> bool {
88 if Arc::ptr_eq(self.get_0(), other.get_0()) {
89 return true;
90 }
91 if let (Ok(s), Ok(o)) = (self.get_0().try_read(), other.get_0().try_read()) {
92 *s == *o
93 } else {
94 false
95 }
96 }
97}
98
99/// Implements the `Eq` trait for `Server`.
100///
101/// This indicates that `Server` has a total equality relation.
102impl Eq for Server {}
103
104/// Manages the state for handling a single connection, including the stream and context.
105///
106/// This struct provides a convenient way to pass around the necessary components
107/// for processing a request or WebSocket frame.
108impl HandlerState {
109 /// Creates a new HandlerState instance.
110 ///
111 /// # Arguments
112 ///
113 /// - `&'a ArcRwLockStream` - The network stream.
114 /// - `RequestConfig` - The request config.
115 ///
116 /// # Returns
117 ///
118 /// - `Self` - The newly created hook state.
119 #[inline(always)]
120 pub(super) fn new(stream: ArcRwLockStream, request_config: RequestConfigData) -> Self {
121 Self {
122 stream,
123 request_config,
124 }
125 }
126}
127
128/// Represents the server, providing methods to configure and run it.
129///
130/// This struct wraps the `ServerData` configuration and routing logic,
131/// offering a high-level API for setting up the HTTP and WebSocket server.
132impl Server {
133 /// Creates a new Server instance with default settings.
134 ///
135 /// # Returns
136 ///
137 /// - `Self` - A new Server instance.
138 pub async fn new() -> Self {
139 let server: ServerData = ServerData::default();
140 Self(arc_rwlock(server))
141 }
142
143 /// Acquires a read lock on the inner server data.
144 ///
145 /// # Returns
146 ///
147 /// - `ServerStateReadGuard` - The read guard for ServerData.
148 pub(super) async fn read(&self) -> ServerStateReadGuard<'_> {
149 self.get_0().read().await
150 }
151
152 /// Acquires a write lock on the inner server data.
153 ///
154 /// # Returns
155 ///
156 /// - `ServerStateWriteGuard` - The write guard for ServerData.
157 async fn write(&self) -> ServerStateWriteGuard<'_> {
158 self.get_0().write().await
159 }
160
161 /// Gets the route matcher.
162 ///
163 /// # Returns
164 /// - `RouteMatcher` - The route matcher.
165 pub async fn get_route_matcher(&self) -> RouteMatcher {
166 self.read().await.get_route_matcher().clone()
167 }
168
169 /// Registers a hook into the server's processing pipeline.
170 ///
171 /// This function dispatches the provided `HookType` to the appropriate
172 /// internal hook collection based on its variant. The hook will be executed
173 /// at the corresponding stage of request processing according to its type:
174 /// - `Panic`: Added to panic handlers for error recovery
175 /// - `RequestError`: Added to request error handlers
176 /// - `RequestMiddleware`: Added to pre-route middleware chain
177 /// - `Route`: Registered as a route handler for the specified path
178 /// - `ResponseMiddleware`: Added to post-route middleware chain
179 ///
180 /// # Arguments
181 ///
182 /// - `HookType` - The `HookType` instance containing the hook configuration and factory.
183 pub async fn handle_hook(&self, hook: HookType) {
184 match hook {
185 HookType::TaskPanic(_, hook) => {
186 self.write().await.get_mut_task_panic().push(hook());
187 }
188 HookType::RequestError(_, hook) => {
189 self.write().await.get_mut_request_error().push(hook());
190 }
191 HookType::RequestMiddleware(_, hook) => {
192 self.write().await.get_mut_request_middleware().push(hook());
193 }
194 HookType::Route(path, hook) => {
195 self.write()
196 .await
197 .get_mut_route_matcher()
198 .add(path, hook())
199 .unwrap();
200 }
201 HookType::ResponseMiddleware(_, hook) => {
202 self.write()
203 .await
204 .get_mut_response_middleware()
205 .push(hook());
206 }
207 };
208 }
209
210 /// Sets the server configuration from a string.
211 ///
212 /// # Arguments
213 ///
214 /// - `AsRef<str>` - The configuration.
215 ///
216 /// # Returns
217 ///
218 /// - `&Self` - Reference to self for method chaining.
219 pub async fn config_from_json<C>(&self, json: C) -> &Self
220 where
221 C: AsRef<str>,
222 {
223 let config: ServerConfig = ServerConfig::from_json(json).unwrap();
224 self.write()
225 .await
226 .set_server_config(config.get_data().await);
227 self
228 }
229
230 /// Sets the server configuration.
231 ///
232 /// # Arguments
233 ///
234 /// - `ServerConfig` - The server configuration.
235 ///
236 /// # Returns
237 ///
238 /// - `&Self` - Reference to self for method chaining.
239 pub async fn server_config(&self, config: ServerConfig) -> &Self {
240 self.write()
241 .await
242 .set_server_config(config.get_data().await);
243 self
244 }
245
246 /// Sets the HTTP request config.
247 ///
248 /// # Arguments
249 ///
250 /// - `RequestConfig`- The HTTP request config to set.
251 ///
252 /// # Returns
253 ///
254 /// - `&Self` - Reference to self for method chaining.
255 pub async fn request_config(&self, request_config: RequestConfig) -> &Self {
256 self.write()
257 .await
258 .set_request_config(request_config.get_data().await);
259 self
260 }
261
262 /// Registers a task panic handler to the processing pipeline.
263 ///
264 /// This method allows registering task panic handlers that implement the `ServerHook` trait,
265 /// which will be executed when a panic occurs during request processing.
266 ///
267 /// # Type Parameters
268 ///
269 /// - `ServerHook` - The task panic handler type that implements `ServerHook`.
270 ///
271 /// # Returns
272 ///
273 /// - `&Self` - Reference to self for method chaining.
274 pub async fn task_panic<S>(&self) -> &Self
275 where
276 S: ServerHook,
277 {
278 self.write()
279 .await
280 .get_mut_task_panic()
281 .push(server_hook_factory::<S>());
282 self
283 }
284
285 /// Registers a request error handler to the processing pipeline.
286 ///
287 /// This method allows registering request error handlers that implement the `ServerHook` trait,
288 /// which will be executed when a request error occurs during HTTP request processing.
289 ///
290 /// # Type Parameters
291 ///
292 /// - `ServerHook` - The request error handler type that implements `ServerHook`.
293 ///
294 /// # Returns
295 ///
296 /// - `&Self` - Reference to self for method chaining.
297 pub async fn request_error<S>(&self) -> &Self
298 where
299 S: ServerHook,
300 {
301 self.write()
302 .await
303 .get_mut_request_error()
304 .push(server_hook_factory::<S>());
305 self
306 }
307
308 /// Registers a route hook for a specific path.
309 ///
310 /// This method allows registering route handlers that implement the `ServerHook` trait,
311 /// providing type safety and better code organization.
312 ///
313 /// # Type Parameters
314 ///
315 /// - `ServerHook` - The route hook type that implements `ServerHook`.
316 ///
317 /// # Arguments
318 ///
319 /// - `path` - The route path pattern.
320 ///
321 /// # Returns
322 ///
323 /// - `&Self` - Reference to self for method chaining.
324 pub async fn route<S>(&self, path: impl AsRef<str>) -> &Self
325 where
326 S: ServerHook,
327 {
328 self.write()
329 .await
330 .get_mut_route_matcher()
331 .add(path.as_ref(), server_hook_factory::<S>())
332 .unwrap();
333 self
334 }
335
336 /// Registers request middleware to the processing pipeline.
337 ///
338 /// This method allows registering middleware that implements the `ServerHook` trait,
339 /// which will be executed before route handlers for every incoming request.
340 ///
341 /// # Type Parameters
342 ///
343 /// - `ServerHook` - The middleware type that implements `ServerHook`.
344 ///
345 /// # Returns
346 ///
347 /// - `&Self` - Reference to self for method chaining.
348 pub async fn request_middleware<S>(&self) -> &Self
349 where
350 S: ServerHook,
351 {
352 self.write()
353 .await
354 .get_mut_request_middleware()
355 .push(server_hook_factory::<S>());
356 self
357 }
358
359 /// Registers response middleware to the processing pipeline.
360 ///
361 /// This method allows registering middleware that implements the `ServerHook` trait,
362 /// which will be executed after route handlers for every outgoing response.
363 ///
364 /// # Type Parameters
365 ///
366 /// - `ServerHook` - The middleware type that implements `ServerHook`.
367 ///
368 /// # Returns
369 ///
370 /// - `&Self` - Reference to self for method chaining.
371 pub async fn response_middleware<S>(&self) -> &Self
372 where
373 S: ServerHook,
374 {
375 self.write()
376 .await
377 .get_mut_response_middleware()
378 .push(server_hook_factory::<S>());
379 self
380 }
381
382 /// Get the host and port into a bindable address string.
383 ///
384 /// # Arguments
385 ///
386 /// - `AsRef<str>` - The host address.
387 /// - `u16` - The port number.
388 ///
389 /// # Returns
390 ///
391 /// - `String` - The formatted address string.
392 #[inline(always)]
393 pub fn get_bind_addr<H>(host: H, port: u16) -> String
394 where
395 H: AsRef<str>,
396 {
397 format!("{}{COLON}{port}", host.as_ref())
398 }
399
400 /// Flushes the standard output stream.
401 ///
402 /// # Returns
403 ///
404 /// - `io::Result<()>` - The result of the flush operation.
405 #[inline(always)]
406 pub fn try_flush_stdout() -> io::Result<()> {
407 stdout().flush()
408 }
409
410 /// Flushes the standard error stream.
411 ///
412 /// # Panics
413 ///
414 /// This function will panic if the flush operation fails.
415 #[inline(always)]
416 pub fn flush_stdout() {
417 stdout().flush().unwrap();
418 }
419
420 /// Flushes the standard error stream.
421 ///
422 /// # Returns
423 ///
424 /// - `io::Result<()>` - The result of the flush operation.
425 #[inline(always)]
426 pub fn try_flush_stderr() -> io::Result<()> {
427 stderr().flush()
428 }
429
430 /// Flushes the standard error stream.
431 ///
432 /// # Panics
433 ///
434 /// This function will panic if the flush operation fails.
435 #[inline(always)]
436 pub fn flush_stderr() {
437 stderr().flush().unwrap();
438 }
439
440 /// Flushes both the standard output and error streams.
441 ///
442 /// # Returns
443 ///
444 /// - `io::Result<()>` - The result of the flush operation.
445 #[inline(always)]
446 pub fn try_flush_stdout_and_stderr() -> io::Result<()> {
447 Self::try_flush_stdout()?;
448 Self::try_flush_stderr()
449 }
450
451 /// Flushes both the standard output and error streams.
452 ///
453 /// # Panics
454 ///
455 /// This function will panic if either flush operation fails.
456 #[inline(always)]
457 pub fn flush_stdout_and_stderr() {
458 Self::flush_stdout();
459 Self::flush_stderr();
460 }
461
462 /// Handles a panic that has been captured and associated with a specific request `Context`.
463 ///
464 /// This function is invoked when a panic occurs within a task that has access to the request
465 /// context, such as a route hook or middleware. It ensures that the panic information is
466 /// recorded in the `Context` and then passed to the server's configured panic hook for
467 /// processing.
468 ///
469 /// By associating the panic with the context, the hook can access request-specific details
470 /// to provide more meaningful error logging and responses.
471 ///
472 /// # Arguments
473 ///
474 /// - `&Context` - The context of the request during which the panic occurred.
475 /// - `&PanicData` - The captured panic information.
476 async fn handle_panic_with_context(&self, ctx: &Context, panic: &PanicData) {
477 let panic_clone: PanicData = panic.clone();
478 ctx.cancel_aborted().await.set_task_panic(panic_clone).await;
479 for hook in self.read().await.get_task_panic().iter() {
480 Box::pin(self.task_handler(ctx, hook, false)).await;
481 if ctx.get_aborted().await {
482 return;
483 }
484 }
485 }
486
487 /// Handles a panic that occurred within a spawned Tokio task.
488 ///
489 /// It extracts the panic information from the `JoinError` and processes it.
490 ///
491 /// # Arguments
492 ///
493 /// - `&Context` - The context associated with the task.
494 /// - `JoinError` - The `JoinError` returned from the panicked task.
495 async fn handle_task_panic(&self, ctx: &Context, join_error: JoinError) {
496 let panic: PanicData = PanicData::from_join_error(join_error);
497 ctx.set_response_status_code(HttpStatus::InternalServerError.code())
498 .await;
499 self.handle_panic_with_context(ctx, &panic).await;
500 }
501
502 /// Spawns a task handler for a given context and hook.
503 ///
504 /// # Arguments
505 ///
506 /// - `&Context` - The context of the request.
507 /// - `&ServerHookHandler` - The hook to execute.
508 /// - `bool` - Whether to handle panics that occur during execution.
509 async fn task_handler(&self, ctx: &Context, hook: &ServerHookHandler, progress: bool) {
510 if let Err(join_error) = spawn(hook(ctx)).await
511 && join_error.is_panic()
512 {
513 if progress {
514 Box::pin(self.handle_task_panic(ctx, join_error)).await;
515 } else {
516 eprintln!("{}", join_error);
517 let _ = Self::try_flush_stdout_and_stderr();
518 }
519 }
520 }
521
522 /// Creates and binds a `TcpListener` based on the server's configuration.
523 ///
524 /// # Returns
525 ///
526 /// - `Result<TcpListener, ServerError>` - A `Result` containing the bound `TcpListener` on success,
527 /// or a `ServerError` on failure.
528 async fn create_tcp_listener(&self) -> Result<TcpListener, ServerError> {
529 let config: ServerConfigData = self.read().await.get_server_config().clone();
530 let host: &String = config.get_host();
531 let port: u16 = config.get_port();
532 let addr: String = Self::get_bind_addr(host, port);
533 Ok(TcpListener::bind(&addr).await?)
534 }
535
536 /// Enters a loop to accept incoming TCP connections and spawn handlers for them.
537 ///
538 /// # Arguments
539 ///
540 /// - `&TcpListener` - A reference to the `TcpListener` to accept connections from.
541 ///
542 /// # Returns
543 ///
544 /// - `Result<(), ServerError>` - A `Result` which is typically `Ok(())` unless an unrecoverable
545 /// error occurs.
546 async fn accept_connections(&self, tcp_listener: &TcpListener) -> Result<(), ServerError> {
547 while let Ok((stream, _socket_addr)) = tcp_listener.accept().await {
548 self.configure_stream(&stream).await;
549 let stream: ArcRwLockStream = ArcRwLockStream::from_stream(stream);
550 self.spawn_connection_handler(stream).await;
551 }
552 Ok(())
553 }
554
555 /// Configures socket options for a newly accepted `TcpStream`.
556 ///
557 /// This applies settings like `TCP_NODELAY`, and `IP_TTL` from the server's configuration.
558 ///
559 /// # Arguments
560 ///
561 /// - `&TcpStream` - A reference to the `TcpStream` to configure.
562 async fn configure_stream(&self, stream: &TcpStream) {
563 let config: ServerConfigData = self.read().await.get_server_config().clone();
564 if let Some(nodelay) = config.try_get_nodelay() {
565 let _ = stream.set_nodelay(*nodelay);
566 }
567 if let Some(ttl) = config.try_get_ttl() {
568 let _ = stream.set_ttl(*ttl);
569 }
570 }
571
572 /// Spawns a new asynchronous task to handle a single client connection.
573 ///
574 /// # Arguments
575 ///
576 /// - `ArcRwLockStream` - The thread-safe stream representing the client connection.
577 async fn spawn_connection_handler(&self, stream: ArcRwLockStream) {
578 let server: Server = self.clone();
579 let request_config: RequestConfigData = *self.read().await.get_request_config();
580 spawn(async move {
581 server.handle_connection(stream, request_config).await;
582 });
583 }
584
585 /// Handles errors that occur while processing HTTP requests.
586 ///
587 /// # Arguments
588 ///
589 /// - `&Context` - The request context.
590 /// - `&RequestError` - The error that occurred.
591 pub async fn handle_request_error(&self, ctx: &Context, error: &RequestError) {
592 ctx.cancel_aborted()
593 .await
594 .set_request_error_data(error.clone())
595 .await;
596 for hook in self.read().await.get_request_error().iter() {
597 self.task_handler(ctx, hook, true).await;
598 if ctx.get_aborted().await {
599 return;
600 }
601 }
602 }
603
604 /// Handles a single client connection, determining whether it's an HTTP or WebSocket request.
605 ///
606 /// It reads the initial request from the stream and dispatches it to the appropriate hook.
607 ///
608 /// # Arguments
609 ///
610 /// - `ArcRwLockStream` - The stream for the client connection.
611 /// - `request_config` - The request config to use for reading the initial HTTP request.
612 async fn handle_connection(&self, stream: ArcRwLockStream, request_config: RequestConfigData) {
613 match Request::http_from_stream(&stream, &request_config).await {
614 Ok(request) => {
615 let hook: HandlerState = HandlerState::new(stream, request_config);
616 self.handle_http_requests(&hook, &request).await;
617 }
618 Err(error) => {
619 self.handle_request_error(&stream.into(), &error).await;
620 }
621 }
622 }
623
624 /// The core request handling pipeline.
625 ///
626 /// This function orchestrates the execution of request middleware, the route hook,
627 /// and response middleware. It supports both function-based and trait-based handlers.
628 ///
629 /// # Arguments
630 ///
631 /// - `&HandlerState` - The `HandlerState` for the current connection.
632 /// - `&Request` - The incoming request to be processed.
633 ///
634 /// # Returns
635 ///
636 /// - `bool` - A boolean indicating whether the connection should be kept alive.
637 async fn request_hook(&self, state: &HandlerState, request: &Request) -> bool {
638 let route: &str = request.get_path();
639 let ctx: &Context = &Context::new(state.get_stream(), request);
640 let keep_alive: bool = request.is_enable_keep_alive();
641 if self.handle_request_middleware(ctx).await {
642 return ctx.is_keep_alive(keep_alive).await;
643 }
644 if self.handle_route_matcher(ctx, route).await {
645 return ctx.is_keep_alive(keep_alive).await;
646 }
647 if self.handle_response_middleware(ctx).await {
648 return ctx.is_keep_alive(keep_alive).await;
649 }
650 if let Some(panic) = ctx.try_get_task_panic_data().await {
651 ctx.set_response_status_code(HttpStatus::InternalServerError.code())
652 .await;
653 self.handle_panic_with_context(ctx, &panic).await;
654 }
655 ctx.is_keep_alive(keep_alive).await
656 }
657
658 /// Handles subsequent HTTP requests on a persistent (keep-alive) connection.
659 ///
660 /// # Arguments
661 ///
662 /// - `&HandlerState` - The `HandlerState` for the current connection.
663 /// - `&Request` - The initial request that established the keep-alive connection.
664 async fn handle_http_requests(&self, state: &HandlerState, request: &Request) {
665 if !self.request_hook(state, request).await {
666 return;
667 }
668 let stream: &ArcRwLockStream = state.get_stream();
669 let request_config: &RequestConfigData = state.get_request_config();
670 loop {
671 match Request::http_from_stream(stream, request_config).await {
672 Ok(new_request) => {
673 if !self.request_hook(state, &new_request).await {
674 return;
675 }
676 }
677 Err(error) => {
678 self.handle_request_error(&state.get_stream().into(), &error)
679 .await;
680 return;
681 }
682 }
683 }
684 }
685
686 /// Executes trait-based request middleware in sequence.
687 ///
688 /// # Arguments
689 ///
690 /// - `&Context` - The request context.
691 ///
692 /// # Returns
693 ///
694 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
695 pub(super) async fn handle_request_middleware(&self, ctx: &Context) -> bool {
696 for hook in self.read().await.get_request_middleware().iter() {
697 self.task_handler(ctx, hook, true).await;
698 if ctx.get_aborted().await {
699 return true;
700 }
701 }
702 false
703 }
704
705 /// Executes a trait-based route hook if one matches.
706 ///
707 /// # Arguments
708 ///
709 /// - `&Context` - The request context.
710 /// - `&str` - The request path to match.
711 ///
712 /// # Returns
713 ///
714 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
715 pub(super) async fn handle_route_matcher(&self, ctx: &Context, path: &str) -> bool {
716 if let Some(hook) = self
717 .read()
718 .await
719 .get_route_matcher()
720 .try_resolve_route(ctx, path)
721 .await
722 {
723 self.task_handler(ctx, &hook, true).await;
724 if ctx.get_aborted().await {
725 return true;
726 }
727 }
728 false
729 }
730
731 /// Executes trait-based response middleware in sequence.
732 ///
733 /// # Arguments
734 ///
735 /// - `&Context` - The request context.
736 ///
737 /// # Returns
738 ///
739 /// - `bool` - `true` if the lifecycle was aborted, `false` otherwise.
740 pub(super) async fn handle_response_middleware(&self, ctx: &Context) -> bool {
741 for hook in self.read().await.get_response_middleware().iter() {
742 self.task_handler(ctx, hook, true).await;
743 if ctx.get_aborted().await {
744 return true;
745 }
746 }
747 false
748 }
749
750 /// Starts the server, binds to the configured address, and begins listening for connections.
751 ///
752 /// This is the main entry point to launch the server. It will initialize the panic hook,
753 /// create a TCP listener, and then enter the connection acceptance loop in a background task.
754 ///
755 /// # Returns
756 ///
757 /// Returns a `Result` containing a shutdown function on success.
758 /// Calling this function will shut down the server by aborting its main task.
759 /// Returns an error if the server fails to start.
760 pub async fn run(&self) -> Result<ServerControlHook, ServerError> {
761 let tcp_listener: TcpListener = self.create_tcp_listener().await?;
762 let server: Server = self.clone();
763 let (wait_sender, wait_receiver) = channel(());
764 let (shutdown_sender, mut shutdown_receiver) = channel(());
765 let accept_connections: JoinHandle<()> = spawn(async move {
766 let _ = server.accept_connections(&tcp_listener).await;
767 let _ = wait_sender.send(());
768 });
769 let wait_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
770 let mut wait_receiver_clone: Receiver<()> = wait_receiver.clone();
771 Box::pin(async move {
772 let _ = wait_receiver_clone.changed().await;
773 })
774 });
775 let shutdown_hook: SharedAsyncTaskFactory<()> = Arc::new(move || {
776 let shutdown_sender_clone: Sender<()> = shutdown_sender.clone();
777 Box::pin(async move {
778 let _ = shutdown_sender_clone.send(());
779 })
780 });
781 spawn(async move {
782 let _ = shutdown_receiver.changed().await;
783 accept_connections.abort();
784 });
785 let mut server_control_hook: ServerControlHook = ServerControlHook::default();
786 server_control_hook.set_shutdown_hook(shutdown_hook);
787 server_control_hook.set_wait_hook(wait_hook);
788 Ok(server_control_hook)
789 }
790}