extern crate bytes as bytes_crate;
extern crate http as http_crate;
use bytes_crate::Bytes;
use h3::client::{Connection as H3Connection, RequestStream, SendRequest};
use h3::error::{Code, ConnectionError};
use crate::cx::Cx;
use crate::net::quic::QuicConnection;
use super::body::H3Body;
use super::error::H3Error;
use http_crate::{Request, Response};
pub struct H3Client {
quic: QuicConnection,
send_request: H3SendRequest,
}
pub struct H3Driver {
connection: H3DriverInner,
}
type H3QuinnConnection = h3_quinn::Connection;
type H3SendRequest = SendRequest<h3_quinn::OpenStreams, Bytes>;
pub(crate) type H3RequestStream = RequestStream<h3_quinn::BidiStream<Bytes>, Bytes>;
type H3DriverInner = H3Connection<H3QuinnConnection, Bytes>;
impl H3Client {
pub async fn new(cx: &Cx, quic: QuicConnection) -> Result<(Self, H3Driver), H3Error> {
cx.checkpoint()?;
let h3_conn = h3_quinn::Connection::new(quic.inner().clone());
let (connection, send_request) = h3::client::new(h3_conn).await?;
Ok((Self { quic, send_request }, H3Driver { connection }))
}
#[must_use]
pub fn quic(&self) -> &QuicConnection {
&self.quic
}
pub async fn request(
&mut self,
cx: &Cx,
request: Request<()>,
) -> Result<Response<H3Body>, H3Error> {
cx.checkpoint()?;
let mut stream = self.send_request.send_request(request).await?;
if let Err(err) = cx.checkpoint() {
cancel_request(&mut stream);
return Err(err.into());
}
stream.finish().await?;
if let Err(err) = cx.checkpoint() {
cancel_request(&mut stream);
return Err(err.into());
}
let response = stream.recv_response().await?;
Ok(response.map(|_| H3Body::new(stream)))
}
pub async fn request_with_body(
&mut self,
cx: &Cx,
request: Request<Bytes>,
) -> Result<Response<H3Body>, H3Error> {
cx.checkpoint()?;
let (parts, body) = request.into_parts();
let request = Request::from_parts(parts, ());
let mut stream = self.send_request.send_request(request).await?;
if let Err(err) = cx.checkpoint() {
cancel_request(&mut stream);
return Err(err.into());
}
if !body.is_empty() {
stream.send_data(body).await?;
}
if let Err(err) = cx.checkpoint() {
cancel_request(&mut stream);
return Err(err.into());
}
stream.finish().await?;
if let Err(err) = cx.checkpoint() {
cancel_request(&mut stream);
return Err(err.into());
}
let response = stream.recv_response().await?;
Ok(response.map(|_| H3Body::new(stream)))
}
}
impl H3Driver {
pub async fn run(mut self) -> Result<(), H3Error> {
let close = std::future::poll_fn(|cx| self.connection.poll_close(cx)).await;
map_close_result(close)
}
pub async fn shutdown(&mut self, max_push_id: usize) -> Result<(), H3Error> {
self.connection.shutdown(max_push_id).await?;
Ok(())
}
pub async fn wait_idle(&mut self) -> Result<(), H3Error> {
let close = self.connection.wait_idle().await;
map_close_result(close)
}
}
fn cancel_request(stream: &mut H3RequestStream) {
let _ = stream.stop_stream(Code::H3_REQUEST_CANCELLED);
let _ = stream.stop_sending(Code::H3_REQUEST_CANCELLED);
}
fn map_close_result(err: ConnectionError) -> Result<(), H3Error> {
if err.is_h3_no_error() {
Ok(())
} else {
Err(err.into())
}
}