use std::{
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll},
};
use bytes::Buf;
use futures_util::{Future, future::poll_fn, ready};
use h3::{
ConnectionState, SharedState,
error::{
Code, ConnectionError, StreamError, connection_error_creators::CloseStream,
internal_error::InternalConnectionError,
},
frame::FrameStream,
proto::frame::Frame,
quic::{self, OpenStreams, WriteBuf},
server::{Connection, RequestStream},
};
use h3::{
quic::SendStreamUnframed,
stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
};
use h3_datagram::{
datagram_handler::{DatagramReader, DatagramSender, HandleDatagramsExt},
quic_traits,
};
use http::{Request, Response, StatusCode};
use h3::webtransport::SessionId;
use pin_project_lite::pin_project;
use super::stream::{BidiStream, RecvStream, SendStream};
pub struct WebTransportSession<C, B>
where
C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
Connection<C, B>: HandleDatagramsExt<C, B>,
B: Buf,
{
session_id: SessionId,
server_conn: Mutex<Connection<C, B>>,
connect_stream: RequestStream<C::BidiStream, B>,
opener: Mutex<C::OpenStreams>,
shared: Arc<SharedState>,
}
impl<C, B> ConnectionState for WebTransportSession<C, B>
where
C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
Connection<C, B>: HandleDatagramsExt<C, B>,
B: Buf,
{
fn shared_state(&self) -> &SharedState {
&self.shared
}
}
impl<C, B> WebTransportSession<C, B>
where
Connection<C, B>: HandleDatagramsExt<C, B>,
C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
B: Buf,
{
#[allow(clippy::type_complexity)]
pub fn split(self) -> (Mutex<Connection<C, B>>, RequestStream<C::BidiStream, B>) {
let WebTransportSession {
server_conn,
connect_stream,
..
} = self;
(server_conn, connect_stream)
}
pub async fn accept(
mut stream: RequestStream<C::BidiStream, B>,
mut conn: Connection<C, B>,
) -> Result<Self, StreamError> {
let shared = conn.inner.shared.clone();
let config = shared.settings();
if !config.enable_webtransport() {
return Err(StreamError::ConnectionError(
conn.inner
.handle_connection_error(InternalConnectionError::new(
Code::H3_SETTINGS_ERROR,
"webtransport is not supported by client".to_string(),
)),
));
}
if !config.enable_datagram() {
return Err(StreamError::ConnectionError(
conn.inner
.handle_connection_error(InternalConnectionError::new(
Code::H3_SETTINGS_ERROR,
"datagrams are not supported by client".to_string(),
)),
));
}
if !conn.inner.config.settings.enable_webtransport() {
tracing::warn!("Server does not support webtransport");
}
if !conn.inner.config.settings.enable_datagram() {
tracing::warn!("Server does not support datagrams");
}
if !conn.inner.config.settings.enable_extended_connect() {
tracing::warn!("Server does not support CONNECT");
}
let response = Response::builder()
.header("sec-webtransport-http3-draft", "draft02")
.status(StatusCode::OK)
.body(())
.unwrap();
stream.send_response(response).await?;
let session_id = stream.send_id().into();
let conn_inner = &mut conn.inner.conn;
let opener = Mutex::new(conn_inner.opener());
Ok(Self {
session_id,
opener,
server_conn: Mutex::new(conn),
connect_stream: stream,
shared,
})
}
pub fn datagram_reader(&self) -> DatagramReader<C::RecvDatagramHandler> {
self.server_conn.lock().unwrap().get_datagram_reader()
}
pub fn datagram_sender(&self) -> DatagramSender<C::SendDatagramHandler, B> {
self.server_conn
.lock()
.unwrap()
.get_datagram_sender(self.connect_stream.send_id())
}
pub fn accept_uni(&self) -> AcceptUni<'_, C, B> {
AcceptUni {
conn: &self.server_conn,
}
}
pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, StreamError> {
let stream = poll_fn(|cx| {
let mut conn = self.server_conn.lock().unwrap();
conn.poll_accept_request_stream(cx)
})
.await;
let stream = match stream {
Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
Ok(None) => {
return Ok(None);
}
Err(err) => return Err(StreamError::ConnectionError(err)),
};
let mut resolver = { self.server_conn.lock().unwrap().create_resolver(stream) };
let frame = poll_fn(|cx| resolver.frame_stream.poll_next(cx)).await;
match frame {
Ok(None) => Ok(None),
Ok(Some(Frame::WebTransportStream(session_id))) => {
let stream = resolver.frame_stream.into_inner();
Ok(Some(AcceptedBi::BidiStream(
session_id,
BidiStream::new(stream),
)))
}
frame => {
let (req, resp) = resolver.accept_with_frame(frame)?.resolve().await?;
Ok(Some(AcceptedBi::Request(Box::new(req), resp)))
}
}
}
pub fn open_bi(&self, session_id: SessionId) -> OpenBi<'_, C, B> {
OpenBi {
opener: &self.opener,
stream: None,
session_id,
stream_handler: WTransportStreamHandler {
shared: self.shared.clone(),
},
}
}
pub fn open_uni(&self, session_id: SessionId) -> OpenUni<'_, C, B> {
OpenUni {
opener: &self.opener,
stream: None,
session_id,
stream_handler: WTransportStreamHandler {
shared: self.shared.clone(),
},
}
}
pub fn session_id(&self) -> SessionId {
self.session_id
}
}
type PendingStreams<C, B> = (
BidiStream<<C as quic::OpenStreams<B>>::BidiStream, B>,
WriteBuf<&'static [u8]>,
);
type PendingUniStreams<C, B> = (
SendStream<<C as quic::OpenStreams<B>>::SendStream, B>,
WriteBuf<&'static [u8]>,
);
pin_project! {
pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
opener: &'a Mutex<C::OpenStreams>,
stream: Option<PendingStreams<C,B>>,
session_id: SessionId,
stream_handler: WTransportStreamHandler,
}
}
struct WTransportStreamHandler {
shared: Arc<SharedState>,
}
impl ConnectionState for WTransportStreamHandler {
fn shared_state(&self) -> &SharedState {
&self.shared
}
}
impl CloseStream for WTransportStreamHandler {}
impl<'a, B, C> Future for OpenBi<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
C::BidiStream: SendStreamUnframed<B>,
{
type Output = Result<BidiStream<C::BidiStream, B>, StreamError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
loop {
match &mut p.stream {
Some((stream, buf)) => {
while buf.has_remaining() {
ready!(stream.poll_send(cx, buf))
.map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
}
let (stream, _) = p.stream.take().unwrap();
return Poll::Ready(Ok(stream));
}
None => {
let mut opener = (*p.opener).lock().unwrap();
let res = ready!(opener.poll_open_bidi(cx))
.map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
let stream = BidiStream::new(BufRecvStream::new(res));
let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
*p.stream = Some((stream, buf));
}
}
}
}
}
pin_project! {
pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
opener: &'a Mutex<C::OpenStreams>,
stream: Option<PendingUniStreams<C, B>>,
session_id: SessionId,
stream_handler: WTransportStreamHandler
}
}
impl<'a, C, B> Future for OpenUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
C::SendStream: SendStreamUnframed<B>,
{
type Output = Result<SendStream<C::SendStream, B>, StreamError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
loop {
match &mut p.stream {
Some((send, buf)) => {
while buf.has_remaining() {
ready!(send.poll_send(cx, buf))
.map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
}
let (send, buf) = p.stream.take().unwrap();
assert!(!buf.has_remaining());
return Poll::Ready(Ok(send));
}
None => {
let mut opener = (*p.opener).lock().unwrap();
let send = ready!(opener.poll_open_send(cx))
.map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
let send = BufRecvStream::new(send);
let send = SendStream::new(send);
let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
*p.stream = Some((send, buf));
}
}
}
}
}
pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
BidiStream(SessionId, BidiStream<C::BidiStream, B>),
Request(Box<Request<()>>, RequestStream<C::BidiStream, B>),
}
pub struct AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
conn: &'a Mutex<Connection<C, B>>,
}
impl<'a, C, B> Future for AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, ConnectionError>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut conn = self.conn.lock().unwrap();
conn.inner.poll_accept_recv(cx)?;
let streams = conn.inner.accepted_streams_mut();
if let Some((id, stream)) = streams.wt_uni_streams.pop() {
return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
}
Poll::Pending
}
}