1use http_body_util::BodyExt;
2use hyper::body::Incoming;
3use hyper::server::conn::http1;
4use hyper::{service::service_fn, Request};
5use hyper_util::rt::TokioIo;
6use hyper_util::server::graceful::GracefulShutdown;
7use ngyn_shared::core::engine::{NgynHttpPlatform, PlatformData};
8use ngyn_shared::server::NgynResponse;
9use std::sync::Arc;
10use tokio::net::TcpListener;
11
12#[derive(Default)]
13pub struct HyperConfig {
15 h1_half_close: bool,
16 h1_keep_alive: bool,
17 h1_title_case_headers: bool,
18 h1_preserve_header_case: bool,
19 h1_max_headers: Option<usize>,
20 max_buf_size: Option<usize>,
21 pipeline_flush: bool,
22}
23
24#[derive(Default)]
26pub struct HyperApplication {
27 data: PlatformData,
28 config: HyperConfig,
29}
30
31impl NgynHttpPlatform for HyperApplication {
32 fn data_mut(&mut self) -> &mut PlatformData {
33 &mut self.data
34 }
35}
36
37impl HyperApplication {
38 pub fn with_config(config: HyperConfig) -> Self {
39 Self {
40 data: PlatformData::default(),
41 config,
42 }
43 }
44 pub async fn listen<A: tokio::net::ToSocketAddrs>(
54 self,
55 address: A,
56 ) -> Result<(), std::io::Error> {
57 let server = TcpListener::bind(address).await?;
58 let data = Arc::new(self.data);
59
60 let mut http1 = http1::Builder::new();
61
62 http1
63 .half_close(self.config.h1_half_close)
64 .keep_alive(self.config.h1_keep_alive)
65 .title_case_headers(self.config.h1_title_case_headers)
66 .preserve_header_case(self.config.h1_preserve_header_case)
67 .pipeline_flush(self.config.pipeline_flush);
68
69 if let Some(buff_size) = self.config.max_buf_size {
70 http1.max_buf_size(buff_size);
71 }
72
73 if let Some(max_headers) = self.config.h1_max_headers {
74 http1.max_headers(max_headers);
75 }
76
77 let graceful = GracefulShutdown::new();
78 let mut signal = std::pin::pin!(shutdown_signal());
80
81 loop {
82 let data = data.clone();
83 tokio::select! {
84 Ok((stream, _)) = server.accept() => {
85 let io = TokioIo::new(stream);
86 let conn = http1.serve_connection(io, service_fn(move |req| hyper_service(data.clone(), req)));
87 let handle = graceful.watch(conn);
88
89 tokio::task::spawn(async move {
90 if let Err(e) = handle.await {
91 eprintln!("server connection error: {}", e);
92 }
93 });
94 }
95 _ = &mut signal => {
96 eprintln!("graceful shutdown signal received");
97 break;
99 }
100 else => continue, }
102 }
103
104 tokio::select! {
105 _ = graceful.shutdown() => {
106 eprintln!("all connections gracefully closed");
107 },
108 _ = tokio::time::sleep(std::time::Duration::from_secs(10)) => {
109 eprintln!("timed out wait for all connections to close");
110 }
111 }
112
113 Ok(())
114 }
115}
116
117async fn hyper_service(
118 data: Arc<PlatformData>,
119 req: Request<Incoming>,
120) -> Result<NgynResponse, hyper::Error> {
121 let (parts, mut body) = req.into_parts();
122 let body = {
123 let mut buf = Vec::new();
124 while let Some(frame) = body.frame().await {
126 if let Ok(bytes) = frame?.into_data() {
127 buf.extend_from_slice(&bytes);
128 } else {
129 break;
130 }
131 }
132 buf
133 };
134 let req = Request::from_parts(parts, body);
135 let res = data.respond(req).await;
136
137 Ok::<_, hyper::Error>(res)
138}
139
140async fn shutdown_signal() {
141 tokio::signal::ctrl_c()
142 .await
143 .expect("failed to listen for signal");
144}