sec_http3/
server.rs

1//! This module provides methods to create a http/3 Server.
2//!
3//! It allows to accept incoming requests, and send responses.
4//!
5//! # Examples
6//!
7//! ## Simple example
8//! ```rust
9//! async fn doc<C>(conn: C)
10//! where
11//! C: sec_http3::quic::Connection<bytes::Bytes>,
12//! <C as sec_http3::quic::Connection<bytes::Bytes>>::BidiStream: Send + 'static
13//! {
14//!     let mut server_builder = sec_http3::server::builder();
15//!     // Build the Connection
16//!     let mut h3_conn = server_builder.build(conn).await.unwrap();
17//!     loop {
18//!         // Accept incoming requests
19//!         match h3_conn.accept().await {
20//!             Ok(Some((req, mut stream))) => {
21//!                 // spawn a new task to handle the request
22//!                 tokio::spawn(async move {
23//!                     // build a http response
24//!                     let response = http::Response::builder().status(http::StatusCode::OK).body(()).unwrap();
25//!                     // send the response to the wire
26//!                     stream.send_response(response).await.unwrap();
27//!                     // send some date
28//!                     stream.send_data(bytes::Bytes::from("test")).await.unwrap();
29//!                     // finnish the stream
30//!                     stream.finish().await.unwrap();
31//!                 });
32//!             }
33//!             Ok(None) => {
34//!                 // break if no Request is accepted
35//!                 break;
36//!             }
37//!             Err(err) => {
38//!                 match err.get_error_level() {
39//!                     // break on connection errors
40//!                     sec_http3::error::ErrorLevel::ConnectionError => break,
41//!                     // continue on stream errors
42//!                     sec_http3::error::ErrorLevel::StreamError => continue,
43//!                 }
44//!             }
45//!         }
46//!     }
47//! }
48//! ```
49//!
50//! ## File server
51//! A ready-to-use example of a file server is available [here](https://github.com/security-union/sec-http3/blob/master/examples/client.rs)
52
53use std::{
54    collections::HashSet,
55    marker::PhantomData,
56    option::Option,
57    result::Result,
58    sync::Arc,
59    task::{Context, Poll},
60};
61
62use bytes::{Buf, BytesMut};
63use futures_util::{
64    future::{self, Future},
65    ready,
66};
67use http::{response, HeaderMap, Request, Response};
68use pin_project_lite::pin_project;
69use quic::RecvStream;
70use quic::StreamId;
71use tokio::sync::mpsc;
72
73use crate::{
74    config::Config,
75    connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
76    error::{Code, Error, ErrorLevel},
77    ext::Datagram,
78    frame::{FrameStream, FrameStreamError},
79    proto::{
80        frame::{Frame, PayloadLen},
81        headers::Header,
82        push::PushId,
83    },
84    qpack,
85    quic::{self, RecvDatagramExt, SendDatagramExt, SendStream as _},
86    request::ResolveRequest,
87    stream::{self, BufRecvStream},
88};
89use tracing::{error, trace, warn};
90
91/// Create a builder of HTTP/3 server connections
92///
93/// This function creates a [`Builder`] that carries settings that can
94/// be shared between server connections.
95pub fn builder() -> Builder {
96    Builder::new()
97}
98
99/// Server connection driver
100///
101/// The [`Connection`] struct manages a connection from the side of the HTTP/3 server
102///
103/// Create a new Instance with [`Connection::new()`].
104/// Accept incoming requests with [`Connection::accept()`].
105/// And shutdown a connection with [`Connection::shutdown()`].
106pub struct Connection<C, B>
107where
108    C: quic::Connection<B>,
109    B: Buf,
110{
111    /// TODO: temporarily break encapsulation for `WebTransportSession`
112    pub inner: ConnectionInner<C, B>,
113    max_field_section_size: u64,
114    // List of all incoming streams that are currently running.
115    ongoing_streams: HashSet<StreamId>,
116    // Let the streams tell us when they are no longer running.
117    request_end_recv: mpsc::UnboundedReceiver<StreamId>,
118    request_end_send: mpsc::UnboundedSender<StreamId>,
119    // Has a GOAWAY frame been sent? If so, this StreamId is the last we are willing to accept.
120    sent_closing: Option<StreamId>,
121    // Has a GOAWAY frame been received? If so, this is PushId the last the remote will accept.
122    recv_closing: Option<PushId>,
123    // The id of the last stream received by this connection.
124    last_accepted_stream: Option<StreamId>,
125}
126
127impl<C, B> ConnectionState for Connection<C, B>
128where
129    C: quic::Connection<B>,
130    B: Buf,
131{
132    fn shared_state(&self) -> &SharedStateRef {
133        &self.inner.shared
134    }
135}
136
137impl<C, B> Connection<C, B>
138where
139    C: quic::Connection<B>,
140    B: Buf,
141{
142    /// Create a new HTTP/3 server connection with default settings
143    ///
144    /// Use a custom [`Builder`] with [`builder()`] to create a connection
145    /// with different settings.
146    /// Provide a Connection which implements [`quic::Connection`].
147    pub async fn new(conn: C) -> Result<Self, Error> {
148        builder().build(conn).await
149    }
150
151    /// Closes the connection with a code and a reason.
152    pub fn close<T: AsRef<str>>(&mut self, code: Code, reason: T) -> Error {
153        self.inner.close(code, reason)
154    }
155}
156
157impl<C, B> Connection<C, B>
158where
159    C: quic::Connection<B>,
160    B: Buf,
161{
162    /// Accept an incoming request.
163    ///
164    /// It returns a tuple with a [`http::Request`] and an [`RequestStream`].
165    /// The [`http::Request`] is the received request from the client.
166    /// The [`RequestStream`] can be used to send the response.
167    pub async fn accept(
168        &mut self,
169    ) -> Result<Option<(Request<()>, RequestStream<C::BidiStream, B>)>, Error> {
170        // Accept the incoming stream
171        let mut stream = match future::poll_fn(|cx| self.poll_accept_request(cx)).await {
172            Ok(Some(s)) => FrameStream::new(BufRecvStream::new(s)),
173            Ok(None) => {
174                // We always send a last GoAway frame to the client, so it knows which was the last
175                // non-rejected request.
176                self.shutdown(0).await?;
177                return Ok(None);
178            }
179            Err(err) => {
180                match err.inner.kind {
181                    crate::error::Kind::Closed => return Ok(None),
182                    crate::error::Kind::Application {
183                        code,
184                        reason,
185                        level: ErrorLevel::ConnectionError,
186                    } => {
187                        return Err(self.inner.close(
188                            code,
189                            reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
190                        ))
191                    }
192                    _ => return Err(err),
193                };
194            }
195        };
196
197        let frame = future::poll_fn(|cx| stream.poll_next(cx)).await;
198        let req = self.accept_with_frame(stream, frame)?;
199        if let Some(req) = req {
200            Ok(Some(req.resolve().await?))
201        } else {
202            Ok(None)
203        }
204    }
205
206    /// Accepts an http request where the first frame has already been read and decoded.
207    ///
208    ///
209    /// This is needed as a bidirectional stream may be read as part of incoming webtransport
210    /// bi-streams. If it turns out that the stream is *not* a `WEBTRANSPORT_STREAM` the request
211    /// may still want to be handled and passed to the user.
212    pub fn accept_with_frame(
213        &mut self,
214        mut stream: FrameStream<C::BidiStream, B>,
215        frame: Result<Option<Frame<PayloadLen>>, FrameStreamError>,
216    ) -> Result<Option<ResolveRequest<C, B>>, Error> {
217        let mut encoded = match frame {
218            Ok(Some(Frame::Headers(h))) => h,
219
220            //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
221            //# If a client-initiated
222            //# stream terminates without enough of the HTTP message to provide a
223            //# complete response, the server SHOULD abort its response stream with
224            //# the error code H3_REQUEST_INCOMPLETE.
225            Ok(None) => {
226                return Err(self.inner.close(
227                    Code::H3_REQUEST_INCOMPLETE,
228                    "request stream closed before headers",
229                ));
230            }
231
232            //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
233            //# Receipt of an invalid sequence of frames MUST be treated as a
234            //# connection error of type H3_FRAME_UNEXPECTED.
235
236            //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
237            //# A server MUST treat the
238            //# receipt of a PUSH_PROMISE frame as a connection error of type
239            //# H3_FRAME_UNEXPECTED.
240            Ok(Some(_)) => {
241                //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
242                //# Receipt of an invalid sequence of frames MUST be treated as a
243                //# connection error of type H3_FRAME_UNEXPECTED.
244                // Close if the first frame is not a header frame
245                return Err(self.inner.close(
246                    Code::H3_FRAME_UNEXPECTED,
247                    "first request frame is not headers",
248                ));
249            }
250            Err(e) => {
251                let err: Error = e.into();
252                if err.is_closed() {
253                    return Ok(None);
254                }
255                match err.inner.kind {
256                    crate::error::Kind::Closed => return Ok(None),
257                    crate::error::Kind::Application {
258                        code,
259                        reason,
260                        level: ErrorLevel::ConnectionError,
261                    } => {
262                        return Err(self.inner.close(
263                            code,
264                            reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
265                        ))
266                    }
267                    crate::error::Kind::Application {
268                        code,
269                        reason: _,
270                        level: ErrorLevel::StreamError,
271                    } => {
272                        stream.reset(code.into());
273                        return Err(err);
274                    }
275                    _ => return Err(err),
276                };
277            }
278        };
279
280        let mut request_stream = RequestStream {
281            request_end: Arc::new(RequestEnd {
282                request_end: self.request_end_send.clone(),
283                stream_id: stream.send_id(),
284            }),
285            inner: connection::RequestStream::new(
286                stream,
287                self.max_field_section_size,
288                self.inner.shared.clone(),
289                self.inner.send_grease_frame,
290            ),
291        };
292
293        let decoded = match qpack::decode_stateless(&mut encoded, self.max_field_section_size) {
294            //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
295            //# An HTTP/3 implementation MAY impose a limit on the maximum size of
296            //# the message header it will accept on an individual HTTP message.
297            Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => Err(cancel_size),
298            Ok(decoded) => {
299                // send the grease frame only once
300                self.inner.send_grease_frame = false;
301                Ok(decoded)
302            }
303            Err(e) => {
304                let err: Error = e.into();
305                if err.is_closed() {
306                    return Ok(None);
307                }
308                match err.inner.kind {
309                    crate::error::Kind::Closed => return Ok(None),
310                    crate::error::Kind::Application {
311                        code,
312                        reason,
313                        level: ErrorLevel::ConnectionError,
314                    } => {
315                        return Err(self.inner.close(
316                            code,
317                            reason.unwrap_or_else(|| String::into_boxed_str(String::from(""))),
318                        ))
319                    }
320                    crate::error::Kind::Application {
321                        code,
322                        reason: _,
323                        level: ErrorLevel::StreamError,
324                    } => {
325                        request_stream.stop_stream(code);
326                        return Err(err);
327                    }
328                    _ => return Err(err),
329                };
330            }
331        };
332
333        Ok(Some(ResolveRequest::new(
334            request_stream,
335            decoded,
336            self.max_field_section_size,
337        )))
338    }
339
340    /// Initiate a graceful shutdown, accepting `max_request` potentially still in-flight
341    ///
342    /// See [connection shutdown](https://www.rfc-editor.org/rfc/rfc9114.html#connection-shutdown) for more information.
343    pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
344        let max_id = self
345            .last_accepted_stream
346            .map(|id| id + max_requests)
347            .unwrap_or(StreamId::FIRST_REQUEST);
348
349        self.inner.shutdown(&mut self.sent_closing, max_id).await
350    }
351
352    /// Accepts an incoming bidirectional stream.
353    ///
354    /// This could be either a *Request* or a *WebTransportBiStream*, the first frame's type
355    /// decides.
356    pub fn poll_accept_request(
357        &mut self,
358        cx: &mut Context<'_>,
359    ) -> Poll<Result<Option<C::BidiStream>, Error>> {
360        let _ = self.poll_control(cx)?;
361        let _ = self.poll_requests_completion(cx);
362        loop {
363            match self.inner.poll_accept_request(cx) {
364                Poll::Ready(Err(x)) => break Poll::Ready(Err(x)),
365                Poll::Ready(Ok(None)) => {
366                    if self.poll_requests_completion(cx).is_ready() {
367                        break Poll::Ready(Ok(None));
368                    } else {
369                        // Wait for all the requests to be finished, request_end_recv will wake
370                        // us on each request completion.
371                        break Poll::Pending;
372                    }
373                }
374                Poll::Pending => {
375                    if self.recv_closing.is_some() && self.poll_requests_completion(cx).is_ready() {
376                        // The connection is now idle.
377                        break Poll::Ready(Ok(None));
378                    } else {
379                        return Poll::Pending;
380                    }
381                }
382                Poll::Ready(Ok(Some(mut s))) => {
383                    // When the connection is in a graceful shutdown procedure, reject all
384                    // incoming requests not belonging to the grace interval. It's possible that
385                    // some acceptable request streams arrive after rejected requests.
386                    if let Some(max_id) = self.sent_closing {
387                        if s.send_id() > max_id {
388                            s.stop_sending(Code::H3_REQUEST_REJECTED.value());
389                            s.reset(Code::H3_REQUEST_REJECTED.value());
390                            if self.poll_requests_completion(cx).is_ready() {
391                                break Poll::Ready(Ok(None));
392                            }
393                            continue;
394                        }
395                    }
396                    self.last_accepted_stream = Some(s.send_id());
397                    self.ongoing_streams.insert(s.send_id());
398                    break Poll::Ready(Ok(Some(s)));
399                }
400            };
401        }
402    }
403
404    pub(crate) fn poll_control(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
405        while (self.poll_next_control(cx)?).is_ready() {}
406        Poll::Pending
407    }
408
409    pub(crate) fn poll_next_control(
410        &mut self,
411        cx: &mut Context<'_>,
412    ) -> Poll<Result<Frame<PayloadLen>, Error>> {
413        let frame = ready!(self.inner.poll_control(cx))?;
414
415        match &frame {
416            Frame::Settings(w) => trace!("Got settings > {:?}", w),
417            &Frame::Goaway(id) => self.inner.process_goaway(&mut self.recv_closing, id)?,
418            f @ Frame::MaxPushId(_) | f @ Frame::CancelPush(_) => {
419                warn!("Control frame ignored {:?}", f);
420
421                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.3
422                //= type=TODO
423                //# If a server receives a CANCEL_PUSH frame for a push
424                //# ID that has not yet been mentioned by a PUSH_PROMISE frame, this MUST
425                //# be treated as a connection error of type H3_ID_ERROR.
426
427                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
428                //= type=TODO
429                //# A MAX_PUSH_ID frame cannot reduce the maximum push
430                //# ID; receipt of a MAX_PUSH_ID frame that contains a smaller value than
431                //# previously received MUST be treated as a connection error of type
432                //# H3_ID_ERROR.
433            }
434
435            //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
436            //# A server MUST treat the
437            //# receipt of a PUSH_PROMISE frame as a connection error of type
438            //# H3_FRAME_UNEXPECTED.
439            frame => {
440                return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
441                    format!("on server control stream: {:?}", frame),
442                    ErrorLevel::ConnectionError,
443                )))
444            }
445        }
446        Poll::Ready(Ok(frame))
447    }
448
449    fn poll_requests_completion(&mut self, cx: &mut Context<'_>) -> Poll<()> {
450        loop {
451            match self.request_end_recv.poll_recv(cx) {
452                // The channel is closed
453                Poll::Ready(None) => return Poll::Ready(()),
454                // A request has completed
455                Poll::Ready(Some(id)) => {
456                    self.ongoing_streams.remove(&id);
457                }
458                Poll::Pending => {
459                    if self.ongoing_streams.is_empty() {
460                        // Tell the caller there is not more ongoing requests.
461                        // Still, the completion of future requests will wake us.
462                        return Poll::Ready(());
463                    } else {
464                        return Poll::Pending;
465                    }
466                }
467            }
468        }
469    }
470}
471
472impl<C, B> Connection<C, B>
473where
474    C: quic::Connection<B> + SendDatagramExt<B>,
475    B: Buf,
476{
477    /// Sends a datagram
478    pub fn send_datagram(&mut self, stream_id: StreamId, data: B) -> Result<(), Error> {
479        self.inner
480            .conn
481            .send_datagram(Datagram::new(stream_id, data))?;
482        Ok(())
483    }
484}
485
486impl<C, B> Connection<C, B>
487where
488    C: quic::Connection<B> + RecvDatagramExt,
489    B: Buf,
490{
491    /// Reads an incoming datagram
492    pub fn read_datagram(&mut self) -> ReadDatagram<C, B> {
493        ReadDatagram {
494            conn: self,
495            _marker: PhantomData,
496        }
497    }
498}
499
500impl<C, B> Drop for Connection<C, B>
501where
502    C: quic::Connection<B>,
503    B: Buf,
504{
505    fn drop(&mut self) {
506        self.inner.close(Code::H3_NO_ERROR, "");
507    }
508}
509
510//= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
511//= type=TODO
512//# In order to
513//# permit these streams to open, an HTTP/3 server SHOULD configure non-
514//# zero minimum values for the number of permitted streams and the
515//# initial stream flow-control window.
516
517//= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
518//= type=TODO
519//# So as to not unnecessarily limit
520//# parallelism, at least 100 request streams SHOULD be permitted at a
521//# time.
522
523/// Builder of HTTP/3 server connections.
524///
525/// Use this struct to create a new [`Connection`].
526/// Settings for the [`Connection`] can be provided here.
527///
528/// # Example
529///
530/// ```rust
531/// fn doc<C,B>(conn: C)
532/// where
533/// C: sec_http3::quic::Connection<B>,
534/// B: bytes::Buf,
535/// {
536///     let mut server_builder = sec_http3::server::builder();
537///     // Set the maximum header size
538///     server_builder.max_field_section_size(1000);
539///     // do not send grease types
540///     server_builder.send_grease(false);
541///     // Build the Connection
542///     let mut h3_conn = server_builder.build(conn);
543/// }
544/// ```
545pub struct Builder {
546    pub(crate) config: Config,
547}
548
549impl Builder {
550    /// Creates a new [`Builder`] with default settings.
551    pub(super) fn new() -> Self {
552        Builder {
553            config: Default::default(),
554        }
555    }
556
557    /// Set the maximum header size this client is willing to accept
558    ///
559    /// See [header size constraints] section of the specification for details.
560    ///
561    /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
562    pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
563        self.config.max_field_section_size = value;
564        self
565    }
566
567    /// Send grease values to the Client.
568    /// See [setting](https://www.rfc-editor.org/rfc/rfc9114.html#settings-parameters), [frame](https://www.rfc-editor.org/rfc/rfc9114.html#frame-reserved) and [stream](https://www.rfc-editor.org/rfc/rfc9114.html#stream-grease) for more information.
569    #[inline]
570    pub fn send_grease(&mut self, value: bool) -> &mut Self {
571        self.config.send_grease = value;
572        self
573    }
574
575    /// Indicates to the peer that WebTransport is supported.
576    ///
577    /// See: [establishing a webtransport session](https://datatracker.ietf.org/doc/html/draft-ietf-webtrans-http3/#section-3.1)
578    ///
579    ///
580    /// **Server**:
581    /// Supporting for webtransport also requires setting `enable_connect` `enable_datagram`
582    /// and `max_webtransport_sessions`.
583    #[inline]
584    pub fn enable_webtransport(&mut self, value: bool) -> &mut Self {
585        self.config.enable_webtransport = value;
586        self
587    }
588
589    /// Enables the CONNECT protocol
590    pub fn enable_connect(&mut self, value: bool) -> &mut Self {
591        self.config.enable_extended_connect = value;
592        self
593    }
594
595    /// Limits the maximum number of WebTransport sessions
596    pub fn max_webtransport_sessions(&mut self, value: u64) -> &mut Self {
597        self.config.max_webtransport_sessions = value;
598        self
599    }
600
601    /// Indicates that the client or server supports HTTP/3 datagrams
602    ///
603    /// See: <https://www.rfc-editor.org/rfc/rfc9297#section-2.1.1>
604    pub fn enable_datagram(&mut self, value: bool) -> &mut Self {
605        self.config.enable_datagram = value;
606        self
607    }
608}
609
610impl Builder {
611    /// Build an HTTP/3 connection from a QUIC connection
612    ///
613    /// This method creates a [`Connection`] instance with the settings in the [`Builder`].
614    pub async fn build<C, B>(&self, conn: C) -> Result<Connection<C, B>, Error>
615    where
616        C: quic::Connection<B>,
617        B: Buf,
618    {
619        let (sender, receiver) = mpsc::unbounded_channel();
620        Ok(Connection {
621            inner: ConnectionInner::new(conn, SharedStateRef::default(), self.config).await?,
622            max_field_section_size: self.config.max_field_section_size,
623            request_end_send: sender,
624            request_end_recv: receiver,
625            ongoing_streams: HashSet::new(),
626            sent_closing: None,
627            recv_closing: None,
628            last_accepted_stream: None,
629        })
630    }
631}
632
633struct RequestEnd {
634    request_end: mpsc::UnboundedSender<StreamId>,
635    stream_id: StreamId,
636}
637
638/// Manage request and response transfer for an incoming request
639///
640/// The [`RequestStream`] struct is used to send and/or receive
641/// information from the client.
642pub struct RequestStream<S, B> {
643    inner: connection::RequestStream<S, B>,
644    request_end: Arc<RequestEnd>,
645}
646
647impl<S, B> AsMut<connection::RequestStream<S, B>> for RequestStream<S, B> {
648    fn as_mut(&mut self) -> &mut connection::RequestStream<S, B> {
649        &mut self.inner
650    }
651}
652
653impl<S, B> ConnectionState for RequestStream<S, B> {
654    fn shared_state(&self) -> &SharedStateRef {
655        &self.inner.conn_state
656    }
657}
658
659impl<S, B> RequestStream<S, B>
660where
661    S: quic::RecvStream,
662    B: Buf,
663{
664    /// Receive data sent from the client
665    pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
666        self.inner.recv_data().await
667    }
668
669    /// Receive an optional set of trailers for the request
670    pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
671        self.inner.recv_trailers().await
672    }
673
674    /// Tell the peer to stop sending into the underlying QUIC stream
675    pub fn stop_sending(&mut self, error_code: crate::error::Code) {
676        self.inner.stream.stop_sending(error_code)
677    }
678
679    /// Returns the underlying stream id
680    pub fn id(&self) -> StreamId {
681        self.inner.stream.id()
682    }
683}
684
685impl<S, B> RequestStream<S, B>
686where
687    S: quic::SendStream<B>,
688    B: Buf,
689{
690    /// Send the HTTP/3 response
691    ///
692    /// This should be called before trying to send any data with
693    /// [`RequestStream::send_data`].
694    pub async fn send_response(&mut self, resp: Response<()>) -> Result<(), Error> {
695        let (parts, _) = resp.into_parts();
696        let response::Parts {
697            status, headers, ..
698        } = parts;
699        let headers = Header::response(status, headers);
700
701        let mut block = BytesMut::new();
702        let mem_size = qpack::encode_stateless(&mut block, headers)?;
703
704        let max_mem_size = self
705            .inner
706            .conn_state
707            .read("send_response")
708            .peer_config
709            .max_field_section_size;
710
711        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
712        //# An implementation that
713        //# has received this parameter SHOULD NOT send an HTTP message header
714        //# that exceeds the indicated size, as the peer will likely refuse to
715        //# process it.
716        if mem_size > max_mem_size {
717            return Err(Error::header_too_big(mem_size, max_mem_size));
718        }
719
720        stream::write(&mut self.inner.stream, Frame::Headers(block.freeze()))
721            .await
722            .map_err(|e| self.maybe_conn_err(e))?;
723
724        Ok(())
725    }
726
727    /// Send some data on the response body.
728    pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
729        self.inner.send_data(buf).await
730    }
731
732    /// Stop a stream with an error code
733    ///
734    /// The code can be [`Code::H3_NO_ERROR`].
735    pub fn stop_stream(&mut self, error_code: Code) {
736        self.inner.stop_stream(error_code);
737    }
738
739    /// Send a set of trailers to end the response.
740    ///
741    /// Either [`RequestStream::finish`] or
742    /// [`RequestStream::send_trailers`] must be called to finalize a
743    /// request.
744    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
745        self.inner.send_trailers(trailers).await
746    }
747
748    /// End the response without trailers.
749    ///
750    /// Either [`RequestStream::finish`] or
751    /// [`RequestStream::send_trailers`] must be called to finalize a
752    /// request.
753    pub async fn finish(&mut self) -> Result<(), Error> {
754        self.inner.finish().await
755    }
756
757    //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
758    //= type=TODO
759    //# Implementations SHOULD cancel requests by abruptly terminating any
760    //# directions of a stream that are still open.  To do so, an
761    //# implementation resets the sending parts of streams and aborts reading
762    //# on the receiving parts of streams; see Section 2.4 of
763    //# [QUIC-TRANSPORT].
764
765    /// Returns the underlying stream id
766    pub fn send_id(&self) -> StreamId {
767        self.inner.stream.send_id()
768    }
769}
770
771impl<S, B> RequestStream<S, B>
772where
773    S: quic::BidiStream<B>,
774    B: Buf,
775{
776    /// Splits the Request-Stream into send and receive.
777    /// This can be used the send and receive data on different tasks.
778    pub fn split(
779        self,
780    ) -> (
781        RequestStream<S::SendStream, B>,
782        RequestStream<S::RecvStream, B>,
783    ) {
784        let (send, recv) = self.inner.split();
785        (
786            RequestStream {
787                inner: send,
788                request_end: self.request_end.clone(),
789            },
790            RequestStream {
791                inner: recv,
792                request_end: self.request_end,
793            },
794        )
795    }
796}
797
798impl Drop for RequestEnd {
799    fn drop(&mut self) {
800        if let Err(e) = self.request_end.send(self.stream_id) {
801            error!(
802                "failed to notify connection of request end: {} {}",
803                self.stream_id, e
804            );
805        }
806    }
807}
808
809pin_project! {
810    /// Future for [`Connection::read_datagram`]
811    pub struct ReadDatagram<'a, C, B>
812    where
813            C: quic::Connection<B>,
814            B: Buf,
815        {
816            conn: &'a mut Connection<C, B>,
817            _marker: PhantomData<B>,
818        }
819}
820
821impl<'a, C, B> Future for ReadDatagram<'a, C, B>
822where
823    C: quic::Connection<B> + RecvDatagramExt,
824    B: Buf,
825{
826    type Output = Result<Option<Datagram<C::Buf>>, Error>;
827
828    fn poll(mut self: std::pin::Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
829        tracing::trace!("poll: read_datagram");
830        match ready!(self.conn.inner.conn.poll_accept_datagram(cx))? {
831            Some(v) => Poll::Ready(Ok(Some(Datagram::decode(v)?))),
832            None => Poll::Ready(Ok(None)),
833        }
834    }
835}