use core::{
fmt,
future::{Future, poll_fn},
marker::PhantomData,
net::SocketAddr,
pin::pin,
};
use ::h3::{
quic::SendStream,
server::{self, RequestStream},
};
use xitca_io::net::QuicStream;
use xitca_service::Service;
use xitca_unsafe_collection::futures::{Select, SelectOutput};
use crate::{
body::{Body, Frame},
bytes::Bytes,
error::HttpServiceError,
h3::{body::RequestBody, error::Error},
http::{Extension, Request, RequestExt, Response},
util::futures::Queue,
};
pub(crate) struct Dispatcher<'a, S, ReqB> {
io: QuicStream,
addr: SocketAddr,
service: &'a S,
_req_body: PhantomData<ReqB>,
}
impl<'a, S, ReqB, ResB, BE> Dispatcher<'a, S, ReqB>
where
S: Service<Request<RequestExt<ReqB>>, Response = Response<ResB>>,
S::Error: fmt::Debug,
ResB: Body<Data = Bytes, Error = BE>,
BE: fmt::Debug,
ReqB: From<RequestBody>,
{
pub(crate) fn new(io: QuicStream, addr: SocketAddr, service: &'a S) -> Self {
Self {
io,
addr,
service,
_req_body: PhantomData,
}
}
pub(crate) async fn run(self) -> Result<(), Error<S::Error, BE>> {
let conn = self.io.connecting().await?;
let conn = h3_quinn::Connection::new(conn);
let mut conn = server::Connection::new(conn).await?;
let mut queue = Queue::new();
loop {
match conn.accept().select(queue.next()).await {
SelectOutput::A(Ok(Some(req))) => {
queue.push(async move {
let (req, stream) = req.resolve_request().await?;
let (tx, rx) = stream.split();
let req = req.map(|_| {
let body = ReqB::from(RequestBody(rx));
RequestExt::from_parts(body, Extension::new(self.addr))
});
let fut = self.service.call(req);
h3_handler(fut, tx).await
});
}
SelectOutput::A(Ok(None)) => break,
SelectOutput::A(Err(e)) => return Err(e.into()),
SelectOutput::B(res) => {
if let Err(e) = res {
HttpServiceError::from(e).log("h3_dispatcher");
}
}
}
}
queue.drain().await;
Ok(())
}
}
async fn h3_handler<'a, Fut, C, ResB, SE, BE>(
fut: Fut,
mut stream: RequestStream<C, Bytes>,
) -> Result<(), Error<SE, BE>>
where
Fut: Future<Output = Result<Response<ResB>, SE>> + 'a,
C: SendStream<Bytes>,
ResB: Body<Data = Bytes, Error = BE>,
{
let (parts, body) = fut.await.map_err(Error::Service)?.into_parts();
let res = Response::from_parts(parts, ());
stream.send_response(res).await?;
let mut body = pin!(body);
while let Some(res) = poll_fn(|cx| body.as_mut().poll_frame(cx)).await {
match res.map_err(Error::Body)? {
Frame::Data(bytes) => stream.send_data(bytes).await?,
Frame::Trailers(trailers) => stream.send_trailers(trailers).await?,
}
}
stream.finish().await?;
Ok(())
}