use super::wrapper::{H3BodyAdapter, VaneBody};
use crate::common::sys::lifecycle::{Error, Result};
use crate::layers::l7::{
container::{Container, PayloadState},
flow,
model::APPLICATION_REGISTRY,
};
use crate::resources::kv::KvStore;
use fancy_log::{LogLevel, log};
use bytes::{Buf, Bytes};
use h3::server::RequestStream;
use h3_quinn::quinn::Connection;
use http::{HeaderMap, Request, Response};
use http_body_util::BodyExt;
use tokio::sync::{mpsc, oneshot};
pub async fn handle_connection(quic_conn: Connection) -> Result<()> {
log(LogLevel::Debug, "➜ Starting L7 H3 Engine...");
let h3_quinn_conn = h3_quinn::Connection::new(quic_conn);
let mut h3_conn: h3::server::Connection<h3_quinn::Connection, bytes::Bytes> =
match h3::server::Connection::new(h3_quinn_conn).await {
Ok(driver) => driver,
Err(e) => {
return Err(Error::System(format!("H3 Protocol Handshake failed: {e}")));
}
};
loop {
match h3_conn.accept().await {
Ok(Some(resolver)) => {
log(LogLevel::Debug, "➜ Received new request resolver");
tokio::spawn(async move {
match resolver.resolve_request().await {
Ok((req, stream)) => {
if let Err(e) = serve_h3_request(req, stream).await {
log(LogLevel::Error, &format!("✗ H3 Request Error: {e:#}"));
}
}
Err(e) => {
log(
LogLevel::Error,
&format!("✗ Failed to resolve request: {e}"),
);
}
}
});
}
Ok(None) => break,
Err(e) => {
log(LogLevel::Warn, &format!("⚠ H3 Accept Error: {e}"));
break;
}
}
}
Ok(())
}
async fn serve_h3_request<T>(
req: Request<()>,
mut stream: RequestStream<T, bytes::Bytes>,
) -> anyhow::Result<()>
where
T: h3::quic::BidiStream<bytes::Bytes> + Send + Unpin + 'static,
{
let (mut parts, _) = req.into_parts();
let (body_tx, body_rx) = mpsc::channel::<Result<Bytes>>(32);
let mut body_tx = Some(body_tx);
let (res_tx, res_rx) = oneshot::channel::<Response<()>>();
let adapter = H3BodyAdapter::new(body_rx);
let boxed_body = adapter.map_err(|e| e).boxed();
let request_payload = PayloadState::Http(VaneBody::H3(boxed_body));
let response_payload = PayloadState::Empty;
let mut kv = KvStore::new();
kv.insert("req.proto".to_owned(), "h3".to_owned());
kv.insert("req.method".to_owned(), parts.method.to_string());
kv.insert("req.path".to_owned(), parts.uri.path().to_owned());
if let Some(q) = parts.uri.query() {
kv.insert("req.query".to_owned(), q.to_owned());
}
if let Some(host) = parts
.headers
.get("host")
.or_else(|| parts.headers.get(":authority"))
&& let Ok(h) = host.to_str()
{
kv.insert("req.host".to_owned(), h.to_owned());
}
let request_headers = std::mem::take(&mut parts.headers);
let response_headers = HeaderMap::new();
let mut container = Container::new(
kv,
request_headers,
request_payload,
response_headers,
response_payload,
Some(res_tx),
);
let config = {
let registry = APPLICATION_REGISTRY.load();
registry
.get("h3")
.or_else(|| registry.get("httpx"))
.map(|entry| entry.value().clone())
.ok_or_else(|| anyhow::anyhow!("No application config found for 'h3' or 'httpx'"))?
};
let flow_handle = tokio::spawn(async move {
if let Err(e) = flow::execute_l7(&config.pipeline, &mut container, String::new()).await {
log(LogLevel::Error, &format!("✗ L7 Flow Logic Failed: {e:#}"));
return None;
}
let body = super::httpx::extract_response_body_from_container(&mut container);
Some(body)
});
let mut res_rx = res_rx;
let mut flow_task = Some(flow_handle);
let mut response_body_stream: Option<http_body_util::combinators::BoxBody<Bytes, Error>> = None;
let mut request_finished = false;
let mut response_finished = false;
loop {
if request_finished && response_finished {
break;
}
tokio::select! {
recv_result = stream.recv_data(), if !request_finished => {
match recv_result {
Ok(Some(mut buf)) => {
let bytes = buf.copy_to_bytes(buf.remaining());
if let Some(tx) = body_tx.as_ref()
&& tx.send(Ok(bytes)).await.is_err() {
request_finished = true;
body_tx = None;
}
}
Ok(None) => {
request_finished = true;
body_tx = None;
}
Err(e) => {
if let Some(tx) = body_tx.as_ref() {
let _ = tx.send(Err(Error::System(e.to_string()))).await;
}
request_finished = true;
body_tx = None;
}
}
}
res_signal = &mut res_rx, if response_body_stream.is_none() && !response_finished => {
if let Ok(response) = res_signal {
if let Err(e) = stream.send_response(response).await {
log(LogLevel::Error, &format!("✗ Failed to send H3 headers: {e}"));
response_finished = true;
}
if let Some(task) = flow_task.take() {
if let Ok(Some(body)) = task.await {
response_body_stream = Some(body);
} else {
response_finished = true;
let _ = stream.finish().await;
}
} else {
response_finished = true;
let _ = stream.finish().await;
}
} else {
response_finished = true;
let _ = stream.finish().await;
}
}
frame_future = async {
if let Some(b) = response_body_stream.as_mut() {
b.frame().await
} else {
std::future::pending().await
}
}, if response_body_stream.is_some() && !response_finished => {
match frame_future {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data()
&& !data.is_empty()
&& let Err(e) = stream.send_data(data).await {
log(LogLevel::Warn, &format!("Failed to send H3 data: {e}"));
response_finished = true;
}
}
Some(Err(e)) => {
log(LogLevel::Error, &format!("Response Body Error: {e}"));
response_finished = true;
let _ = stream.finish().await;
}
None => {
response_finished = true;
let _ = stream.finish().await;
}
}
}
}
}
Ok(())
}