chitey_server/server/
http3_server.rs

1use std::net::SocketAddr;
2use std::sync::Arc;
3use bytes::Bytes;
4use futures_util::StreamExt;
5use h3::error::ErrorLevel;
6use h3::quic::BidiStream;
7use h3::server::RequestStream;
8use http::HeaderValue;
9use http::Request;
10use http::Response;
11use http::StatusCode;
12use hyper::Body;
13use tracing::info;
14// use tracing::{error, info, trace_span};
15use urlpattern::UrlPatternMatchInput;
16
17use crate::server::http3_stream_wrapper::StreamWrapper;
18use crate::web_server::ChiteyError;
19use crate::web_server::Factories;
20
21
22use super::util::TlsCertsKey;
23use super::util::cors_builder;
24use super::util::throw_chitey_internal_server_error;
25
26#[derive(Clone)]
27pub struct Http3ServerOpt {
28    pub listen: SocketAddr,
29}
30
31pub async fn launch_http3_server(
32    tls_cert_key: TlsCertsKey,
33    http3_server_opt: Http3ServerOpt,
34    factories: Factories,
35) -> Result<(), ChiteyError> {
36    let TlsCertsKey { certs, key } = tls_cert_key;
37    let Http3ServerOpt { listen } = http3_server_opt;
38
39    let tls_config = {
40        let mut tls_config = rustls::ServerConfig::builder()
41            .with_safe_default_cipher_suites()
42            .with_safe_default_kx_groups()
43            .with_protocol_versions(&[&rustls::version::TLS13])
44            .unwrap()
45            .with_no_client_auth()
46            .with_single_cert(certs.clone(), key.clone()).unwrap();
47        tls_config.max_early_data_size = u32::MAX;
48        let alpn: &[u8] = b"h3";
49        tls_config.alpn_protocols = vec![alpn.into()];
50        tls_config
51    };
52
53    let server_config = quinn::ServerConfig::with_crypto(Arc::new(tls_config));
54    let endpoint = quinn::Endpoint::server(server_config, listen).unwrap();
55
56    while let Some(new_conn) = endpoint.accept().await {
57        // #[cfg(debug_assertions)]
58        // trace_span!("New connection being attempted");
59
60        let factories2 = factories.clone();
61        tokio::spawn(async move {
62            println!("{:?}", factories2.factories.len());
63            match new_conn.await {
64                Ok(conn) => {
65                    // #[cfg(debug_assertions)]
66                    info!("new connection established");
67
68                    let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(conn))
69                        .await
70                        .unwrap();
71
72                    loop {
73                        match h3_conn.accept().await {
74                            Ok(Some((req, stream))) => {
75                                // #[cfg(debug_assertions)]
76                                // eprintln!("new request: {:#?}", req);
77
78                                let factories3 = factories2.clone();
79                                tokio::spawn(async move {
80                                    // if let Err(e) = handle_request_http3(req, stream).await {
81                                    //     #[cfg(debug_assertions)]
82                                    //     error!("handling request failed: {}", e);
83                                    // };
84                                    if let Err(e) = handle_request_http3(req, stream, factories3).await {
85                                        tracing::error!("http3: {}", e.to_string());
86                                    }
87                                });
88                            }
89
90                            // indicating no more streams to be received
91                            Ok(None) => {
92                                break;
93                            }
94
95                            Err(err) => {
96                                #[cfg(debug_assertions)]
97                                eprintln!("error on accept {}", err);
98                                match err.get_error_level() {
99                                    ErrorLevel::ConnectionError => break,
100                                    ErrorLevel::StreamError => continue,
101                                }
102                            }
103                        }
104                    }
105                }
106                Err(_err) => {
107                    // #[cfg(debug_assertions)]
108                    // eprintln!("accepting connection failed: {:?}", err);
109                }
110            }
111        });
112    }
113
114    // shut down gracefully
115    // wait for connections to be closed before exiting
116    println!("Starting to serve on https://{}.", listen);
117    endpoint.wait_idle().await;
118
119    Ok(())
120}
121
122pub async fn handle_request_http3<T>(
123    req: Request<()>,
124    mut stream: RequestStream<T, Bytes>,
125    factories: Factories,
126) -> Result<(), ChiteyError>
127where
128    T: BidiStream<Bytes> + 'static + Send + Sync,
129{
130    if req.uri().path().contains("..") {
131        let resp = cors_builder().status(StatusCode::NOT_FOUND).body(()).unwrap();
132
133        throw_chitey_internal_server_error(stream.send_response(resp).await)?;
134        throw_chitey_internal_server_error(stream.send_data(Bytes::copy_from_slice(b"page not found")).await)?;
135        throw_chitey_internal_server_error(stream.finish().await)?
136    }
137    let url = req.uri().to_string().parse().unwrap();
138    let input = UrlPatternMatchInput::Url(url);
139
140    let method = req.method().clone();
141    let req_contain_key = req.headers().contains_key("Another-Header");
142
143    let (mut send_stream, recv_stream) = stream.split();
144    let stm: StreamWrapper<T> = StreamWrapper::new(recv_stream);
145    let req = req.map(|_| Body::wrap_stream(stm));
146
147    for (res, factory) in factories.factories {
148        // GET && POST
149        if res.guard == method {
150            if let Ok(Some(_)) = res.rdef.exec(input.clone()) {
151                let factory_loc = factory.lock().await;
152                if factory_loc.analyze_types(input.clone()) {
153                    return match factory_loc.handler_func(input.clone(), (req, true, factories.contexts.clone())).await {
154                    Ok(resp) => {
155                        let (mut parts, mut body) = resp.into_parts();
156                        if req_contain_key {
157                            parts.headers.append("Another-Header", HeaderValue::from_static("Ack"));
158                        }
159                        parts.headers.append("Alt-Svc", HeaderValue::from_static("h3=\":443\"; ma=2592000"));
160                        throw_chitey_internal_server_error(send_stream.send_response(Response::from_parts(parts, ())).await)?;
161                        while let Some(chunk) = body.next().await {
162                            match chunk {
163                                Ok(chunk) => {
164                                    throw_chitey_internal_server_error(send_stream.send_data(chunk).await)?;
165                                }
166                                Err(e) => {
167                                    eprintln!("エラーが発生しました: {}", e);
168                                    break;
169                                }
170                            }
171                        }
172                        throw_chitey_internal_server_error(send_stream.finish().await)?;
173                        Ok(())
174                    },
175                    Err(e) => Err(ChiteyError::InternalServerError(e.to_string())),
176                    }
177                }
178            };
179        }
180    }
181
182
183    // let content_type = content_type_option.unwrap();
184    // let mime_type_result: Result<mime::Mime, _> = match content_type.to_str() {
185    //     Ok(s) => s
186    //         .parse()
187    //         .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err)),
188    //     Err(err) => Err(std::io::Error::new(std::io::ErrorKind::Other, err)),
189    // };
190    // if mime_type_result.is_err() {
191    //     return Ok(());
192    // }
193    // let mime_type = mime_type_result.unwrap();
194    // if mime_type.essence_str() != "multipart/form-data" {
195    //     return Ok(());
196    // }
197    // let boundary = mime_type
198    //     .get_param("boundary")
199    //     .map(|v| v.to_string())
200    //     .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "boundary not found"))?;
201    // // if let Ok(Some(post_data)) = stream.recv_data().await{
202    // //   info!("{:?}", post_data.chunk());
203    // // }
204    // let stm: StreamWrapper<T> = StreamWrapper::new(recv_stream);
205    // let mut req = req.map(|_| Body::wrap_stream(stm));
206    //     let (re, b)  = req.into_parts();
207
208    // let mut multipart_stream = mpart_async::server::MultipartStream::new(
209    //     boundary,
210    //     b.map_ok(|buf| {
211    //         let mut ret = BytesMut::with_capacity(buf.remaining());
212    //         ret.put(buf);
213    //         ret.freeze()
214    //     }),
215    // );
216
217    // while let Ok(Some(mut field)) = multipart_stream.try_next().await {
218    //     println!("Field name:{}", field.name().unwrap());
219    //     if let Ok(filename) = field.filename() {
220    //         println!("Field filename:{}", filename);
221    //         let mut writer = BufWriter::new(File::create(filename.as_ref()).unwrap());
222    //         let mut bufferlen: i64 = 0;
223    //         while let Ok(Some(bytes)) = field.try_next().await {
224    //             bufferlen += bytes.len() as i64;
225    //             writer.write(&bytes).unwrap();
226    //         }
227    //         println!("Bytes received:{}", bufferlen);
228    //     } else {
229    //         let mut buffer = BytesMut::new();
230    //         while let Ok(Some(bytes)) = field.try_next().await {
231    //             buffer.put(bytes);
232    //         }
233    //         let value: String = String::from_utf8(buffer.to_vec()).unwrap();
234    //         println!("{} = {}", field.name().unwrap(), value);
235    //     }
236    // }
237
238    // let resp = http::Response::builder().status(status).body(()).unwrap();
239
240    // // let post_stream = PostStream
241
242    // match send_stream.send_response(resp).await {
243    //     Ok(_) => {
244    //         #[cfg(debug_assertions)]
245    //         info!("successfully respond to connection");
246    //     }
247    //     Err(err) => {
248    //         #[cfg(debug_assertions)]
249    //         error!("unable to send response to connection peer: {:?}", err);
250    //     }
251    // }
252
253    // if let Ok(Some(post_data)) = recv_stream.recv_data().await{
254    //   info!("{:?}", post_data.chunk());
255    // }
256    // info!("{:?}", req.headers().get("content-type"));
257    // match recv_stream.recv_trailers().await {
258    //   Ok(v) => match v {
259    //     Some(d) => { info!("recv: {:?}", d) },
260    //     None => { info!("recv: None"); },
261    //   },
262    //   Err(v) => { info!("recv: {:?}", v); },
263    // };
264    // let mut num = 0;
265    // loop {
266    //   match recv_stream.recv_data().await {
267    //     Ok(v) => match v {
268    //       Some(d) => {
269    //         // info!("{:?}", d.chunk());
270    //         // info!("{}", std::str::from_utf8(d.chunk()).unwrap());
271    //         num += d.remaining();
272    //         // info!("{:?}", d.remaining());
273    //       },
274    //       None => { info!("None"); break; },
275    //     },
276    //     Err(v) => { info!("{:?}", v); break; },
277    //   }
278    //   // tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
279    // }
280    // info!("num: {}", num);
281    // match recv_stream.recv_trailers().await {
282    //   Ok(v) => match v {
283    //     Some(d) => { info!("recv: {:?}", d) },
284    //     None => { info!("recv: None"); },
285    //   },
286    //   Err(v) => { info!("recv: {:?}", v); },
287    // };
288    // let mut buf = BytesMut::new();
289    // buf.extend_from_slice(b"hello world http3");
290    // send_stream.send_data(buf.freeze()).await?;
291
292    // Ok(send_stream.finish().await?)
293    //     Ok(())
294    // } else {
295    let resp = cors_builder().status(StatusCode::NOT_FOUND).header("Alt-Svc", "h3=\":443\"; ma=2592000").body(()).unwrap();
296    throw_chitey_internal_server_error(send_stream.send_response(resp).await)?;
297    throw_chitey_internal_server_error(send_stream.send_data(Bytes::copy_from_slice(b"page not found")).await)?;
298    throw_chitey_internal_server_error(send_stream.finish().await)
299}