salvo_quinn/
client.rs

1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4    convert::TryFrom,
5    marker::PhantomData,
6    sync::{atomic::AtomicUsize, Arc},
7    task::{Context, Poll, Waker},
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use futures_util::future;
12use http::{request, HeaderMap, Response};
13use tracing::{info, trace};
14
15use crate::{
16    connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
17    error::{Code, Error, ErrorLevel},
18    frame::FrameStream,
19    proto::{frame::Frame, headers::Header, varint::VarInt},
20    qpack, quic, stream,
21};
22
23/// Start building a new HTTP/3 client
24pub fn builder() -> Builder {
25    Builder::new()
26}
27
28/// Create a new HTTP/3 client with default settings
29pub async fn new<C, O>(conn: C) -> Result<(Connection<C, Bytes>, SendRequest<O, Bytes>), Error>
30where
31    C: quic::Connection<Bytes, OpenStreams = O>,
32    O: quic::OpenStreams<Bytes>,
33{
34    //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
35    //= type=implication
36    //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP
37    //# address and UDP port, where the IP address and port might be derived
38    //# from a URI, a selected alternative service ([ALTSVC]), a configured
39    //# proxy, or name resolution of any of these.
40    Builder::new().build(conn).await
41}
42
43/// HTTP/3 request sender
44///
45/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
46/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
47/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
48/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
49/// This includes the cases where no body is sent at all.
50///
51/// This struct is cloneable so multiple requests can be sent concurrently.
52///
53/// Existing instances are atomically counted internally, so whenever all of them have been
54/// dropped, the connection will be automatically closed whith HTTP/3 connection error code
55/// `HTTP_NO_ERROR = 0`.
56///
57/// # Examples
58///
59/// ## Sending a request with no body
60///
61/// ```rust
62/// # use h3::{quic, client::*};
63/// # use http::{Request, Response};
64/// # use bytes::Buf;
65/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
66/// # where
67/// #     T: quic::OpenStreams<B>,
68/// #     B: Buf,
69/// # {
70/// // Prepare the HTTP request to send to the server
71/// let request = Request::get("https://www.example.com/").body(())?;
72///
73/// // Send the request to the server
74/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
75/// // Don't forget to end up the request by finishing the send stream.
76/// req_stream.finish().await?;
77/// // Receive the response
78/// let response: Response<()> = req_stream.recv_response().await?;
79/// // Process the response...
80/// # Ok(())
81/// # }
82/// # pub fn main() {}
83/// ```
84///
85/// ## Sending a request with a body and trailers
86///
87/// ```rust
88/// # use h3::{quic, client::*};
89/// # use http::{Request, Response, HeaderMap};
90/// # use bytes::{Buf, Bytes};
91/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
92/// # where
93/// #     T: quic::OpenStreams<Bytes>,
94/// # {
95/// // Prepare the HTTP request to send to the server
96/// let request = Request::get("https://www.example.com/").body(())?;
97///
98/// // Send the request to the server
99/// let mut req_stream = send_request.send_request(request).await?;
100/// // Send some data
101/// req_stream.send_data("body".into()).await?;
102/// // Prepare the trailers
103/// let mut trailers = HeaderMap::new();
104/// trailers.insert("trailer", "value".parse()?);
105/// // Send them and finish the send stream
106/// req_stream.send_trailers(trailers).await?;
107/// // We don't need to finish the send stream, as `send_trailers()` did it for us
108///
109/// // Receive the response.
110/// let response = req_stream.recv_response().await?;
111/// // Process the response...
112/// # Ok(())
113/// # }
114/// # pub fn main() {}
115/// ```
116///
117/// [`send_request()`]: struct.SendRequest.html#method.send_request
118/// [`RequestStream`]: struct.RequestStream.html
119/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
120pub struct SendRequest<T, B>
121where
122    T: quic::OpenStreams<B>,
123    B: Buf,
124{
125    open: T,
126    conn_state: SharedStateRef,
127    max_field_section_size: u64, // maximum size for a header we receive
128    // counts instances of SendRequest to close the connection when the last is dropped.
129    sender_count: Arc<AtomicUsize>,
130    conn_waker: Option<Waker>,
131    _buf: PhantomData<fn(B)>,
132    send_grease_frame: bool,
133}
134
135impl<T, B> SendRequest<T, B>
136where
137    T: quic::OpenStreams<B>,
138    B: Buf,
139{
140    /// Send a HTTP/3 request to the server
141    pub async fn send_request(
142        &mut self,
143        req: http::Request<()>,
144    ) -> Result<RequestStream<T::BidiStream, B>, Error> {
145        let (peer_max_field_section_size, closing) = {
146            let state = self.conn_state.read("send request lock state");
147            (state.peer_max_field_section_size, state.closing)
148        };
149
150        if closing.is_some() {
151            return Err(Error::closing());
152        }
153
154        let (parts, _) = req.into_parts();
155        let request::Parts {
156            method,
157            uri,
158            headers,
159            ..
160        } = parts;
161        let headers = Header::request(method, uri, headers)?;
162
163        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
164        //= type=implication
165        //# A
166        //# client MUST send only a single request on a given stream.
167        let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
168            .await
169            .map_err(|e| self.maybe_conn_err(e))?;
170
171        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
172        //= type=TODO
173        //# Characters in field names MUST be
174        //# converted to lowercase prior to their encoding.
175
176        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
177        //= type=TODO
178        //# To allow for better compression efficiency, the Cookie header field
179        //# ([COOKIES]) MAY be split into separate field lines, each with one or
180        //# more cookie-pairs, before compression.
181
182        let mut block = BytesMut::new();
183        let mem_size = qpack::encode_stateless(&mut block, headers)?;
184
185        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
186        //# An implementation that
187        //# has received this parameter SHOULD NOT send an HTTP message header
188        //# that exceeds the indicated size, as the peer will likely refuse to
189        //# process it.
190        if mem_size > peer_max_field_section_size {
191            return Err(Error::header_too_big(mem_size, peer_max_field_section_size));
192        }
193
194        stream::write(&mut stream, Frame::Headers(block.freeze()))
195            .await
196            .map_err(|e| self.maybe_conn_err(e))?;
197
198        let request_stream = RequestStream {
199            inner: connection::RequestStream::new(
200                FrameStream::new(stream),
201                self.max_field_section_size,
202                self.conn_state.clone(),
203                self.send_grease_frame,
204            ),
205        };
206        // send the grease frame only once
207        self.send_grease_frame = false;
208        Ok(request_stream)
209    }
210}
211
212impl<T, B> ConnectionState for SendRequest<T, B>
213where
214    T: quic::OpenStreams<B>,
215    B: Buf,
216{
217    fn shared_state(&self) -> &SharedStateRef {
218        &self.conn_state
219    }
220}
221
222impl<T, B> Clone for SendRequest<T, B>
223where
224    T: quic::OpenStreams<B> + Clone,
225    B: Buf,
226{
227    fn clone(&self) -> Self {
228        self.sender_count
229            .fetch_add(1, std::sync::atomic::Ordering::Release);
230
231        Self {
232            open: self.open.clone(),
233            conn_state: self.conn_state.clone(),
234            max_field_section_size: self.max_field_section_size,
235            sender_count: self.sender_count.clone(),
236            conn_waker: self.conn_waker.clone(),
237            _buf: PhantomData,
238            send_grease_frame: self.send_grease_frame,
239        }
240    }
241}
242
243impl<T, B> Drop for SendRequest<T, B>
244where
245    T: quic::OpenStreams<B>,
246    B: Buf,
247{
248    fn drop(&mut self) {
249        if self
250            .sender_count
251            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
252            == 1
253        {
254            if let Some(w) = self.conn_waker.take() {
255                w.wake()
256            }
257            self.shared_state().write("SendRequest drop").error = Some(Error::closed());
258            self.open.close(Code::H3_NO_ERROR, b"");
259        }
260    }
261}
262
263/// Client connection driver
264///
265/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
266/// It needs to be polled continously via [`poll_close()`]. On connection closure, this
267/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
268/// error occured.
269///
270/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
271/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
272/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
273/// sent to the server.
274///
275/// # Examples
276///
277/// ## Drive a connection concurrenty
278///
279/// ```rust
280/// # use bytes::Buf;
281/// # use futures_util::future;
282/// # use h3::{client::*, quic};
283/// # use tokio::task::JoinHandle;
284/// # async fn doc<C, B>(mut connection: Connection<C, B>)
285/// #    -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
286/// # where
287/// #    C: quic::Connection<B> + Send + 'static,
288/// #    C::SendStream: Send + 'static,
289/// #    C::RecvStream: Send + 'static,
290/// #    B: Buf + Send + 'static,
291/// # {
292/// // Run the driver on a different task
293/// tokio::spawn(async move {
294///     future::poll_fn(|cx| connection.poll_close(cx)).await?;
295///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
296/// })
297/// # }
298/// ```
299///
300/// ## Shutdown a connection gracefully
301///
302/// ```rust
303/// # use bytes::Buf;
304/// # use futures_util::future;
305/// # use h3::{client::*, quic};
306/// # use tokio::{self, sync::oneshot, task::JoinHandle};
307/// # async fn doc<C, B>(mut connection: Connection<C, B>)
308/// #    -> Result<(), Box<dyn std::error::Error + Send + Sync>>
309/// # where
310/// #    C: quic::Connection<B> + Send + 'static,
311/// #    C::SendStream: Send + 'static,
312/// #    C::RecvStream: Send + 'static,
313/// #    B: Buf + Send + 'static,
314/// # {
315/// // Prepare a channel to stop the driver thread
316/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
317///
318/// // Run the driver on a different task
319/// let driver = tokio::spawn(async move {
320///     tokio::select! {
321///         // Drive the connection
322///         closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed?,
323///         // Listen for shutdown condition
324///         max_streams = shutdown_rx => {
325///             // Initiate shutdown
326///             connection.shutdown(max_streams?);
327///             // Wait for ongoing work to complete
328///             future::poll_fn(|cx| connection.poll_close(cx)).await?;
329///         }
330///     };
331///
332///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
333/// });
334///
335/// // Do client things, wait for close contition...
336///
337/// // Initiate shutdown
338/// shutdown_tx.send(2);
339/// // Wait for the connection to be closed
340/// driver.await?
341/// # }
342/// ```
343/// [`poll_close()`]: struct.Connection.html#method.poll_close
344/// [`shutdown()`]: struct.Connection.html#method.shutdown
345pub struct Connection<C, B>
346where
347    C: quic::Connection<B>,
348    B: Buf,
349{
350    inner: ConnectionInner<C, B>,
351}
352
353impl<C, B> Connection<C, B>
354where
355    C: quic::Connection<B>,
356    B: Buf,
357{
358    /// Itiniate a graceful shutdown, accepting `max_request` potentially in-flight server push
359    pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
360        self.inner.shutdown(max_requests).await
361    }
362
363    /// Wait until the connection is closed
364    pub async fn wait_idle(&mut self) -> Result<(), Error> {
365        future::poll_fn(|cx| self.poll_close(cx)).await
366    }
367
368    /// Maintain the connection state until it is closed
369    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
370        while let Poll::Ready(result) = self.inner.poll_control(cx) {
371            match result {
372                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
373                //= type=TODO
374                //# When a 0-RTT QUIC connection is being used, the initial value of each
375                //# server setting is the value used in the previous session.  Clients
376                //# SHOULD store the settings the server provided in the HTTP/3
377                //# connection where resumption information was provided, but they MAY
378                //# opt not to store settings in certain cases (e.g., if the session
379                //# ticket is received before the SETTINGS frame).
380
381                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
382                //= type=TODO
383                //# A client MUST comply
384                //# with stored settings -- or default values if no values are stored --
385                //# when attempting 0-RTT.
386
387                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
388                //= type=TODO
389                //# Once a server has provided new settings,
390                //# clients MUST comply with those values.
391                Ok(Frame::Settings(_)) => trace!("Got settings"),
392                Ok(Frame::Goaway(id)) => {
393                    //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
394                    //# The GOAWAY frame is always sent on the control stream.  In the
395                    //# server-to-client direction, it carries a QUIC stream ID for a client-
396                    //# initiated bidirectional stream encoded as a variable-length integer.
397                    //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
398                    //# of any other type as a connection error of type H3_ID_ERROR.
399                    if !id.is_request() {
400                        return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
401                            format!("non-request StreamId in a GoAway frame: {}", id),
402                            ErrorLevel::ConnectionError,
403                        )));
404                    }
405                    info!("Server initiated graceful shutdown, last: StreamId({})", id);
406                }
407
408                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
409                //# If a PUSH_PROMISE frame is received on the control stream, the client
410                //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
411
412                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
413                //# A client MUST treat the
414                //# receipt of a MAX_PUSH_ID frame as a connection error of type
415                //# H3_FRAME_UNEXPECTED.
416                Ok(frame) => {
417                    return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
418                        format!("on client control stream: {:?}", frame),
419                        ErrorLevel::ConnectionError,
420                    )))
421                }
422                Err(e) => {
423                    let connection_error = self
424                        .inner
425                        .shared
426                        .read("poll_close error read")
427                        .error
428                        .as_ref()
429                        .cloned();
430
431                    match connection_error {
432                        Some(e) if e.is_closed() => return Poll::Ready(Ok(())),
433                        Some(e) => return Poll::Ready(Err(e)),
434                        None => {
435                            self.inner.shared.write("poll_close error").error = e.clone().into();
436                            return Poll::Ready(Err(e));
437                        }
438                    }
439                }
440            }
441        }
442
443        //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
444        //# Clients MUST treat
445        //# receipt of a server-initiated bidirectional stream as a connection
446        //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
447        //# been negotiated.
448        if self.inner.poll_accept_request(cx).is_ready() {
449            return Poll::Ready(Err(self.inner.close(
450                Code::H3_STREAM_CREATION_ERROR,
451                "client received a bidirectional stream",
452            )));
453        }
454
455        Poll::Pending
456    }
457}
458
459/// HTTP/3 client builder
460///
461/// Set the configuration for a new client.
462///
463/// # Examples
464/// ```rust
465/// # use h3::quic;
466/// # async fn doc<C, O, B>(quic: C)
467/// # where
468/// #   C: quic::Connection<B, OpenStreams = O>,
469/// #   O: quic::OpenStreams<B>,
470/// #   B: bytes::Buf,
471/// # {
472/// let h3_conn = h3::client::builder()
473///     .max_field_section_size(8192)
474///     .build(quic)
475///     .await
476///     .expect("Failed to build connection");
477/// # }
478/// ```
479pub struct Builder {
480    max_field_section_size: u64,
481    send_grease: bool,
482}
483
484impl Builder {
485    pub(super) fn new() -> Self {
486        Builder {
487            max_field_section_size: VarInt::MAX.0,
488            send_grease: true,
489        }
490    }
491
492    /// Set the maximum header size this client is willing to accept
493    ///
494    /// See [header size constraints] section of the specification for details.
495    ///
496    /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
497    pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
498        self.max_field_section_size = value;
499        self
500    }
501
502    /// Create a new HTTP/3 client from a `quic` connection
503    pub async fn build<C, O, B>(
504        &mut self,
505        quic: C,
506    ) -> Result<(Connection<C, B>, SendRequest<O, B>), Error>
507    where
508        C: quic::Connection<B, OpenStreams = O>,
509        O: quic::OpenStreams<B>,
510        B: Buf,
511    {
512        let open = quic.opener();
513        let conn_state = SharedStateRef::default();
514
515        let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);
516
517        Ok((
518            Connection {
519                inner: ConnectionInner::new(
520                    quic,
521                    self.max_field_section_size,
522                    conn_state.clone(),
523                    self.send_grease,
524                )
525                .await?,
526            },
527            SendRequest {
528                open,
529                conn_state,
530                conn_waker,
531                max_field_section_size: self.max_field_section_size,
532                sender_count: Arc::new(AtomicUsize::new(1)),
533                _buf: PhantomData,
534                send_grease_frame: self.send_grease,
535            },
536        ))
537    }
538}
539
540/// Manage request bodies transfer, response and trailers.
541///
542/// Once a request has been sent via [`send_request()`], a response can be awaited by calling
543/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request
544/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`].
545///
546/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns
547/// `None`. Then the trailers will eventually be available via [`recv_trailers()`].
548///
549/// TODO: If data is polled before the response has been received, an error will be thrown.
550///
551/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be
552/// thrown
553///
554/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will
555/// put an end to any transfer concerning it.
556///
557/// # Examples
558///
559/// ```rust
560/// # use h3::{quic, client::*};
561/// # use http::{Request, Response};
562/// # use bytes::Buf;
563/// # use tokio::io::AsyncWriteExt;
564/// # async fn doc<T,B>(mut req_stream: RequestStream<T, B>) -> Result<(), Box<dyn std::error::Error>>
565/// # where
566/// #     T: quic::RecvStream,
567/// #     B: Buf,
568/// # {
569/// // Prepare the HTTP request to send to the server
570/// let request = Request::get("https://www.example.com/").body(())?;
571///
572/// // Receive the response
573/// let response = req_stream.recv_response().await?;
574/// // Receive the body
575/// while let Some(mut chunk) = req_stream.recv_data().await? {
576///     let mut out = tokio::io::stdout();
577///     out.write_all_buf(&mut chunk).await?;
578///     out.flush().await?;
579/// }
580/// # Ok(())
581/// # }
582/// # pub fn main() {}
583/// ```
584///
585/// [`send_request()`]: struct.SendRequest.html#method.send_request
586/// [`recv_response()`]: #method.recv_response
587/// [`recv_data()`]: #method.recv_data
588/// [`send_data()`]: #method.send_data
589/// [`send_trailers()`]: #method.send_trailers
590/// [`recv_trailers()`]: #method.recv_trailers
591/// [`finish()`]: #method.finish
592/// [`stop_sending()`]: #method.stop_sending
593pub struct RequestStream<S, B> {
594    inner: connection::RequestStream<S, B>,
595}
596
597impl<S, B> ConnectionState for RequestStream<S, B> {
598    fn shared_state(&self) -> &SharedStateRef {
599        &self.inner.conn_state
600    }
601}
602
603impl<S, B> RequestStream<S, B>
604where
605    S: quic::RecvStream,
606{
607    /// Receive the HTTP/3 response
608    ///
609    /// This should be called before trying to receive any data with [`recv_data()`].
610    ///
611    /// [`recv_data()`]: #method.recv_data
612    pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
613        let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
614            .await
615            .map_err(|e| self.maybe_conn_err(e))?
616            .ok_or_else(|| {
617                Code::H3_GENERAL_PROTOCOL_ERROR.with_reason(
618                    "Did not receive response headers",
619                    ErrorLevel::ConnectionError,
620                )
621            })?;
622
623        //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
624        //= type=TODO
625        //# A client MUST treat
626        //# receipt of a PUSH_PROMISE frame that contains a larger push ID than
627        //# the client has advertised as a connection error of H3_ID_ERROR.
628
629        //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
630        //= type=TODO
631        //# If a client
632        //# receives a push ID that has already been promised and detects a
633        //# mismatch, it MUST respond with a connection error of type
634        //# H3_GENERAL_PROTOCOL_ERROR.
635
636        let decoded = if let Frame::Headers(ref mut encoded) = frame {
637            match qpack::decode_stateless(encoded, self.inner.max_field_section_size) {
638                //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
639                //# An HTTP/3 implementation MAY impose a limit on the maximum size of
640                //# the message header it will accept on an individual HTTP message.
641                Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => {
642                    self.inner.stop_sending(Code::H3_REQUEST_CANCELLED);
643                    return Err(Error::header_too_big(
644                        cancel_size,
645                        self.inner.max_field_section_size,
646                    ));
647                }
648                Ok(decoded) => decoded,
649                Err(e) => return Err(e.into()),
650            }
651        } else {
652            return Err(Code::H3_FRAME_UNEXPECTED.with_reason(
653                "First response frame is not headers",
654                ErrorLevel::ConnectionError,
655            ));
656        };
657
658        let qpack::Decoded { fields, .. } = decoded;
659
660        let (status, headers) = Header::try_from(fields)?.into_response_parts()?;
661        let mut resp = Response::new(());
662        *resp.status_mut() = status;
663        *resp.headers_mut() = headers;
664        *resp.version_mut() = http::Version::HTTP_3;
665
666        Ok(resp)
667    }
668
669    /// Receive some of the request body.
670    // TODO what if called before recv_response ?
671    pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
672        self.inner.recv_data().await
673    }
674
675    /// Receive an optional set of trailers for the response.
676    pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
677        let res = self.inner.recv_trailers().await;
678        if let Err(ref e) = res {
679            if e.is_header_too_big() {
680                self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED);
681            }
682        }
683        res
684    }
685
686    /// Tell the peer to stop sending into the underlying QUIC stream
687    pub fn stop_sending(&mut self, error_code: crate::error::Code) {
688        // TODO take by value to prevent any further call as this request is cancelled
689        // rename `cancel()` ?
690        self.inner.stream.stop_sending(error_code)
691    }
692}
693
694impl<S, B> RequestStream<S, B>
695where
696    S: quic::SendStream<B>,
697    B: Buf,
698{
699    /// Send some data on the request body.
700    pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
701        self.inner.send_data(buf).await
702    }
703
704    /// Send a set of trailers to end the request.
705    ///
706    /// Either [`RequestStream::finish`] or
707    /// [`RequestStream::send_trailers`] must be called to finalize a
708    /// request.
709    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
710        self.inner.send_trailers(trailers).await
711    }
712
713    /// End the request without trailers.
714    ///
715    /// Either [`RequestStream::finish`] or
716    /// [`RequestStream::send_trailers`] must be called to finalize a
717    /// request.
718    pub async fn finish(&mut self) -> Result<(), Error> {
719        self.inner.finish().await
720    }
721
722    //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
723    //= type=TODO
724    //# Implementations SHOULD cancel requests by abruptly terminating any
725    //# directions of a stream that are still open.  To do so, an
726    //# implementation resets the sending parts of streams and aborts reading
727    //# on the receiving parts of streams; see Section 2.4 of
728    //# [QUIC-TRANSPORT].
729}
730
731impl<S, B> RequestStream<S, B>
732where
733    S: quic::BidiStream<B>,
734    B: Buf,
735{
736    /// Split this stream into two halves that can be driven independently.
737    pub fn split(
738        self,
739    ) -> (
740        RequestStream<S::SendStream, B>,
741        RequestStream<S::RecvStream, B>,
742    ) {
743        let (send, recv) = self.inner.split();
744        (RequestStream { inner: send }, RequestStream { inner: recv })
745    }
746}