salvo_quinn/client.rs
1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4 convert::TryFrom,
5 marker::PhantomData,
6 sync::{atomic::AtomicUsize, Arc},
7 task::{Context, Poll, Waker},
8};
9
10use bytes::{Buf, Bytes, BytesMut};
11use futures_util::future;
12use http::{request, HeaderMap, Response};
13use tracing::{info, trace};
14
15use crate::{
16 connection::{self, ConnectionInner, ConnectionState, SharedStateRef},
17 error::{Code, Error, ErrorLevel},
18 frame::FrameStream,
19 proto::{frame::Frame, headers::Header, varint::VarInt},
20 qpack, quic, stream,
21};
22
23/// Start building a new HTTP/3 client
24pub fn builder() -> Builder {
25 Builder::new()
26}
27
28/// Create a new HTTP/3 client with default settings
29pub async fn new<C, O>(conn: C) -> Result<(Connection<C, Bytes>, SendRequest<O, Bytes>), Error>
30where
31 C: quic::Connection<Bytes, OpenStreams = O>,
32 O: quic::OpenStreams<Bytes>,
33{
34 //= https://www.rfc-editor.org/rfc/rfc9114#section-3.3
35 //= type=implication
36 //# Clients SHOULD NOT open more than one HTTP/3 connection to a given IP
37 //# address and UDP port, where the IP address and port might be derived
38 //# from a URI, a selected alternative service ([ALTSVC]), a configured
39 //# proxy, or name resolution of any of these.
40 Builder::new().build(conn).await
41}
42
43/// HTTP/3 request sender
44///
45/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
46/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
47/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
48/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
49/// This includes the cases where no body is sent at all.
50///
51/// This struct is cloneable so multiple requests can be sent concurrently.
52///
53/// Existing instances are atomically counted internally, so whenever all of them have been
54/// dropped, the connection will be automatically closed whith HTTP/3 connection error code
55/// `HTTP_NO_ERROR = 0`.
56///
57/// # Examples
58///
59/// ## Sending a request with no body
60///
61/// ```rust
62/// # use h3::{quic, client::*};
63/// # use http::{Request, Response};
64/// # use bytes::Buf;
65/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
66/// # where
67/// # T: quic::OpenStreams<B>,
68/// # B: Buf,
69/// # {
70/// // Prepare the HTTP request to send to the server
71/// let request = Request::get("https://www.example.com/").body(())?;
72///
73/// // Send the request to the server
74/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
75/// // Don't forget to end up the request by finishing the send stream.
76/// req_stream.finish().await?;
77/// // Receive the response
78/// let response: Response<()> = req_stream.recv_response().await?;
79/// // Process the response...
80/// # Ok(())
81/// # }
82/// # pub fn main() {}
83/// ```
84///
85/// ## Sending a request with a body and trailers
86///
87/// ```rust
88/// # use h3::{quic, client::*};
89/// # use http::{Request, Response, HeaderMap};
90/// # use bytes::{Buf, Bytes};
91/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
92/// # where
93/// # T: quic::OpenStreams<Bytes>,
94/// # {
95/// // Prepare the HTTP request to send to the server
96/// let request = Request::get("https://www.example.com/").body(())?;
97///
98/// // Send the request to the server
99/// let mut req_stream = send_request.send_request(request).await?;
100/// // Send some data
101/// req_stream.send_data("body".into()).await?;
102/// // Prepare the trailers
103/// let mut trailers = HeaderMap::new();
104/// trailers.insert("trailer", "value".parse()?);
105/// // Send them and finish the send stream
106/// req_stream.send_trailers(trailers).await?;
107/// // We don't need to finish the send stream, as `send_trailers()` did it for us
108///
109/// // Receive the response.
110/// let response = req_stream.recv_response().await?;
111/// // Process the response...
112/// # Ok(())
113/// # }
114/// # pub fn main() {}
115/// ```
116///
117/// [`send_request()`]: struct.SendRequest.html#method.send_request
118/// [`RequestStream`]: struct.RequestStream.html
119/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
120pub struct SendRequest<T, B>
121where
122 T: quic::OpenStreams<B>,
123 B: Buf,
124{
125 open: T,
126 conn_state: SharedStateRef,
127 max_field_section_size: u64, // maximum size for a header we receive
128 // counts instances of SendRequest to close the connection when the last is dropped.
129 sender_count: Arc<AtomicUsize>,
130 conn_waker: Option<Waker>,
131 _buf: PhantomData<fn(B)>,
132 send_grease_frame: bool,
133}
134
135impl<T, B> SendRequest<T, B>
136where
137 T: quic::OpenStreams<B>,
138 B: Buf,
139{
140 /// Send a HTTP/3 request to the server
141 pub async fn send_request(
142 &mut self,
143 req: http::Request<()>,
144 ) -> Result<RequestStream<T::BidiStream, B>, Error> {
145 let (peer_max_field_section_size, closing) = {
146 let state = self.conn_state.read("send request lock state");
147 (state.peer_max_field_section_size, state.closing)
148 };
149
150 if closing.is_some() {
151 return Err(Error::closing());
152 }
153
154 let (parts, _) = req.into_parts();
155 let request::Parts {
156 method,
157 uri,
158 headers,
159 ..
160 } = parts;
161 let headers = Header::request(method, uri, headers)?;
162
163 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
164 //= type=implication
165 //# A
166 //# client MUST send only a single request on a given stream.
167 let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
168 .await
169 .map_err(|e| self.maybe_conn_err(e))?;
170
171 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
172 //= type=TODO
173 //# Characters in field names MUST be
174 //# converted to lowercase prior to their encoding.
175
176 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
177 //= type=TODO
178 //# To allow for better compression efficiency, the Cookie header field
179 //# ([COOKIES]) MAY be split into separate field lines, each with one or
180 //# more cookie-pairs, before compression.
181
182 let mut block = BytesMut::new();
183 let mem_size = qpack::encode_stateless(&mut block, headers)?;
184
185 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
186 //# An implementation that
187 //# has received this parameter SHOULD NOT send an HTTP message header
188 //# that exceeds the indicated size, as the peer will likely refuse to
189 //# process it.
190 if mem_size > peer_max_field_section_size {
191 return Err(Error::header_too_big(mem_size, peer_max_field_section_size));
192 }
193
194 stream::write(&mut stream, Frame::Headers(block.freeze()))
195 .await
196 .map_err(|e| self.maybe_conn_err(e))?;
197
198 let request_stream = RequestStream {
199 inner: connection::RequestStream::new(
200 FrameStream::new(stream),
201 self.max_field_section_size,
202 self.conn_state.clone(),
203 self.send_grease_frame,
204 ),
205 };
206 // send the grease frame only once
207 self.send_grease_frame = false;
208 Ok(request_stream)
209 }
210}
211
212impl<T, B> ConnectionState for SendRequest<T, B>
213where
214 T: quic::OpenStreams<B>,
215 B: Buf,
216{
217 fn shared_state(&self) -> &SharedStateRef {
218 &self.conn_state
219 }
220}
221
222impl<T, B> Clone for SendRequest<T, B>
223where
224 T: quic::OpenStreams<B> + Clone,
225 B: Buf,
226{
227 fn clone(&self) -> Self {
228 self.sender_count
229 .fetch_add(1, std::sync::atomic::Ordering::Release);
230
231 Self {
232 open: self.open.clone(),
233 conn_state: self.conn_state.clone(),
234 max_field_section_size: self.max_field_section_size,
235 sender_count: self.sender_count.clone(),
236 conn_waker: self.conn_waker.clone(),
237 _buf: PhantomData,
238 send_grease_frame: self.send_grease_frame,
239 }
240 }
241}
242
243impl<T, B> Drop for SendRequest<T, B>
244where
245 T: quic::OpenStreams<B>,
246 B: Buf,
247{
248 fn drop(&mut self) {
249 if self
250 .sender_count
251 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
252 == 1
253 {
254 if let Some(w) = self.conn_waker.take() {
255 w.wake()
256 }
257 self.shared_state().write("SendRequest drop").error = Some(Error::closed());
258 self.open.close(Code::H3_NO_ERROR, b"");
259 }
260 }
261}
262
263/// Client connection driver
264///
265/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
266/// It needs to be polled continously via [`poll_close()`]. On connection closure, this
267/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
268/// error occured.
269///
270/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
271/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
272/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
273/// sent to the server.
274///
275/// # Examples
276///
277/// ## Drive a connection concurrenty
278///
279/// ```rust
280/// # use bytes::Buf;
281/// # use futures_util::future;
282/// # use h3::{client::*, quic};
283/// # use tokio::task::JoinHandle;
284/// # async fn doc<C, B>(mut connection: Connection<C, B>)
285/// # -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
286/// # where
287/// # C: quic::Connection<B> + Send + 'static,
288/// # C::SendStream: Send + 'static,
289/// # C::RecvStream: Send + 'static,
290/// # B: Buf + Send + 'static,
291/// # {
292/// // Run the driver on a different task
293/// tokio::spawn(async move {
294/// future::poll_fn(|cx| connection.poll_close(cx)).await?;
295/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
296/// })
297/// # }
298/// ```
299///
300/// ## Shutdown a connection gracefully
301///
302/// ```rust
303/// # use bytes::Buf;
304/// # use futures_util::future;
305/// # use h3::{client::*, quic};
306/// # use tokio::{self, sync::oneshot, task::JoinHandle};
307/// # async fn doc<C, B>(mut connection: Connection<C, B>)
308/// # -> Result<(), Box<dyn std::error::Error + Send + Sync>>
309/// # where
310/// # C: quic::Connection<B> + Send + 'static,
311/// # C::SendStream: Send + 'static,
312/// # C::RecvStream: Send + 'static,
313/// # B: Buf + Send + 'static,
314/// # {
315/// // Prepare a channel to stop the driver thread
316/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
317///
318/// // Run the driver on a different task
319/// let driver = tokio::spawn(async move {
320/// tokio::select! {
321/// // Drive the connection
322/// closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed?,
323/// // Listen for shutdown condition
324/// max_streams = shutdown_rx => {
325/// // Initiate shutdown
326/// connection.shutdown(max_streams?);
327/// // Wait for ongoing work to complete
328/// future::poll_fn(|cx| connection.poll_close(cx)).await?;
329/// }
330/// };
331///
332/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
333/// });
334///
335/// // Do client things, wait for close contition...
336///
337/// // Initiate shutdown
338/// shutdown_tx.send(2);
339/// // Wait for the connection to be closed
340/// driver.await?
341/// # }
342/// ```
343/// [`poll_close()`]: struct.Connection.html#method.poll_close
344/// [`shutdown()`]: struct.Connection.html#method.shutdown
345pub struct Connection<C, B>
346where
347 C: quic::Connection<B>,
348 B: Buf,
349{
350 inner: ConnectionInner<C, B>,
351}
352
353impl<C, B> Connection<C, B>
354where
355 C: quic::Connection<B>,
356 B: Buf,
357{
358 /// Itiniate a graceful shutdown, accepting `max_request` potentially in-flight server push
359 pub async fn shutdown(&mut self, max_requests: usize) -> Result<(), Error> {
360 self.inner.shutdown(max_requests).await
361 }
362
363 /// Wait until the connection is closed
364 pub async fn wait_idle(&mut self) -> Result<(), Error> {
365 future::poll_fn(|cx| self.poll_close(cx)).await
366 }
367
368 /// Maintain the connection state until it is closed
369 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
370 while let Poll::Ready(result) = self.inner.poll_control(cx) {
371 match result {
372 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
373 //= type=TODO
374 //# When a 0-RTT QUIC connection is being used, the initial value of each
375 //# server setting is the value used in the previous session. Clients
376 //# SHOULD store the settings the server provided in the HTTP/3
377 //# connection where resumption information was provided, but they MAY
378 //# opt not to store settings in certain cases (e.g., if the session
379 //# ticket is received before the SETTINGS frame).
380
381 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
382 //= type=TODO
383 //# A client MUST comply
384 //# with stored settings -- or default values if no values are stored --
385 //# when attempting 0-RTT.
386
387 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
388 //= type=TODO
389 //# Once a server has provided new settings,
390 //# clients MUST comply with those values.
391 Ok(Frame::Settings(_)) => trace!("Got settings"),
392 Ok(Frame::Goaway(id)) => {
393 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
394 //# The GOAWAY frame is always sent on the control stream. In the
395 //# server-to-client direction, it carries a QUIC stream ID for a client-
396 //# initiated bidirectional stream encoded as a variable-length integer.
397 //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
398 //# of any other type as a connection error of type H3_ID_ERROR.
399 if !id.is_request() {
400 return Poll::Ready(Err(Code::H3_ID_ERROR.with_reason(
401 format!("non-request StreamId in a GoAway frame: {}", id),
402 ErrorLevel::ConnectionError,
403 )));
404 }
405 info!("Server initiated graceful shutdown, last: StreamId({})", id);
406 }
407
408 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
409 //# If a PUSH_PROMISE frame is received on the control stream, the client
410 //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
411
412 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
413 //# A client MUST treat the
414 //# receipt of a MAX_PUSH_ID frame as a connection error of type
415 //# H3_FRAME_UNEXPECTED.
416 Ok(frame) => {
417 return Poll::Ready(Err(Code::H3_FRAME_UNEXPECTED.with_reason(
418 format!("on client control stream: {:?}", frame),
419 ErrorLevel::ConnectionError,
420 )))
421 }
422 Err(e) => {
423 let connection_error = self
424 .inner
425 .shared
426 .read("poll_close error read")
427 .error
428 .as_ref()
429 .cloned();
430
431 match connection_error {
432 Some(e) if e.is_closed() => return Poll::Ready(Ok(())),
433 Some(e) => return Poll::Ready(Err(e)),
434 None => {
435 self.inner.shared.write("poll_close error").error = e.clone().into();
436 return Poll::Ready(Err(e));
437 }
438 }
439 }
440 }
441 }
442
443 //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
444 //# Clients MUST treat
445 //# receipt of a server-initiated bidirectional stream as a connection
446 //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
447 //# been negotiated.
448 if self.inner.poll_accept_request(cx).is_ready() {
449 return Poll::Ready(Err(self.inner.close(
450 Code::H3_STREAM_CREATION_ERROR,
451 "client received a bidirectional stream",
452 )));
453 }
454
455 Poll::Pending
456 }
457}
458
459/// HTTP/3 client builder
460///
461/// Set the configuration for a new client.
462///
463/// # Examples
464/// ```rust
465/// # use h3::quic;
466/// # async fn doc<C, O, B>(quic: C)
467/// # where
468/// # C: quic::Connection<B, OpenStreams = O>,
469/// # O: quic::OpenStreams<B>,
470/// # B: bytes::Buf,
471/// # {
472/// let h3_conn = h3::client::builder()
473/// .max_field_section_size(8192)
474/// .build(quic)
475/// .await
476/// .expect("Failed to build connection");
477/// # }
478/// ```
479pub struct Builder {
480 max_field_section_size: u64,
481 send_grease: bool,
482}
483
484impl Builder {
485 pub(super) fn new() -> Self {
486 Builder {
487 max_field_section_size: VarInt::MAX.0,
488 send_grease: true,
489 }
490 }
491
492 /// Set the maximum header size this client is willing to accept
493 ///
494 /// See [header size constraints] section of the specification for details.
495 ///
496 /// [header size constraints]: https://www.rfc-editor.org/rfc/rfc9114.html#name-header-size-constraints
497 pub fn max_field_section_size(&mut self, value: u64) -> &mut Self {
498 self.max_field_section_size = value;
499 self
500 }
501
502 /// Create a new HTTP/3 client from a `quic` connection
503 pub async fn build<C, O, B>(
504 &mut self,
505 quic: C,
506 ) -> Result<(Connection<C, B>, SendRequest<O, B>), Error>
507 where
508 C: quic::Connection<B, OpenStreams = O>,
509 O: quic::OpenStreams<B>,
510 B: Buf,
511 {
512 let open = quic.opener();
513 let conn_state = SharedStateRef::default();
514
515 let conn_waker = Some(future::poll_fn(|cx| Poll::Ready(cx.waker().clone())).await);
516
517 Ok((
518 Connection {
519 inner: ConnectionInner::new(
520 quic,
521 self.max_field_section_size,
522 conn_state.clone(),
523 self.send_grease,
524 )
525 .await?,
526 },
527 SendRequest {
528 open,
529 conn_state,
530 conn_waker,
531 max_field_section_size: self.max_field_section_size,
532 sender_count: Arc::new(AtomicUsize::new(1)),
533 _buf: PhantomData,
534 send_grease_frame: self.send_grease,
535 },
536 ))
537 }
538}
539
540/// Manage request bodies transfer, response and trailers.
541///
542/// Once a request has been sent via [`send_request()`], a response can be awaited by calling
543/// [`recv_response()`]. A body for this request can be sent with [`send_data()`], then the request
544/// shall be completed by either sending trailers with [`send_trailers()`], or [`finish()`].
545///
546/// After receiving the response's headers, it's body can be read by [`recv_data()`] until it returns
547/// `None`. Then the trailers will eventually be available via [`recv_trailers()`].
548///
549/// TODO: If data is polled before the response has been received, an error will be thrown.
550///
551/// TODO: If trailers are polled but the body hasn't been fully received, an UNEXPECT_FRAME error will be
552/// thrown
553///
554/// Whenever the client wants to cancel this request, it can call [`stop_sending()`], which will
555/// put an end to any transfer concerning it.
556///
557/// # Examples
558///
559/// ```rust
560/// # use h3::{quic, client::*};
561/// # use http::{Request, Response};
562/// # use bytes::Buf;
563/// # use tokio::io::AsyncWriteExt;
564/// # async fn doc<T,B>(mut req_stream: RequestStream<T, B>) -> Result<(), Box<dyn std::error::Error>>
565/// # where
566/// # T: quic::RecvStream,
567/// # B: Buf,
568/// # {
569/// // Prepare the HTTP request to send to the server
570/// let request = Request::get("https://www.example.com/").body(())?;
571///
572/// // Receive the response
573/// let response = req_stream.recv_response().await?;
574/// // Receive the body
575/// while let Some(mut chunk) = req_stream.recv_data().await? {
576/// let mut out = tokio::io::stdout();
577/// out.write_all_buf(&mut chunk).await?;
578/// out.flush().await?;
579/// }
580/// # Ok(())
581/// # }
582/// # pub fn main() {}
583/// ```
584///
585/// [`send_request()`]: struct.SendRequest.html#method.send_request
586/// [`recv_response()`]: #method.recv_response
587/// [`recv_data()`]: #method.recv_data
588/// [`send_data()`]: #method.send_data
589/// [`send_trailers()`]: #method.send_trailers
590/// [`recv_trailers()`]: #method.recv_trailers
591/// [`finish()`]: #method.finish
592/// [`stop_sending()`]: #method.stop_sending
593pub struct RequestStream<S, B> {
594 inner: connection::RequestStream<S, B>,
595}
596
597impl<S, B> ConnectionState for RequestStream<S, B> {
598 fn shared_state(&self) -> &SharedStateRef {
599 &self.inner.conn_state
600 }
601}
602
603impl<S, B> RequestStream<S, B>
604where
605 S: quic::RecvStream,
606{
607 /// Receive the HTTP/3 response
608 ///
609 /// This should be called before trying to receive any data with [`recv_data()`].
610 ///
611 /// [`recv_data()`]: #method.recv_data
612 pub async fn recv_response(&mut self) -> Result<Response<()>, Error> {
613 let mut frame = future::poll_fn(|cx| self.inner.stream.poll_next(cx))
614 .await
615 .map_err(|e| self.maybe_conn_err(e))?
616 .ok_or_else(|| {
617 Code::H3_GENERAL_PROTOCOL_ERROR.with_reason(
618 "Did not receive response headers",
619 ErrorLevel::ConnectionError,
620 )
621 })?;
622
623 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
624 //= type=TODO
625 //# A client MUST treat
626 //# receipt of a PUSH_PROMISE frame that contains a larger push ID than
627 //# the client has advertised as a connection error of H3_ID_ERROR.
628
629 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
630 //= type=TODO
631 //# If a client
632 //# receives a push ID that has already been promised and detects a
633 //# mismatch, it MUST respond with a connection error of type
634 //# H3_GENERAL_PROTOCOL_ERROR.
635
636 let decoded = if let Frame::Headers(ref mut encoded) = frame {
637 match qpack::decode_stateless(encoded, self.inner.max_field_section_size) {
638 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
639 //# An HTTP/3 implementation MAY impose a limit on the maximum size of
640 //# the message header it will accept on an individual HTTP message.
641 Err(qpack::DecoderError::HeaderTooLong(cancel_size)) => {
642 self.inner.stop_sending(Code::H3_REQUEST_CANCELLED);
643 return Err(Error::header_too_big(
644 cancel_size,
645 self.inner.max_field_section_size,
646 ));
647 }
648 Ok(decoded) => decoded,
649 Err(e) => return Err(e.into()),
650 }
651 } else {
652 return Err(Code::H3_FRAME_UNEXPECTED.with_reason(
653 "First response frame is not headers",
654 ErrorLevel::ConnectionError,
655 ));
656 };
657
658 let qpack::Decoded { fields, .. } = decoded;
659
660 let (status, headers) = Header::try_from(fields)?.into_response_parts()?;
661 let mut resp = Response::new(());
662 *resp.status_mut() = status;
663 *resp.headers_mut() = headers;
664 *resp.version_mut() = http::Version::HTTP_3;
665
666 Ok(resp)
667 }
668
669 /// Receive some of the request body.
670 // TODO what if called before recv_response ?
671 pub async fn recv_data(&mut self) -> Result<Option<impl Buf>, Error> {
672 self.inner.recv_data().await
673 }
674
675 /// Receive an optional set of trailers for the response.
676 pub async fn recv_trailers(&mut self) -> Result<Option<HeaderMap>, Error> {
677 let res = self.inner.recv_trailers().await;
678 if let Err(ref e) = res {
679 if e.is_header_too_big() {
680 self.inner.stream.stop_sending(Code::H3_REQUEST_CANCELLED);
681 }
682 }
683 res
684 }
685
686 /// Tell the peer to stop sending into the underlying QUIC stream
687 pub fn stop_sending(&mut self, error_code: crate::error::Code) {
688 // TODO take by value to prevent any further call as this request is cancelled
689 // rename `cancel()` ?
690 self.inner.stream.stop_sending(error_code)
691 }
692}
693
694impl<S, B> RequestStream<S, B>
695where
696 S: quic::SendStream<B>,
697 B: Buf,
698{
699 /// Send some data on the request body.
700 pub async fn send_data(&mut self, buf: B) -> Result<(), Error> {
701 self.inner.send_data(buf).await
702 }
703
704 /// Send a set of trailers to end the request.
705 ///
706 /// Either [`RequestStream::finish`] or
707 /// [`RequestStream::send_trailers`] must be called to finalize a
708 /// request.
709 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> Result<(), Error> {
710 self.inner.send_trailers(trailers).await
711 }
712
713 /// End the request without trailers.
714 ///
715 /// Either [`RequestStream::finish`] or
716 /// [`RequestStream::send_trailers`] must be called to finalize a
717 /// request.
718 pub async fn finish(&mut self) -> Result<(), Error> {
719 self.inner.finish().await
720 }
721
722 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1.1
723 //= type=TODO
724 //# Implementations SHOULD cancel requests by abruptly terminating any
725 //# directions of a stream that are still open. To do so, an
726 //# implementation resets the sending parts of streams and aborts reading
727 //# on the receiving parts of streams; see Section 2.4 of
728 //# [QUIC-TRANSPORT].
729}
730
731impl<S, B> RequestStream<S, B>
732where
733 S: quic::BidiStream<B>,
734 B: Buf,
735{
736 /// Split this stream into two halves that can be driven independently.
737 pub fn split(
738 self,
739 ) -> (
740 RequestStream<S::SendStream, B>,
741 RequestStream<S::RecvStream, B>,
742 ) {
743 let (send, recv) = self.inner.split();
744 (RequestStream { inner: send }, RequestStream { inner: recv })
745 }
746}