sec_http3/client.rs
1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4 convert::TryFrom,
5 marker::PhantomData,
6 sync::{atomic::AtomicUsize, Arc},
7 task::{Context, Poll, Waker},
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use futures_util::future;
12use http::{request, HeaderMap, Response};
13use tracing::{info, trace};
14
15use crate::{
16 config::Config,
17 connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
18 error::{Code, Error, ErrorLevel},
19 frame::FrameStream,
20 proto::{frame::Frame, headers::Header, push::PushId},
21 qpack,
22 quic::{self, StreamId},
23 stream::{self, BufRecvStream},
24};
25
26/// Start building a new HTTP/3 client
27pub fn builder() -> Builder {
28 Builder::new()
29}
30
31/// Create a new HTTP/3 client with default settings
32pub async fn new<C, O>(conn: C) -> Result<(Connection<C, Bytes>, SendRequest<O, Bytes>), Error>
33where
34 C: quic::Connection<Bytes, OpenStreams = O>,
35 O: quic::OpenStreams<Bytes>,
36{
37 //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
38 //= type=implication
39 //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP
40 //# address and UDP port, where the IP address and port might be derived
41 //# from a URI, a selected alternative service ([ALTSVC]), a configured
42 //# proxy, or name resolution of any of these.
43 Builder::new().build(conn).await
44}
45
46/// HTTP/3 request sender
47///
48/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
49/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
50/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
51/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
52/// This includes the cases where no body is sent at all.
53///
54/// This struct is cloneable so multiple requests can be sent concurrently.
55///
56/// Existing instances are atomically counted internally, so whenever all of them have been
57/// dropped, the connection will be automatically closed whith HTTP/3 connection error code
58/// `HTTP_NO_ERROR = 0`.
59///
60/// # Examples
61///
62/// ## Sending a request with no body
63///
64/// ```rust
65/// # use sec_http3::{quic, client::*};
66/// # use http::{Request, Response};
67/// # use bytes::Buf;
68/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
69/// # where
70/// # T: quic::OpenStreams<B>,
71/// # B: Buf,
72/// # {
73/// // Prepare the HTTP request to send to the server
74/// let request = Request::get("https://www.example.com/").body(())?;
75///
76/// // Send the request to the server
77/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
78/// // Don't forget to end up the request by finishing the send stream.
79/// req_stream.finish().await?;
80/// // Receive the response
81/// let response: Response<()> = req_stream.recv_response().await?;
82/// // Process the response...
83/// # Ok(())
84/// # }
85/// # pub fn main() {}
86/// ```
87///
88/// ## Sending a request with a body and trailers
89///
90/// ```rust
91/// # use sec_http3::{quic, client::*};
92/// # use http::{Request, Response, HeaderMap};
93/// # use bytes::{Buf, Bytes};
94/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
95/// # where
96/// # T: quic::OpenStreams<Bytes>,
97/// # {
98/// // Prepare the HTTP request to send to the server
99/// let request = Request::get("https://www.example.com/").body(())?;
100///
101/// // Send the request to the server
102/// let mut req_stream = send_request.send_request(request).await?;
103/// // Send some data
104/// req_stream.send_data("body".into()).await?;
105/// // Prepare the trailers
106/// let mut trailers = HeaderMap::new();
107/// trailers.insert("trailer", "value".parse()?);
108/// // Send them and finish the send stream
109/// req_stream.send_trailers(trailers).await?;
110/// // We don't need to finish the send stream, as `send_trailers()` did it for us
111///
112/// // Receive the response.
113/// let response = req_stream.recv_response().await?;
114/// // Process the response...
115/// # Ok(())
116/// # }
117/// # pub fn main() {}
118/// ```
119///
120/// [`send_request()`]: struct.SendRequest.html#method.send_request
121/// [`RequestStream`]: struct.RequestStream.html
122/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
123pub struct SendRequest<T, B>
124where
125 T: quic::OpenStreams<B>,
126 B: Buf,
127{
128 open: T,
129 conn_state: SharedStateRef,
130 max_field_section_size: u64, // maximum size for a header we receive
131 // counts instances of SendRequest to close the connection when the last is dropped.
132 sender_count: Arc<AtomicUsize>,
133 conn_waker: Option<Waker>,
134 _buf: PhantomData<fn(B)>,
135 send_grease_frame: bool,
136}
137
138impl<T, B> SendRequest<T, B>
139where
140 T: quic::OpenStreams<B>,
141 B: Buf,
142{
143 /// Send a HTTP/3 request to the server
144 pub async fn send_request(
145 &mut self,
146 req: http::Request<()>,
147 ) -> Result<RequestStream<T::BidiStream, B>, Error> {
148 let (peer_max_field_section_size, closing) = {
149 let state = self.conn_state.read("send request lock state");
150 (state.peer_config.max_field_section_size, state.closing)
151 };
152
153 if closing {
154 return Err(Error::closing());
155 }
156
157 let (parts, _) = req.into_parts();
158 let request::Parts {
159 method,
160 uri,
161 headers,
162 ..
163 } = parts;
164 let headers = Header::request(method, uri, headers, Default::default())?;
165
166 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
167 //= type=implication
168 //# A
169 //# client MUST send only a single request on a given stream.
170 let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
171 .await
172 .map_err(|e| self.maybe_conn_err(e))?;
173
174 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
175 //= type=TODO
176 //# Characters in field names MUST be
177 //# converted to lowercase prior to their encoding.
178
179 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
180 //= type=TODO
181 //# To allow for better compression efficiency, the Cookie header field
182 //# ([COOKIES]) MAY be split into separate field lines, each with one or
183 //# more cookie-pairs, before compression.
184
185 let mut block = BytesMut::new();
186 let mem_size = qpack::encode_stateless(&mut block, headers)?;
187
188 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
189 //# An implementation that
190 //# has received this parameter SHOULD NOT send an HTTP message header
191 //# that exceeds the indicated size, as the peer will likely refuse to
192 //# process it.
193 if mem_size > peer_max_field_section_size {
194 return Err(Error::header_too_big(mem_size, peer_max_field_section_size));
195 }
196
197 stream::write(&mut stream, Frame::Headers(block.freeze()))
198 .await
199 .map_err(|e| self.maybe_conn_err(e))?;
200
201 let request_stream = RequestStream {
202 inner: connection::RequestStream::new(
203 FrameStream::new(BufRecvStream::new(stream)),
204 self.max_field_section_size,
205 self.conn_state.clone(),
206 self.send_grease_frame,
207 ),
208 };
209 // send the grease frame only once
210 self.send_grease_frame = false;
211 Ok(request_stream)
212 }
213}
214
215impl<T, B> ConnectionState for SendRequest<T, B>
216where
217 T: quic::OpenStreams<B>,
218 B: Buf,
219{
220 fn shared_state(&self) -> &SharedStateRef {
221 &self.conn_state
222 }
223}
224
225impl<T, B> Clone for SendRequest<T, B>
226where
227 T: quic::OpenStreams<B> + Clone,
228 B: Buf,
229{
230 fn clone(&self) -> Self {
231 self.sender_count
232 .fetch_add(1, std::sync::atomic::Ordering::Release);
233
234 Self {
235 open: self.open.clone(),
236 conn_state: self.conn_state.clone(),
237 max_field_section_size: self.max_field_section_size,
238 sender_count: self.sender_count.clone(),
239 conn_waker: self.conn_waker.clone(),
240 _buf: PhantomData,
241 send_grease_frame: self.send_grease_frame,
242 }
243 }
244}
245
246impl<T, B> Drop for SendRequest<T, B>
247where
248 T: quic::OpenStreams<B>,
249 B: Buf,
250{
251 fn drop(&mut self) {
252 if self
253 .sender_count
254 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
255 == 1
256 {
257 if let Some(w) = Option::take(&mut self.conn_waker) {
258 w.wake()
259 }
260 self.shared_state().write("SendRequest drop").error = Some(Error::closed());
261 self.open.close(Code::H3_NO_ERROR, b"");
262 }
263 }
264}
265
266/// Client connection driver
267///
268/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
269/// It needs to be polled continously via [`poll_close()`]. On connection closure, this
270/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
271/// error occured.
272///
273/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
274/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
275/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
276/// sent to the server.
277///
278/// # Examples
279///
280/// ## Drive a connection concurrenty
281///
282/// ```rust
283/// # use bytes::Buf;
284/// # use futures_util::future;
285/// # use sec_http3::{client::*, quic};
286/// # use tokio::task::JoinHandle;
287/// # async fn doc<C, B>(mut connection: Connection<C, B>)
288/// # -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
289/// # where
290/// # C: quic::Connection<B> + Send + 'static,
291/// # C::SendStream: Send + 'static,
292/// # C::RecvStream: Send + 'static,
293/// # B: Buf + Send + 'static,
294/// # {
295/// // Run the driver on a different task
296/// tokio::spawn(async move {
297/// future::poll_fn(|cx| connection.poll_close(cx)).await?;
298/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
299/// })
300/// # }
301/// ```
302///
303/// ## Shutdown a connection gracefully
304///
305/// ```rust
306/// # use bytes::Buf;
307/// # use futures_util::future;
308/// # use sec_http3::{client::*, quic};
309/// # use tokio::{self, sync::oneshot, task::JoinHandle};
310/// # async fn doc<C, B>(mut connection: Connection<C, B>)
311/// # -> Result<(), Box<dyn std::error::Error + Send + Sync>>
312/// # where
313/// # C: quic::Connection<B> + Send + 'static,
314/// # C::SendStream: Send + 'static,
315/// # C::RecvStream: Send + 'static,
316/// # B: Buf + Send + 'static,
317/// # {
318/// // Prepare a channel to stop the driver thread
319/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
320///
321/// // Run the driver on a different task
322/// let driver = tokio::spawn(async move {
323/// tokio::select! {
324/// // Drive the connection
325/// closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed?,
326/// // Listen for shutdown condition
327/// max_streams = shutdown_rx => {
328/// // Initiate shutdown
329/// connection.shutdown(max_streams?);
330/// // Wait for ongoing work to complete
331/// future::poll_fn(|cx| connection.poll_close(cx)).await?;
332/// }
333/// };
334///
335/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
336/// });
337///
338/// // Do client things, wait for close contition...
339///
340/// // Initiate shutdown
341/// shutdown_tx.send(2);
342/// // Wait for the connection to be closed
343/// driver.await?
344/// # }
345/// ```
346/// [`poll_close()`]: struct.Connection.html#method.poll_close
347/// [`shutdown()`]: struct.Connection.html#method.shutdown
348pub struct Connection<C, B>
349where
350 C: quic::Connection<B>,
351 B: Buf,
352{
353 inner: ConnectionInner<C, B>,
354 // Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
355 sent_closing: Option<PushId>,
356 // Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
357 recv_closing: Option<StreamId>,
358}
359
360impl<C, B> Connection<C, B>
361where
362 C: quic::Connection<B>,
363 B: Buf,
364{
365 /// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
366 pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), Error> {
367 // TODO: Calculate remaining pushes once server push is implemented.
368 self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
369 }
370
371 /// Wait until the connection is closed
372 pub async fn wait_idle(&mut self) -> Result<(), Error> {
373 future::poll_fn(|cx| self.poll_close(cx)).await
374 }
375
376 /// Maintain the connection state until it is closed
377 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
378 while let Poll::Ready(result) = self.inner.poll_control(cx) {
379 match result {
380 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
381 //= type=TODO
382 //# When a 0-RTT QUIC connection is being used, the initial value of each
383 //# server setting is the value used in the previous session. Clients
384 //# SHOULD store the settings the server provided in the HTTP/3
385 //# connection where resumption information was provided, but they MAY
386 //# opt not to store settings in certain cases (e.g., if the session
387 //# ticket is received before the SETTINGS frame).
388
389 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
390 //= type=TODO
391 //# A client MUST comply
392 //# with stored settings -- or default values if no values are stored --
393 //# when attempting 0-RTT.
394
395 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
396 //= type=TODO
397 //# Once a server has provided new settings,
398 //# clients MUST comply with those values.
399 Ok(Frame::Settings(_)) => trace!("Got settings"),
400 Ok(Frame::Goaway(id)) => {
401 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
402 //# The GOAWAY frame is always sent on the control stream. In the
403 //# server-to-client direction, it carries a QUIC stream ID for a client-
404 //# initiated bidirectional stream encoded as a variable-length integer.
405 //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
406 //# of any other type as a connection error of type H3_ID_ERROR.
407 if !StreamId::from(id).is_request() {
408 return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
409 format!("non-request StreamId in a GoAway frame: {}", id),
410 ErrorLevel::ConnectionError,
411 )));
412 }
413 self.inner.process_goaway(&mut self.recv_closing, id)?;
414
415 info!("Server initiated graceful shutdown, last: StreamId({})", id);
416 }
417
418 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
419 //# If a PUSH_PROMISE frame is received on the control stream, the client
420 //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
421
422 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
423 //# A client MUST treat the
424 //# receipt of a MAX_PUSH_ID frame as a connection error of type
425 //# H3_FRAME_UNEXPECTED.
426 Ok(frame) => {
427 return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
428 format!("on client control stream: {:?}", frame),
429 ErrorLevel::ConnectionError,
430 )))
431 }
432 Err(e) => {
433 let connection_error = self.inner.shared.read("poll_close").error.clone();
434 let connection_error = match connection_error {
435 Some(e) => e,
436 None => {
437 self.inner.shared.write("poll_close error").error = Some(e.clone());
438 e
439 }
440 };
441 if connection_error.is_closed() {
442 return Poll::Ready(Ok(()));
443 }
444 return Poll::Ready(Err(connection_error));
445 }
446 }
447 }
448
449 //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
450 //# Clients MUST treat
451 //# receipt of a server-initiated bidirectional stream as a connection
452 //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
453 //# been negotiated.
454 if self.inner.poll_accept_request(cx).is_ready() {
455 return Poll::Ready(Err(self.inner.close(
456 Code::H3_STREAM_CREATION_ERROR,
457 "client received a bidirectional stream",
458 )));
459 }
460
461 Poll::Pending
462 }
463}
464
465/// HTTP/3 client builder
466///
467/// Set the configuration for a new client.
468///
469/// # Examples
470/// ```rust
471/// # use sec_http3::quic;
472/// # async fn doc<C, O, B>(quic: C)
473/// # where
474/// # C: quic::Connection<B, OpenStreams = O>,
475/// # O: quic::OpenStreams<B>,
476/// # B: bytes::Buf,
477/// # {
478/// let h3_conn = sec_http3::client::builder()
479/// .max_field_section_size(8192)
480/// .build(quic)
481/// .await
482/// .expect("Failed to build connection");
483/// # }
484/// ```
485pub struct Builder {
486 config: Config,
487}
488
489impl Builder {
490 pub(super) fn new() -> Self {
491 Builder {
492 config: Default::default(),
493 }
494 }
495
496 /// Set the maximum header size this client is willing to accept
497 ///
498 /// See [header size constraints] section of the specification for details.
499 ///
500 /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
501 pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
502 self.config.max_field_section_size = value;
503 self
504 }
505
506 /// Create a new HTTP/3 client from a `quic` connection
507 pub async fn build<C, O, B>(
508 &mut self,
509 quic: C,
510 ) -> Result<(Connection<C, B>, SendRequest<O, B>), Error>
511 where
512 C: quic::Connection<B, OpenStreams = O>,
513 O: quic::OpenStreams<B>,
514 B: Buf,
515 {
516 let open = quic.opener();
517 let conn_state = SharedStateRef::default();
518
519 let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);
520
521 Ok((
522 Connection {
523 inner: ConnectionInner::new(quic, conn_state.clone(), self.config).await?,
524 sent_closing: None,
525 recv_closing: None,
526 },
527 SendRequest {
528 open,
529 conn_state,
530 conn_waker,
531 max_field_section_size: self.config.max_field_section_size,
532 sender_count: Arc::new(AtomicUsize::new(1)),
533 send_grease_frame: self.config.send_grease,
534 _buf: PhantomData,
535 },
536 ))
537 }
538}
539
540/// Manage request bodies transfer, response and trailers.
541///
542/// Once a request has been sent via [`send_request()`], a response can be awaited by calling
543/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request
544/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`].
545///
546/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns
547/// `None`. Then the trailers will eventually be available via [`recv_trailers()`].
548///
549/// TODO: If data is polled before the response has been received, an error will be thrown.
550///
551/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be
552/// thrown
553///
554/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will
555/// put an end to any transfer concerning it.
556///
557/// # Examples
558///
559/// ```rust
560/// # use sec_http3::{quic, client::*};
561/// # use http::{Request, Response};
562/// # use bytes::Buf;
563/// # use tokio::io::AsyncWriteExt;
564/// # async fn doc<T,B>(mut req_stream: RequestStream<T, B>) -> Result<(), Box<dyn std::error::Error>>
565/// # where
566/// # T: quic::RecvStream,
567/// # {
568/// // Prepare the HTTP request to send to the server
569/// let request = Request::get("https://www.example.com/").body(())?;
570///
571/// // Receive the response
572/// let response = req_stream.recv_response().await?;
573/// // Receive the body
574/// while let Some(mut chunk) = req_stream.recv_data().await? {
575/// let mut out = tokio::io::stdout();
576/// out.write_all_buf(&mut chunk).await?;
577/// out.flush().await?;
578/// }
579/// # Ok(())
580/// # }
581/// # pub fn main() {}
582/// ```
583///
584/// [`send_request()`]: struct.SendRequest.html#method.send_request
585/// [`recv_response()`]: #method.recv_response
586/// [`recv_data()`]: #method.recv_data
587/// [`send_data()`]: #method.send_data
588/// [`send_trailers()`]: #method.send_trailers
589/// [`recv_trailers()`]: #method.recv_trailers
590/// [`finish()`]: #method.finish
591/// [`stop_sending()`]: #method.stop_sending
592pub struct RequestStream<S, B> {
593 inner: connection::RequestStream<S, B>,
594}
595
596impl<S, B> ConnectionState for RequestStream<S, B> {
597 fn shared_state(&self) -> &SharedStateRef {
598 &self.inner.conn_state
599 }
600}
601
602impl<S, B> RequestStream<S, B>
603where
604 S: quic::RecvStream,
605{
606 /// Receive the HTTP/3 response
607 ///
608 /// This should be called before trying to receive any data with [`recv_data()`].
609 ///
610 /// [`recv_data()`]: #method.recv_data
611 pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
612 let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
613 .await
614 .map_err(|e| self.maybe_conn_err(e))?
615 .ok_or_else(|| {
616 Code::H3_GENERAL_PROTOCOL_ERROR.with_reason(
617 "Did not receive response headers",
618 ErrorLevel::ConnectionError,
619 )
620 })?;
621
622 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
623 //= type=TODO
624 //# A client MUST treat
625 //# receipt of a PUSH_PROMISE frame that contains a larger push ID than
626 //# the client has advertised as a connection error of H3_ID_ERROR.
627
628 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
629 //= type=TODO
630 //# If a client
631 //# receives a push ID that has already been promised and detects a
632 //# mismatch, it MUST respond with a connection error of type
633 //# H3_GENERAL_PROTOCOL_ERROR.
634
635 let decoded = if let Frame::Headers(ref mut encoded) = frame {
636 match qpack::decode_stateless(encoded, self.inner.max_field_section_size) {
637 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
638 //# An HTTP/3 implementation MAY impose a limit on the maximum size of
639 //# the message header it will accept on an individual HTTP message.
640 Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => {
641 self.inner.stop_sending(Code::H3_REQUEST_CANCELLED);
642 return Err(Error::header_too_big(
643 cancel_size,
644 self.inner.max_field_section_size,
645 ));
646 }
647 Ok(decoded) => decoded,
648 Err(e) => return Err(e.into()),
649 }
650 } else {
651 return Err(Code::H3_FRAME_UNEXPECTED.with_reason(
652 "First response frame is not headers",
653 ErrorLevel::ConnectionError,
654 ));
655 };
656
657 let qpack::Decoded { fields, .. } = decoded;
658
659 let (status, headers) = Header::try_from(fields)?.into_response_parts()?;
660 let mut resp = Response::new(());
661 *resp.status_mut() = status;
662 *resp.headers_mut() = headers;
663 *resp.version_mut() = http::Version::HTTP_3;
664
665 Ok(resp)
666 }
667
668 /// Receive some of the request body.
669 // TODO what if called before recv_response ?
670 pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
671 self.inner.recv_data().await
672 }
673
674 /// Receive an optional set of trailers for the response.
675 pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
676 let res = self.inner.recv_trailers().await;
677 if let Err(ref e) = res {
678 if e.is_header_too_big() {
679 self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED);
680 }
681 }
682 res
683 }
684
685 /// Tell the peer to stop sending into the underlying QUIC stream
686 pub fn stop_sending(&mut self, error_code: crate::error::Code) {
687 // TODO take by value to prevent any further call as this request is cancelled
688 // rename `cancel()` ?
689 self.inner.stream.stop_sending(error_code)
690 }
691}
692
693impl<S, B> RequestStream<S, B>
694where
695 S: quic::SendStream<B>,
696 B: Buf,
697{
698 /// Send some data on the request body.
699 pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
700 self.inner.send_data(buf).await
701 }
702
703 /// Send a set of trailers to end the request.
704 ///
705 /// Either [`RequestStream::finish`] or
706 /// [`RequestStream::send_trailers`] must be called to finalize a
707 /// request.
708 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
709 self.inner.send_trailers(trailers).await
710 }
711
712 /// End the request without trailers.
713 ///
714 /// Either [`RequestStream::finish`] or
715 /// [`RequestStream::send_trailers`] must be called to finalize a
716 /// request.
717 pub async fn finish(&mut self) -> Result<(), Error> {
718 self.inner.finish().await
719 }
720
721 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
722 //= type=TODO
723 //# Implementations SHOULD cancel requests by abruptly terminating any
724 //# directions of a stream that are still open. To do so, an
725 //# implementation resets the sending parts of streams and aborts reading
726 //# on the receiving parts of streams; see Section 2.4 of
727 //# [QUIC-TRANSPORT].
728}
729
730impl<S, B> RequestStream<S, B>
731where
732 S: quic::BidiStream<B>,
733 B: Buf,
734{
735 /// Split this stream into two halves that can be driven independently.
736 pub fn split(
737 self,
738 ) -> (
739 RequestStream<S::SendStream, B>,
740 RequestStream<S::RecvStream, B>,
741 ) {
742 let (send, recv) = self.inner.split();
743 (RequestStream { inner: send }, RequestStream { inner: recv })
744 }
745}