h3/client/connection.rs
1//! Client implementation of the HTTP/3 protocol
2
3use std::{
4 marker::PhantomData,
5 sync::{atomic::AtomicUsize, Arc},
6 task::{Context, Poll},
7};
8
9use bytes::{Buf, BytesMut};
10use futures_util::future;
11use http::request;
12
13#[cfg(feature = "tracing")]
14use tracing::{info, instrument, trace};
15
16use crate::{
17 connection::{self, ConnectionInner},
18 error::{
19 connection_error_creators::CloseStream, internal_error::InternalConnectionError, Code,
20 ConnectionError, StreamError,
21 },
22 frame::FrameStream,
23 proto::{frame::Frame, headers::Header, push::PushId},
24 qpack,
25 quic::{self, StreamId},
26 shared_state::{ConnectionState, SharedState},
27 stream::{self, BufRecvStream},
28};
29
30use super::stream::RequestStream;
31
32/// HTTP/3 request sender
33///
34/// [`send_request()`] initiates a new request and will resolve when it is ready to be sent
35/// to the server. Then a [`RequestStream`] will be returned to send a request body (for
36/// POST, PUT methods) and receive a response. After the whole body is sent, it is necessary
37/// to call [`RequestStream::finish()`] to let the server know the request transfer is complete.
38/// This includes the cases where no body is sent at all.
39///
40/// This struct is cloneable so multiple requests can be sent concurrently.
41///
42/// Existing instances are atomically counted internally, so whenever all of them have been
43/// dropped, the connection will be automatically closed with HTTP/3 connection error code
44/// `HTTP_NO_ERROR = 0`.
45///
46/// # Examples
47///
48/// ## Sending a request with no body
49///
50/// ```rust
51/// # use h3::{quic, client::*};
52/// # use http::{Request, Response};
53/// # use bytes::Buf;
54/// # async fn doc<T,B>(mut send_request: SendRequest<T, B>) -> Result<(), Box<dyn std::error::Error>>
55/// # where
56/// # T: quic::OpenStreams<B>,
57/// # B: Buf,
58/// # {
59/// // Prepare the HTTP request to send to the server
60/// let request = Request::get("https://www.example.com/").body(())?;
61///
62/// // Send the request to the server
63/// let mut req_stream: RequestStream<_, _> = send_request.send_request(request).await?;
64/// // Don't forget to end up the request by finishing the send stream.
65/// req_stream.finish().await?;
66/// // Receive the response
67/// let response: Response<()> = req_stream.recv_response().await?;
68/// // Process the response...
69/// # Ok(())
70/// # }
71/// # pub fn main() {}
72/// ```
73///
74/// ## Sending a request with a body and trailers
75///
76/// ```rust
77/// # use h3::{quic, client::*};
78/// # use http::{Request, Response, HeaderMap};
79/// # use bytes::{Buf, Bytes};
80/// # async fn doc<T,B>(mut send_request: SendRequest<T, Bytes>) -> Result<(), Box<dyn std::error::Error>>
81/// # where
82/// # T: quic::OpenStreams<Bytes>,
83/// # {
84/// // Prepare the HTTP request to send to the server
85/// let request = Request::get("https://www.example.com/").body(())?;
86///
87/// // Send the request to the server
88/// let mut req_stream = send_request.send_request(request).await?;
89/// // Send some data
90/// req_stream.send_data("body".into()).await?;
91/// // Prepare the trailers
92/// let mut trailers = HeaderMap::new();
93/// trailers.insert("trailer", "value".parse()?);
94/// // Send them and finish the send stream
95/// req_stream.send_trailers(trailers).await?;
96/// // We don't need to finish the send stream, as `send_trailers()` did it for us
97///
98/// // Receive the response.
99/// let response = req_stream.recv_response().await?;
100/// // Process the response...
101/// # Ok(())
102/// # }
103/// # pub fn main() {}
104/// ```
105///
106/// [`send_request()`]: struct.SendRequest.html#method.send_request
107/// [`RequestStream`]: struct.RequestStream.html
108/// [`RequestStream::finish()`]: struct.RequestStream.html#method.finish
109pub struct SendRequest<T, B>
110where
111 T: quic::OpenStreams<B>,
112 B: Buf,
113{
114 pub(super) open: T,
115 pub(super) conn_state: Arc<SharedState>,
116 pub(super) max_field_section_size: u64, // maximum size for a header we receive
117 // counts instances of SendRequest to close the connection when the last is dropped.
118 pub(super) sender_count: Arc<AtomicUsize>,
119 pub(super) _buf: PhantomData<fn(B)>,
120 pub(super) send_grease_frame: bool,
121}
122
123impl<T, B> ConnectionState for SendRequest<T, B>
124where
125 T: quic::OpenStreams<B>,
126 B: Buf,
127{
128 fn shared_state(&self) -> &SharedState {
129 &self.conn_state
130 }
131}
132
133impl<T, B> CloseStream for SendRequest<T, B>
134where
135 T: quic::OpenStreams<B>,
136 B: Buf,
137{
138}
139
140impl<T, B> SendRequest<T, B>
141where
142 T: quic::OpenStreams<B>,
143 B: Buf,
144{
145 /// Send an HTTP/3 request to the server
146 #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
147 pub async fn send_request(
148 &mut self,
149 req: http::Request<()>,
150 ) -> Result<RequestStream<T::BidiStream, B>, StreamError> {
151 if let Some(error) = self.check_peer_connection_closing() {
152 return Err(error);
153 };
154
155 let (parts, _) = req.into_parts();
156 let request::Parts {
157 method,
158 uri,
159 headers,
160 extensions,
161 ..
162 } = parts;
163 let headers = Header::request(method, uri, headers, extensions).map_err(|_e| {
164 self.handle_connection_error_on_stream(InternalConnectionError {
165 code: Code::H3_INTERNAL_ERROR,
166 message: "Failed to build request headers".to_string(),
167 })
168 })?;
169
170 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.1
171 //= type=implication
172 //# A
173 //# client MUST send only a single request on a given stream.
174 let mut stream = future::poll_fn(|cx| self.open.poll_open_bidi(cx))
175 .await
176 .map_err(|e| self.handle_quic_stream_error(e))?;
177
178 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2
179 //= type=TODO
180 //# Characters in field names MUST be
181 //# converted to lowercase prior to their encoding.
182
183 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.1
184 //= type=TODO
185 //# To allow for better compression efficiency, the Cookie header field
186 //# ([COOKIES]) MAY be split into separate field lines, each with one or
187 //# more cookie-pairs, before compression.
188
189 let mut block = BytesMut::new();
190 let mem_size = qpack::encode_stateless(&mut block, headers).map_err(|_e| {
191 self.handle_connection_error_on_stream(InternalConnectionError {
192 code: Code::H3_INTERNAL_ERROR,
193 message: "Failed to encode headers".to_string(),
194 })
195 })?;
196
197 //= https://www.rfc-editor.org/rfc/rfc9114#section-4.2.2
198 //# An implementation that
199 //# has received this parameter SHOULD NOT send an HTTP message header
200 //# that exceeds the indicated size, as the peer will likely refuse to
201 //# process it.
202 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
203 //# An HTTP implementation MUST NOT send frames or requests that would be
204 //# invalid based on its current understanding of the peer's settings.
205 let peer_max_field_section_size = self.settings().max_field_section_size;
206 if mem_size > peer_max_field_section_size {
207 return Err(StreamError::HeaderTooBig {
208 actual_size: mem_size,
209 max_size: peer_max_field_section_size,
210 });
211 }
212
213 stream::write(&mut stream, Frame::Headers(block.freeze()))
214 .await
215 .map_err(|e| self.handle_quic_stream_error(e))?;
216
217 let request_stream = RequestStream {
218 inner: connection::RequestStream::new(
219 FrameStream::new(BufRecvStream::new(stream)),
220 self.max_field_section_size,
221 self.conn_state.clone(),
222 self.send_grease_frame,
223 ),
224 };
225 // send the grease frame only once
226 self.send_grease_frame = false;
227 Ok(request_stream)
228 }
229}
230
231impl<T, B> Clone for SendRequest<T, B>
232where
233 T: quic::OpenStreams<B> + Clone,
234 B: Buf,
235{
236 fn clone(&self) -> Self {
237 self.sender_count
238 .fetch_add(1, std::sync::atomic::Ordering::Release);
239
240 Self {
241 conn_state: self.conn_state.clone(),
242 open: self.open.clone(),
243 max_field_section_size: self.max_field_section_size,
244 sender_count: self.sender_count.clone(),
245 _buf: PhantomData,
246 send_grease_frame: self.send_grease_frame,
247 }
248 }
249}
250
251impl<T, B> Drop for SendRequest<T, B>
252where
253 T: quic::OpenStreams<B>,
254 B: Buf,
255{
256 fn drop(&mut self) {
257 if self
258 .sender_count
259 .fetch_sub(1, std::sync::atomic::Ordering::AcqRel)
260 == 1
261 {
262 self.handle_connection_error_on_stream(InternalConnectionError::new(
263 Code::H3_NO_ERROR,
264 "Connection closed by client".to_string(),
265 ));
266 }
267 }
268}
269
270/// Client connection driver
271///
272/// Maintains the internal state of an HTTP/3 connection, including control and QPACK.
273/// It needs to be polled continuously via [`poll_close()`]. On connection closure, this
274/// will resolve to `Ok(())` if the peer sent `HTTP_NO_ERROR`, or `Err()` if a connection-level
275/// error occurred.
276///
277/// [`shutdown()`] initiates a graceful shutdown of this connection. After calling it, no request
278/// initiation will be further allowed. Then [`poll_close()`] will resolve when all ongoing requests
279/// and push streams complete. Finally, a connection closure with `HTTP_NO_ERROR` code will be
280/// sent to the server.
281///
282/// # Examples
283///
284/// ## Drive a connection concurrently
285///
286/// ```rust
287/// # use bytes::Buf;
288/// # use futures_util::future;
289/// # use h3::{client::*, quic};
290/// # use tokio::task::JoinHandle;
291/// # async fn doc<C, B>(mut connection: Connection<C, B>)
292/// # -> JoinHandle<Result<(), Box<dyn std::error::Error + Send + Sync>>>
293/// # where
294/// # C: quic::Connection<B> + Send + 'static,
295/// # C::SendStream: Send + 'static,
296/// # C::RecvStream: Send + 'static,
297/// # B: Buf + Send + 'static,
298/// # {
299/// // Run the driver on a different task
300/// tokio::spawn(async move {
301/// future::poll_fn(|cx| connection.poll_close(cx)).await;
302/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
303/// })
304/// # }
305/// ```
306///
307/// ## Shutdown a connection gracefully
308///
309/// ```rust
310/// # use bytes::Buf;
311/// # use futures_util::future;
312/// # use h3::quic;
313/// # use h3::client::Connection;
314/// # use h3::client::SendRequest;
315/// # use tokio::{self, sync::oneshot, task::JoinHandle};
316/// # async fn doc<C, B>(mut connection: Connection<C, B>)
317/// # -> Result<(), Box<dyn std::error::Error + Send + Sync>>
318/// # where
319/// # C: quic::Connection<B> + Send + 'static,
320/// # C::SendStream: Send + 'static,
321/// # C::RecvStream: Send + 'static,
322/// # B: Buf + Send + 'static,
323/// # {
324/// // Prepare a channel to stop the driver thread
325/// let (shutdown_tx, shutdown_rx) = oneshot::channel();
326///
327/// // Run the driver on a different task
328/// let driver = tokio::spawn(async move {
329/// tokio::select! {
330/// // Drive the connection
331/// closed = future::poll_fn(|cx| connection.poll_close(cx)) => closed,
332/// // Listen for shutdown condition
333/// max_streams = shutdown_rx => {
334/// // Initiate shutdown
335/// connection.shutdown(max_streams?);
336/// // Wait for ongoing work to complete
337/// future::poll_fn(|cx| connection.poll_close(cx)).await
338/// }
339/// };
340///
341/// Ok::<(), Box<dyn std::error::Error + Send + Sync>>(())
342/// });
343///
344/// // Do client things, wait for close condition...
345///
346/// // Initiate shutdown
347/// shutdown_tx.send(2);
348/// // Wait for the connection to be closed
349/// driver.await?
350/// # }
351/// ```
352/// [`poll_close()`]: struct.Connection.html#method.poll_close
353/// [`shutdown()`]: struct.Connection.html#method.shutdown
354pub struct Connection<C, B>
355where
356 C: quic::Connection<B>,
357 B: Buf,
358{
359 /// TODO: breaking encapsulation for RFC9298.
360 pub inner: ConnectionInner<C, B>,
361 // Has a GOAWAY frame been sent? If so, this PushId is the last we are willing to accept.
362 pub(super) sent_closing: Option<PushId>,
363 // Has a GOAWAY frame been received? If so, this is StreamId the last the remote will accept.
364 pub(super) recv_closing: Option<StreamId>,
365}
366
367impl<C, B> ConnectionState for Connection<C, B>
368where
369 C: quic::Connection<B>,
370 B: Buf,
371{
372 fn shared_state(&self) -> &SharedState {
373 &self.inner.shared
374 }
375}
376
377impl<C, B> Connection<C, B>
378where
379 C: quic::Connection<B>,
380 B: Buf,
381{
382 /// Initiate a graceful shutdown, accepting `max_push` potentially in-flight server pushes
383 #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
384 pub async fn shutdown(&mut self, _max_push: usize) -> Result<(), ConnectionError> {
385 // TODO: Calculate remaining pushes once server push is implemented.
386 self.inner.shutdown(&mut self.sent_closing, PushId(0)).await
387 }
388
389 /// Wait until the connection is closed
390 #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
391 pub async fn wait_idle(&mut self) -> ConnectionError {
392 future::poll_fn(|cx| self.poll_close(cx)).await
393 }
394
395 /// Maintain the connection state until it is closed
396 #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
397 pub fn poll_close(&mut self, cx: &mut Context<'_>) -> Poll<ConnectionError> {
398 while let Poll::Ready(result) = self.inner.poll_control(cx) {
399 match result {
400 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
401 //= type=TODO
402 //# When a 0-RTT QUIC connection is being used, the initial value of each
403 //# server setting is the value used in the previous session. Clients
404 //# SHOULD store the settings the server provided in the HTTP/3
405 //# connection where resumption information was provided, but they MAY
406 //# opt not to store settings in certain cases (e.g., if the session
407 //# ticket is received before the SETTINGS frame).
408
409 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
410 //= type=TODO
411 //# A client MUST comply
412 //# with stored settings -- or default values if no values are stored --
413 //# when attempting 0-RTT.
414
415 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.4.2
416 //= type=TODO
417 //# Once a server has provided new settings,
418 //# clients MUST comply with those values.
419 Ok(Frame::Settings(_)) => {
420 #[cfg(feature = "tracing")]
421 trace!("Got settings");
422 ()
423 }
424
425 Ok(Frame::Goaway(id)) => {
426 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.6
427 //# The GOAWAY frame is always sent on the control stream. In the
428 //# server-to-client direction, it carries a QUIC stream ID for a client-
429 //# initiated bidirectional stream encoded as a variable-length integer.
430 //# A client MUST treat receipt of a GOAWAY frame containing a stream ID
431 //# of any other type as a connection error of type H3_ID_ERROR.
432 if !StreamId::from(id).is_request() {
433 return Poll::Ready(self.inner.handle_connection_error(
434 InternalConnectionError::new(
435 Code::H3_ID_ERROR,
436 format!("non-request StreamId in a GoAway frame: {}", id),
437 ),
438 ));
439 }
440 if let Err(err) = self.inner.process_goaway(&mut self.recv_closing, id) {
441 return Poll::Ready(err);
442 }
443
444 #[cfg(feature = "tracing")]
445 info!("Server initiated graceful shutdown, last: StreamId({})", id);
446 }
447
448 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.5
449 //# If a PUSH_PROMISE frame is received on the control stream, the client
450 //# MUST respond with a connection error of type H3_FRAME_UNEXPECTED.
451
452 //= https://www.rfc-editor.org/rfc/rfc9114#section-7.2.7
453 //# A client MUST treat the
454 //# receipt of a MAX_PUSH_ID frame as a connection error of type
455 //# H3_FRAME_UNEXPECTED.
456 Ok(frame) => {
457 return Poll::Ready(self.inner.handle_connection_error(
458 InternalConnectionError::new(
459 Code::H3_FRAME_UNEXPECTED,
460 format!("on client control stream: {:?}", frame),
461 ),
462 ));
463 }
464 Err(connection_error) => {
465 return Poll::Ready(connection_error);
466 }
467 }
468 }
469
470 //= https://www.rfc-editor.org/rfc/rfc9114#section-6.1
471 //# Clients MUST treat
472 //# receipt of a server-initiated bidirectional stream as a connection
473 //# error of type H3_STREAM_CREATION_ERROR unless such an extension has
474 //# been negotiated.
475 if self.inner.poll_accept_bi(cx).is_ready() {
476 return Poll::Ready(
477 self.inner
478 .handle_connection_error(InternalConnectionError::new(
479 Code::H3_STREAM_CREATION_ERROR,
480 "client received a server-initiated bidirectional stream".to_string(),
481 )),
482 );
483 }
484
485 Poll::Pending
486 }
487}