sec_http3/server.rs
1//! This module provides methods to create a http/3 Server.
2//!
3//! It allows to accept incoming requests, and send responses.
4//!
5//! # Examples
6//!
7//! ## Simple example
8//! ```rust
9//! async fn doc<C>(conn: C)
10//! where
11//! C: sec_http3::quic::Connection<bytes::Bytes>,
12//! <C as sec_http3::quic::Connection<bytes::Bytes>>::BidiStream: Send + 'static
13//! {
14//! let mut server_builder = sec_http3::server::builder();
15//! // Build the Connection
16//! let mut h3_conn = server_builder.build(conn).await.unwrap();
17//! loop {
18//! // Accept incoming requests
19//! match h3_conn.accept().await {
20//! Ok(Some((req, mut stream))) => {
21//! // spawn a new task to handle the request
22//! tokio::spawn(async move {
23//! // build a http response
24//! let response = http::Response::builder().status(http::StatusCode::OK).body(()).unwrap();
25//! // send the response to the wire
26//! stream.send_response(response).await.unwrap();
27//! // send some date
28//! stream.send_data(bytes::Bytes::from("test")).await.unwrap();
29//! // finnish the stream
30//! stream.finish().await.unwrap();
31//! });
32//! }
33//! Ok(None) => {
34//! // break if no Request is accepted
35//! break;
36//! }
37//! Err(err) => {
38//! match err.get_error_level() {
39//! // break on connection errors
40//! sec_http3::error::ErrorLevel::ConnectionError => break,
41//! // continue on stream errors
42//! sec_http3::error::ErrorLevel::StreamError => continue,
43//! }
44//! }
45//! }
46//! }
47//! }
48//! ```
49//!
50//! ## File server
51//! A ready-to-use example of a file server is available [here](https://github.com/security-union/sec-http3/blob/master/examples/client.rs)
52
53use std::{
54 collections::HashSet,
55 marker::PhantomData,
56 option::Option,
57 result::Result,
58 sync::Arc,
59 task::{Context, Poll},
60};
61
62use bytes::{Buf, BytesMut};
63use futures_util::{
64 future::{self, Future},
65 ready,
66};
67use http::{response, HeaderMap, Request, Response};
68use pin_project_lite::pin_project;
69use quic::RecvStream;
70use quic::StreamId;
71use tokio::sync::mpsc;
72
73use crate::{
74 config::Config,
75 connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
76 error::{Code, Error, ErrorLevel},
77 ext::Datagram,
78 frame::{FrameStream, FrameStreamError},
79 proto::{
80 frame::{Frame, PayloadLen},
81 headers::Header,
82 push::PushId,
83 },
84 qpack,
85 quic::{self, RecvDatagramExt, SendDatagramExt, SendStream as _},
86 request::ResolveRequest,
87 stream::{self, BufRecvStream},
88};
89use tracing::{error, trace, warn};
90
91/// Create a builder of HTTP/3 server connections
92///
93/// This function creates a [`Builder`] that carries settings that can
94/// be shared between server connections.
95pub fn builder() -> Builder {
96 Builder::new()
97}
98
99/// Server connection driver
100///
101/// The [`Connection`] struct manages a connection from the side of the HTTP/3 server
102///
103/// Create a new Instance with [`Connection::new()`].
104/// Accept incoming requests with [`Connection::accept()`].
105/// And shutdown a connection with [`Connection::shutdown()`].
106pub struct Connection<C, B>
107where
108 C: quic::Connection<B>,
109 B: Buf,
110{
111 /// TODO: temporarily break encapsulation for `WebTransportSession`
112 pub inner: ConnectionInner<C, B>,
113 max_field_section_size: u64,
114 // List of all incoming streams that are currently running.
115 ongoing_streams: HashSet<StreamId>,
116 // Let the streams tell us when they are no longer running.
117 request_end_recv: mpsc::UnboundedReceiver<StreamId>,
118 request_end_send: mpsc::UnboundedSender<StreamId>,
119 // Has a GOAWAY frame been sent? If so, this StreamId is the last we are willing to accept.
120 sent_closing: Option<StreamId>,
121 // Has a GOAWAY frame been received? If so, this is PushId the last the remote will accept.
122 recv_closing: Option<PushId>,
123 // The id of the last stream received by this connection.
124 last_accepted_stream: Option<StreamId>,
125}
126
127impl<C, B> ConnectionState for Connection<C, B>
128where
129 C: quic::Connection<B>,
130 B: Buf,
131{
132 fn shared_state(&self) -> &SharedStateRef {
133 &self.inner.shared
134 }
135}
136
137impl<C, B> Connection<C, B>
138where
139 C: quic::Connection<B>,
140 B: Buf,
141{
142 /// Create a new HTTP/3 server connection with default settings
143 ///
144 /// Use a custom [`Builder`] with [`builder()`] to create a connection
145 /// with different settings.
146 /// Provide a Connection which implements [`quic::Connection`].
147 pub async fn new(conn: C) -> Result<Self, Error> {
148 builder().build(conn).await
149 }
150
151 /// Closes the connection with a code and a reason.
152 pub fn close<T: AsRef<str>>(&mut self, code: Code, reason: T) -> Error {
153 self.inner.close(code, reason)
154 }
155}
156
157impl<C, B> Connection<C, B>
158where
159 C: quic::Connection<B>,
160 B: Buf,
161{
162 /// Accept an incoming request.
163 ///
164 /// It returns a tuple with a [`http::Request`] and an [`RequestStream`].
165 /// The [`http::Request`] is the received request from the client.
166 /// The [`RequestStream`] can be used to send the response.
167 pub async fn accept(
168 &mut self,
169 ) -> Result<Option<(Request<()>, RequestStream<C::BidiStream, B>)>, Error> {
170 // Accept the incoming stream
171 let mut stream = match future::poll_fn(|cx| self.poll_accept_request(cx)).await {
172 Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
173 Ok(None) => {
174 // We always send a last GoAway frame to the client, so it knows which was the last
175 // non-rejected request.
176 self.shutdown(0).await?;
177 return Ok(None);
178 }
179 Err(err) => {
180 match err.inner.kind {
181 crate::error::Kind::Closed => return Ok(None),
182 crate::error::Kind::Application {
183 code,
184 reason,
185 level: ErrorLevel::ConnectionError,
186 } => {
187 return Err(self.inner.close(
188 code,
189 reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
190 ))
191 }
192 _ => return Err(err),
193 };
194 }
195 };
196
197 let frame = future::poll_fn(|cx| stream.poll_next(cx)).await;
198 let req = self.accept_with_frame(stream, frame)?;
199 if let Some(req) = req {
200 Ok(Some(req.resolve().await?))
201 } else {
202 Ok(None)
203 }
204 }
205
206 /// Accepts an http request where the first frame has already been read and decoded.
207 ///
208 ///
209 /// This is needed as a bidirectional stream may be read as part of incoming webtransport
210 /// bi-streams. If it turns out that the stream is *not* a `WEBTRANSPORT_STREAM` the request
211 /// may still want to be handled and passed to the user.
212 pub fn accept_with_frame(
213 &mut self,
214 mut stream: FrameStream<C::BidiStream, B>,
215 frame: Result<Option<Frame<PayloadLen>>, FrameStreamError>,
216 ) -> Result<Option<ResolveRequest<C, B>>, Error> {
217 let mut encoded = match frame {
218 Ok(Some(Frame::Headers(h))) => h,
219
220 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
221 //# If a client-initiated
222 //# stream terminates without enough of the HTTP message to provide a
223 //# complete response, the server SHOULD abort its response stream with
224 //# the error code H3_REQUEST_INCOMPLETE.
225 Ok(None) => {
226 return Err(self.inner.close(
227 Code::H3_REQUEST_INCOMPLETE,
228 "request stream closed before headers",
229 ));
230 }
231
232 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
233 //# Receipt of an invalid sequence of frames MUST be treated as a
234 //# connection error of type H3_FRAME_UNEXPECTED.
235
236 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
237 //# A server MUST treat the
238 //# receipt of a PUSH_PROMISE frame as a connection error of type
239 //# H3_FRAME_UNEXPECTED.
240 Ok(Some(_)) => {
241 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
242 //# Receipt of an invalid sequence of frames MUST be treated as a
243 //# connection error of type H3_FRAME_UNEXPECTED.
244 // Close if the first frame is not a header frame
245 return Err(self.inner.close(
246 Code::H3_FRAME_UNEXPECTED,
247 "first request frame is not headers",
248 ));
249 }
250 Err(e) => {
251 let err: Error = e.into();
252 if err.is_closed() {
253 return Ok(None);
254 }
255 match err.inner.kind {
256 crate::error::Kind::Closed => return Ok(None),
257 crate::error::Kind::Application {
258 code,
259 reason,
260 level: ErrorLevel::ConnectionError,
261 } => {
262 return Err(self.inner.close(
263 code,
264 reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
265 ))
266 }
267 crate::error::Kind::Application {
268 code,
269 reason: _,
270 level: ErrorLevel::StreamError,
271 } => {
272 stream.reset(code.into());
273 return Err(err);
274 }
275 _ => return Err(err),
276 };
277 }
278 };
279
280 let mut request_stream = RequestStream {
281 request_end: Arc::new(RequestEnd {
282 request_end: self.request_end_send.clone(),
283 stream_id: stream.send_id(),
284 }),
285 inner: connection::RequestStream::new(
286 stream,
287 self.max_field_section_size,
288 self.inner.shared.clone(),
289 self.inner.send_grease_frame,
290 ),
291 };
292
293 let decoded = match qpack::decode_stateless(&mut encoded, self.max_field_section_size) {
294 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
295 //# An HTTP/3 implementation MAY impose a limit on the maximum size of
296 //# the message header it will accept on an individual HTTP message.
297 Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => Err(cancel_size),
298 Ok(decoded) => {
299 // send the grease frame only once
300 self.inner.send_grease_frame = false;
301 Ok(decoded)
302 }
303 Err(e) => {
304 let err: Error = e.into();
305 if err.is_closed() {
306 return Ok(None);
307 }
308 match err.inner.kind {
309 crate::error::Kind::Closed => return Ok(None),
310 crate::error::Kind::Application {
311 code,
312 reason,
313 level: ErrorLevel::ConnectionError,
314 } => {
315 return Err(self.inner.close(
316 code,
317 reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
318 ))
319 }
320 crate::error::Kind::Application {
321 code,
322 reason: _,
323 level: ErrorLevel::StreamError,
324 } => {
325 request_stream.stop_stream(code);
326 return Err(err);
327 }
328 _ => return Err(err),
329 };
330 }
331 };
332
333 Ok(Some(ResolveRequest::new(
334 request_stream,
335 decoded,
336 self.max_field_section_size,
337 )))
338 }
339
340 /// Initiate a graceful shutdown, accepting `max_request` potentially still in-flight
341 ///
342 /// See [connection shutdown](https://www.rfc-editor.org/rfc/rfc9114.html#connection-shutdown) for more information.
343 pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
344 let max_id = self
345 .last_accepted_stream
346 .map(|id| id + max_requests)
347 .unwrap_or(StreamId::FIRST_REQUEST);
348
349 self.inner.shutdown(&mut self.sent_closing, max_id).await
350 }
351
352 /// Accepts an incoming bidirectional stream.
353 ///
354 /// This could be either a *Request* or a *WebTransportBiStream*, the first frame's type
355 /// decides.
356 pub fn poll_accept_request(
357 &mut self,
358 cx: &mut Context<'_>,
359 ) -> Poll<Result<Option<C::BidiStream>, Error>> {
360 let _ = self.poll_control(cx)?;
361 let _ = self.poll_requests_completion(cx);
362 loop {
363 match self.inner.poll_accept_request(cx) {
364 Poll::Ready(Err(x)) => break Poll::Ready(Err(x)),
365 Poll::Ready(Ok(None)) => {
366 if self.poll_requests_completion(cx).is_ready() {
367 break Poll::Ready(Ok(None));
368 } else {
369 // Wait for all the requests to be finished, request_end_recv will wake
370 // us on each request completion.
371 break Poll::Pending;
372 }
373 }
374 Poll::Pending => {
375 if self.recv_closing.is_some() && self.poll_requests_completion(cx).is_ready() {
376 // The connection is now idle.
377 break Poll::Ready(Ok(None));
378 } else {
379 return Poll::Pending;
380 }
381 }
382 Poll::Ready(Ok(Some(mut s))) => {
383 // When the connection is in a graceful shutdown procedure, reject all
384 // incoming requests not belonging to the grace interval. It's possible that
385 // some acceptable request streams arrive after rejected requests.
386 if let Some(max_id) = self.sent_closing {
387 if s.send_id() > max_id {
388 s.stop_sending(Code::H3_REQUEST_REJECTED.value());
389 s.reset(Code::H3_REQUEST_REJECTED.value());
390 if self.poll_requests_completion(cx).is_ready() {
391 break Poll::Ready(Ok(None));
392 }
393 continue;
394 }
395 }
396 self.last_accepted_stream = Some(s.send_id());
397 self.ongoing_streams.insert(s.send_id());
398 break Poll::Ready(Ok(Some(s)));
399 }
400 };
401 }
402 }
403
404 pub(crate) fn poll_control(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
405 while (self.poll_next_control(cx)?).is_ready() {}
406 Poll::Pending
407 }
408
409 pub(crate) fn poll_next_control(
410 &mut self,
411 cx: &mut Context<'_>,
412 ) -> Poll<Result<Frame<PayloadLen>, Error>> {
413 let frame = ready!(self.inner.poll_control(cx))?;
414
415 match &frame {
416 Frame::Settings(w) => trace!("Got settings > {:?}", w),
417 &Frame::Goaway(id) => self.inner.process_goaway(&mut self.recv_closing, id)?,
418 f @ Frame::MaxPushId(_) | f @ Frame::CancelPush(_) => {
419 warn!("Control frame ignored {:?}", f);
420
421 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
422 //= type=TODO
423 //# If a server receives a CANCEL_PUSH frame for a push
424 //# ID that has not yet been mentioned by a PUSH_PROMISE frame, this MUST
425 //# be treated as a connection error of type H3_ID_ERROR.
426
427 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
428 //= type=TODO
429 //# A MAX_PUSH_ID frame cannot reduce the maximum push
430 //# ID; receipt of a MAX_PUSH_ID frame that contains a smaller value than
431 //# previously received MUST be treated as a connection error of type
432 //# H3_ID_ERROR.
433 }
434
435 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
436 //# A server MUST treat the
437 //# receipt of a PUSH_PROMISE frame as a connection error of type
438 //# H3_FRAME_UNEXPECTED.
439 frame => {
440 return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
441 format!("on server control stream: {:?}", frame),
442 ErrorLevel::ConnectionError,
443 )))
444 }
445 }
446 Poll::Ready(Ok(frame))
447 }
448
449 fn poll_requests_completion(&mut self, cx: &mut Context<'_>) -> Poll<()> {
450 loop {
451 match self.request_end_recv.poll_recv(cx) {
452 // The channel is closed
453 Poll::Ready(None) => return Poll::Ready(()),
454 // A request has completed
455 Poll::Ready(Some(id)) => {
456 self.ongoing_streams.remove(&id);
457 }
458 Poll::Pending => {
459 if self.ongoing_streams.is_empty() {
460 // Tell the caller there is not more ongoing requests.
461 // Still, the completion of future requests will wake us.
462 return Poll::Ready(());
463 } else {
464 return Poll::Pending;
465 }
466 }
467 }
468 }
469 }
470}
471
472impl<C, B> Connection<C, B>
473where
474 C: quic::Connection<B> + SendDatagramExt<B>,
475 B: Buf,
476{
477 /// Sends a datagram
478 pub fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> {
479 self.inner
480 .conn
481 .send_datagram(Datagram::new(stream_id, data))?;
482 Ok(())
483 }
484}
485
486impl<C, B> Connection<C, B>
487where
488 C: quic::Connection<B> + RecvDatagramExt,
489 B: Buf,
490{
491 /// Reads an incoming datagram
492 pub fn read_datagram(&mut self) -> ReadDatagram<C, B> {
493 ReadDatagram {
494 conn: self,
495 _marker: PhantomData,
496 }
497 }
498}
499
500impl<C, B> Drop for Connection<C, B>
501where
502 C: quic::Connection<B>,
503 B: Buf,
504{
505 fn drop(&mut self) {
506 self.inner.close(Code::H3_NO_ERROR, "");
507 }
508}
509
510//= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
511//= type=TODO
512//# In order to
513//# permit these streams to open, an HTTP/3 server SHOULD configure non-
514//# zero minimum values for the number of permitted streams and the
515//# initial stream flow-control window.
516
517//= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
518//= type=TODO
519//# So as to not unnecessarily limit
520//# parallelism, at least 100 request streams SHOULD be permitted at a
521//# time.
522
523/// Builder of HTTP/3 server connections.
524///
525/// Use this struct to create a new [`Connection`].
526/// Settings for the [`Connection`] can be provided here.
527///
528/// # Example
529///
530/// ```rust
531/// fn doc<C,B>(conn: C)
532/// where
533/// C: sec_http3::quic::Connection<B>,
534/// B: bytes::Buf,
535/// {
536/// let mut server_builder = sec_http3::server::builder();
537/// // Set the maximum header size
538/// server_builder.max_field_section_size(1000);
539/// // do not send grease types
540/// server_builder.send_grease(false);
541/// // Build the Connection
542/// let mut h3_conn = server_builder.build(conn);
543/// }
544/// ```
545pub struct Builder {
546 pub(crate) config: Config,
547}
548
549impl Builder {
550 /// Creates a new [`Builder`] with default settings.
551 pub(super) fn new() -> Self {
552 Builder {
553 config: Default::default(),
554 }
555 }
556
557 /// Set the maximum header size this client is willing to accept
558 ///
559 /// See [header size constraints] section of the specification for details.
560 ///
561 /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
562 pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
563 self.config.max_field_section_size = value;
564 self
565 }
566
567 /// Send grease values to the Client.
568 /// See [setting](https://www.rfc-editor.org/rfc/rfc9114.html#settings-parameters), [frame](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) and [stream](https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease) for more information.
569 #[inline]
570 pub fn send_grease(&mut self, value: bool) -> &mut Self {
571 self.config.send_grease = value;
572 self
573 }
574
575 /// Indicates to the peer that WebTransport is supported.
576 ///
577 /// See: [establishing a webtransport session](https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.1)
578 ///
579 ///
580 /// **Server**:
581 /// Supporting for webtransport also requires setting `enable_connect` `enable_datagram`
582 /// and `max_webtransport_sessions`.
583 #[inline]
584 pub fn enable_webtransport(&mut self, value: bool) -> &mut Self {
585 self.config.enable_webtransport = value;
586 self
587 }
588
589 /// Enables the CONNECT protocol
590 pub fn enable_connect(&mut self, value: bool) -> &mut Self {
591 self.config.enable_extended_connect = value;
592 self
593 }
594
595 /// Limits the maximum number of WebTransport sessions
596 pub fn max_webtransport_sessions(&mut self, value: u64) -> &mut Self {
597 self.config.max_webtransport_sessions = value;
598 self
599 }
600
601 /// Indicates that the client or server supports HTTP/3 datagrams
602 ///
603 /// See: <https://www.rfc-editor.org/rfc/rfc9297#section-2.1.1>
604 pub fn enable_datagram(&mut self, value: bool) -> &mut Self {
605 self.config.enable_datagram = value;
606 self
607 }
608}
609
610impl Builder {
611 /// Build an HTTP/3 connection from a QUIC connection
612 ///
613 /// This method creates a [`Connection`] instance with the settings in the [`Builder`].
614 pub async fn build<C, B>(&self, conn: C) -> Result<Connection<C, B>, Error>
615 where
616 C: quic::Connection<B>,
617 B: Buf,
618 {
619 let (sender, receiver) = mpsc::unbounded_channel();
620 Ok(Connection {
621 inner: ConnectionInner::new(conn, SharedStateRef::default(), self.config).await?,
622 max_field_section_size: self.config.max_field_section_size,
623 request_end_send: sender,
624 request_end_recv: receiver,
625 ongoing_streams: HashSet::new(),
626 sent_closing: None,
627 recv_closing: None,
628 last_accepted_stream: None,
629 })
630 }
631}
632
633struct RequestEnd {
634 request_end: mpsc::UnboundedSender<StreamId>,
635 stream_id: StreamId,
636}
637
638/// Manage request and response transfer for an incoming request
639///
640/// The [`RequestStream`] struct is used to send and/or receive
641/// information from the client.
642pub struct RequestStream<S, B> {
643 inner: connection::RequestStream<S, B>,
644 request_end: Arc<RequestEnd>,
645}
646
647impl<S, B> AsMut<connection::RequestStream<S, B>> for RequestStream<S, B> {
648 fn as_mut(&mut self) -> &mut connection::RequestStream<S, B> {
649 &mut self.inner
650 }
651}
652
653impl<S, B> ConnectionState for RequestStream<S, B> {
654 fn shared_state(&self) -> &SharedStateRef {
655 &self.inner.conn_state
656 }
657}
658
659impl<S, B> RequestStream<S, B>
660where
661 S: quic::RecvStream,
662 B: Buf,
663{
664 /// Receive data sent from the client
665 pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
666 self.inner.recv_data().await
667 }
668
669 /// Receive an optional set of trailers for the request
670 pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
671 self.inner.recv_trailers().await
672 }
673
674 /// Tell the peer to stop sending into the underlying QUIC stream
675 pub fn stop_sending(&mut self, error_code: crate::error::Code) {
676 self.inner.stream.stop_sending(error_code)
677 }
678
679 /// Returns the underlying stream id
680 pub fn id(&self) -> StreamId {
681 self.inner.stream.id()
682 }
683}
684
685impl<S, B> RequestStream<S, B>
686where
687 S: quic::SendStream<B>,
688 B: Buf,
689{
690 /// Send the HTTP/3 response
691 ///
692 /// This should be called before trying to send any data with
693 /// [`RequestStream::send_data`].
694 pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> {
695 let (parts, _) = resp.into_parts();
696 let response::Parts {
697 status, headers, ..
698 } = parts;
699 let headers = Header::response(status, headers);
700
701 let mut block = BytesMut::new();
702 let mem_size = qpack::encode_stateless(&mut block, headers)?;
703
704 let max_mem_size = self
705 .inner
706 .conn_state
707 .read("send_response")
708 .peer_config
709 .max_field_section_size;
710
711 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
712 //# An implementation that
713 //# has received this parameter SHOULD NOT send an HTTP message header
714 //# that exceeds the indicated size, as the peer will likely refuse to
715 //# process it.
716 if mem_size > max_mem_size {
717 return Err(Error::header_too_big(mem_size, max_mem_size));
718 }
719
720 stream::write(&mut self.inner.stream, Frame::Headers(block.freeze()))
721 .await
722 .map_err(|e| self.maybe_conn_err(e))?;
723
724 Ok(())
725 }
726
727 /// Send some data on the response body.
728 pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
729 self.inner.send_data(buf).await
730 }
731
732 /// Stop a stream with an error code
733 ///
734 /// The code can be [`Code::H3_NO_ERROR`].
735 pub fn stop_stream(&mut self, error_code: Code) {
736 self.inner.stop_stream(error_code);
737 }
738
739 /// Send a set of trailers to end the response.
740 ///
741 /// Either [`RequestStream::finish`] or
742 /// [`RequestStream::send_trailers`] must be called to finalize a
743 /// request.
744 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
745 self.inner.send_trailers(trailers).await
746 }
747
748 /// End the response without trailers.
749 ///
750 /// Either [`RequestStream::finish`] or
751 /// [`RequestStream::send_trailers`] must be called to finalize a
752 /// request.
753 pub async fn finish(&mut self) -> Result<(), Error> {
754 self.inner.finish().await
755 }
756
757 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
758 //= type=TODO
759 //# Implementations SHOULD cancel requests by abruptly terminating any
760 //# directions of a stream that are still open. To do so, an
761 //# implementation resets the sending parts of streams and aborts reading
762 //# on the receiving parts of streams; see Section 2.4 of
763 //# [QUIC-TRANSPORT].
764
765 /// Returns the underlying stream id
766 pub fn send_id(&self) -> StreamId {
767 self.inner.stream.send_id()
768 }
769}
770
771impl<S, B> RequestStream<S, B>
772where
773 S: quic::BidiStream<B>,
774 B: Buf,
775{
776 /// Splits the Request-Stream into send and receive.
777 /// This can be used the send and receive data on different tasks.
778 pub fn split(
779 self,
780 ) -> (
781 RequestStream<S::SendStream, B>,
782 RequestStream<S::RecvStream, B>,
783 ) {
784 let (send, recv) = self.inner.split();
785 (
786 RequestStream {
787 inner: send,
788 request_end: self.request_end.clone(),
789 },
790 RequestStream {
791 inner: recv,
792 request_end: self.request_end,
793 },
794 )
795 }
796}
797
798impl Drop for RequestEnd {
799 fn drop(&mut self) {
800 if let Err(e) = self.request_end.send(self.stream_id) {
801 error!(
802 "failed to notify connection of request end: {} {}",
803 self.stream_id, e
804 );
805 }
806 }
807}
808
809pin_project! {
810 /// Future for [`Connection::read_datagram`]
811 pub struct ReadDatagram<'a, C, B>
812 where
813 C: quic::Connection<B>,
814 B: Buf,
815 {
816 conn: &'a mut Connection<C, B>,
817 _marker: PhantomData<B>,
818 }
819}
820
821impl<'a, C, B> Future for ReadDatagram<'a, C, B>
822where
823 C: quic::Connection<B> + RecvDatagramExt,
824 B: Buf,
825{
826 type Output = Result<Option<Datagram<C::Buf>>, Error>;
827
828 fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
829 tracing::trace!("poll: read_datagram");
830 match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? {
831 Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))),
832 None => Poll::Ready(Ok(None)),
833 }
834 }
835}