h3/client/
connection.rs

1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4    marker::PhantomData,
5    sync::{atomic::AtomicUsize, Arc},
6    task::{Context, Poll},
7};
8
9use bytes::{Buf, BytesMut};
10use futures_util::future;
11use http::request;
12
13#[cfg(feature = "tracing")]
14use tracing::{info, instrument, trace};
15
16use crate::{
17    connection::{self, ConnectionInner},
18    error::{
19        connection_error_creators::CloseStream, internal_error::InternalConnectionError, Code,
20        ConnectionError, StreamError,
21    },
22    frame::FrameStream,
23    proto::{frame::Frame, headers::Header, push::PushId},
24    qpack,
25    quic::{self, StreamId},
26    shared_state::{ConnectionState, SharedState},
27    stream::{self, BufRecvStream},
28};
29
30use super::stream::RequestStream;
31
32/// HTTP/3 request sender
33///
34/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
35/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
36/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
37/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
38/// This includes the cases where no body is sent at all.
39///
40/// This struct is cloneable so multiple requests can be sent concurrently.
41///
42/// Existing instances are atomically counted internally, so whenever all of them have been
43/// dropped, the connection will be automatically closed with HTTP/3 connection error code
44/// `HTTP_NO_ERROR = 0`.
45///
46/// # Examples
47///
48/// ## Sending a request with no body
49///
50/// ```rust
51/// # use h3::{quic, client::*};
52/// # use http::{Request, Response};
53/// # use bytes::Buf;
54/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
55/// # where
56/// #     T: quic::OpenStreams<B>,
57/// #     B: Buf,
58/// # {
59/// // Prepare the HTTP request to send to the server
60/// let request = Request::get("https://www.example.com/").body(())?;
61///
62/// // Send the request to the server
63/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
64/// // Don't forget to end up the request by finishing the send stream.
65/// req_stream.finish().await?;
66/// // Receive the response
67/// let response: Response<()> = req_stream.recv_response().await?;
68/// // Process the response...
69/// # Ok(())
70/// # }
71/// # pub fn main() {}
72/// ```
73///
74/// ## Sending a request with a body and trailers
75///
76/// ```rust
77/// # use h3::{quic, client::*};
78/// # use http::{Request, Response, HeaderMap};
79/// # use bytes::{Buf, Bytes};
80/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
81/// # where
82/// #     T: quic::OpenStreams<Bytes>,
83/// # {
84/// // Prepare the HTTP request to send to the server
85/// let request = Request::get("https://www.example.com/").body(())?;
86///
87/// // Send the request to the server
88/// let mut req_stream = send_request.send_request(request).await?;
89/// // Send some data
90/// req_stream.send_data("body".into()).await?;
91/// // Prepare the trailers
92/// let mut trailers = HeaderMap::new();
93/// trailers.insert("trailer", "value".parse()?);
94/// // Send them and finish the send stream
95/// req_stream.send_trailers(trailers).await?;
96/// // We don't need to finish the send stream, as `send_trailers()` did it for us
97///
98/// // Receive the response.
99/// let response = req_stream.recv_response().await?;
100/// // Process the response...
101/// # Ok(())
102/// # }
103/// # pub fn main() {}
104/// ```
105///
106/// [`send_request()`]: struct.SendRequest.html#method.send_request
107/// [`RequestStream`]: struct.RequestStream.html
108/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
109pub struct SendRequest<T, B>
110where
111    T: quic::OpenStreams<B>,
112    B: Buf,
113{
114    pub(super) open: T,
115    pub(super) conn_state: Arc<SharedState>,
116    pub(super) max_field_section_size: u64, // maximum size for a header we receive
117    // counts instances of SendRequest to close the connection when the last is dropped.
118    pub(super) sender_count: Arc<AtomicUsize>,
119    pub(super) _buf: PhantomData<fn(B)>,
120    pub(super) send_grease_frame: bool,
121}
122
123impl<T, B> ConnectionState for SendRequest<T, B>
124where
125    T: quic::OpenStreams<B>,
126    B: Buf,
127{
128    fn shared_state(&self) -> &SharedState {
129        &self.conn_state
130    }
131}
132
133impl<T, B> CloseStream for SendRequest<T, B>
134where
135    T: quic::OpenStreams<B>,
136    B: Buf,
137{
138}
139
140impl<T, B> SendRequest<T, B>
141where
142    T: quic::OpenStreams<B>,
143    B: Buf,
144{
145    /// Send an HTTP/3 request to the server
146    #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
147    pub async fn send_request(
148        &mut self,
149        req: http::Request<()>,
150    ) -> Result<RequestStream<T::BidiStream, B>, StreamError> {
151        if let Some(error) = self.check_peer_connection_closing() {
152            return Err(error);
153        };
154
155        let (parts, _) = req.into_parts();
156        let request::Parts {
157            method,
158            uri,
159            headers,
160            extensions,
161            ..
162        } = parts;
163        let headers = Header::request(method, uri, headers, extensions).map_err(|_e| {
164            self.handle_connection_error_on_stream(InternalConnectionError {
165                code: Code::H3_INTERNAL_ERROR,
166                message: "Failed to build request headers".to_string(),
167            })
168        })?;
169
170        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
171        //= type=implication
172        //# A
173        //# client MUST send only a single request on a given stream.
174        let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
175            .await
176            .map_err(|e| self.handle_quic_stream_error(e))?;
177
178        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
179        //= type=TODO
180        //# Characters in field names MUST be
181        //# converted to lowercase prior to their encoding.
182
183        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
184        //= type=TODO
185        //# To allow for better compression efficiency, the Cookie header field
186        //# ([COOKIES]) MAY be split into separate field lines, each with one or
187        //# more cookie-pairs, before compression.
188
189        let mut block = BytesMut::new();
190        let mem_size = qpack::encode_stateless(&mut block, headers).map_err(|_e| {
191            self.handle_connection_error_on_stream(InternalConnectionError {
192                code: Code::H3_INTERNAL_ERROR,
193                message: "Failed to encode headers".to_string(),
194            })
195        })?;
196
197        //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
198        //# An implementation that
199        //# has received this parameter SHOULD NOT send an HTTP message header
200        //# that exceeds the indicated size, as the peer will likely refuse to
201        //# process it.
202        //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
203        //# An HTTP implementation MUST NOT send frames or requests that would be
204        //# invalid based on its current understanding of the peer's settings.
205        let peer_max_field_section_size = self.settings().max_field_section_size;
206        if mem_size > peer_max_field_section_size {
207            return Err(StreamError::HeaderTooBig {
208                actual_size: mem_size,
209                max_size: peer_max_field_section_size,
210            });
211        }
212
213        stream::write(&mut stream, Frame::Headers(block.freeze()))
214            .await
215            .map_err(|e| self.handle_quic_stream_error(e))?;
216
217        let request_stream = RequestStream {
218            inner: connection::RequestStream::new(
219                FrameStream::new(BufRecvStream::new(stream)),
220                self.max_field_section_size,
221                self.conn_state.clone(),
222                self.send_grease_frame,
223            ),
224        };
225        // send the grease frame only once
226        self.send_grease_frame = false;
227        Ok(request_stream)
228    }
229}
230
231impl<T, B> Clone for SendRequest<T, B>
232where
233    T: quic::OpenStreams<B> + Clone,
234    B: Buf,
235{
236    fn clone(&self) -> Self {
237        self.sender_count
238            .fetch_add(1, std::sync::atomic::Ordering::Release);
239
240        Self {
241            conn_state: self.conn_state.clone(),
242            open: self.open.clone(),
243            max_field_section_size: self.max_field_section_size,
244            sender_count: self.sender_count.clone(),
245            _buf: PhantomData,
246            send_grease_frame: self.send_grease_frame,
247        }
248    }
249}
250
251impl<T, B> Drop for SendRequest<T, B>
252where
253    T: quic::OpenStreams<B>,
254    B: Buf,
255{
256    fn drop(&mut self) {
257        if self
258            .sender_count
259            .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
260            == 1
261        {
262            self.handle_connection_error_on_stream(InternalConnectionError::new(
263                Code::H3_NO_ERROR,
264                "Connection closed by client".to_string(),
265            ));
266        }
267    }
268}
269
270/// Client connection driver
271///
272/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
273/// It needs to be polled continuously via [`poll_close()`]. On connection closure, this
274/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
275/// error occurred.
276///
277/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
278/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
279/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
280/// sent to the server.
281///
282/// # Examples
283///
284/// ## Drive a connection concurrently
285///
286/// ```rust
287/// # use bytes::Buf;
288/// # use futures_util::future;
289/// # use h3::{client::*, quic};
290/// # use tokio::task::JoinHandle;
291/// # async fn doc<C, B>(mut connection: Connection<C, B>)
292/// #    -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
293/// # where
294/// #    C: quic::Connection<B> + Send + 'static,
295/// #    C::SendStream: Send + 'static,
296/// #    C::RecvStream: Send + 'static,
297/// #    B: Buf + Send + 'static,
298/// # {
299/// // Run the driver on a different task
300/// tokio::spawn(async move {
301///     future::poll_fn(|cx| connection.poll_close(cx)).await;
302///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
303/// })
304/// # }
305/// ```
306///
307/// ## Shutdown a connection gracefully
308///
309/// ```rust
310/// # use bytes::Buf;
311/// # use futures_util::future;
312/// # use h3::quic;
313/// # use h3::client::Connection;
314/// # use h3::client::SendRequest;
315/// # use tokio::{self, sync::oneshot, task::JoinHandle};
316/// # async fn doc<C, B>(mut connection: Connection<C, B>)
317/// #    -> Result<(), Box<dyn std::error::Error + Send + Sync>>
318/// # where
319/// #    C: quic::Connection<B> + Send + 'static,
320/// #    C::SendStream: Send + 'static,
321/// #    C::RecvStream: Send + 'static,
322/// #    B: Buf + Send + 'static,
323/// # {
324/// // Prepare a channel to stop the driver thread
325/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
326///
327/// // Run the driver on a different task
328/// let driver = tokio::spawn(async move {
329///     tokio::select! {
330///         // Drive the connection
331///         closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed,
332///         // Listen for shutdown condition
333///         max_streams = shutdown_rx => {
334///             // Initiate shutdown
335///             connection.shutdown(max_streams?);
336///             // Wait for ongoing work to complete
337///             future::poll_fn(|cx| connection.poll_close(cx)).await
338///         }
339///     };
340///
341///     Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
342/// });
343///
344/// // Do client things, wait for close condition...
345///
346/// // Initiate shutdown
347/// shutdown_tx.send(2);
348/// // Wait for the connection to be closed
349/// driver.await?
350/// # }
351/// ```
352/// [`poll_close()`]: struct.Connection.html#method.poll_close
353/// [`shutdown()`]: struct.Connection.html#method.shutdown
354pub struct Connection<C, B>
355where
356    C: quic::Connection<B>,
357    B: Buf,
358{
359    /// TODO: breaking encapsulation for RFC9298.
360    pub inner: ConnectionInner<C, B>,
361    // Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
362    pub(super) sent_closing: Option<PushId>,
363    // Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
364    pub(super) recv_closing: Option<StreamId>,
365}
366
367impl<C, B> ConnectionState for Connection<C, B>
368where
369    C: quic::Connection<B>,
370    B: Buf,
371{
372    fn shared_state(&self) -> &SharedState {
373        &self.inner.shared
374    }
375}
376
377impl<C, B> Connection<C, B>
378where
379    C: quic::Connection<B>,
380    B: Buf,
381{
382    /// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
383    #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
384    pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), ConnectionError> {
385        // TODO: Calculate remaining pushes once server push is implemented.
386        self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
387    }
388
389    /// Wait until the connection is closed
390    #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
391    pub async fn wait_idle(&mut self) -> ConnectionError {
392        future::poll_fn(|cx| self.poll_close(cx)).await
393    }
394
395    /// Maintain the connection state until it is closed
396    #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
397    pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionError> {
398        while let Poll::Ready(result) = self.inner.poll_control(cx) {
399            match result {
400                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
401                //= type=TODO
402                //# When a 0-RTT QUIC connection is being used, the initial value of each
403                //# server setting is the value used in the previous session.  Clients
404                //# SHOULD store the settings the server provided in the HTTP/3
405                //# connection where resumption information was provided, but they MAY
406                //# opt not to store settings in certain cases (e.g., if the session
407                //# ticket is received before the SETTINGS frame).
408
409                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
410                //= type=TODO
411                //# A client MUST comply
412                //# with stored settings -- or default values if no values are stored --
413                //# when attempting 0-RTT.
414
415                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
416                //= type=TODO
417                //# Once a server has provided new settings,
418                //# clients MUST comply with those values.
419                Ok(Frame::Settings(_)) => {
420                    #[cfg(feature = "tracing")]
421                    trace!("Got settings");
422                    ()
423                }
424
425                Ok(Frame::Goaway(id)) => {
426                    //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
427                    //# The GOAWAY frame is always sent on the control stream.  In the
428                    //# server-to-client direction, it carries a QUIC stream ID for a client-
429                    //# initiated bidirectional stream encoded as a variable-length integer.
430                    //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
431                    //# of any other type as a connection error of type H3_ID_ERROR.
432                    if !StreamId::from(id).is_request() {
433                        return Poll::Ready(self.inner.handle_connection_error(
434                            InternalConnectionError::new(
435                                Code::H3_ID_ERROR,
436                                format!("non-request StreamId in a GoAway frame: {}", id),
437                            ),
438                        ));
439                    }
440                    if let Err(err) = self.inner.process_goaway(&mut self.recv_closing, id) {
441                        return Poll::Ready(err);
442                    }
443
444                    #[cfg(feature = "tracing")]
445                    info!("Server initiated graceful shutdown, last: StreamId({})", id);
446                }
447
448                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
449                //# If a PUSH_PROMISE frame is received on the control stream, the client
450                //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
451
452                //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
453                //# A client MUST treat the
454                //# receipt of a MAX_PUSH_ID frame as a connection error of type
455                //# H3_FRAME_UNEXPECTED.
456                Ok(frame) => {
457                    return Poll::Ready(self.inner.handle_connection_error(
458                        InternalConnectionError::new(
459                            Code::H3_FRAME_UNEXPECTED,
460                            format!("on client control stream: {:?}", frame),
461                        ),
462                    ));
463                }
464                Err(connection_error) => {
465                    return Poll::Ready(connection_error);
466                }
467            }
468        }
469
470        //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
471        //# Clients MUST treat
472        //# receipt of a server-initiated bidirectional stream as a connection
473        //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
474        //# been negotiated.
475        if self.inner.poll_accept_bi(cx).is_ready() {
476            return Poll::Ready(
477                self.inner
478                    .handle_connection_error(InternalConnectionError::new(
479                        Code::H3_STREAM_CREATION_ERROR,
480                        "client received a server-initiated bidirectional stream".to_string(),
481                    )),
482            );
483        }
484
485        Poll::Pending
486    }
487}