use bytes::Bytes;
use tokio::sync::mpsc;
use crate::error::{Error, Result};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct H3TunnelOutbound {
pub bytes: Bytes,
pub fin: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum H3TunnelEvent {
Data(Bytes),
EndStream,
Reset(String),
GoAway { id: u64 },
}
#[derive(Debug)]
pub struct H3Tunnel {
outbound_tx: mpsc::Sender<H3TunnelOutbound>,
inbound_rx: mpsc::Receiver<Result<H3TunnelEvent>>,
}
impl H3Tunnel {
pub fn new(
outbound_tx: mpsc::Sender<H3TunnelOutbound>,
inbound_rx: mpsc::Receiver<Result<H3TunnelEvent>>,
) -> Self {
Self {
outbound_tx,
inbound_rx,
}
}
pub async fn send_bytes(&self, bytes: Bytes, fin: bool) -> Result<()> {
self.outbound_tx
.send(H3TunnelOutbound { bytes, fin })
.await
.map_err(|_| Error::HttpProtocol("H3 tunnel outbound channel closed".into()))
}
pub async fn close_send(&self) -> Result<()> {
self.send_bytes(Bytes::new(), true).await
}
pub async fn recv_event(&mut self) -> Option<Result<H3TunnelEvent>> {
self.inbound_rx.recv().await
}
pub async fn recv_bytes(&mut self) -> Option<Result<Bytes>> {
loop {
match self.recv_event().await? {
Ok(H3TunnelEvent::Data(bytes)) => return Some(Ok(bytes)),
Ok(H3TunnelEvent::EndStream) => return None,
Ok(H3TunnelEvent::Reset(reason)) => {
return Some(Err(Error::HttpProtocol(format!(
"H3 tunnel reset: {reason}"
))));
}
Ok(H3TunnelEvent::GoAway { id }) => {
return Some(Err(Error::HttpProtocol(format!(
"H3 tunnel closed by GOAWAY id={id}"
))));
}
Err(err) => return Some(Err(err)),
}
}
}
}