sec_http3/webtransport/
server.rs

1//! Provides the server side WebTransport session
2
3use std::{
4    marker::{PhantomData, Send},
5    pin::Pin,
6    sync::Mutex,
7    task::{Context, Poll},
8};
9
10use self::quic::SendStreamUnframed;
11use crate::{
12    connection::ConnectionState,
13    error::{Code, ErrorLevel},
14    ext::{Datagram, Protocol},
15    frame::FrameStream,
16    proto::frame::Frame,
17    quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf},
18    server::{self, Connection, RequestStream},
19    stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
20    webtransport::stream::{BidiStream, RecvStream, SendStream},
21    Error,
22};
23use bytes::Buf;
24use futures_util::{future::poll_fn, ready, Future};
25use http::{Method, Request, Response, StatusCode};
26
27use crate::webtransport::SessionId;
28use pin_project_lite::pin_project;
29
30/// WebTransport session driver.
31///
32/// Maintains the session using the underlying HTTP/3 connection.
33///
34/// Similar to [`crate::server::Connection`] it is generic over the QUIC implementation and Buffer.
35pub struct WebTransportSession<C, B>
36where
37    C: quic::Connection<B> + Send,
38    B: Buf + Send,
39{
40    // See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-2-3
41    session_id: SessionId,
42    /// The underlying HTTP/3 connection
43    server_conn: Mutex<Connection<C, B>>,
44    connect_stream: RequestStream<C::BidiStream, B>,
45    opener: Mutex<C::OpenStreams>,
46}
47
48impl<C, B> WebTransportSession<C, B>
49where
50    C: quic::Connection<B> + Send,
51    B: Buf + Send,
52{
53    /// Accepts a *CONNECT* request for establishing a WebTransport session.
54    ///
55    /// TODO: is the API or the user responsible for validating the CONNECT request?
56    pub async fn accept(
57        request: Request<()>,
58        mut stream: RequestStream<C::BidiStream, B>,
59        mut conn: Connection<C, B>,
60    ) -> Result<Self, Error> {
61        let shared = conn.shared_state().clone();
62        {
63            let config = shared.write("Read WebTransport support").peer_config;
64
65            if !config.enable_webtransport {
66                return Err(conn.close(
67                    Code::H3_SETTINGS_ERROR,
68                    "webtransport is not supported by client",
69                ));
70            }
71
72            if !config.enable_datagram {
73                return Err(conn.close(
74                    Code::H3_SETTINGS_ERROR,
75                    "datagrams are not supported by client",
76                ));
77            }
78        }
79
80        // The peer is responsible for validating our side of the webtransport support.
81        //
82        // However, it is still advantageous to show a log on the server as (attempting) to
83        // establish a WebTransportSession without the proper h3 config is usually a mistake.
84        if !conn.inner.config.enable_webtransport {
85            tracing::warn!("Server does not support webtransport");
86        }
87
88        if !conn.inner.config.enable_datagram {
89            tracing::warn!("Server does not support datagrams");
90        }
91
92        if !conn.inner.config.enable_extended_connect {
93            tracing::warn!("Server does not support CONNECT");
94        }
95
96        // Respond to the CONNECT request.
97
98        // See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.3
99        let response = if validate_wt_connect(&request) {
100            Response::builder()
101                // This is the only header that chrome cares about.
102                .header("sec-webtransport-http3-draft", "draft02")
103                .status(StatusCode::OK)
104                .body(())
105                .unwrap()
106        } else {
107            Response::builder()
108                .status(StatusCode::BAD_REQUEST)
109                .body(())
110                .unwrap()
111        };
112
113        stream.send_response(response).await?;
114
115        let session_id = stream.send_id().into();
116        let conn_inner = &mut conn.inner.conn;
117        let opener = Mutex::new(conn_inner.opener());
118
119        Ok(Self {
120            session_id,
121            opener,
122            server_conn: Mutex::new(conn),
123            connect_stream: stream,
124        })
125    }
126
127    /// Receive a datagram from the client
128    pub fn accept_datagram(&self) -> ReadDatagram<C, B> {
129        ReadDatagram {
130            conn: &self.server_conn,
131            _marker: PhantomData,
132        }
133    }
134
135    /// Sends a datagram
136    ///
137    /// TODO: maybe make async. `quinn` does not require an async send
138    pub fn send_datagram(&self, data: B) -> Result<(), Error>
139    where
140        C: SendDatagramExt<B>,
141    {
142        self.server_conn
143            .lock()
144            .unwrap()
145            .send_datagram(self.connect_stream.id(), data)?;
146
147        Ok(())
148    }
149
150    /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF.
151    pub fn accept_uni(&self) -> AcceptUni<C, B> {
152        AcceptUni {
153            conn: &self.server_conn,
154        }
155    }
156
157    /// Accepts an incoming bidirectional stream or request
158    pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, Error> {
159        // Get the next stream
160        // Accept the incoming stream
161        let stream = poll_fn(|cx| {
162            let mut conn = self.server_conn.lock().unwrap();
163            conn.poll_accept_request(cx)
164        })
165        .await;
166
167        let mut stream = match stream {
168            Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
169            Ok(None) => {
170                // FIXME: is proper HTTP GoAway shutdown required?
171                return Ok(None);
172            }
173            Err(err) => {
174                match err.kind() {
175                    crate::error::Kind::Closed => return Ok(None),
176                    crate::error::Kind::Application {
177                        code,
178                        reason,
179                        level: ErrorLevel::ConnectionError,
180                        ..
181                    } => {
182                        return Err(self.server_conn.lock().unwrap().close(
183                            code,
184                            reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
185                        ))
186                    }
187                    _ => return Err(err),
188                };
189            }
190        };
191
192        // Read the first frame.
193        //
194        // This will determine if it is a webtransport bi-stream or a request stream
195        let frame = poll_fn(|cx| stream.poll_next(cx)).await;
196
197        match frame {
198            Ok(None) => Ok(None),
199            Ok(Some(Frame::WebTransportStream(session_id))) => {
200                // Take the stream out of the framed reader and split it in half like Paul Allen
201                let stream = stream.into_inner();
202
203                Ok(Some(AcceptedBi::BidiStream(
204                    session_id,
205                    BidiStream::new(stream),
206                )))
207            }
208            // Make the underlying HTTP/3 connection handle the rest
209            frame => {
210                let req = {
211                    let mut conn = self.server_conn.lock().unwrap();
212                    conn.accept_with_frame(stream, frame)?
213                };
214                if let Some(req) = req {
215                    let (req, resp) = req.resolve().await?;
216                    Ok(Some(AcceptedBi::Request(req, resp)))
217                } else {
218                    Ok(None)
219                }
220            }
221        }
222    }
223
224    /// Open a new bidirectional stream
225    pub fn open_bi(&self, session_id: SessionId) -> OpenBi<C, B> {
226        OpenBi {
227            opener: &self.opener,
228            stream: None,
229            session_id,
230        }
231    }
232
233    /// Open a new unidirectional stream
234    pub fn open_uni(&self, session_id: SessionId) -> OpenUni<C, B> {
235        OpenUni {
236            opener: &self.opener,
237            stream: None,
238            session_id,
239        }
240    }
241
242    /// Returns the session id
243    pub fn session_id(&self) -> SessionId {
244        self.session_id
245    }
246}
247
248/// Streams are opened, but the initial webtransport header has not been sent
249type PendingStreams<C, B> = (
250    BidiStream<<C as quic::Connection<B>>::BidiStream, B>,
251    WriteBuf<&'static [u8]>,
252);
253
254/// Streams are opened, but the initial webtransport header has not been sent
255type PendingUniStreams<C, B> = (
256    SendStream<<C as quic::Connection<B>>::SendStream, B>,
257    WriteBuf<&'static [u8]>,
258);
259
260pin_project! {
261    /// Future for opening a bidi stream
262    pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
263        opener: &'a Mutex<C::OpenStreams>,
264        stream: Option<PendingStreams<C,B>>,
265        session_id: SessionId,
266    }
267}
268
269impl<'a, B, C> Future for OpenBi<'a, C, B>
270where
271    C: quic::Connection<B>,
272    B: Buf,
273    C::BidiStream: SendStreamUnframed<B>,
274{
275    type Output = Result<BidiStream<C::BidiStream, B>, Error>;
276
277    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
278        let mut p = self.project();
279        loop {
280            match &mut p.stream {
281                Some((stream, buf)) => {
282                    while buf.has_remaining() {
283                        ready!(stream.poll_send(cx, buf))?;
284                    }
285
286                    let (stream, _) = p.stream.take().unwrap();
287                    return Poll::Ready(Ok(stream));
288                }
289                None => {
290                    let mut opener = (*p.opener).lock().unwrap();
291                    // Open the stream first
292                    let res = ready!(opener.poll_open_bidi(cx))?;
293                    let stream = BidiStream::new(BufRecvStream::new(res));
294
295                    let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
296                    *p.stream = Some((stream, buf));
297                }
298            }
299        }
300    }
301}
302
303pin_project! {
304    /// Opens a unidirectional stream
305    pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
306        opener: &'a Mutex<C::OpenStreams>,
307        stream: Option<PendingUniStreams<C, B>>,
308        // Future for opening a uni stream
309        session_id: SessionId,
310    }
311}
312
313impl<'a, C, B> Future for OpenUni<'a, C, B>
314where
315    C: quic::Connection<B>,
316    B: Buf,
317    C::SendStream: SendStreamUnframed<B>,
318{
319    type Output = Result<SendStream<C::SendStream, B>, Error>;
320
321    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
322        let mut p = self.project();
323        loop {
324            match &mut p.stream {
325                Some((send, buf)) => {
326                    while buf.has_remaining() {
327                        ready!(send.poll_send(cx, buf))?;
328                    }
329                    let (send, buf) = p.stream.take().unwrap();
330                    assert!(!buf.has_remaining());
331                    return Poll::Ready(Ok(send));
332                }
333                None => {
334                    let mut opener = (*p.opener).lock().unwrap();
335                    let send = ready!(opener.poll_open_send(cx))?;
336                    let send = BufRecvStream::new(send);
337                    let send = SendStream::new(send);
338
339                    let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
340                    *p.stream = Some((send, buf));
341                }
342            }
343        }
344    }
345}
346
347/// An accepted incoming bidirectional stream.
348///
349/// Since
350pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
351    /// An incoming bidirectional stream
352    BidiStream(SessionId, BidiStream<C::BidiStream, B>),
353    /// An incoming HTTP/3 request, passed through a webtransport session.
354    ///
355    /// This makes it possible to respond to multiple CONNECT requests
356    Request(Request<()>, RequestStream<C::BidiStream, B>),
357}
358
359/// Future for [`Connection::read_datagram`]
360pub struct ReadDatagram<'a, C, B>
361where
362    C: quic::Connection<B>,
363    B: Buf,
364{
365    conn: &'a Mutex<Connection<C, B>>,
366    _marker: PhantomData<B>,
367}
368
369impl<'a, C, B> Future for ReadDatagram<'a, C, B>
370where
371    C: quic::Connection<B> + RecvDatagramExt,
372    B: Buf,
373{
374    type Output = Result<Option<(SessionId, C::Buf)>, Error>;
375
376    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377        let mut conn = self.conn.lock().unwrap();
378        match ready!(conn.inner.conn.poll_accept_datagram(cx))? {
379            Some(v) => {
380                let datagram = Datagram::decode(v)?;
381                Poll::Ready(Ok(Some((
382                    datagram.stream_id().into(),
383                    datagram.into_payload(),
384                ))))
385            }
386            None => Poll::Ready(Ok(None)),
387        }
388    }
389}
390
391/// Future for [`WebTransportSession::accept_uni`]
392pub struct AcceptUni<'a, C, B>
393where
394    C: quic::Connection<B>,
395    B: Buf,
396{
397    conn: &'a Mutex<server::Connection<C, B>>,
398}
399
400impl<'a, C, B> Future for AcceptUni<'a, C, B>
401where
402    C: quic::Connection<B>,
403    B: Buf,
404{
405    type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, Error>;
406
407    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
408        let mut conn = self.conn.lock().unwrap();
409        conn.inner.poll_accept_recv(cx)?;
410
411        // Get the currently available streams
412        let streams = conn.inner.accepted_streams_mut();
413        if let Some((id, stream)) = streams.wt_uni_streams.pop() {
414            return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
415        }
416
417        Poll::Pending
418    }
419}
420
421fn validate_wt_connect(request: &Request<()>) -> bool {
422    let protocol = request.extensions().get::<Protocol>();
423    matches!((request.method(), protocol), (&Method::CONNECT, Some(p)) if p == &Protocol::WEB_TRANSPORT)
424}