sec_http3/
client.rs

1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4    convert::TryFrom,
5    marker::PhantomData,
6    sync::{atomic::AtomicUsize, Arc},
7    task::{Context, Poll, Waker},
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use futures_util::future;
12use http::{request, HeaderMap, Response};
13use tracing::{info, trace};
14
15use crate::{
16    config::Config,
17    connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
18    error::{Code, Error, ErrorLevel},
19    frame::FrameStream,
20    proto::{frame::Frame, headers::Header, push::PushId},
21    qpack,
22    quic::{self, StreamId},
23    stream::{self, BufRecvStream},
24};
25
26/// Start building a new HTTP/3 client
27pub fn builder() -> Builder {
28    Builder::new()
29}
30
31/// Create a new HTTP/3 client with default settings
32pub async fn new<C, O>(conn: C) -> Result<(Connection<C, Bytes>, SendRequest<O, Bytes>), Error>
33where
34    C: quic::Connection<Bytes, OpenStreams = O>,
35    O: quic::OpenStreams<Bytes>,
36{
37    //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
38    //= type=implication
39    //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP
40    //# address and UDP port, where the IP address and port might be derived
41    //# from a URI, a selected alternative service ([ALTSVC]), a configured
42    //# proxy, or name resolution of any of these.
43    Builder::new().build(conn).await
44}
45
46/// HTTP/3 request sender
47///
48/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
49/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
50/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
51/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
52/// This includes the cases where no body is sent at all.
53///
54/// This struct is cloneable so multiple requests can be sent concurrently.
55///
56/// Existing instances are atomically counted internally, so whenever all of them have been
57/// dropped, the connection will be automatically closed whith HTTP/3 connection error code
58/// `HTTP_NO_ERROR = 0`.
59///
60/// # Examples
61///
62/// ## Sending a request with no body
63///
64/// ```rust
65/// # use sec_http3::{quic, client::*};
66/// # use http::{Request, Response};
67/// # use bytes::Buf;
68/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
69/// # where
70/// #     T: quic::OpenStreams<B>,
71/// #     B: Buf,
72/// # {
73/// // Prepare the HTTP request to send to the server
74/// let request = Request::get("https://www.example.com/").body(())?;
75///
76/// // Send the request to the server
77/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
78/// // Don't forget to end up the request by finishing the send stream.
79/// req_stream.finish().await?;
80/// // Receive the response
81/// let response: Response<()> = req_stream.recv_response().await?;
82/// // Process the response...
83/// # Ok(())
84/// # }
85/// # pub fn main() {}
86/// ```
87///
88/// ## Sending a request with a body and trailers
89///
90/// ```rust
91/// # use sec_http3::{quic, client::*};
92/// # use http::{Request, Response, HeaderMap};
93/// # use bytes::{Buf, Bytes};
94/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
95/// # where
96/// #     T: quic::OpenStreams<Bytes>,
97/// # {
98/// // Prepare the HTTP request to send to the server
99/// let request = Request::get("https://www.example.com/").body(())?;
100///
101/// // Send the request to the server
102/// let mut req_stream = send_request.send_request(request).await?;
103/// // Send some data
104/// req_stream.send_data("body".into()).await?;
105/// // Prepare the trailers
106/// let mut trailers = HeaderMap::new();
107/// trailers.insert("trailer", "value".parse()?);
108/// // Send them and finish the send stream
109/// req_stream.send_trailers(trailers).await?;
110/// // We don't need to finish the send stream, as `send_trailers()` did it for us
111///
112/// // Receive the response.
113/// let response = req_stream.recv_response().await?;
114/// // Process the response...
115/// # Ok(())
116/// # }
117/// # pub fn main() {}
118/// ```
119///
120/// [`send_request()`]: struct.SendRequest.html#method.send_request
121/// [`RequestStream`]: struct.RequestStream.html
122/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
123pub struct SendRequest<T, B>
124where
125    T: quic::OpenStreams<B>,
126    B: Buf,
127{
128    open: T,
129    conn_state: SharedStateRef,
130    max_field_section_size: u64, // maximum size for a header we receive
131    // counts instances of SendRequest to close the connection when the last is dropped.
132    sender_count: Arc<AtomicUsize>,
133    conn_waker: Option<Waker>,
134    _buf: PhantomData<fn(B)>,
135    send_grease_frame: bool,
136}
137
138impl<T, B> SendRequest<T, B>
139where
140    T: quic::OpenStreams<B>,
141    B: Buf,
142{
143    /// Send a HTTP/3 request to the server
144    pub async fn send_request(
145        &mut self,
146        req: http::Request<()>,
147    ) -> Result<RequestStream<T::BidiStream, B>, Error> {
148        let (peer_max_field_section_size, closing) = {
149            let state = self.conn_state.read("send request lock state");
150            (state.peer_config.max_field_section_size, state.closing)
151        };
152
153        if closing {
154            return Err(Error::closing());
155        }
156
157        let (parts, _) = req.into_parts();
158        let request::Parts {
159            method,
160            uri,
161            headers,
162            ..
163        } = parts;
164        let headers = Header::request(method, uri, headers, Default::default())?;
165
166        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
167        //= type=implication
168        //# A
169        //# client MUST send only a single request on a given stream.
170        let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
171            .await
172            .map_err(|e| self.maybe_conn_err(e))?;
173
174        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
175        //= type=TODO
176        //# Characters in field names MUST be
177        //# converted to lowercase prior to their encoding.
178
179        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
180        //= type=TODO
181        //# To allow for better compression efficiency, the Cookie header field
182        //# ([COOKIES]) MAY be split into separate field lines, each with one or
183        //# more cookie-pairs, before compression.
184
185        let mut block = BytesMut::new();
186        let mem_size = qpack::encode_stateless(&mut block, headers)?;
187
188        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
189        //# An implementation that
190        //# has received this parameter SHOULD NOT send an HTTP message header
191        //# that exceeds the indicated size, as the peer will likely refuse to
192        //# process it.
193        if mem_size > peer_max_field_section_size {
194            return Err(Error::header_too_big(mem_size, peer_max_field_section_size));
195        }
196
197        stream::write(&mut stream, Frame::Headers(block.freeze()))
198            .await
199            .map_err(|e| self.maybe_conn_err(e))?;
200
201        let request_stream = RequestStream {
202            inner: connection::RequestStream::new(
203                FrameStream::new(BufRecvStream::new(stream)),
204                self.max_field_section_size,
205                self.conn_state.clone(),
206                self.send_grease_frame,
207            ),
208        };
209        // send the grease frame only once
210        self.send_grease_frame = false;
211        Ok(request_stream)
212    }
213}
214
215impl<T, B> ConnectionState for SendRequest<T, B>
216where
217    T: quic::OpenStreams<B>,
218    B: Buf,
219{
220    fn shared_state(&self) -> &SharedStateRef {
221        &self.conn_state
222    }
223}
224
225impl<T, B> Clone for SendRequest<T, B>
226where
227    T: quic::OpenStreams<B> + Clone,
228    B: Buf,
229{
230    fn clone(&self) -> Self {
231        self.sender_count
232            .fetch_add(1, std::sync::atomic::Ordering::Release);
233
234        Self {
235            open: self.open.clone(),
236            conn_state: self.conn_state.clone(),
237            max_field_section_size: self.max_field_section_size,
238            sender_count: self.sender_count.clone(),
239            conn_waker: self.conn_waker.clone(),
240            _buf: PhantomData,
241            send_grease_frame: self.send_grease_frame,
242        }
243    }
244}
245
246impl<T, B> Drop for SendRequest<T, B>
247where
248    T: quic::OpenStreams<B>,
249    B: Buf,
250{
251    fn drop(&mut self) {
252        if self
253            .sender_count
254            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
255            == 1
256        {
257            if let Some(w) = Option::take(&mut self.conn_waker) {
258                w.wake()
259            }
260            self.shared_state().write("SendRequest drop").error = Some(Error::closed());
261            self.open.close(Code::H3_NO_ERROR, b"");
262        }
263    }
264}
265
266/// Client connection driver
267///
268/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
269/// It needs to be polled continously via [`poll_close()`]. On connection closure, this
270/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
271/// error occured.
272///
273/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
274/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
275/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
276/// sent to the server.
277///
278/// # Examples
279///
280/// ## Drive a connection concurrenty
281///
282/// ```rust
283/// # use bytes::Buf;
284/// # use futures_util::future;
285/// # use sec_http3::{client::*, quic};
286/// # use tokio::task::JoinHandle;
287/// # async fn doc<C, B>(mut connection: Connection<C, B>)
288/// #    -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
289/// # where
290/// #    C: quic::Connection<B> + Send + 'static,
291/// #    C::SendStream: Send + 'static,
292/// #    C::RecvStream: Send + 'static,
293/// #    B: Buf + Send + 'static,
294/// # {
295/// // Run the driver on a different task
296/// tokio::spawn(async move {
297///     future::poll_fn(|cx| connection.poll_close(cx)).await?;
298///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
299/// })
300/// # }
301/// ```
302///
303/// ## Shutdown a connection gracefully
304///
305/// ```rust
306/// # use bytes::Buf;
307/// # use futures_util::future;
308/// # use sec_http3::{client::*, quic};
309/// # use tokio::{self, sync::oneshot, task::JoinHandle};
310/// # async fn doc<C, B>(mut connection: Connection<C, B>)
311/// #    -> Result<(), Box<dyn std::error::Error + Send + Sync>>
312/// # where
313/// #    C: quic::Connection<B> + Send + 'static,
314/// #    C::SendStream: Send + 'static,
315/// #    C::RecvStream: Send + 'static,
316/// #    B: Buf + Send + 'static,
317/// # {
318/// // Prepare a channel to stop the driver thread
319/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
320///
321/// // Run the driver on a different task
322/// let driver = tokio::spawn(async move {
323///     tokio::select! {
324///         // Drive the connection
325///         closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed?,
326///         // Listen for shutdown condition
327///         max_streams = shutdown_rx => {
328///             // Initiate shutdown
329///             connection.shutdown(max_streams?);
330///             // Wait for ongoing work to complete
331///             future::poll_fn(|cx| connection.poll_close(cx)).await?;
332///         }
333///     };
334///
335///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
336/// });
337///
338/// // Do client things, wait for close contition...
339///
340/// // Initiate shutdown
341/// shutdown_tx.send(2);
342/// // Wait for the connection to be closed
343/// driver.await?
344/// # }
345/// ```
346/// [`poll_close()`]: struct.Connection.html#method.poll_close
347/// [`shutdown()`]: struct.Connection.html#method.shutdown
348pub struct Connection<C, B>
349where
350    C: quic::Connection<B>,
351    B: Buf,
352{
353    inner: ConnectionInner<C, B>,
354    // Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
355    sent_closing: Option<PushId>,
356    // Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
357    recv_closing: Option<StreamId>,
358}
359
360impl<C, B> Connection<C, B>
361where
362    C: quic::Connection<B>,
363    B: Buf,
364{
365    /// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
366    pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> {
367        // TODO: Calculate remaining pushes once server push is implemented.
368        self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
369    }
370
371    /// Wait until the connection is closed
372    pub async fn wait_idle(&mut self) -> Result<(), Error> {
373        future::poll_fn(|cx| self.poll_close(cx)).await
374    }
375
376    /// Maintain the connection state until it is closed
377    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
378        while let Poll::Ready(result) = self.inner.poll_control(cx) {
379            match result {
380                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
381                //= type=TODO
382                //# When a 0-RTT QUIC connection is being used, the initial value of each
383                //# server setting is the value used in the previous session.  Clients
384                //# SHOULD store the settings the server provided in the HTTP/3
385                //# connection where resumption information was provided, but they MAY
386                //# opt not to store settings in certain cases (e.g., if the session
387                //# ticket is received before the SETTINGS frame).
388
389                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
390                //= type=TODO
391                //# A client MUST comply
392                //# with stored settings -- or default values if no values are stored --
393                //# when attempting 0-RTT.
394
395                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
396                //= type=TODO
397                //# Once a server has provided new settings,
398                //# clients MUST comply with those values.
399                Ok(Frame::Settings(_)) => trace!("Got settings"),
400                Ok(Frame::Goaway(id)) => {
401                    //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
402                    //# The GOAWAY frame is always sent on the control stream.  In the
403                    //# server-to-client direction, it carries a QUIC stream ID for a client-
404                    //# initiated bidirectional stream encoded as a variable-length integer.
405                    //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
406                    //# of any other type as a connection error of type H3_ID_ERROR.
407                    if !StreamId::from(id).is_request() {
408                        return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
409                            format!("non-request StreamId in a GoAway frame: {}", id),
410                            ErrorLevel::ConnectionError,
411                        )));
412                    }
413                    self.inner.process_goaway(&mut self.recv_closing, id)?;
414
415                    info!("Server initiated graceful shutdown, last: StreamId({})", id);
416                }
417
418                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
419                //# If a PUSH_PROMISE frame is received on the control stream, the client
420                //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
421
422                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
423                //# A client MUST treat the
424                //# receipt of a MAX_PUSH_ID frame as a connection error of type
425                //# H3_FRAME_UNEXPECTED.
426                Ok(frame) => {
427                    return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
428                        format!("on client control stream: {:?}", frame),
429                        ErrorLevel::ConnectionError,
430                    )))
431                }
432                Err(e) => {
433                    let connection_error = self.inner.shared.read("poll_close").error.clone();
434                    let connection_error = match connection_error {
435                        Some(e) => e,
436                        None => {
437                            self.inner.shared.write("poll_close error").error = Some(e.clone());
438                            e
439                        }
440                    };
441                    if connection_error.is_closed() {
442                        return Poll::Ready(Ok(()));
443                    }
444                    return Poll::Ready(Err(connection_error));
445                }
446            }
447        }
448
449        //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
450        //# Clients MUST treat
451        //# receipt of a server-initiated bidirectional stream as a connection
452        //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
453        //# been negotiated.
454        if self.inner.poll_accept_request(cx).is_ready() {
455            return Poll::Ready(Err(self.inner.close(
456                Code::H3_STREAM_CREATION_ERROR,
457                "client received a bidirectional stream",
458            )));
459        }
460
461        Poll::Pending
462    }
463}
464
465/// HTTP/3 client builder
466///
467/// Set the configuration for a new client.
468///
469/// # Examples
470/// ```rust
471/// # use sec_http3::quic;
472/// # async fn doc<C, O, B>(quic: C)
473/// # where
474/// #   C: quic::Connection<B, OpenStreams = O>,
475/// #   O: quic::OpenStreams<B>,
476/// #   B: bytes::Buf,
477/// # {
478/// let h3_conn = sec_http3::client::builder()
479///     .max_field_section_size(8192)
480///     .build(quic)
481///     .await
482///     .expect("Failed to build connection");
483/// # }
484/// ```
485pub struct Builder {
486    config: Config,
487}
488
489impl Builder {
490    pub(super) fn new() -> Self {
491        Builder {
492            config: Default::default(),
493        }
494    }
495
496    /// Set the maximum header size this client is willing to accept
497    ///
498    /// See [header size constraints] section of the specification for details.
499    ///
500    /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
501    pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
502        self.config.max_field_section_size = value;
503        self
504    }
505
506    /// Create a new HTTP/3 client from a `quic` connection
507    pub async fn build<C, O, B>(
508        &mut self,
509        quic: C,
510    ) -> Result<(Connection<C, B>, SendRequest<O, B>), Error>
511    where
512        C: quic::Connection<B, OpenStreams = O>,
513        O: quic::OpenStreams<B>,
514        B: Buf,
515    {
516        let open = quic.opener();
517        let conn_state = SharedStateRef::default();
518
519        let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);
520
521        Ok((
522            Connection {
523                inner: ConnectionInner::new(quic, conn_state.clone(), self.config).await?,
524                sent_closing: None,
525                recv_closing: None,
526            },
527            SendRequest {
528                open,
529                conn_state,
530                conn_waker,
531                max_field_section_size: self.config.max_field_section_size,
532                sender_count: Arc::new(AtomicUsize::new(1)),
533                send_grease_frame: self.config.send_grease,
534                _buf: PhantomData,
535            },
536        ))
537    }
538}
539
540/// Manage request bodies transfer, response and trailers.
541///
542/// Once a request has been sent via [`send_request()`], a response can be awaited by calling
543/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request
544/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`].
545///
546/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns
547/// `None`. Then the trailers will eventually be available via [`recv_trailers()`].
548///
549/// TODO: If data is polled before the response has been received, an error will be thrown.
550///
551/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be
552/// thrown
553///
554/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will
555/// put an end to any transfer concerning it.
556///
557/// # Examples
558///
559/// ```rust
560/// # use sec_http3::{quic, client::*};
561/// # use http::{Request, Response};
562/// # use bytes::Buf;
563/// # use tokio::io::AsyncWriteExt;
564/// # async fn doc<T,B>(mut req_stream: RequestStream<T, B>) -> Result<(), Box<dyn std::error::Error>>
565/// # where
566/// #     T: quic::RecvStream,
567/// # {
568/// // Prepare the HTTP request to send to the server
569/// let request = Request::get("https://www.example.com/").body(())?;
570///
571/// // Receive the response
572/// let response = req_stream.recv_response().await?;
573/// // Receive the body
574/// while let Some(mut chunk) = req_stream.recv_data().await? {
575///     let mut out = tokio::io::stdout();
576///     out.write_all_buf(&mut chunk).await?;
577///     out.flush().await?;
578/// }
579/// # Ok(())
580/// # }
581/// # pub fn main() {}
582/// ```
583///
584/// [`send_request()`]: struct.SendRequest.html#method.send_request
585/// [`recv_response()`]: #method.recv_response
586/// [`recv_data()`]: #method.recv_data
587/// [`send_data()`]: #method.send_data
588/// [`send_trailers()`]: #method.send_trailers
589/// [`recv_trailers()`]: #method.recv_trailers
590/// [`finish()`]: #method.finish
591/// [`stop_sending()`]: #method.stop_sending
592pub struct RequestStream<S, B> {
593    inner: connection::RequestStream<S, B>,
594}
595
596impl<S, B> ConnectionState for RequestStream<S, B> {
597    fn shared_state(&self) -> &SharedStateRef {
598        &self.inner.conn_state
599    }
600}
601
602impl<S, B> RequestStream<S, B>
603where
604    S: quic::RecvStream,
605{
606    /// Receive the HTTP/3 response
607    ///
608    /// This should be called before trying to receive any data with [`recv_data()`].
609    ///
610    /// [`recv_data()`]: #method.recv_data
611    pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
612        let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
613            .await
614            .map_err(|e| self.maybe_conn_err(e))?
615            .ok_or_else(|| {
616                Code::H3_GENERAL_PROTOCOL_ERROR.with_reason(
617                    "Did not receive response headers",
618                    ErrorLevel::ConnectionError,
619                )
620            })?;
621
622        //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
623        //= type=TODO
624        //# A client MUST treat
625        //# receipt of a PUSH_PROMISE frame that contains a larger push ID than
626        //# the client has advertised as a connection error of H3_ID_ERROR.
627
628        //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
629        //= type=TODO
630        //# If a client
631        //# receives a push ID that has already been promised and detects a
632        //# mismatch, it MUST respond with a connection error of type
633        //# H3_GENERAL_PROTOCOL_ERROR.
634
635        let decoded = if let Frame::Headers(ref mut encoded) = frame {
636            match qpack::decode_stateless(encoded, self.inner.max_field_section_size) {
637                //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
638                //# An HTTP/3 implementation MAY impose a limit on the maximum size of
639                //# the message header it will accept on an individual HTTP message.
640                Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => {
641                    self.inner.stop_sending(Code::H3_REQUEST_CANCELLED);
642                    return Err(Error::header_too_big(
643                        cancel_size,
644                        self.inner.max_field_section_size,
645                    ));
646                }
647                Ok(decoded) => decoded,
648                Err(e) => return Err(e.into()),
649            }
650        } else {
651            return Err(Code::H3_FRAME_UNEXPECTED.with_reason(
652                "First response frame is not headers",
653                ErrorLevel::ConnectionError,
654            ));
655        };
656
657        let qpack::Decoded { fields, .. } = decoded;
658
659        let (status, headers) = Header::try_from(fields)?.into_response_parts()?;
660        let mut resp = Response::new(());
661        *resp.status_mut() = status;
662        *resp.headers_mut() = headers;
663        *resp.version_mut() = http::Version::HTTP_3;
664
665        Ok(resp)
666    }
667
668    /// Receive some of the request body.
669    // TODO what if called before recv_response ?
670    pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
671        self.inner.recv_data().await
672    }
673
674    /// Receive an optional set of trailers for the response.
675    pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
676        let res = self.inner.recv_trailers().await;
677        if let Err(ref e) = res {
678            if e.is_header_too_big() {
679                self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED);
680            }
681        }
682        res
683    }
684
685    /// Tell the peer to stop sending into the underlying QUIC stream
686    pub fn stop_sending(&mut self, error_code: crate::error::Code) {
687        // TODO take by value to prevent any further call as this request is cancelled
688        // rename `cancel()` ?
689        self.inner.stream.stop_sending(error_code)
690    }
691}
692
693impl<S, B> RequestStream<S, B>
694where
695    S: quic::SendStream<B>,
696    B: Buf,
697{
698    /// Send some data on the request body.
699    pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
700        self.inner.send_data(buf).await
701    }
702
703    /// Send a set of trailers to end the request.
704    ///
705    /// Either [`RequestStream::finish`] or
706    /// [`RequestStream::send_trailers`] must be called to finalize a
707    /// request.
708    pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
709        self.inner.send_trailers(trailers).await
710    }
711
712    /// End the request without trailers.
713    ///
714    /// Either [`RequestStream::finish`] or
715    /// [`RequestStream::send_trailers`] must be called to finalize a
716    /// request.
717    pub async fn finish(&mut self) -> Result<(), Error> {
718        self.inner.finish().await
719    }
720
721    //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
722    //= type=TODO
723    //# Implementations SHOULD cancel requests by abruptly terminating any
724    //# directions of a stream that are still open.  To do so, an
725    //# implementation resets the sending parts of streams and aborts reading
726    //# on the receiving parts of streams; see Section 2.4 of
727    //# [QUIC-TRANSPORT].
728}
729
730impl<S, B> RequestStream<S, B>
731where
732    S: quic::BidiStream<B>,
733    B: Buf,
734{
735    /// Split this stream into two halves that can be driven independently.
736    pub fn split(
737        self,
738    ) -> (
739        RequestStream<S::SendStream, B>,
740        RequestStream<S::RecvStream, B>,
741    ) {
742        let (send, recv) = self.inner.split();
743        (RequestStream { inner: send }, RequestStream { inner: recv })
744    }
745}