chitey_server/server/http3_server.rs
1use std::net::SocketAddr;
2use std::sync::Arc;
3use bytes::Bytes;
4use futures_util::StreamExt;
5use h3::error::ErrorLevel;
6use h3::quic::BidiStream;
7use h3::server::RequestStream;
8use http::HeaderValue;
9use http::Request;
10use http::Response;
11use http::StatusCode;
12use hyper::Body;
13use tracing::info;
14// use tracing::{error, info, trace_span};
15use urlpattern::UrlPatternMatchInput;
16
17use crate::server::http3_stream_wrapper::StreamWrapper;
18use crate::web_server::ChiteyError;
19use crate::web_server::Factories;
20
21
22use super::util::TlsCertsKey;
23use super::util::cors_builder;
24use super::util::throw_chitey_internal_server_error;
25
26#[derive(Clone)]
27pub struct Http3ServerOpt {
28 pub listen: SocketAddr,
29}
30
31pub async fn launch_http3_server(
32 tls_cert_key: TlsCertsKey,
33 http3_server_opt: Http3ServerOpt,
34 factories: Factories,
35) -> Result<(), ChiteyError> {
36 let TlsCertsKey { certs, key } = tls_cert_key;
37 let Http3ServerOpt { listen } = http3_server_opt;
38
39 let tls_config = {
40 let mut tls_config = rustls::ServerConfig::builder()
41 .with_safe_default_cipher_suites()
42 .with_safe_default_kx_groups()
43 .with_protocol_versions(&[&rustls::version::TLS13])
44 .unwrap()
45 .with_no_client_auth()
46 .with_single_cert(certs.clone(), key.clone()).unwrap();
47 tls_config.max_early_data_size = u32::MAX;
48 let alpn: &[u8] = b"h3";
49 tls_config.alpn_protocols = vec![alpn.into()];
50 tls_config
51 };
52
53 let server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_config));
54 let endpoint = quinn::Endpoint::server(server_config, listen).unwrap();
55
56 while let Some(new_conn) = endpoint.accept().await {
57 // #[cfg(debug_assertions)]
58 // trace_span!("New connection being attempted");
59
60 let factories2 = factories.clone();
61 tokio::spawn(async move {
62 println!("{:?}", factories2.factories.len());
63 match new_conn.await {
64 Ok(conn) => {
65 // #[cfg(debug_assertions)]
66 info!("new connection established");
67
68 let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn))
69 .await
70 .unwrap();
71
72 loop {
73 match h3_conn.accept().await {
74 Ok(Some((req, stream))) => {
75 // #[cfg(debug_assertions)]
76 // eprintln!("new request: {:#?}", req);
77
78 let factories3 = factories2.clone();
79 tokio::spawn(async move {
80 // if let Err(e) = handle_request_http3(req, stream).await {
81 // #[cfg(debug_assertions)]
82 // error!("handling request failed: {}", e);
83 // };
84 if let Err(e) = handle_request_http3(req, stream, factories3).await {
85 tracing::error!("http3: {}", e.to_string());
86 }
87 });
88 }
89
90 // indicating no more streams to be received
91 Ok(None) => {
92 break;
93 }
94
95 Err(err) => {
96 #[cfg(debug_assertions)]
97 eprintln!("error on accept {}", err);
98 match err.get_error_level() {
99 ErrorLevel::ConnectionError => break,
100 ErrorLevel::StreamError => continue,
101 }
102 }
103 }
104 }
105 }
106 Err(_err) => {
107 // #[cfg(debug_assertions)]
108 // eprintln!("accepting connection failed: {:?}", err);
109 }
110 }
111 });
112 }
113
114 // shut down gracefully
115 // wait for connections to be closed before exiting
116 println!("Starting to serve on https://{}.", listen);
117 endpoint.wait_idle().await;
118
119 Ok(())
120}
121
122pub async fn handle_request_http3<T>(
123 req: Request<()>,
124 mut stream: RequestStream<T, Bytes>,
125 factories: Factories,
126) -> Result<(), ChiteyError>
127where
128 T: BidiStream<Bytes> + 'static + Send + Sync,
129{
130 if req.uri().path().contains("..") {
131 let resp = cors_builder().status(StatusCode::NOT_FOUND).body(()).unwrap();
132
133 throw_chitey_internal_server_error(stream.send_response(resp).await)?;
134 throw_chitey_internal_server_error(stream.send_data(Bytes::copy_from_slice(b"page not found")).await)?;
135 throw_chitey_internal_server_error(stream.finish().await)?
136 }
137 let url = req.uri().to_string().parse().unwrap();
138 let input = UrlPatternMatchInput::Url(url);
139
140 let method = req.method().clone();
141 let req_contain_key = req.headers().contains_key("Another-Header");
142
143 let (mut send_stream, recv_stream) = stream.split();
144 let stm: StreamWrapper<T> = StreamWrapper::new(recv_stream);
145 let req = req.map(|_| Body::wrap_stream(stm));
146
147 for (res, factory) in factories.factories {
148 // GET && POST
149 if res.guard == method {
150 if let Ok(Some(_)) = res.rdef.exec(input.clone()) {
151 let factory_loc = factory.lock().await;
152 if factory_loc.analyze_types(input.clone()) {
153 return match factory_loc.handler_func(input.clone(), (req, true, factories.contexts.clone())).await {
154 Ok(resp) => {
155 let (mut parts, mut body) = resp.into_parts();
156 if req_contain_key {
157 parts.headers.append("Another-Header", HeaderValue::from_static("Ack"));
158 }
159 parts.headers.append("Alt-Svc", HeaderValue::from_static("h3=\":443\"; ma=2592000"));
160 throw_chitey_internal_server_error(send_stream.send_response(Response::from_parts(parts, ())).await)?;
161 while let Some(chunk) = body.next().await {
162 match chunk {
163 Ok(chunk) => {
164 throw_chitey_internal_server_error(send_stream.send_data(chunk).await)?;
165 }
166 Err(e) => {
167 eprintln!("エラーが発生しました: {}", e);
168 break;
169 }
170 }
171 }
172 throw_chitey_internal_server_error(send_stream.finish().await)?;
173 Ok(())
174 },
175 Err(e) => Err(ChiteyError::InternalServerError(e.to_string())),
176 }
177 }
178 };
179 }
180 }
181
182
183 // let content_type = content_type_option.unwrap();
184 // let mime_type_result: Result<mime::Mime, _> = match content_type.to_str() {
185 // Ok(s) => s
186 // .parse()
187 // .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
188 // Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
189 // };
190 // if mime_type_result.is_err() {
191 // return Ok(());
192 // }
193 // let mime_type = mime_type_result.unwrap();
194 // if mime_type.essence_str() != "multipart/form-data" {
195 // return Ok(());
196 // }
197 // let boundary = mime_type
198 // .get_param("boundary")
199 // .map(|v| v.to_string())
200 // .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "boundary not found"))?;
201 // // if let Ok(Some(post_data)) = stream.recv_data().await{
202 // // info!("{:?}", post_data.chunk());
203 // // }
204 // let stm: StreamWrapper<T> = StreamWrapper::new(recv_stream);
205 // let mut req = req.map(|_| Body::wrap_stream(stm));
206 // let (re, b) = req.into_parts();
207
208 // let mut multipart_stream = mpart_async::server::MultipartStream::new(
209 // boundary,
210 // b.map_ok(|buf| {
211 // let mut ret = BytesMut::with_capacity(buf.remaining());
212 // ret.put(buf);
213 // ret.freeze()
214 // }),
215 // );
216
217 // while let Ok(Some(mut field)) = multipart_stream.try_next().await {
218 // println!("Field name:{}", field.name().unwrap());
219 // if let Ok(filename) = field.filename() {
220 // println!("Field filename:{}", filename);
221 // let mut writer = BufWriter::new(File::create(filename.as_ref()).unwrap());
222 // let mut bufferlen: i64 = 0;
223 // while let Ok(Some(bytes)) = field.try_next().await {
224 // bufferlen += bytes.len() as i64;
225 // writer.write(&bytes).unwrap();
226 // }
227 // println!("Bytes received:{}", bufferlen);
228 // } else {
229 // let mut buffer = BytesMut::new();
230 // while let Ok(Some(bytes)) = field.try_next().await {
231 // buffer.put(bytes);
232 // }
233 // let value: String = String::from_utf8(buffer.to_vec()).unwrap();
234 // println!("{} = {}", field.name().unwrap(), value);
235 // }
236 // }
237
238 // let resp = http::Response::builder().status(status).body(()).unwrap();
239
240 // // let post_stream = PostStream
241
242 // match send_stream.send_response(resp).await {
243 // Ok(_) => {
244 // #[cfg(debug_assertions)]
245 // info!("successfully respond to connection");
246 // }
247 // Err(err) => {
248 // #[cfg(debug_assertions)]
249 // error!("unable to send response to connection peer: {:?}", err);
250 // }
251 // }
252
253 // if let Ok(Some(post_data)) = recv_stream.recv_data().await{
254 // info!("{:?}", post_data.chunk());
255 // }
256 // info!("{:?}", req.headers().get("content-type"));
257 // match recv_stream.recv_trailers().await {
258 // Ok(v) => match v {
259 // Some(d) => { info!("recv: {:?}", d) },
260 // None => { info!("recv: None"); },
261 // },
262 // Err(v) => { info!("recv: {:?}", v); },
263 // };
264 // let mut num = 0;
265 // loop {
266 // match recv_stream.recv_data().await {
267 // Ok(v) => match v {
268 // Some(d) => {
269 // // info!("{:?}", d.chunk());
270 // // info!("{}", std::str::from_utf8(d.chunk()).unwrap());
271 // num += d.remaining();
272 // // info!("{:?}", d.remaining());
273 // },
274 // None => { info!("None"); break; },
275 // },
276 // Err(v) => { info!("{:?}", v); break; },
277 // }
278 // // tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
279 // }
280 // info!("num: {}", num);
281 // match recv_stream.recv_trailers().await {
282 // Ok(v) => match v {
283 // Some(d) => { info!("recv: {:?}", d) },
284 // None => { info!("recv: None"); },
285 // },
286 // Err(v) => { info!("recv: {:?}", v); },
287 // };
288 // let mut buf = BytesMut::new();
289 // buf.extend_from_slice(b"hello world http3");
290 // send_stream.send_data(buf.freeze()).await?;
291
292 // Ok(send_stream.finish().await?)
293 // Ok(())
294 // } else {
295 let resp = cors_builder().status(StatusCode::NOT_FOUND).header("Alt-Svc", "h3=\":443\"; ma=2592000").body(()).unwrap();
296 throw_chitey_internal_server_error(send_stream.send_response(resp).await)?;
297 throw_chitey_internal_server_error(send_stream.send_data(Bytes::copy_from_slice(b"page not found")).await)?;
298 throw_chitey_internal_server_error(send_stream.finish().await)
299}