rama_http_core/client/conn/
http2.rs

1//! HTTP/2 client connections
2
3use std::borrow::Cow;
4use std::fmt;
5use std::marker::PhantomData;
6use std::pin::Pin;
7use std::task::{Context, Poll};
8use std::time::Duration;
9
10use futures_util::ready;
11use rama_core::error::BoxError;
12use rama_core::rt::Executor;
13use rama_http_types::proto::h2::PseudoHeaderOrder;
14use rama_http_types::proto::h2::frame::{SettingOrder, SettingsConfig};
15use rama_http_types::{Request, Response};
16use tokio::io::{AsyncRead, AsyncWrite};
17use tracing::{debug, trace};
18
19use super::super::dispatch::{self, TrySendError};
20use crate::body::{Body, Incoming as IncomingBody};
21use crate::h2::frame::{Priority, StreamDependency};
22use crate::proto;
23
24/// The sender side of an established connection.
25pub struct SendRequest<B> {
26    dispatch: dispatch::UnboundedSender<Request<B>, Response<IncomingBody>>,
27}
28
29impl<B> Clone for SendRequest<B> {
30    fn clone(&self) -> SendRequest<B> {
31        SendRequest {
32            dispatch: self.dispatch.clone(),
33        }
34    }
35}
36
37/// A future that processes all HTTP state for the IO object.
38///
39/// In most cases, this should just be spawned into an executor, so that it
40/// can process incoming and outgoing messages, notice hangups, and the like.
41///
42/// Instances of this type are typically created via the [`handshake`] function
43#[must_use = "futures do nothing unless polled"]
44pub struct Connection<T, B>
45where
46    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
47    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
48{
49    inner: (PhantomData<T>, proto::h2::ClientTask<B, T>),
50}
51
52/// A builder to configure an HTTP connection.
53///
54/// After setting options, the builder is used to create a handshake future.
55///
56/// **Note**: The default values of options are *not considered stable*. They
57/// are subject to change at any time.
58#[derive(Clone, Debug)]
59pub struct Builder {
60    pub(super) exec: Executor,
61    h2_builder: proto::h2::client::Config,
62    headers_pseudo_order: Option<PseudoHeaderOrder>,
63    headers_priority: Option<StreamDependency>,
64    priority: Option<Cow<'static, [Priority]>>,
65}
66
67/// Returns a handshake future over some IO.
68///
69/// This is a shortcut for `Builder::new(exec).handshake(io)`.
70/// See [`client::conn`](crate::client::conn) for more.
71pub async fn handshake<T, B>(
72    exec: Executor,
73    io: T,
74) -> crate::Result<(SendRequest<B>, Connection<T, B>)>
75where
76    T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
77    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
78{
79    Builder::new(exec).handshake(io).await
80}
81
82// ===== impl SendRequest
83
84impl<B> SendRequest<B> {
85    /// Polls to determine whether this sender can be used yet for a request.
86    ///
87    /// If the associated connection is closed, this returns an Error.
88    pub fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
89        if self.is_closed() {
90            Poll::Ready(Err(crate::Error::new_closed()))
91        } else {
92            Poll::Ready(Ok(()))
93        }
94    }
95
96    /// Waits until the dispatcher is ready
97    ///
98    /// If the associated connection is closed, this returns an Error.
99    pub async fn ready(&mut self) -> crate::Result<()> {
100        futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
101    }
102
103    /// Checks if the connection is currently ready to send a request.
104    ///
105    /// # Note
106    ///
107    /// This is mostly a hint. Due to inherent latency of networks, it is
108    /// possible that even after checking this is ready, sending a request
109    /// may still fail because the connection was closed in the meantime.
110    pub fn is_ready(&self) -> bool {
111        self.dispatch.is_ready()
112    }
113
114    /// Checks if the connection side has been closed.
115    pub fn is_closed(&self) -> bool {
116        self.dispatch.is_closed()
117    }
118}
119
120impl<B> SendRequest<B>
121where
122    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
123{
124    /// Sends a `Request` on the associated connection.
125    ///
126    /// Returns a future that if successful, yields the `Response`.
127    ///
128    /// `req` must have a `Host` header.
129    ///
130    /// Absolute-form `Uri`s are not required. If received, they will be serialized
131    /// as-is.
132    pub fn send_request(
133        &mut self,
134        req: Request<B>,
135    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
136        let sent = self.dispatch.send(req);
137
138        async move {
139            match sent {
140                Ok(rx) => match rx.await {
141                    Ok(Ok(resp)) => Ok(resp),
142                    Ok(Err(err)) => Err(err),
143                    // this is definite bug if it happens, but it shouldn't happen!
144                    Err(_canceled) => panic!("dispatch dropped without returning error"),
145                },
146                Err(_req) => {
147                    debug!("connection was not ready");
148                    Err(crate::Error::new_canceled().with("connection was not ready"))
149                }
150            }
151        }
152    }
153
154    /// Sends a `Request` on the associated connection.
155    ///
156    /// Returns a future that if successful, yields the `Response`.
157    ///
158    /// # Error
159    ///
160    /// If there was an error before trying to serialize the request to the
161    /// connection, the message will be returned as part of this error.
162    pub fn try_send_request(
163        &mut self,
164        req: Request<B>,
165    ) -> impl Future<Output = Result<Response<IncomingBody>, TrySendError<Request<B>>>> {
166        let sent = self.dispatch.try_send(req);
167        async move {
168            match sent {
169                Ok(rx) => match rx.await {
170                    Ok(Ok(res)) => Ok(res),
171                    Ok(Err(err)) => Err(err),
172                    // this is definite bug if it happens, but it shouldn't happen!
173                    Err(_) => panic!("dispatch dropped without returning error"),
174                },
175                Err(req) => {
176                    debug!("connection was not ready");
177                    let error = crate::Error::new_canceled().with("connection was not ready");
178                    Err(TrySendError {
179                        error,
180                        message: Some(req),
181                    })
182                }
183            }
184        }
185    }
186}
187
188impl<B> fmt::Debug for SendRequest<B> {
189    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
190        f.debug_struct("SendRequest").finish()
191    }
192}
193
194// ===== impl Connection
195
196impl<T, B> Connection<T, B>
197where
198    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
199    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
200{
201    /// Returns whether the [extended CONNECT protocol][1] is enabled or not.
202    ///
203    /// This setting is configured by the server peer by sending the
204    /// [`SETTINGS_ENABLE_CONNECT_PROTOCOL` parameter][2] in a `SETTINGS` frame.
205    /// This method returns the currently acknowledged value received from the
206    /// remote.
207    ///
208    /// [1]: https://datatracker.ietf.org/doc/html/rfc8441#section-4
209    /// [2]: https://datatracker.ietf.org/doc/html/rfc8441#section-3
210    pub fn is_extended_connect_protocol_enabled(&self) -> bool {
211        self.inner.1.is_extended_connect_protocol_enabled()
212    }
213}
214
215impl<T, B> fmt::Debug for Connection<T, B>
216where
217    T: AsyncRead + AsyncWrite + fmt::Debug + Send + 'static + Unpin,
218    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
219{
220    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221        f.debug_struct("Connection").finish()
222    }
223}
224
225impl<T, B> Future for Connection<T, B>
226where
227    T: AsyncRead + AsyncWrite + Unpin + Send + 'static,
228    B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
229{
230    type Output = crate::Result<()>;
231
232    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
233        match ready!(Pin::new(&mut self.inner.1).poll(cx))? {
234            proto::Dispatched::Shutdown => Poll::Ready(Ok(())),
235            proto::Dispatched::Upgrade(_pending) => unreachable!("http2 cannot upgrade"),
236        }
237    }
238}
239
240// ===== impl Builder
241
242impl Builder {
243    /// Creates a new connection builder.
244    #[inline]
245    pub fn new(exec: Executor) -> Builder {
246        Builder {
247            exec,
248            h2_builder: Default::default(),
249            headers_pseudo_order: None,
250            headers_priority: None,
251            priority: None,
252        }
253    }
254
255    pub fn apply_setting_config(&mut self, config: &SettingsConfig) -> &mut Self {
256        self.header_table_size(config.header_table_size)
257            .max_concurrent_streams(config.max_concurrent_streams)
258            .initial_stream_window_size(config.initial_window_size)
259            .max_frame_size(config.max_frame_size);
260
261        if let Some(value) = config.enable_push {
262            self.enable_push(value != 0);
263        }
264
265        if let Some(value) = config.max_header_list_size {
266            self.max_header_list_size(value);
267        }
268
269        if let Some(value) = config.enable_connect_protocol {
270            self.enable_connect_protocol(value);
271        }
272
273        if let Some(value) = config.unknown_setting_9 {
274            self.unknown_setting_9(value);
275        }
276
277        if let Some(order) = config.setting_order.clone() {
278            self.setting_order(order);
279        }
280
281        self
282    }
283
284    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
285    /// stream-level flow control.
286    ///
287    /// Passing `None` will do nothing.
288    ///
289    /// If not set, rama_http_core will use a default.
290    ///
291    /// [spec]: https://httpwg.org/specs/rfc9113.html#SETTINGS_INITIAL_WINDOW_SIZE
292    pub fn initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
293        if let Some(sz) = sz.into() {
294            self.h2_builder.adaptive_window = false;
295            self.h2_builder.initial_stream_window_size = sz;
296        }
297        self
298    }
299
300    /// Sets the max connection-level flow control for HTTP2
301    ///
302    /// Passing `None` will do nothing.
303    ///
304    /// If not set, rama_http_core will use a default.
305    pub fn initial_connection_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
306        if let Some(sz) = sz.into() {
307            self.h2_builder.adaptive_window = false;
308            self.h2_builder.initial_conn_window_size = sz;
309        }
310        self
311    }
312
313    /// Sets the initial maximum of locally initiated (send) streams.
314    ///
315    /// This value will be overwritten by the value included in the initial
316    /// SETTINGS frame received from the peer as part of a [connection preface].
317    ///
318    /// Passing `None` will do nothing.
319    ///
320    /// If not set, rama_http_core will use a default.
321    ///
322    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
323    pub fn initial_max_send_streams(&mut self, initial: impl Into<Option<usize>>) -> &mut Self {
324        if let Some(initial) = initial.into() {
325            self.h2_builder.initial_max_send_streams = initial;
326        }
327        self
328    }
329
330    /// Sets whether to use an adaptive flow control.
331    ///
332    /// Enabling this will override the limits set in
333    /// `initial_stream_window_size` and
334    /// `initial_connection_window_size`.
335    pub fn adaptive_window(&mut self, enabled: bool) -> &mut Self {
336        use proto::h2::SPEC_WINDOW_SIZE;
337
338        self.h2_builder.adaptive_window = enabled;
339        if enabled {
340            self.h2_builder.initial_conn_window_size = SPEC_WINDOW_SIZE;
341            self.h2_builder.initial_stream_window_size = SPEC_WINDOW_SIZE;
342        }
343        self
344    }
345
346    /// Sets the maximum frame size to use for HTTP2.
347    ///
348    /// Default is currently 16KB, but can change.
349    pub fn max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
350        self.h2_builder.max_frame_size = sz.into();
351        self
352    }
353
354    /// Sets the max size of received header frames.
355    ///
356    /// Default is currently 16KB, but can change.
357    pub fn max_header_list_size(&mut self, max: u32) -> &mut Self {
358        self.h2_builder.max_header_list_size = max;
359        self
360    }
361
362    /// Sets the header table size.
363    ///
364    /// This setting informs the peer of the maximum size of the header compression
365    /// table used to encode header blocks, in octets. The encoder may select any value
366    /// equal to or less than the header table size specified by the sender.
367    ///
368    /// The default value of crate `h2` is 4,096.
369    pub fn header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self {
370        self.h2_builder.header_table_size = size.into();
371        self
372    }
373
374    /// Sets the maximum number of concurrent streams.
375    ///
376    /// The maximum concurrent streams setting only controls the maximum number
377    /// of streams that can be initiated by the remote peer. In other words,
378    /// when this setting is set to 100, this does not limit the number of
379    /// concurrent streams that can be created by the caller.
380    ///
381    /// It is recommended that this value be no smaller than 100, so as to not
382    /// unnecessarily limit parallelism. However, any value is legal, including
383    /// 0. If `max` is set to 0, then the remote will not be permitted to
384    /// initiate streams.
385    ///
386    /// Note that streams in the reserved state, i.e., push promises that have
387    /// been reserved but the stream has not started, do not count against this
388    /// setting.
389    ///
390    /// Also note that if the remote *does* exceed the value set here, it is not
391    /// a protocol level error. Instead, the `h2` library will immediately reset
392    /// the stream.
393    ///
394    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
395    ///
396    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
397    pub fn max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
398        self.h2_builder.max_concurrent_streams = max.into();
399        self
400    }
401
402    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
403    /// connection alive.
404    ///
405    /// Pass `None` to disable HTTP2 keep-alive.
406    ///
407    /// Default is currently disabled.
408    pub fn keep_alive_interval(&mut self, interval: impl Into<Option<Duration>>) -> &mut Self {
409        self.h2_builder.keep_alive_interval = interval.into();
410        self
411    }
412
413    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
414    ///
415    /// If the ping is not acknowledged within the timeout, the connection will
416    /// be closed. Does nothing if `keep_alive_interval` is disabled.
417    ///
418    /// Default is 20 seconds.
419    pub fn keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
420        self.h2_builder.keep_alive_timeout = timeout;
421        self
422    }
423
424    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
425    ///
426    /// If disabled, keep-alive pings are only sent while there are open
427    /// request/responses streams. If enabled, pings are also sent when no
428    /// streams are active. Does nothing if `keep_alive_interval` is
429    /// disabled.
430    ///
431    /// Default is `false`.
432    pub fn keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
433        self.h2_builder.keep_alive_while_idle = enabled;
434        self
435    }
436
437    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
438    ///
439    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
440    /// details.
441    ///
442    /// The default value is determined by the `h2` crate.
443    ///
444    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
445    pub fn max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
446        self.h2_builder.max_concurrent_reset_streams = Some(max);
447        self
448    }
449
450    /// Set the maximum write buffer size for each HTTP/2 stream.
451    ///
452    /// Default is currently 1MB, but may change.
453    ///
454    /// # Panics
455    ///
456    /// The value must be no larger than `u32::MAX`.
457    pub fn max_send_buf_size(&mut self, max: usize) -> &mut Self {
458        assert!(max <= u32::MAX as usize);
459        self.h2_builder.max_send_buffer_size = max;
460        self
461    }
462
463    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
464    ///
465    /// This will default to the default value set by the `h2` module. For now this is `20`.
466    pub fn max_pending_accept_reset_streams(&mut self, max: impl Into<Option<usize>>) -> &mut Self {
467        self.h2_builder.max_pending_accept_reset_streams = max.into();
468        self
469    }
470
471    pub fn enable_push(&mut self, enable: bool) -> &mut Self {
472        self.h2_builder.enable_push = enable;
473        self
474    }
475
476    pub fn enable_connect_protocol(&mut self, value: u32) -> &mut Self {
477        self.h2_builder.enable_connect_protocol = Some(value);
478        self
479    }
480
481    pub fn unknown_setting_9(&mut self, value: u32) -> &mut Self {
482        self.h2_builder.unknown_setting_9 = Some(value);
483        self
484    }
485
486    pub fn setting_order(&mut self, order: SettingOrder) -> &mut Self {
487        self.h2_builder.setting_order = Some(order);
488        self
489    }
490
491    pub fn headers_pseudo_order(&mut self, order: PseudoHeaderOrder) -> &mut Self {
492        self.headers_pseudo_order = Some(order);
493        self
494    }
495
496    pub fn headers_priority(&mut self, headers_priority: StreamDependency) -> &mut Self {
497        self.headers_priority = Some(headers_priority);
498        self
499    }
500
501    pub fn priority(&mut self, priority: impl Into<Cow<'static, [Priority]>>) -> &mut Self {
502        self.priority = Some(priority.into());
503        self
504    }
505
506    /// Constructs a connection with the configured options and IO.
507    /// See [`client::conn`](crate::client::conn) for more.
508    ///
509    /// Note, if [`Connection`] is not `await`-ed, [`SendRequest`] will
510    /// do nothing.
511    pub fn handshake<T, B>(
512        &self,
513        io: T,
514    ) -> impl Future<Output = crate::Result<(SendRequest<B>, Connection<T, B>)>>
515    where
516        T: AsyncRead + AsyncWrite + Send + Unpin + 'static,
517        B: Body<Data: Send + 'static, Error: Into<BoxError>> + Send + 'static + Unpin,
518    {
519        let opts = self.clone();
520
521        async move {
522            trace!("client handshake HTTP/2");
523
524            let mut client_builder = proto::h2::client::new_builder(&self.h2_builder);
525            if let Some(order) = self.headers_pseudo_order.clone() {
526                client_builder.headers_pseudo_order(order);
527            }
528            if let Some(priority) = self.headers_priority.clone() {
529                client_builder.headers_priority(priority);
530            }
531            if let Some(priority) = self.priority.clone() {
532                client_builder.priority(priority);
533            }
534
535            let (tx, rx) = dispatch::channel();
536
537            let h2 = proto::h2::client::handshake_with_builder(
538                client_builder,
539                io,
540                rx,
541                &opts.h2_builder,
542                opts.exec,
543            )
544            .await?;
545
546            Ok((
547                SendRequest {
548                    dispatch: tx.unbound(),
549                },
550                Connection {
551                    inner: (PhantomData, h2),
552                },
553            ))
554        }
555    }
556}
557
558#[cfg(test)]
559mod tests {
560    use rama_core::rt::Executor;
561    use rama_http_types::dep::http_body_util;
562    use tokio::io::{AsyncRead, AsyncWrite};
563
564    #[tokio::test]
565    #[ignore] // only compilation is checked
566    async fn send_sync_executor_of_send_futures() {
567        #[allow(unused)]
568        async fn run(io: impl AsyncRead + AsyncWrite + Send + Unpin + 'static) {
569            let (_sender, conn) = crate::client::conn::http2::handshake::<
570                _,
571                http_body_util::Empty<bytes::Bytes>,
572            >(Executor::default(), io)
573            .await
574            .unwrap();
575
576            tokio::task::spawn(async move {
577                conn.await.unwrap();
578            });
579        }
580    }
581}