hyper2/proto/h2/
client.rs

1use std::{
2    borrow::Cow, convert::Infallible, future::Future, marker::PhantomData, pin::Pin, task::{Context, Poll}, time::Duration
3};
4
5use crate::rt::{Read, Write};
6use bytes::Bytes;
7use futures_channel::mpsc::{Receiver, Sender};
8use futures_channel::{mpsc, oneshot};
9use futures_util::future::{Either, FusedFuture, FutureExt as _};
10use futures_util::ready;
11use futures_util::stream::{StreamExt as _, StreamFuture};
12use h2::{client::{Builder, Connection, SendRequest}, frame::Priority};
13use h2::SendStream;
14use http::{Method, StatusCode};
15use pin_project_lite::pin_project;
16
17use super::ping::{Ponger, Recorder};
18use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
19use crate::body::{Body, Incoming as IncomingBody};
20use crate::client::dispatch::{Callback, SendWhen, TrySendError};
21use crate::common::io::Compat;
22use crate::common::time::Time;
23use crate::ext::Protocol;
24use crate::headers;
25use crate::proto::h2::UpgradedSendStream;
26use crate::proto::Dispatched;
27use crate::rt::bounds::Http2ClientConnExec;
28use crate::upgrade::Upgraded;
29use crate::{Request, Response};
30use h2::client::ResponseFuture;
31use h2::frame::{PseudoOrder, SettingsOrder, StreamDependency};
32
33
34type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
35
36///// An mpsc channel is used to help notify the `Connection` task when *all*
37///// other handles to it have been dropped, so that it can shutdown.
38type ConnDropRef = mpsc::Sender<Infallible>;
39
40///// A oneshot channel watches the `Connection` task, and when it completes,
41///// the "dispatch" task will be notified and can shutdown sooner.
42type ConnEof = oneshot::Receiver<Infallible>;
43
44// Our defaults are chosen for the "majority" case, which usually are not
45// resource constrained, and so the spec default of 64kb can be too limiting
46// for performance.
47const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
48const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
49const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
50
51// The maximum number of concurrent streams that the client is allowed to open
52// before it receives the initial SETTINGS frame from the server.
53// This default value is derived from what the HTTP/2 spec recommends as the
54// minimum value that endpoints advertise to their peers. It means that using
55// this value will minimize the chance of the failure where the local endpoint
56// attempts to open too many streams and gets rejected by the remote peer with
57// the `REFUSED_STREAM` error.
58const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
59
60#[derive(Clone, Debug)]
61pub(crate) struct Config {
62    pub(crate) adaptive_window: bool,
63    pub(crate) initial_stream_id: Option<u32>,
64    pub(crate) initial_conn_window_size: u32,
65    pub(crate) initial_stream_window_size: u32,
66    pub(crate) initial_max_send_streams: usize,
67    pub(crate) max_frame_size: Option<u32>,
68    pub(crate) keep_alive_interval: Option<Duration>,
69    pub(crate) keep_alive_timeout: Duration,
70    pub(crate) keep_alive_while_idle: bool,
71    pub(crate) max_concurrent_reset_streams: Option<usize>,
72    pub(crate) max_send_buffer_size: usize,
73    pub(crate) max_concurrent_streams: Option<u32>,
74    pub(crate) max_header_list_size: Option<u32>,
75    pub(crate) max_pending_accept_reset_streams: Option<usize>,
76    pub(crate) enable_push: Option<bool>,
77    pub(crate) header_table_size: Option<u32>,
78    pub(crate) unknown_setting8: Option<bool>,
79    pub(crate) unknown_setting9: Option<bool>,
80    pub(crate) headers_pseudo_order: Option<[PseudoOrder; 4]>,
81    pub(crate) headers_priority: Option<StreamDependency>,
82    pub(crate) settings_order: Option<[SettingsOrder; 8]>,
83    pub(crate) priority: Option<Cow<'static, [Priority]>>
84}
85
86impl Default for Config {
87    fn default() -> Config {
88        Config {
89            adaptive_window: false,
90            initial_stream_id: None,
91            initial_conn_window_size: DEFAULT_CONN_WINDOW,
92            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
93            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
94            max_frame_size: None,
95            max_header_list_size: None,
96            keep_alive_interval: None,
97            keep_alive_timeout: Duration::from_secs(20),
98            keep_alive_while_idle: false,
99            max_concurrent_reset_streams: None,
100            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
101            max_pending_accept_reset_streams: None,
102            header_table_size: None,
103            max_concurrent_streams: None,
104            enable_push: None,
105            unknown_setting8: None,
106            unknown_setting9: None,
107            headers_pseudo_order: None,
108            headers_priority: None,
109            settings_order: None,
110            priority: None,
111        }
112    }
113}
114
115fn new_builder(config: &Config) -> Builder {
116    let mut builder = Builder::default();
117    builder
118        .initial_max_send_streams(config.initial_max_send_streams)
119        .initial_window_size(config.initial_stream_window_size)
120        .initial_connection_window_size(config.initial_conn_window_size)
121        .max_send_buffer_size(config.max_send_buffer_size);
122    if let Some(id) = config.initial_stream_id {
123        builder.initial_stream_id(id);
124    }
125    if let Some(max) = config.max_pending_accept_reset_streams {
126        builder.max_pending_accept_reset_streams(max);
127    }
128    if let Some(max) = config.max_concurrent_reset_streams {
129        builder.max_concurrent_reset_streams(max);
130    }
131    if let Some(max) = config.max_concurrent_streams {
132        builder.max_concurrent_streams(max);
133    }
134    if let Some(max) = config.max_header_list_size {
135        builder.max_header_list_size(max);
136    }
137    if let Some(opt) = config.enable_push {
138        builder.enable_push(opt);
139    }
140    if let Some(max) = config.max_frame_size {
141        builder.max_frame_size(max);
142    }
143    if let Some(max) = config.header_table_size {
144        builder.header_table_size(max);
145    }
146    if let Some(v) = config.unknown_setting8 {
147        builder.unknown_setting8(v);
148    }
149    if let Some(v) = config.unknown_setting9 {
150        builder.unknown_setting9(v);
151    }
152    if let Some(priority) = config.headers_priority {
153        builder.headers_priority(priority);
154    }
155    if let Some(order) = config.headers_pseudo_order {
156        builder.headers_psuedo(order);
157    }
158    if let Some(order) = config.settings_order {
159        builder.settings_order(order);
160    }
161    if let Some(ref priority) = config.priority {
162        builder.priority(priority.clone());
163    }
164    builder
165}
166
167fn new_ping_config(config: &Config) -> ping::Config {
168    ping::Config {
169        bdp_initial_window: if config.adaptive_window {
170            Some(config.initial_stream_window_size)
171        } else {
172            None
173        },
174        keep_alive_interval: config.keep_alive_interval,
175        keep_alive_timeout: config.keep_alive_timeout,
176        keep_alive_while_idle: config.keep_alive_while_idle,
177    }
178}
179
180pub(crate) async fn handshake<T, B, E>(
181    io: T,
182    req_rx: ClientRx<B>,
183    config: &Config,
184    mut exec: E,
185    timer: Time,
186) -> crate::Result<ClientTask<B, E, T>>
187where
188    T: Read + Write + Unpin,
189    B: Body + 'static,
190    B::Data: Send + 'static,
191    E: Http2ClientConnExec<B, T> + Unpin,
192    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
193{
194    let (h2_tx, mut conn) = new_builder(config)
195        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
196        .await
197        .map_err(crate::Error::new_h2)?;
198
199    // An mpsc channel is used entirely to detect when the
200    // 'Client' has been dropped. This is to get around a bug
201    // in h2 where dropping all SendRequests won't notify a
202    // parked Connection.
203    let (conn_drop_ref, rx) = mpsc::channel(1);
204    let (cancel_tx, conn_eof) = oneshot::channel();
205
206    let conn_drop_rx = rx.into_future();
207
208    let ping_config = new_ping_config(config);
209
210    let (conn, ping) = if ping_config.is_enabled() {
211        let pp = conn.ping_pong().expect("conn.ping_pong");
212        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
213
214        let conn: Conn<_, B> = Conn::new(ponger, conn);
215        (Either::Left(conn), recorder)
216    } else {
217        (Either::Right(conn), ping::disabled())
218    };
219    let conn: ConnMapErr<T, B> = ConnMapErr {
220        conn,
221        is_terminated: false,
222    };
223
224    exec.execute_h2_future(H2ClientFuture::Task {
225        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
226    });
227
228    Ok(ClientTask {
229        ping,
230        conn_drop_ref,
231        conn_eof,
232        executor: exec,
233        h2_tx,
234        req_rx,
235        fut_ctx: None,
236        marker: PhantomData,
237    })
238}
239
240pin_project! {
241    struct Conn<T, B>
242    where
243        B: Body,
244    {
245        #[pin]
246        ponger: Ponger,
247        #[pin]
248        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
249    }
250}
251
252impl<T, B> Conn<T, B>
253where
254    B: Body,
255    T: Read + Write + Unpin,
256{
257    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
258        Conn { ponger, conn }
259    }
260}
261
262impl<T, B> Future for Conn<T, B>
263where
264    B: Body,
265    T: Read + Write + Unpin,
266{
267    type Output = Result<(), h2::Error>;
268
269    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
270        let mut this = self.project();
271        match this.ponger.poll(cx) {
272            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
273                this.conn.set_target_window_size(wnd);
274                this.conn.set_initial_window_size(wnd)?;
275            }
276            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
277                debug!("connection keep-alive timed out");
278                return Poll::Ready(Ok(()));
279            }
280            Poll::Pending => {}
281        }
282
283        Pin::new(&mut this.conn).poll(cx)
284    }
285}
286
287pin_project! {
288    struct ConnMapErr<T, B>
289    where
290        B: Body,
291        T: Read,
292        T: Write,
293        T: Unpin,
294    {
295        #[pin]
296        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
297        #[pin]
298        is_terminated: bool,
299    }
300}
301
302impl<T, B> Future for ConnMapErr<T, B>
303where
304    B: Body,
305    T: Read + Write + Unpin,
306{
307    type Output = Result<(), ()>;
308
309    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
310        let mut this = self.project();
311
312        if *this.is_terminated {
313            return Poll::Pending;
314        }
315        let polled = this.conn.poll(cx);
316        if polled.is_ready() {
317            *this.is_terminated = true;
318        }
319        polled.map_err(|_e| {
320            debug!(error = %_e, "connection error");
321        })
322    }
323}
324
325impl<T, B> FusedFuture for ConnMapErr<T, B>
326where
327    B: Body,
328    T: Read + Write + Unpin,
329{
330    fn is_terminated(&self) -> bool {
331        self.is_terminated
332    }
333}
334
335pin_project! {
336    pub struct ConnTask<T, B>
337    where
338        B: Body,
339        T: Read,
340        T: Write,
341        T: Unpin,
342    {
343        #[pin]
344        drop_rx: StreamFuture<Receiver<Infallible>>,
345        #[pin]
346        cancel_tx: Option<oneshot::Sender<Infallible>>,
347        #[pin]
348        conn: ConnMapErr<T, B>,
349    }
350}
351
352impl<T, B> ConnTask<T, B>
353where
354    B: Body,
355    T: Read + Write + Unpin,
356{
357    fn new(
358        conn: ConnMapErr<T, B>,
359        drop_rx: StreamFuture<Receiver<Infallible>>,
360        cancel_tx: oneshot::Sender<Infallible>,
361    ) -> Self {
362        Self {
363            drop_rx,
364            cancel_tx: Some(cancel_tx),
365            conn,
366        }
367    }
368}
369
370impl<T, B> Future for ConnTask<T, B>
371where
372    B: Body,
373    T: Read + Write + Unpin,
374{
375    type Output = ();
376
377    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
378        let mut this = self.project();
379
380        if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
381            // ok or err, the `conn` has finished.
382            return Poll::Ready(());
383        }
384
385        if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
386            // mpsc has been dropped, hopefully polling
387            // the connection some more should start shutdown
388            // and then close.
389            trace!("send_request dropped, starting conn shutdown");
390            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
391        }
392
393        Poll::Pending
394    }
395}
396
397pin_project! {
398    #[project = H2ClientFutureProject]
399    pub enum H2ClientFuture<B, T>
400    where
401        B: http_body::Body,
402        B: 'static,
403        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
404        T: Read,
405        T: Write,
406        T: Unpin,
407    {
408        Pipe {
409            #[pin]
410            pipe: PipeMap<B>,
411        },
412        Send {
413            #[pin]
414            send_when: SendWhen<B>,
415        },
416        Task {
417            #[pin]
418            task: ConnTask<T, B>,
419        },
420    }
421}
422
423impl<B, T> Future for H2ClientFuture<B, T>
424where
425    B: http_body::Body + 'static,
426    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
427    T: Read + Write + Unpin,
428{
429    type Output = ();
430
431    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
432        let this = self.project();
433
434        match this {
435            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
436            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
437            H2ClientFutureProject::Task { task } => task.poll(cx),
438        }
439    }
440}
441
442struct FutCtx<B>
443where
444    B: Body,
445{
446    is_connect: bool,
447    eos: bool,
448    fut: ResponseFuture,
449    body_tx: SendStream<SendBuf<B::Data>>,
450    body: B,
451    cb: Callback<Request<B>, Response<IncomingBody>>,
452}
453
454impl<B: Body> Unpin for FutCtx<B> {}
455
456pub(crate) struct ClientTask<B, E, T>
457where
458    B: Body,
459    E: Unpin,
460{
461    ping: ping::Recorder,
462    conn_drop_ref: ConnDropRef,
463    conn_eof: ConnEof,
464    executor: E,
465    h2_tx: SendRequest<SendBuf<B::Data>>,
466    req_rx: ClientRx<B>,
467    fut_ctx: Option<FutCtx<B>>,
468    marker: PhantomData<T>,
469}
470
471impl<B, E, T> ClientTask<B, E, T>
472where
473    B: Body + 'static,
474    E: Http2ClientConnExec<B, T> + Unpin,
475    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
476    T: Read + Write + Unpin,
477{
478    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
479        self.h2_tx.is_extended_connect_protocol_enabled()
480    }
481}
482
483pin_project! {
484    pub struct PipeMap<S>
485    where
486        S: Body,
487    {
488        #[pin]
489        pipe: PipeToSendStream<S>,
490        #[pin]
491        conn_drop_ref: Option<Sender<Infallible>>,
492        #[pin]
493        ping: Option<Recorder>,
494    }
495}
496
497impl<B> Future for PipeMap<B>
498where
499    B: http_body::Body,
500    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
501{
502    type Output = ();
503
504    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
505        let mut this = self.project();
506
507        match this.pipe.poll_unpin(cx) {
508            Poll::Ready(result) => {
509                if let Err(_e) = result {
510                    debug!("client request body error: {}", _e);
511                }
512                drop(this.conn_drop_ref.take().expect("Future polled twice"));
513                drop(this.ping.take().expect("Future polled twice"));
514                return Poll::Ready(());
515            }
516            Poll::Pending => (),
517        };
518        Poll::Pending
519    }
520}
521
522impl<B, E, T> ClientTask<B, E, T>
523where
524    B: Body + 'static + Unpin,
525    B::Data: Send,
526    E: Http2ClientConnExec<B, T> + Unpin,
527    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
528    T: Read + Write + Unpin,
529{
530    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
531        let ping = self.ping.clone();
532
533        let send_stream = if !f.is_connect {
534            if !f.eos {
535                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
536
537                // eagerly see if the body pipe is ready and
538                // can thus skip allocating in the executor
539                match Pin::new(&mut pipe).poll(cx) {
540                    Poll::Ready(_) => (),
541                    Poll::Pending => {
542                        let conn_drop_ref = self.conn_drop_ref.clone();
543                        // keep the ping recorder's knowledge of an
544                        // "open stream" alive while this body is
545                        // still sending...
546                        let ping = ping.clone();
547
548                        let pipe = PipeMap {
549                            pipe,
550                            conn_drop_ref: Some(conn_drop_ref),
551                            ping: Some(ping),
552                        };
553                        // Clear send task
554                        self.executor
555                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
556                    }
557                }
558            }
559
560            None
561        } else {
562            Some(f.body_tx)
563        };
564
565        self.executor.execute_h2_future(H2ClientFuture::Send {
566            send_when: SendWhen {
567                when: ResponseFutMap {
568                    fut: f.fut,
569                    ping: Some(ping),
570                    send_stream: Some(send_stream),
571                },
572                call_back: Some(f.cb),
573            },
574        });
575    }
576}
577
578pin_project! {
579    pub(crate) struct ResponseFutMap<B>
580    where
581        B: Body,
582        B: 'static,
583    {
584        #[pin]
585        fut: ResponseFuture,
586        #[pin]
587        ping: Option<Recorder>,
588        #[pin]
589        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
590    }
591}
592
593impl<B> Future for ResponseFutMap<B>
594where
595    B: Body + 'static,
596{
597    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
598
599    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
600        let mut this = self.project();
601
602        let result = ready!(this.fut.poll(cx));
603
604        let ping = this.ping.take().expect("Future polled twice");
605        let send_stream = this.send_stream.take().expect("Future polled twice");
606
607        match result {
608            Ok(res) => {
609                // record that we got the response headers
610                ping.record_non_data();
611
612                let content_length = headers::content_length_parse_all(res.headers());
613                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
614                    if content_length.map_or(false, |len| len != 0) {
615                        warn!("h2 connect response with non-zero body not supported");
616
617                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
618                        return Poll::Ready(Err((
619                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
620                            None::<Request<B>>,
621                        )));
622                    }
623                    let (parts, recv_stream) = res.into_parts();
624                    let mut res = Response::from_parts(parts, IncomingBody::empty());
625
626                    let (pending, on_upgrade) = crate::upgrade::pending();
627                    let io = H2Upgraded {
628                        ping,
629                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
630                        recv_stream,
631                        buf: Bytes::new(),
632                    };
633                    let upgraded = Upgraded::new(io, Bytes::new());
634
635                    pending.fulfill(upgraded);
636                    res.extensions_mut().insert(on_upgrade);
637
638                    Poll::Ready(Ok(res))
639                } else {
640                    let res = res.map(|stream| {
641                        let ping = ping.for_stream(&stream);
642                        IncomingBody::h2(stream, content_length.into(), ping)
643                    });
644                    Poll::Ready(Ok(res))
645                }
646            }
647            Err(err) => {
648                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
649
650                debug!("client response error: {}", err);
651                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
652            }
653        }
654    }
655}
656
657impl<B, E, T> Future for ClientTask<B, E, T>
658where
659    B: Body + 'static + Unpin,
660    B::Data: Send,
661    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
662    E: Http2ClientConnExec<B, T> + Unpin,
663    T: Read + Write + Unpin,
664{
665    type Output = crate::Result<Dispatched>;
666
667    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
668        loop {
669            match ready!(self.h2_tx.poll_ready(cx)) {
670                Ok(()) => (),
671                Err(err) => {
672                    self.ping.ensure_not_timed_out()?;
673                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
674                        trace!("connection gracefully shutdown");
675                        Poll::Ready(Ok(Dispatched::Shutdown))
676                    } else {
677                        Poll::Ready(Err(crate::Error::new_h2(err)))
678                    };
679                }
680            };
681
682            // If we were waiting on pending open
683            // continue where we left off.
684            if let Some(f) = self.fut_ctx.take() {
685                self.poll_pipe(f, cx);
686                continue;
687            }
688
689            match self.req_rx.poll_recv(cx) {
690                Poll::Ready(Some((req, cb))) => {
691                    // check that future hasn't been canceled already
692                    if cb.is_canceled() {
693                        trace!("request callback is canceled");
694                        continue;
695                    }
696                    let (head, body) = req.into_parts();
697                    let mut req = ::http::Request::from_parts(head, ());
698                    super::strip_connection_headers(req.headers_mut(), true);
699                    if let Some(len) = body.size_hint().exact() {
700                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
701                            headers::set_content_length_if_missing(req.headers_mut(), len);
702                        }
703                    }
704
705                    let is_connect = req.method() == Method::CONNECT;
706                    let eos = body.is_end_stream();
707
708                    if is_connect
709                        && headers::content_length_parse_all(req.headers())
710                            .map_or(false, |len| len != 0)
711                    {
712                        warn!("h2 connect request with non-zero body not supported");
713                        cb.send(Err(TrySendError {
714                            error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
715                            message: None,
716                        }));
717                        continue;
718                    }
719
720                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
721                        req.extensions_mut().insert(protocol.into_inner());
722                    }
723
724                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
725                        Ok(ok) => ok,
726                        Err(err) => {
727                            debug!("client send request error: {}", err);
728                            cb.send(Err(TrySendError {
729                                error: crate::Error::new_h2(err),
730                                message: None,
731                            }));
732                            continue;
733                        }
734                    };
735
736                    let f = FutCtx {
737                        is_connect,
738                        eos,
739                        fut,
740                        body_tx,
741                        body,
742                        cb,
743                    };
744
745                    // Check poll_ready() again.
746                    // If the call to send_request() resulted in the new stream being pending open
747                    // we have to wait for the open to complete before accepting new requests.
748                    match self.h2_tx.poll_ready(cx) {
749                        Poll::Pending => {
750                            // Save Context
751                            self.fut_ctx = Some(f);
752                            return Poll::Pending;
753                        }
754                        Poll::Ready(Ok(())) => (),
755                        Poll::Ready(Err(err)) => {
756                            f.cb.send(Err(TrySendError {
757                                error: crate::Error::new_h2(err),
758                                message: None,
759                            }));
760                            continue;
761                        }
762                    }
763                    self.poll_pipe(f, cx);
764                    continue;
765                }
766
767                Poll::Ready(None) => {
768                    trace!("client::dispatch::Sender dropped");
769                    return Poll::Ready(Ok(Dispatched::Shutdown));
770                }
771
772                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
773                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
774                    // But we cannot remove it as long as MSRV is less than that.
775                    #[allow(unused)]
776                    Ok(never) => match never {},
777                    Err(_conn_is_eof) => {
778                        trace!("connection task is closed, closing dispatch task");
779                        return Poll::Ready(Ok(Dispatched::Shutdown));
780                    }
781                },
782            }
783        }
784    }
785}