use std::{
marker::{PhantomData, Send},
pin::Pin,
sync::Mutex,
task::{Context, Poll},
};
use bytes::Buf;
use futures_util::{future::poll_fn, ready, Future};
use http::{Request, Response, StatusCode};
use pin_project_lite::pin_project;
use crate::{
connection::ConnectionState,
error::{Code, ErrorLevel},
ext::Datagram,
frame::FrameStream,
proto::frame::Frame,
quic::SendStreamUnframed,
quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf},
server::{self, Connection, RequestStream},
stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
Error,
};
use super::{
stream::{BidiStream, RecvStream, SendStream},
SessionId,
};
pub struct WebTransportSession<C, B>
where
C: quic::Connection<B> + Send,
B: Buf,
{
session_id: SessionId,
server_conn: Mutex<Connection<C, B>>,
connect_stream: RequestStream<C::BidiStream, B>,
opener: Mutex<C::OpenStreams>,
}
#[allow(clippy::future_not_send)]
impl<C, B> WebTransportSession<C, B>
where
C: quic::Connection<B> + Send,
B: Buf,
{
#[allow(clippy::type_complexity)]
pub fn split(self) -> (Mutex<Connection<C, B>>, RequestStream<C::BidiStream, B>) {
let WebTransportSession {
server_conn,
connect_stream,
..
} = self;
(server_conn, connect_stream)
}
pub async fn accept(
mut stream: RequestStream<C::BidiStream, B>,
mut conn: Connection<C, B>,
) -> Result<Self, Error> {
let shared = conn.shared_state().clone();
{
let config = shared.write("Read WebTransport support").peer_config;
if !config.enable_webtransport() {
return Err(conn.close(
Code::H3_SETTINGS_ERROR,
"webtransport is not supported by client",
));
}
if !config.enable_datagram() {
return Err(conn.close(
Code::H3_SETTINGS_ERROR,
"datagrams are not supported by client",
));
}
}
if !conn.inner.config.settings.enable_webtransport() {
tracing::warn!("Server does not support webtransport");
}
if !conn.inner.config.settings.enable_datagram() {
tracing::warn!("Server does not support datagrams");
}
if !conn.inner.config.settings.enable_extended_connect() {
tracing::warn!("Server does not support CONNECT");
}
let response = Response::builder()
.header("sec-webtransport-http3-draft", "draft02")
.status(StatusCode::OK)
.body(())
.unwrap();
stream.send_response(response).await?;
let session_id = stream.send_id().into();
let conn_inner = &mut conn.inner.conn;
let opener = Mutex::new(conn_inner.opener());
Ok(Self {
session_id,
opener,
server_conn: Mutex::new(conn),
connect_stream: stream,
})
}
pub fn accept_datagram(&self) -> ReadDatagram<C, B> {
ReadDatagram::new(&self.server_conn)
}
pub fn send_datagram(&self, data: B) -> Result<(), Error>
where
C: SendDatagramExt<B>,
{
self.server_conn
.lock()
.unwrap()
.send_datagram(self.connect_stream.id(), data)?;
Ok(())
}
pub fn accept_uni(&self) -> AcceptUni<C, B> {
AcceptUni::new(&self.server_conn)
}
pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, Error> {
let stream = poll_fn(|cx| {
let mut conn = self.server_conn.lock().unwrap();
conn.poll_accept_request(cx)
})
.await;
let mut stream = match stream {
Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
Ok(None) => {
return Ok(None);
}
Err(err) => {
match err.kind() {
crate::error::Kind::Closed => return Ok(None),
crate::error::Kind::Application {
code,
reason,
level: ErrorLevel::ConnectionError,
..
} => {
return Err(self.server_conn.lock().unwrap().close(
code,
reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
))
}
_ => return Err(err),
};
}
};
let frame = poll_fn(|cx| stream.poll_next(cx)).await;
match frame {
Ok(None) => Ok(None),
Ok(Some(Frame::WebTransportStream(session_id))) => {
let stream = stream.into_inner();
Ok(Some(AcceptedBi::BidiStream(
session_id,
BidiStream::new(stream),
)))
}
frame => {
let req = {
let mut conn = self.server_conn.lock().unwrap();
conn.accept_with_frame(stream, frame)?
};
if let Some(req) = req {
let (req, resp) = req.resolve().await?;
Ok(Some(AcceptedBi::Request(req, resp)))
} else {
Ok(None)
}
}
}
}
pub fn open_bi(&self, session_id: SessionId) -> OpenBi<C, B> {
OpenBi::new(&self.opener, session_id)
}
pub fn open_uni(&self, session_id: SessionId) -> OpenUni<C, B> {
OpenUni::new(&self.opener, session_id)
}
pub fn session_id(&self) -> SessionId {
self.session_id
}
}
type PendingStreams<C, B> = (
BidiStream<<C as quic::Connection<B>>::BidiStream, B>,
WriteBuf<&'static [u8]>,
);
type PendingUniStreams<C, B> = (
SendStream<<C as quic::Connection<B>>::SendStream, B>,
WriteBuf<&'static [u8]>,
);
pin_project! {
pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
opener: &'a Mutex<C::OpenStreams>,
stream: Option<PendingStreams<C,B>>,
session_id: SessionId,
}
}
impl<'a, C: quic::Connection<B>, B: Buf> OpenBi<'a, C, B> {
#[allow(missing_docs)]
pub fn new(opener: &'a Mutex<C::OpenStreams>, session_id: SessionId) -> Self {
Self {
opener,
stream: None,
session_id,
}
}
#[allow(missing_docs)]
pub fn with_stream(mut self, stream: impl Into<Option<PendingStreams<C, B>>>) -> Self {
self.stream = stream.into();
self
}
}
impl<'a, B, C> Future for OpenBi<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
C::BidiStream: SendStreamUnframed<B>,
{
type Output = Result<BidiStream<C::BidiStream, B>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
loop {
match &mut p.stream {
Some((stream, buf)) => {
while buf.has_remaining() {
ready!(stream.poll_send(cx, buf))?;
}
let (stream, _) = p.stream.take().unwrap();
return Poll::Ready(Ok(stream));
}
None => {
let mut opener = (*p.opener).lock().unwrap();
let res = ready!(opener.poll_open_bidi(cx))?;
let stream = BidiStream::new(BufRecvStream::new(res));
let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
*p.stream = Some((stream, buf));
}
}
}
}
}
pin_project! {
pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
opener: &'a Mutex<C::OpenStreams>,
stream: Option<PendingUniStreams<C, B>>,
session_id: SessionId,
}
}
impl<'a, C: quic::Connection<B>, B: Buf> OpenUni<'a, C, B> {
#[allow(missing_docs)]
pub fn new(opener: &'a Mutex<C::OpenStreams>, session_id: SessionId) -> Self {
Self {
opener,
stream: None,
session_id,
}
}
#[allow(missing_docs)]
pub fn with_stream(mut self, stream: impl Into<Option<PendingUniStreams<C, B>>>) -> Self {
self.stream = stream.into();
self
}
}
impl<'a, C, B> Future for OpenUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
C::SendStream: SendStreamUnframed<B>,
{
type Output = Result<SendStream<C::SendStream, B>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut p = self.project();
loop {
match &mut p.stream {
Some((send, buf)) => {
while buf.has_remaining() {
ready!(send.poll_send(cx, buf))?;
}
let (send, buf) = p.stream.take().unwrap();
assert!(!buf.has_remaining());
return Poll::Ready(Ok(send));
}
None => {
let mut opener = (*p.opener).lock().unwrap();
let send = ready!(opener.poll_open_send(cx))?;
let send = BufRecvStream::new(send);
let send = SendStream::new(send);
let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
*p.stream = Some((send, buf));
}
}
}
}
}
pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
BidiStream(SessionId, BidiStream<C::BidiStream, B>),
Request(Request<()>, RequestStream<C::BidiStream, B>),
}
pub struct ReadDatagram<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
conn: &'a Mutex<Connection<C, B>>,
_marker: PhantomData<B>,
}
impl<'a, C, B> ReadDatagram<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
#[allow(missing_docs)]
pub fn new(conn: &'a Mutex<Connection<C, B>>) -> Self {
Self {
conn,
_marker: PhantomData,
}
}
}
impl<'a, C, B> Future for ReadDatagram<'a, C, B>
where
C: quic::Connection<B> + RecvDatagramExt,
B: Buf,
{
type Output = Result<Option<(SessionId, C::Buf)>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut conn = self.conn.lock().unwrap();
match ready!(conn.inner.conn.poll_accept_datagram(cx))? {
Some(v) => {
let datagram = Datagram::decode(v)?;
Poll::Ready(Ok(Some((
datagram.stream_id().into(),
datagram.into_payload(),
))))
}
None => Poll::Ready(Ok(None)),
}
}
}
pub struct AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
conn: &'a Mutex<Connection<C, B>>,
}
impl<'a, C, B> AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
#[allow(missing_docs)]
pub fn new(conn: &'a Mutex<server::Connection<C, B>>) -> Self {
Self { conn }
}
}
impl<'a, C, B> Future for AcceptUni<'a, C, B>
where
C: quic::Connection<B>,
B: Buf,
{
type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, Error>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut conn = self.conn.lock().unwrap();
conn.inner.poll_accept_recv(cx)?;
let streams = conn.inner.accepted_streams_mut();
if let Some((id, stream)) = streams.wt_uni_streams.pop() {
return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
}
Poll::Pending
}
}