ngyn_hyper/
lib.rs

1use http_body_util::BodyExt;
2use hyper::body::Incoming;
3use hyper::server::conn::http1;
4use hyper::{service::service_fn, Request};
5use hyper_util::rt::TokioIo;
6use hyper_util::server::graceful::GracefulShutdown;
7use ngyn_shared::core::engine::{NgynHttpPlatform, PlatformData};
8use ngyn_shared::server::NgynResponse;
9use std::sync::Arc;
10use tokio::net::TcpListener;
11
12#[derive(Default)]
13/// Configure an [`HyperApplication`]
14pub struct HyperConfig {
15    h1_half_close: bool,
16    h1_keep_alive: bool,
17    h1_title_case_headers: bool,
18    h1_preserve_header_case: bool,
19    h1_max_headers: Option<usize>,
20    max_buf_size: Option<usize>,
21    pipeline_flush: bool,
22}
23
24/// Represents a Hyper-based application.
25#[derive(Default)]
26pub struct HyperApplication {
27    data: PlatformData,
28    config: HyperConfig,
29}
30
31impl NgynHttpPlatform for HyperApplication {
32    fn data_mut(&mut self) -> &mut PlatformData {
33        &mut self.data
34    }
35}
36
37impl HyperApplication {
38    pub fn with_config(config: HyperConfig) -> Self {
39        Self {
40            data: PlatformData::default(),
41            config,
42        }
43    }
44    /// Listens for incoming connections and serves the application.
45    ///
46    /// ### Arguments
47    ///
48    /// * `address` - The address to listen on.
49    ///
50    /// ### Returns
51    ///
52    /// A `Result` indicating success or failure.
53    pub async fn listen<A: tokio::net::ToSocketAddrs>(
54        self,
55        address: A,
56    ) -> Result<(), std::io::Error> {
57        let server = TcpListener::bind(address).await?;
58        let data = Arc::new(self.data);
59
60        let mut http1 = http1::Builder::new();
61
62        http1
63            .half_close(self.config.h1_half_close)
64            .keep_alive(self.config.h1_keep_alive)
65            .title_case_headers(self.config.h1_title_case_headers)
66            .preserve_header_case(self.config.h1_preserve_header_case)
67            .pipeline_flush(self.config.pipeline_flush);
68
69        if let Some(buff_size) = self.config.max_buf_size {
70            http1.max_buf_size(buff_size);
71        }
72
73        if let Some(max_headers) = self.config.h1_max_headers {
74            http1.max_headers(max_headers);
75        }
76
77        let graceful = GracefulShutdown::new();
78        // when this signal completes, start shutdown
79        let mut signal = std::pin::pin!(shutdown_signal());
80
81        loop {
82            let data = data.clone();
83            tokio::select! {
84                Ok((stream, _)) = server.accept() => {
85                    let io = TokioIo::new(stream);
86                    let conn = http1.serve_connection(io, service_fn(move |req| hyper_service(data.clone(), req)));
87                    let handle = graceful.watch(conn);
88
89                    tokio::task::spawn(async move {
90                        if let Err(e) = handle.await {
91                            eprintln!("server connection error: {}", e);
92                        }
93                    });
94                }
95                _ = &mut signal => {
96                    eprintln!("graceful shutdown signal received");
97                    // stop the accept loop
98                    break;
99                }
100                else => continue, // continue waiting for the next signal or connection
101            }
102        }
103
104        tokio::select! {
105            _ = graceful.shutdown() => {
106                eprintln!("all connections gracefully closed");
107            },
108            _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
109                eprintln!("timed out wait for all connections to close");
110            }
111        }
112
113        Ok(())
114    }
115}
116
117async fn hyper_service(
118    data: Arc<PlatformData>,
119    req: Request<Incoming>,
120) -> Result<NgynResponse, hyper::Error> {
121    let (parts, mut body) = req.into_parts();
122    let body = {
123        let mut buf = Vec::new();
124        // TODO: change this approach. It's not efficient.
125        while let Some(frame) = body.frame().await {
126            if let Ok(bytes) = frame?.into_data() {
127                buf.extend_from_slice(&bytes);
128            } else {
129                break;
130            }
131        }
132        buf
133    };
134    let req = Request::from_parts(parts, body);
135    let res = data.respond(req).await;
136
137    Ok::<_, hyper::Error>(res)
138}
139
140async fn shutdown_signal() {
141    tokio::signal::ctrl_c()
142        .await
143        .expect("failed to listen for signal");
144}