1use std::{
4 pin::Pin,
5 sync::{Arc, Mutex},
6 task::{Context, Poll},
7};
8
9use bytes::Buf;
10use futures_util::{Future, future::poll_fn, ready};
11use h3::{
12 ConnectionState, SharedState,
13 error::{
14 Code, ConnectionError, StreamError, connection_error_creators::CloseStream,
15 internal_error::InternalConnectionError,
16 },
17 frame::FrameStream,
18 proto::frame::Frame,
19 quic::{self, OpenStreams, WriteBuf},
20 server::{Connection, RequestStream},
21};
22use h3::{
23 quic::SendStreamUnframed,
24 stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
25};
26use h3_datagram::{
27 datagram_handler::{DatagramReader, DatagramSender, HandleDatagramsExt},
28 quic_traits,
29};
30use http::{Request, Response, StatusCode};
31
32use h3::webtransport::SessionId;
33use pin_project_lite::pin_project;
34
35use super::stream::{BidiStream, RecvStream, SendStream};
36
37pub struct WebTransportSession<C, B>
43where
44 C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
45 Connection<C, B>: HandleDatagramsExt<C, B>,
46 B: Buf,
47{
48 session_id: SessionId,
50 server_conn: Mutex<Connection<C, B>>,
52 connect_stream: RequestStream<C::BidiStream, B>,
53 opener: Mutex<C::OpenStreams>,
54 shared: Arc<SharedState>,
58}
59
60impl<C, B> ConnectionState for WebTransportSession<C, B>
61where
62 C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
63 Connection<C, B>: HandleDatagramsExt<C, B>,
64 B: Buf,
65{
66 fn shared_state(&self) -> &SharedState {
67 &self.shared
68 }
69}
70
71impl<C, B> WebTransportSession<C, B>
72where
73 Connection<C, B>: HandleDatagramsExt<C, B>,
74 C: quic::Connection<B> + quic_traits::DatagramConnectionExt<B>,
75 B: Buf,
76{
77 #[allow(clippy::type_complexity)]
79 pub fn split(self) -> (Mutex<Connection<C, B>>, RequestStream<C::BidiStream, B>) {
80 let WebTransportSession {
81 server_conn,
82 connect_stream,
83 ..
84 } = self;
85
86 (server_conn, connect_stream)
87 }
88
89 pub async fn accept(
93 mut stream: RequestStream<C::BidiStream, B>,
94 mut conn: Connection<C, B>,
95 ) -> Result<Self, StreamError> {
96 let shared = conn.inner.shared.clone();
97 let config = shared.settings();
98
99 if !config.enable_webtransport() {
100 return Err(StreamError::ConnectionError(
101 conn.inner
102 .handle_connection_error(InternalConnectionError::new(
103 Code::H3_SETTINGS_ERROR,
104 "webtransport is not supported by client".to_string(),
105 )),
106 ));
107 }
108
109 if !config.enable_datagram() {
110 return Err(StreamError::ConnectionError(
111 conn.inner
112 .handle_connection_error(InternalConnectionError::new(
113 Code::H3_SETTINGS_ERROR,
114 "datagrams are not supported by client".to_string(),
115 )),
116 ));
117 }
118
119 if !conn.inner.config.settings.enable_webtransport() {
124 tracing::warn!("Server does not support webtransport");
125 }
126
127 if !conn.inner.config.settings.enable_datagram() {
128 tracing::warn!("Server does not support datagrams");
129 }
130
131 if !conn.inner.config.settings.enable_extended_connect() {
132 tracing::warn!("Server does not support CONNECT");
133 }
134
135 let response = Response::builder()
139 .header("sec-webtransport-http3-draft", "draft02")
141 .status(StatusCode::OK)
142 .body(())
143 .unwrap();
144
145 stream.send_response(response).await?;
146
147 let session_id = stream.send_id().into();
148 let conn_inner = &mut conn.inner.conn;
149 let opener = Mutex::new(conn_inner.opener());
150
151 Ok(Self {
152 session_id,
153 opener,
154 server_conn: Mutex::new(conn),
155 connect_stream: stream,
156 shared,
157 })
158 }
159
160 pub fn datagram_reader(&self) -> DatagramReader<C::RecvDatagramHandler> {
162 self.server_conn.lock().unwrap().get_datagram_reader()
163 }
164
165 pub fn datagram_sender(&self) -> DatagramSender<C::SendDatagramHandler, B> {
169 self.server_conn
170 .lock()
171 .unwrap()
172 .get_datagram_sender(self.connect_stream.send_id())
173 }
174
175 pub fn accept_uni(&self) -> AcceptUni<'_, C, B> {
177 AcceptUni {
178 conn: &self.server_conn,
179 }
180 }
181
182 pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, StreamError> {
184 let stream = poll_fn(|cx| {
185 let mut conn = self.server_conn.lock().unwrap();
186 conn.poll_accept_request_stream(cx)
187 })
188 .await;
189
190 let stream = match stream {
191 Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
192 Ok(None) => {
193 return Ok(None);
195 }
196 Err(err) => return Err(StreamError::ConnectionError(err)),
197 };
198
199 let mut resolver = { self.server_conn.lock().unwrap().create_resolver(stream) };
200 let frame = poll_fn(|cx| resolver.frame_stream.poll_next(cx)).await;
204
205 match frame {
206 Ok(None) => Ok(None),
207 Ok(Some(Frame::WebTransportStream(session_id))) => {
208 let stream = resolver.frame_stream.into_inner();
210 Ok(Some(AcceptedBi::BidiStream(
211 session_id,
212 BidiStream::new(stream),
213 )))
214 }
215 frame => {
217 let (req, resp) = resolver.accept_with_frame(frame)?.resolve().await?;
218 Ok(Some(AcceptedBi::Request(Box::new(req), resp)))
219 }
220 }
221 }
222
223 pub fn open_bi(&self, session_id: SessionId) -> OpenBi<'_, C, B> {
225 OpenBi {
226 opener: &self.opener,
227 stream: None,
228 session_id,
229 stream_handler: WTransportStreamHandler {
230 shared: self.shared.clone(),
231 },
232 }
233 }
234
235 pub fn open_uni(&self, session_id: SessionId) -> OpenUni<'_, C, B> {
237 OpenUni {
238 opener: &self.opener,
239 stream: None,
240 session_id,
241 stream_handler: WTransportStreamHandler {
242 shared: self.shared.clone(),
243 },
244 }
245 }
246
247 pub fn session_id(&self) -> SessionId {
249 self.session_id
250 }
251}
252
253type PendingStreams<C, B> = (
255 BidiStream<<C as quic::OpenStreams<B>>::BidiStream, B>,
256 WriteBuf<&'static [u8]>,
257);
258
259type PendingUniStreams<C, B> = (
261 SendStream<<C as quic::OpenStreams<B>>::SendStream, B>,
262 WriteBuf<&'static [u8]>,
263);
264
265pin_project! {
266 pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
268 opener: &'a Mutex<C::OpenStreams>,
269 stream: Option<PendingStreams<C,B>>,
270 session_id: SessionId,
271 stream_handler: WTransportStreamHandler,
272 }
273}
274
275struct WTransportStreamHandler {
276 shared: Arc<SharedState>,
277}
278
279impl ConnectionState for WTransportStreamHandler {
280 fn shared_state(&self) -> &SharedState {
281 &self.shared
282 }
283}
284
285impl CloseStream for WTransportStreamHandler {}
286
287impl<'a, B, C> Future for OpenBi<'a, C, B>
288where
289 C: quic::Connection<B>,
290 B: Buf,
291 C::BidiStream: SendStreamUnframed<B>,
292{
293 type Output = Result<BidiStream<C::BidiStream, B>, StreamError>;
294
295 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
296 let mut p = self.project();
297 loop {
298 match &mut p.stream {
299 Some((stream, buf)) => {
300 while buf.has_remaining() {
301 ready!(stream.poll_send(cx, buf))
302 .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
303 }
304
305 let (stream, _) = p.stream.take().unwrap();
306 return Poll::Ready(Ok(stream));
307 }
308 None => {
309 let mut opener = (*p.opener).lock().unwrap();
310 let res = ready!(opener.poll_open_bidi(cx))
312 .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
313 let stream = BidiStream::new(BufRecvStream::new(res));
314
315 let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
316 *p.stream = Some((stream, buf));
317 }
318 }
319 }
320 }
321}
322
323pin_project! {
324 pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
326 opener: &'a Mutex<C::OpenStreams>,
327 stream: Option<PendingUniStreams<C, B>>,
328 session_id: SessionId,
330 stream_handler: WTransportStreamHandler
331 }
332}
333
334impl<'a, C, B> Future for OpenUni<'a, C, B>
335where
336 C: quic::Connection<B>,
337 B: Buf,
338 C::SendStream: SendStreamUnframed<B>,
339{
340 type Output = Result<SendStream<C::SendStream, B>, StreamError>;
341
342 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
343 let mut p = self.project();
344 loop {
345 match &mut p.stream {
346 Some((send, buf)) => {
347 while buf.has_remaining() {
348 ready!(send.poll_send(cx, buf))
349 .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
350 }
351 let (send, buf) = p.stream.take().unwrap();
352 assert!(!buf.has_remaining());
353 return Poll::Ready(Ok(send));
354 }
355 None => {
356 let mut opener = (*p.opener).lock().unwrap();
357 let send = ready!(opener.poll_open_send(cx))
358 .map_err(|err| p.stream_handler.handle_quic_stream_error(err))?;
359 let send = BufRecvStream::new(send);
360 let send = SendStream::new(send);
361
362 let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
363 *p.stream = Some((send, buf));
364 }
365 }
366 }
367 }
368}
369
370pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
374 BidiStream(SessionId, BidiStream<C::BidiStream, B>),
376 Request(Box<Request<()>>, RequestStream<C::BidiStream, B>),
380}
381
382pub struct AcceptUni<'a, C, B>
384where
385 C: quic::Connection<B>,
386 B: Buf,
387{
388 conn: &'a Mutex<Connection<C, B>>,
389}
390
391impl<'a, C, B> Future for AcceptUni<'a, C, B>
392where
393 C: quic::Connection<B>,
394 B: Buf,
395{
396 type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, ConnectionError>;
397
398 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
399 let mut conn = self.conn.lock().unwrap();
400 conn.inner.poll_accept_recv(cx)?;
401
402 let streams = conn.inner.accepted_streams_mut();
404 if let Some((id, stream)) = streams.wt_uni_streams.pop() {
405 return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
406 }
407
408 Poll::Pending
409 }
410}