1use std::{
4 marker::{PhantomData, Send},
5 pin::Pin,
6 sync::Mutex,
7 task::{Context, Poll},
8};
9
10use self::quic::SendStreamUnframed;
11use crate::{
12 connection::ConnectionState,
13 error::{Code, ErrorLevel},
14 ext::{Datagram, Protocol},
15 frame::FrameStream,
16 proto::frame::Frame,
17 quic::{self, OpenStreams, RecvDatagramExt, SendDatagramExt, WriteBuf},
18 server::{self, Connection, RequestStream},
19 stream::{BidiStreamHeader, BufRecvStream, UniStreamHeader},
20 webtransport::stream::{BidiStream, RecvStream, SendStream},
21 Error,
22};
23use bytes::Buf;
24use futures_util::{future::poll_fn, ready, Future};
25use http::{Method, Request, Response, StatusCode};
26
27use crate::webtransport::SessionId;
28use pin_project_lite::pin_project;
29
30pub struct WebTransportSession<C, B>
36where
37 C: quic::Connection<B> + Send,
38 B: Buf + Send,
39{
40 session_id: SessionId,
42 server_conn: Mutex<Connection<C, B>>,
44 connect_stream: RequestStream<C::BidiStream, B>,
45 opener: Mutex<C::OpenStreams>,
46}
47
48impl<C, B> WebTransportSession<C, B>
49where
50 C: quic::Connection<B> + Send,
51 B: Buf + Send,
52{
53 pub async fn accept(
57 request: Request<()>,
58 mut stream: RequestStream<C::BidiStream, B>,
59 mut conn: Connection<C, B>,
60 ) -> Result<Self, Error> {
61 let shared = conn.shared_state().clone();
62 {
63 let config = shared.write("Read WebTransport support").peer_config;
64
65 if !config.enable_webtransport {
66 return Err(conn.close(
67 Code::H3_SETTINGS_ERROR,
68 "webtransport is not supported by client",
69 ));
70 }
71
72 if !config.enable_datagram {
73 return Err(conn.close(
74 Code::H3_SETTINGS_ERROR,
75 "datagrams are not supported by client",
76 ));
77 }
78 }
79
80 if !conn.inner.config.enable_webtransport {
85 tracing::warn!("Server does not support webtransport");
86 }
87
88 if !conn.inner.config.enable_datagram {
89 tracing::warn!("Server does not support datagrams");
90 }
91
92 if !conn.inner.config.enable_extended_connect {
93 tracing::warn!("Server does not support CONNECT");
94 }
95
96 let response = if validate_wt_connect(&request) {
100 Response::builder()
101 .header("sec-webtransport-http3-draft", "draft02")
103 .status(StatusCode::OK)
104 .body(())
105 .unwrap()
106 } else {
107 Response::builder()
108 .status(StatusCode::BAD_REQUEST)
109 .body(())
110 .unwrap()
111 };
112
113 stream.send_response(response).await?;
114
115 let session_id = stream.send_id().into();
116 let conn_inner = &mut conn.inner.conn;
117 let opener = Mutex::new(conn_inner.opener());
118
119 Ok(Self {
120 session_id,
121 opener,
122 server_conn: Mutex::new(conn),
123 connect_stream: stream,
124 })
125 }
126
127 pub fn accept_datagram(&self) -> ReadDatagram<C, B> {
129 ReadDatagram {
130 conn: &self.server_conn,
131 _marker: PhantomData,
132 }
133 }
134
135 pub fn send_datagram(&self, data: B) -> Result<(), Error>
139 where
140 C: SendDatagramExt<B>,
141 {
142 self.server_conn
143 .lock()
144 .unwrap()
145 .send_datagram(self.connect_stream.id(), data)?;
146
147 Ok(())
148 }
149
150 pub fn accept_uni(&self) -> AcceptUni<C, B> {
152 AcceptUni {
153 conn: &self.server_conn,
154 }
155 }
156
157 pub async fn accept_bi(&self) -> Result<Option<AcceptedBi<C, B>>, Error> {
159 let stream = poll_fn(|cx| {
162 let mut conn = self.server_conn.lock().unwrap();
163 conn.poll_accept_request(cx)
164 })
165 .await;
166
167 let mut stream = match stream {
168 Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
169 Ok(None) => {
170 return Ok(None);
172 }
173 Err(err) => {
174 match err.kind() {
175 crate::error::Kind::Closed => return Ok(None),
176 crate::error::Kind::Application {
177 code,
178 reason,
179 level: ErrorLevel::ConnectionError,
180 ..
181 } => {
182 return Err(self.server_conn.lock().unwrap().close(
183 code,
184 reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
185 ))
186 }
187 _ => return Err(err),
188 };
189 }
190 };
191
192 let frame = poll_fn(|cx| stream.poll_next(cx)).await;
196
197 match frame {
198 Ok(None) => Ok(None),
199 Ok(Some(Frame::WebTransportStream(session_id))) => {
200 let stream = stream.into_inner();
202
203 Ok(Some(AcceptedBi::BidiStream(
204 session_id,
205 BidiStream::new(stream),
206 )))
207 }
208 frame => {
210 let req = {
211 let mut conn = self.server_conn.lock().unwrap();
212 conn.accept_with_frame(stream, frame)?
213 };
214 if let Some(req) = req {
215 let (req, resp) = req.resolve().await?;
216 Ok(Some(AcceptedBi::Request(req, resp)))
217 } else {
218 Ok(None)
219 }
220 }
221 }
222 }
223
224 pub fn open_bi(&self, session_id: SessionId) -> OpenBi<C, B> {
226 OpenBi {
227 opener: &self.opener,
228 stream: None,
229 session_id,
230 }
231 }
232
233 pub fn open_uni(&self, session_id: SessionId) -> OpenUni<C, B> {
235 OpenUni {
236 opener: &self.opener,
237 stream: None,
238 session_id,
239 }
240 }
241
242 pub fn session_id(&self) -> SessionId {
244 self.session_id
245 }
246}
247
248type PendingStreams<C, B> = (
250 BidiStream<<C as quic::Connection<B>>::BidiStream, B>,
251 WriteBuf<&'static [u8]>,
252);
253
254type PendingUniStreams<C, B> = (
256 SendStream<<C as quic::Connection<B>>::SendStream, B>,
257 WriteBuf<&'static [u8]>,
258);
259
260pin_project! {
261 pub struct OpenBi<'a, C:quic::Connection<B>, B:Buf> {
263 opener: &'a Mutex<C::OpenStreams>,
264 stream: Option<PendingStreams<C,B>>,
265 session_id: SessionId,
266 }
267}
268
269impl<'a, B, C> Future for OpenBi<'a, C, B>
270where
271 C: quic::Connection<B>,
272 B: Buf,
273 C::BidiStream: SendStreamUnframed<B>,
274{
275 type Output = Result<BidiStream<C::BidiStream, B>, Error>;
276
277 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
278 let mut p = self.project();
279 loop {
280 match &mut p.stream {
281 Some((stream, buf)) => {
282 while buf.has_remaining() {
283 ready!(stream.poll_send(cx, buf))?;
284 }
285
286 let (stream, _) = p.stream.take().unwrap();
287 return Poll::Ready(Ok(stream));
288 }
289 None => {
290 let mut opener = (*p.opener).lock().unwrap();
291 let res = ready!(opener.poll_open_bidi(cx))?;
293 let stream = BidiStream::new(BufRecvStream::new(res));
294
295 let buf = WriteBuf::from(BidiStreamHeader::WebTransportBidi(*p.session_id));
296 *p.stream = Some((stream, buf));
297 }
298 }
299 }
300 }
301}
302
303pin_project! {
304 pub struct OpenUni<'a, C: quic::Connection<B>, B:Buf> {
306 opener: &'a Mutex<C::OpenStreams>,
307 stream: Option<PendingUniStreams<C, B>>,
308 session_id: SessionId,
310 }
311}
312
313impl<'a, C, B> Future for OpenUni<'a, C, B>
314where
315 C: quic::Connection<B>,
316 B: Buf,
317 C::SendStream: SendStreamUnframed<B>,
318{
319 type Output = Result<SendStream<C::SendStream, B>, Error>;
320
321 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
322 let mut p = self.project();
323 loop {
324 match &mut p.stream {
325 Some((send, buf)) => {
326 while buf.has_remaining() {
327 ready!(send.poll_send(cx, buf))?;
328 }
329 let (send, buf) = p.stream.take().unwrap();
330 assert!(!buf.has_remaining());
331 return Poll::Ready(Ok(send));
332 }
333 None => {
334 let mut opener = (*p.opener).lock().unwrap();
335 let send = ready!(opener.poll_open_send(cx))?;
336 let send = BufRecvStream::new(send);
337 let send = SendStream::new(send);
338
339 let buf = WriteBuf::from(UniStreamHeader::WebTransportUni(*p.session_id));
340 *p.stream = Some((send, buf));
341 }
342 }
343 }
344 }
345}
346
347pub enum AcceptedBi<C: quic::Connection<B>, B: Buf> {
351 BidiStream(SessionId, BidiStream<C::BidiStream, B>),
353 Request(Request<()>, RequestStream<C::BidiStream, B>),
357}
358
359pub struct ReadDatagram<'a, C, B>
361where
362 C: quic::Connection<B>,
363 B: Buf,
364{
365 conn: &'a Mutex<Connection<C, B>>,
366 _marker: PhantomData<B>,
367}
368
369impl<'a, C, B> Future for ReadDatagram<'a, C, B>
370where
371 C: quic::Connection<B> + RecvDatagramExt,
372 B: Buf,
373{
374 type Output = Result<Option<(SessionId, C::Buf)>, Error>;
375
376 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
377 let mut conn = self.conn.lock().unwrap();
378 match ready!(conn.inner.conn.poll_accept_datagram(cx))? {
379 Some(v) => {
380 let datagram = Datagram::decode(v)?;
381 Poll::Ready(Ok(Some((
382 datagram.stream_id().into(),
383 datagram.into_payload(),
384 ))))
385 }
386 None => Poll::Ready(Ok(None)),
387 }
388 }
389}
390
391pub struct AcceptUni<'a, C, B>
393where
394 C: quic::Connection<B>,
395 B: Buf,
396{
397 conn: &'a Mutex<server::Connection<C, B>>,
398}
399
400impl<'a, C, B> Future for AcceptUni<'a, C, B>
401where
402 C: quic::Connection<B>,
403 B: Buf,
404{
405 type Output = Result<Option<(SessionId, RecvStream<C::RecvStream, B>)>, Error>;
406
407 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
408 let mut conn = self.conn.lock().unwrap();
409 conn.inner.poll_accept_recv(cx)?;
410
411 let streams = conn.inner.accepted_streams_mut();
413 if let Some((id, stream)) = streams.wt_uni_streams.pop() {
414 return Poll::Ready(Ok(Some((id, RecvStream::new(stream)))));
415 }
416
417 Poll::Pending
418 }
419}
420
421fn validate_wt_connect(request: &Request<()>) -> bool {
422 let protocol = request.extensions().get::<Protocol>();
423 matches!((request.method(), protocol), (&Method::CONNECT, Some(p)) if p == &Protocol::WEB_TRANSPORT)
424}