use super::quic_pool;
use crate::common::sys::lifecycle::{Error, Result};
use crate::layers::l7::{
container::{Container, PayloadState},
http::wrapper::{H3BodyAdapter, VaneBody},
};
use bytes::Buf;
use fancy_log::{LogLevel, log};
use http::{Request, Uri};
use http_body_util::{BodyExt, Full, combinators::BoxBody};
use std::str::FromStr;
use tokio::sync::mpsc;
pub async fn execute_quinn_request(
container: &mut Container,
url_str: &str,
method_str: Option<&str>,
skip_verify: bool,
) -> Result<()> {
let uri =
Uri::from_str(url_str).map_err(|e| Error::Configuration(format!("Invalid URL: {e}")))?;
let host = uri
.host()
.ok_or_else(|| Error::Configuration("URL missing host".into()))?;
let port = uri.port_u16().unwrap_or(443);
let mut send_request = quic_pool::get_or_create_connection(host, port, skip_verify).await?;
let req_method = if let Some(m) = method_str {
http::Method::from_str(m).unwrap_or(http::Method::GET)
} else {
container
.kv
.get("req.method")
.and_then(|m| http::Method::from_str(m).ok())
.unwrap_or(http::Method::GET)
};
let mut req = Request::builder().method(req_method).uri(uri);
if let Some(headers) = req.headers_mut() {
*headers = container.request_headers.clone();
headers.remove(http::header::HOST);
}
let request = req.body(()).unwrap();
let stream = send_request
.send_request(request)
.await
.map_err(|e| Error::System(format!("Failed to send H3 headers: {e}")))?;
let (mut driver_send, mut driver_recv) = stream.split();
let req_payload = std::mem::replace(&mut container.request_body, PayloadState::Empty);
let req_body_stream: Option<BoxBody<bytes::Bytes, Error>> = match req_payload {
PayloadState::Http(vane_body) => Some(vane_body.boxed()),
PayloadState::Buffered(bytes, _guard) => Some(Full::new(bytes).map_err(|e| match e {}).boxed()),
_ => None,
};
if let Some(mut body) = req_body_stream {
tokio::spawn(async move {
loop {
match body.frame().await {
Some(Ok(frame)) => {
if let Ok(data) = frame.into_data()
&& !data.is_empty()
&& let Err(e) = driver_send.send_data(data).await
{
log(LogLevel::Warn, &format!("⚠ H3 Upload interrupted: {e}"));
break;
}
}
Some(Err(e)) => {
log(LogLevel::Error, &format!("✗ H3 Request Read Error: {e}"));
break;
}
None => {
break;
}
}
}
let _ = driver_send.finish().await;
});
} else {
let _ = driver_send.finish().await;
}
let response = match driver_recv.recv_response().await {
Ok(res) => res,
Err(e) => {
return Err(Error::System(format!("Failed to receive H3 response: {e}")));
}
};
let status = response.status();
log(
LogLevel::Debug,
&format!("✓ H3 Upstream Responded: {status}"),
);
container
.kv
.insert("res.status".to_owned(), status.as_u16().to_string());
container.response_headers = response.headers().clone();
let (res_body_tx, res_body_rx) = mpsc::channel::<Result<bytes::Bytes>>(32);
tokio::spawn(async move {
loop {
match driver_recv.recv_data().await {
Ok(Some(mut chunk)) => {
let bytes = chunk.copy_to_bytes(chunk.remaining());
if res_body_tx.send(Ok(bytes)).await.is_err() {
driver_recv.stop_sending(h3::error::Code::H3_REQUEST_CANCELLED);
break;
}
}
Ok(None) => {
break;
}
Err(e) => {
log(LogLevel::Error, &format!("✗ H3 Download Error: {e}"));
let _ = res_body_tx.send(Err(Error::System(e.to_string()))).await;
break;
}
}
}
});
let adapter = H3BodyAdapter::new(res_body_rx);
container.response_body = PayloadState::Http(VaneBody::H3(adapter.boxed()));
Ok(())
}