use bytes::Bytes;
use tokio::sync::mpsc;
use tokio::sync::oneshot;
use crate::error::{Error, Result};
use crate::headers::Headers;
use crate::response::Response;
use crate::transport::h3::driver::DriverCommand;
use crate::transport::h3::H3Tunnel;
#[derive(Debug, Clone)]
pub struct H3Handle {
command_tx: mpsc::Sender<DriverCommand>,
is_draining: std::sync::Arc<std::sync::atomic::AtomicBool>,
}
impl H3Handle {
pub fn new(
command_tx: mpsc::Sender<DriverCommand>,
is_draining: std::sync::Arc<std::sync::atomic::AtomicBool>,
) -> Self {
Self {
command_tx,
is_draining,
}
}
pub fn is_closed(&self) -> bool {
self.command_tx.is_closed()
}
pub fn is_draining(&self) -> bool {
self.is_draining.load(std::sync::atomic::Ordering::SeqCst)
}
pub async fn send_request(
&self,
method: http::Method,
uri: &http::Uri,
headers: Vec<(String, String)>,
body: Option<Bytes>,
) -> Result<Response> {
let (response_tx, response_rx) = oneshot::channel();
let command = DriverCommand::SendRequest {
method,
uri: uri.clone(),
headers,
body,
response_tx,
};
self.command_tx
.send(command)
.await
.map_err(|_| Error::HttpProtocol("H3 Driver channel closed".into()))?;
let stream_response = response_rx
.await
.map_err(|_| Error::HttpProtocol("H3 Response channel closed".into()))??;
Ok(Response::new(
stream_response.status,
Headers::from(stream_response.headers),
stream_response.body,
"HTTP/3".to_string(),
))
}
pub async fn send_streaming_request(
&self,
method: http::Method,
uri: &http::Uri,
headers: Vec<(String, String)>,
body: Option<Bytes>,
) -> Result<(Response, mpsc::Receiver<Result<Bytes>>)> {
let (headers_tx, headers_rx) = oneshot::channel();
let (body_tx, body_rx) = mpsc::channel(32);
let (driver_body_tx, mut driver_body_rx) = mpsc::unbounded_channel();
tokio::spawn(async move {
while let Some(item) = driver_body_rx.recv().await {
if body_tx.send(item).await.is_err() {
break;
}
}
});
self.command_tx
.send(DriverCommand::SendStreamingRequest {
method,
uri: uri.clone(),
headers,
body,
headers_tx,
body_tx: driver_body_tx,
})
.await
.map_err(|_| Error::HttpProtocol("H3 Driver channel closed".into()))?;
let (status, headers) = headers_rx
.await
.map_err(|_| Error::HttpProtocol("H3 streaming headers channel closed".into()))??;
Ok((
Response::new(
status,
Headers::from(headers),
Bytes::new(),
"HTTP/3".to_string(),
),
body_rx,
))
}
pub async fn open_websocket_tunnel(
&self,
uri: http::Uri,
headers: Vec<(String, String)>,
) -> Result<H3Tunnel> {
let (response_tx, response_rx) = oneshot::channel();
self.command_tx
.send(DriverCommand::OpenWebSocketTunnel {
uri,
headers,
response_tx,
})
.await
.map_err(|_| Error::HttpProtocol("H3 Driver channel closed".into()))?;
response_rx
.await
.map_err(|_| Error::HttpProtocol("H3 tunnel response channel closed".into()))?
}
}