oxidite_core/server/
http3_server.rs1#![cfg(feature = "http3")]
2
3use std::net::SocketAddr;
4use std::sync::Arc;
5use tokio::sync::Notify;
6use rustls::ServerConfig;
7use quinn::{Endpoint, ServerConfig as QuinnServerConfig};
8use h3::server::RequestStream;
9use h3_quinn;
10use bytes::Bytes;
11use http::{Request, Response};
12use http_body_util::BodyExt;
13use crate::error::Result;
14use crate::types::{OxiditeRequest, OxiditeResponse};
15use tower_service::Service;
16
17pub struct Http3Server<S> {
18 service: S,
19}
20
21impl<S> Http3Server<S>
22where
23 S: Service<OxiditeRequest, Response = OxiditeResponse, Error = crate::error::Error>
24 + Clone
25 + Send
26 + Sync
27 + 'static,
28 S::Future: Send + 'static,
29{
30 pub fn new(service: S) -> Self {
31 Self { service }
32 }
33
34 pub async fn listen(self, addr: SocketAddr, tls_config: ServerConfig) -> Result<()> {
35 let crypto = quinn::crypto::rustls::QuicServerConfig::try_from(Arc::new(tls_config))
36 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
37 let quinn_config = QuinnServerConfig::with_crypto(Arc::new(crypto));
38
39 let endpoint = Endpoint::server(quinn_config, addr)
40 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
41
42 println!("HTTP/3 server listening on https://{}", addr);
43
44 let notify_shutdown = Arc::new(Notify::new());
45 let (_shutdown_tx, mut shutdown_rx) = tokio::sync::mpsc::unbounded_channel::<()>();
46
47 loop {
48 tokio::select! {
49 conn = endpoint.accept() => {
50 if let Some(conn) = conn {
51 let quic_conn = match conn.await {
52 Ok(conn) => conn,
53 Err(e) => {
54 eprintln!("Connection error: {}", e);
55 continue;
56 }
57 };
58
59 let service = self.service.clone();
60 let _notify = notify_shutdown.clone();
61
62 tokio::spawn(async move {
63 if let Err(e) = Self::handle_connection(quic_conn, service).await {
64 eprintln!("Connection handler error: {}", e);
65 }
66 });
67 }
68 }
69 _ = shutdown_rx.recv() => {
70 println!("Shutting down HTTP/3 server...");
71 endpoint.close(0u32.into(), b"shutdown");
72 break;
73 }
74 }
75 }
76
77 Ok(())
78 }
79
80 async fn handle_connection(
81 quic_conn: quinn::Connection,
82 service: S,
83 ) -> Result<()> {
84 let mut h3_conn = h3::server::Connection::new(h3_quinn::Connection::new(quic_conn))
85 .await
86 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
87
88 loop {
89 match h3_conn.accept().await {
90 Ok(Some(resolver)) => {
91 let (req, stream) = match resolver.resolve_request().await {
92 Ok(res) => res,
93 Err(e) => {
94 eprintln!("Error resolving request: {}", e);
95 continue;
96 }
97 };
98 Self::handle_request(req, stream, service.clone()).await?;
99 }
100 Ok(None) => break, Err(e) => {
102 eprintln!("Error accepting request: {}", e);
103 break;
104 }
105 }
106 }
107
108 Ok(())
109 }
110
111 async fn handle_request(
112 h3_request: Request<()>,
113 mut stream: RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>,
114 mut service: S,
115 ) -> Result<()> {
116 let (parts, _) = h3_request.into_parts();
118
119 let body = http_body_util::Full::new(Bytes::new()).map_err(|e| match e {}).boxed();
121
122 let oxidite_req = Request::from_parts(parts, body);
123
124 let response = service.call(oxidite_req).await
126 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
127
128 let status = response.status();
130 let response_headers = response.headers().clone();
131
132 let response_body = response.into_inner().into_body().collect().await
134 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?
135 .to_bytes();
136
137 let mut h3_response = Response::builder()
138 .status(status.as_u16());
139
140 *h3_response.headers_mut().unwrap() = response_headers;
141
142 stream.send_response(h3_response.body(()).unwrap()).await
143 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
144
145 stream.send_data(response_body).await
146 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
147
148 stream.finish().await
149 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
150
151 Ok(())
152 }
153}
154
155pub fn create_tls_config(cert_pem: &str, key_pem: &str) -> Result<ServerConfig> {
157 use rustls_pemfile::{certs, pkcs8_private_keys};
158 use std::io::Cursor;
159
160 let cert_chain = certs(&mut Cursor::new(cert_pem))
161 .collect::<std::result::Result<Vec<_>, _>>()
162 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
163
164 let mut keys = pkcs8_private_keys(&mut Cursor::new(key_pem))
165 .collect::<std::result::Result<Vec<_>, _>>()?;
166
167 if keys.is_empty() {
168 return Err(crate::error::Error::InternalServerError("No private keys found".to_string()));
169 }
170
171 let mut config = ServerConfig::builder()
172 .with_no_client_auth()
173 .with_single_cert(cert_chain, rustls::pki_types::PrivateKeyDer::Pkcs8(keys.remove(0)))
174 .map_err(|e| crate::error::Error::InternalServerError(e.to_string()))?;
175
176 config.alpn_protocols = vec![b"h3".to_vec()];
177
178 Ok(config)
179}