salvo_http3/webtransport/
server.rs

1//! Provides the server side WebTransport session
2
3use std::{
4    pin::Pin,
5    sync::{Arc, Mutex},
6    task::{Context, Poll},
7};
8
9use bytes::Buf;
10use futures_util::{Future, future::poll_fn, ready};
11use h3::{
12    ConnectionState, SharedState,
13    error::{
14        Code, ConnectionError, StreamError, connection_error_creators::CloseStream,
15        internal_error::InternalConnectionError,
16    },
17    frame::FrameStream,
18    proto::frame::Frame,
19    quic::{self, OpenStreams, WriteBuf},
20    server::{Connection, RequestStream},
21};
22use h3::{
23    quic::SendStreamUnframed,
24    stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
25};
26use h3_datagram::{
27    datagram_handler::{DatagramReader, DatagramSender, HandleDatagramsExt},
28    quic_traits,
29};
30use http::{Request, Response, StatusCode};
31
32use h3::webtransport::SessionId;
33use pin_project_lite::pin_project;
34
35use super::stream::{BidiStream, RecvStream, SendStream};
36
37/// WebTransport session driver.
38///
39/// Maintains the session using the underlying HTTP/3 connection.
40///
41/// Similar to [`h3::server::Connection`](https://docs.rs/h3/latest/h3/server/struct.Connection.html) it is generic over the QUIC implementation and Buffer.
42pub struct WebTransportSession<C, B>
43where
44    C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
45    Connection<C, B>: HandleDatagramsExt<C, B>,
46    B: Buf,
47{
48    // See: https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-2-3
49    session_id: SessionId,
50    /// The underlying HTTP/3 connection
51    server_conn: Mutex<Connection<C, B>>,
52    connect_stream: RequestStream<C::BidiStream, B>,
53    opener: Mutex<C::OpenStreams>,
54    /// Shared State
55    ///
56    /// Shared state is already in server_conn, but with this it is not necessary to lock the mutex
57    shared: Arc<SharedState>,
58}
59
60impl<C, B> ConnectionState for WebTransportSession<C, B>
61where
62    C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
63    Connection<C, B>: HandleDatagramsExt<C, B>,
64    B: Buf,
65{
66    fn shared_state(&self) -> &SharedState {
67        &self.shared
68    }
69}
70
71impl<C, B> WebTransportSession<C, B>
72where
73    Connection<C, B>: HandleDatagramsExt<C, B>,
74    C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
75    B: Buf,
76{
77    /// Split the session into the underlying connection and stream.
78    #[allow(clippy::type_complexity)]
79    pub fn split(self) -> (Mutex<Connection<C, B>>, RequestStream<C::BidiStream, B>) {
80        let WebTransportSession {
81            server_conn,
82            connect_stream,
83            ..
84        } = self;
85
86        (server_conn, connect_stream)
87    }
88
89    /// Accepts a *CONNECT* request for establishing a WebTransport session.
90    ///
91    /// TODO: is the API or the user responsible for validating the CONNECT request?
92    pub async fn accept(
93        mut stream: RequestStream<C::BidiStream, B>,
94        mut conn: Connection<C, B>,
95    ) -> Result<Self, StreamError> {
96        let shared = conn.inner.shared.clone();
97        let config = shared.settings();
98
99        if !config.enable_webtransport() {
100            return Err(StreamError::ConnectionError(
101                conn.inner
102                    .handle_connection_error(InternalConnectionError::new(
103                        Code::H3_SETTINGS_ERROR,
104                        "webtransport is not supported by client".to_string(),
105                    )),
106            ));
107        }
108
109        if !config.enable_datagram() {
110            return Err(StreamError::ConnectionError(
111                conn.inner
112                    .handle_connection_error(InternalConnectionError::new(
113                        Code::H3_SETTINGS_ERROR,
114                        "datagrams are not supported by client".to_string(),
115                    )),
116            ));
117        }
118
119        // The peer is responsible for validating our side of the webtransport support.
120        //
121        // However, it is still advantageous to show a log on the server as (attempting) to
122        // establish a WebTransportSession without the proper h3 config is usually a mistake.
123        if !conn.inner.config.settings.enable_webtransport() {
124            tracing::warn!("Server does not support webtransport");
125        }
126
127        if !conn.inner.config.settings.enable_datagram() {
128            tracing::warn!("Server does not support datagrams");
129        }
130
131        if !conn.inner.config.settings.enable_extended_connect() {
132            tracing::warn!("Server does not support CONNECT");
133        }
134
135        // Respond to the CONNECT request.
136
137        //= https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.3
138        let response = Response::builder()
139            // This is the only header that chrome cares about.
140            .header("sec-webtransport-http3-draft", "draft02")
141            .status(StatusCode::OK)
142            .body(())
143            .unwrap();
144
145        stream.send_response(response).await?;
146
147        let session_id = stream.send_id().into();
148        let conn_inner = &mut conn.inner.conn;
149        let opener = Mutex::new(conn_inner.opener());
150
151        Ok(Self {
152            session_id,
153            opener,
154            server_conn: Mutex::new(conn),
155            connect_stream: stream,
156            shared,
157        })
158    }
159
160    /// Receive a datagram from the client
161    pub fn datagram_reader(&self) -> DatagramReader<C::RecvDatagramHandler> {
162        self.server_conn.lock().unwrap().get_datagram_reader()
163    }
164
165    /// Sends a datagram
166    ///
167    /// TODO: maybe make async. `quinn` does not require an async send
168    pub fn datagram_sender(&self) -> DatagramSender<C::SendDatagramHandler, B> {
169        self.server_conn
170            .lock()
171            .unwrap()
172            .get_datagram_sender(self.connect_stream.send_id())
173    }
174
175    /// Accept an incoming unidirectional stream from the client, it reads the stream until EOF.
176    pub fn accept_uni(&self) -> AcceptUni<'_, C, B> {
177        AcceptUni {
178            conn: &self.server_conn,
179        }
180    }
181
182    /// Accepts an incoming bidirectional stream or request
183    pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, StreamError> {
184        let stream = poll_fn(|cx| {
185            let mut conn = self.server_conn.lock().unwrap();
186            conn.poll_accept_request_stream(cx)
187        })
188        .await;
189
190        let stream = match stream {
191            Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
192            Ok(None) => {
193                // FIXME: is proper HTTP GoAway shutdown required?
194                return Ok(None);
195            }
196            Err(err) => return Err(StreamError::ConnectionError(err)),
197        };
198
199        let mut resolver = { self.server_conn.lock().unwrap().create_resolver(stream) };
200        // Read the first frame.
201        //
202        // This will determine if it is a webtransport bi-stream or a request stream
203        let frame = poll_fn(|cx| resolver.frame_stream.poll_next(cx)).await;
204
205        match frame {
206            Ok(None) => Ok(None),
207            Ok(Some(Frame::WebTransportStream(session_id))) => {
208                // Take the stream out of the framed reader and split it in half like Paul Allen
209                let stream = resolver.frame_stream.into_inner();
210                Ok(Some(AcceptedBi::BidiStream(
211                    session_id,
212                    BidiStream::new(stream),
213                )))
214            }
215            // Make the underlying HTTP/3 connection handle the rest
216            frame => {
217                let (req, resp) = resolver.accept_with_frame(frame)?.resolve().await?;
218                Ok(Some(AcceptedBi::Request(Box::new(req), resp)))
219            }
220        }
221    }
222
223    /// Open a new bidirectional stream
224    pub fn open_bi(&self, session_id: SessionId) -> OpenBi<'_, C, B> {
225        OpenBi {
226            opener: &self.opener,
227            stream: None,
228            session_id,
229            stream_handler: WTransportStreamHandler {
230                shared: self.shared.clone(),
231            },
232        }
233    }
234
235    /// Open a new unidirectional stream
236    pub fn open_uni(&self, session_id: SessionId) -> OpenUni<'_, C, B> {
237        OpenUni {
238            opener: &self.opener,
239            stream: None,
240            session_id,
241            stream_handler: WTransportStreamHandler {
242                shared: self.shared.clone(),
243            },
244        }
245    }
246
247    /// Returns the session id
248    pub fn session_id(&self) -> SessionId {
249        self.session_id
250    }
251}
252
253/// Streams are opened, but the initial webtransport header has not been sent
254type PendingStreams<C, B> = (
255    BidiStream<<C as quic::OpenStreams<B>>::BidiStream, B>,
256    WriteBuf<&'static [u8]>,
257);
258
259/// Streams are opened, but the initial webtransport header has not been sent
260type PendingUniStreams<C, B> = (
261    SendStream<<C as quic::OpenStreams<B>>::SendStream, B>,
262    WriteBuf<&'static [u8]>,
263);
264
265pin_project! {
266    /// Future for opening a bidi stream
267    pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
268        opener: &'a Mutex<C::OpenStreams>,
269        stream: Option<PendingStreams<C,B>>,
270        session_id: SessionId,
271        stream_handler: WTransportStreamHandler,
272    }
273}
274
275struct WTransportStreamHandler {
276    shared: Arc<SharedState>,
277}
278
279impl ConnectionState for WTransportStreamHandler {
280    fn shared_state(&self) -> &SharedState {
281        &self.shared
282    }
283}
284
285impl CloseStream for WTransportStreamHandler {}
286
287impl<'a, B, C> Future for OpenBi<'a, C, B>
288where
289    C: quic::Connection<B>,
290    B: Buf,
291    C::BidiStream: SendStreamUnframed<B>,
292{
293    type Output = Result<BidiStream<C::BidiStream, B>, StreamError>;
294
295    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296        let mut p = self.project();
297        loop {
298            match &mut p.stream {
299                Some((stream, buf)) => {
300                    while buf.has_remaining() {
301                        ready!(stream.poll_send(cx, buf))
302                            .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
303                    }
304
305                    let (stream, _) = p.stream.take().unwrap();
306                    return Poll::Ready(Ok(stream));
307                }
308                None => {
309                    let mut opener = (*p.opener).lock().unwrap();
310                    // Open the stream first
311                    let res = ready!(opener.poll_open_bidi(cx))
312                        .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
313                    let stream = BidiStream::new(BufRecvStream::new(res));
314
315                    let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
316                    *p.stream = Some((stream, buf));
317                }
318            }
319        }
320    }
321}
322
323pin_project! {
324    /// Opens a unidirectional stream
325    pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
326        opener: &'a Mutex<C::OpenStreams>,
327        stream: Option<PendingUniStreams<C, B>>,
328        // Future for opening a uni stream
329        session_id: SessionId,
330        stream_handler: WTransportStreamHandler
331    }
332}
333
334impl<'a, C, B> Future for OpenUni<'a, C, B>
335where
336    C: quic::Connection<B>,
337    B: Buf,
338    C::SendStream: SendStreamUnframed<B>,
339{
340    type Output = Result<SendStream<C::SendStream, B>, StreamError>;
341
342    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
343        let mut p = self.project();
344        loop {
345            match &mut p.stream {
346                Some((send, buf)) => {
347                    while buf.has_remaining() {
348                        ready!(send.poll_send(cx, buf))
349                            .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
350                    }
351                    let (send, buf) = p.stream.take().unwrap();
352                    assert!(!buf.has_remaining());
353                    return Poll::Ready(Ok(send));
354                }
355                None => {
356                    let mut opener = (*p.opener).lock().unwrap();
357                    let send = ready!(opener.poll_open_send(cx))
358                        .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
359                    let send = BufRecvStream::new(send);
360                    let send = SendStream::new(send);
361
362                    let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
363                    *p.stream = Some((send, buf));
364                }
365            }
366        }
367    }
368}
369
370/// An accepted incoming bidirectional stream.
371///
372/// Since
373pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
374    /// An incoming bidirectional stream
375    BidiStream(SessionId, BidiStream<C::BidiStream, B>),
376    /// An incoming HTTP/3 request, passed through a webtransport session.
377    ///
378    /// This makes it possible to respond to multiple CONNECT requests
379    Request(Box<Request<()>>, RequestStream<C::BidiStream, B>),
380}
381
382/// Future for [`WebTransportSession::accept_uni`]
383pub struct AcceptUni<'a, C, B>
384where
385    C: quic::Connection<B>,
386    B: Buf,
387{
388    conn: &'a Mutex<Connection<C, B>>,
389}
390
391impl<'a, C, B> Future for AcceptUni<'a, C, B>
392where
393    C: quic::Connection<B>,
394    B: Buf,
395{
396    type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, ConnectionError>;
397
398    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
399        let mut conn = self.conn.lock().unwrap();
400        conn.inner.poll_accept_recv(cx)?;
401
402        // Get the currently available streams
403        let streams = conn.inner.accepted_streams_mut();
404        if let Some((id, stream)) = streams.wt_uni_streams.pop() {
405            return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
406        }
407
408        Poll::Pending
409    }
410}