miku_hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_util::future::{Either, FusedFuture, FutureExt as _};
15use futures_util::ready;
16use futures_util::stream::{StreamExt as _, StreamFuture};
17use h2::client::{Builder, Connection, SendRequest};
18use h2::SendStream;
19use http::{Method, StatusCode};
20use pin_project_lite::pin_project;
21
22use super::ping::{Ponger, Recorder};
23use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
24use crate::body::{Body, Incoming as IncomingBody};
25use crate::client::dispatch::{Callback, SendWhen, TrySendError};
26use crate::common::io::Compat;
27use crate::common::time::Time;
28use crate::ext::{FramePriority, FrameStreamDependency, Protocol, PseudoType};
29use crate::headers;
30use crate::proto::h2::UpgradedSendStream;
31use crate::proto::Dispatched;
32use crate::rt::bounds::Http2ClientConnExec;
33use crate::upgrade::Upgraded;
34use crate::{Request, Response};
35use h2::client::ResponseFuture;
36
37type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
38
39///// An mpsc channel is used to help notify the `Connection` task when *all*
40///// other handles to it have been dropped, so that it can shutdown.
41type ConnDropRef = mpsc::Sender<Infallible>;
42
43///// A oneshot channel watches the `Connection` task, and when it completes,
44///// the "dispatch" task will be notified and can shutdown sooner.
45type ConnEof = oneshot::Receiver<Infallible>;
46
47// Our defaults are chosen for the "majority" case, which usually are not
48// resource constrained, and so the spec default of 64kb can be too limiting
49// for performance.
50const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
51const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
52const _DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
53const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
54const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
55
56// The maximum number of concurrent streams that the client is allowed to open
57// before it receives the initial SETTINGS frame from the server.
58// This default value is derived from what the HTTP/2 spec recommends as the
59// minimum value that endpoints advertise to their peers. It means that using
60// this value will minimize the chance of the failure where the local endpoint
61// attempts to open too many streams and gets rejected by the remote peer with
62// the `REFUSED_STREAM` error.
63const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
64
65#[derive(Clone, Debug)]
66pub(crate) struct Config {
67    pub(crate) adaptive_window: bool,
68    pub(crate) initial_conn_window_size: u32,
69    pub(crate) initial_stream_window_size: u32,
70    pub(crate) initial_max_send_streams: usize,
71    pub(crate) max_frame_size: Option<u32>,
72    pub(crate) max_header_list_size: u32,
73    pub(crate) keep_alive_interval: Option<Duration>,
74    pub(crate) keep_alive_timeout: Duration,
75    pub(crate) keep_alive_while_idle: bool,
76    pub(crate) max_concurrent_reset_streams: Option<usize>,
77    pub(crate) max_send_buffer_size: usize,
78    pub(crate) max_pending_accept_reset_streams: Option<usize>,
79    pub(crate) header_table_size: Option<u32>,
80    pub(crate) max_concurrent_streams: Option<u32>,
81
82    // Extend configuration controlling h2
83    pub(crate) enable_push: bool,
84    pub(crate) headers_frame_pseudo_order: Option<&'static [PseudoType; 4]>,
85    pub(crate) headers_frame_priority: Option<FrameStreamDependency>,
86    pub(crate) virtual_streams_priorities: Option<&'static [FramePriority]>,
87}
88
89impl Default for Config {
90    fn default() -> Config {
91        Config {
92            adaptive_window: false,
93            initial_conn_window_size: DEFAULT_CONN_WINDOW,
94            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
95            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
96            max_frame_size: None,
97            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
98            keep_alive_interval: None,
99            keep_alive_timeout: Duration::from_secs(20),
100            keep_alive_while_idle: false,
101            max_concurrent_reset_streams: None,
102            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
103            max_pending_accept_reset_streams: None,
104            header_table_size: None,
105            max_concurrent_streams: None,
106
107            // Extend configuration controlling h2
108            enable_push: true,
109            headers_frame_pseudo_order: None,
110            headers_frame_priority: None,
111            virtual_streams_priorities: None,
112        }
113    }
114}
115
116fn new_builder(config: &Config) -> Builder {
117    let mut builder = Builder::default();
118    builder
119        .initial_max_send_streams(config.initial_max_send_streams)
120        .initial_window_size(config.initial_stream_window_size)
121        .initial_connection_window_size(config.initial_conn_window_size)
122        .max_header_list_size(config.max_header_list_size)
123        .max_send_buffer_size(config.max_send_buffer_size)
124        .enable_push(config.enable_push)
125        .headers_frame_pseudo_order(config.headers_frame_pseudo_order)
126        .headers_frame_priority(config.headers_frame_priority)
127        .virtual_streams_priorities(config.virtual_streams_priorities);
128    if let Some(max) = config.max_frame_size {
129        builder.max_frame_size(max);
130    }
131    if let Some(max) = config.max_concurrent_reset_streams {
132        builder.max_concurrent_reset_streams(max);
133    }
134    if let Some(max) = config.max_pending_accept_reset_streams {
135        builder.max_pending_accept_reset_streams(max);
136    }
137    if let Some(size) = config.header_table_size {
138        builder.header_table_size(size);
139    }
140    if let Some(max) = config.max_concurrent_streams {
141        builder.max_concurrent_streams(max);
142    }
143    builder
144}
145
146fn new_ping_config(config: &Config) -> ping::Config {
147    ping::Config {
148        bdp_initial_window: if config.adaptive_window {
149            Some(config.initial_stream_window_size)
150        } else {
151            None
152        },
153        keep_alive_interval: config.keep_alive_interval,
154        keep_alive_timeout: config.keep_alive_timeout,
155        keep_alive_while_idle: config.keep_alive_while_idle,
156    }
157}
158
159pub(crate) async fn handshake<T, B, E>(
160    io: T,
161    req_rx: ClientRx<B>,
162    config: &Config,
163    mut exec: E,
164    timer: Time,
165) -> crate::Result<ClientTask<B, E, T>>
166where
167    T: Read + Write + Unpin,
168    B: Body + 'static,
169    B::Data: Send + 'static,
170    E: Http2ClientConnExec<B, T> + Unpin,
171    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
172{
173    let (h2_tx, mut conn) = new_builder(config)
174        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
175        .await
176        .map_err(crate::Error::new_h2)?;
177
178    // An mpsc channel is used entirely to detect when the
179    // 'Client' has been dropped. This is to get around a bug
180    // in h2 where dropping all SendRequests won't notify a
181    // parked Connection.
182    let (conn_drop_ref, rx) = mpsc::channel(1);
183    let (cancel_tx, conn_eof) = oneshot::channel();
184
185    let conn_drop_rx = rx.into_future();
186
187    let ping_config = new_ping_config(config);
188
189    let (conn, ping) = if ping_config.is_enabled() {
190        let pp = conn.ping_pong().expect("conn.ping_pong");
191        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
192
193        let conn: Conn<_, B> = Conn::new(ponger, conn);
194        (Either::Left(conn), recorder)
195    } else {
196        (Either::Right(conn), ping::disabled())
197    };
198    let conn: ConnMapErr<T, B> = ConnMapErr {
199        conn,
200        is_terminated: false,
201    };
202
203    exec.execute_h2_future(H2ClientFuture::Task {
204        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
205    });
206
207    Ok(ClientTask {
208        ping,
209        conn_drop_ref,
210        conn_eof,
211        executor: exec,
212        h2_tx,
213        req_rx,
214        fut_ctx: None,
215        marker: PhantomData,
216    })
217}
218
219pin_project! {
220    struct Conn<T, B>
221    where
222        B: Body,
223    {
224        #[pin]
225        ponger: Ponger,
226        #[pin]
227        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
228    }
229}
230
231impl<T, B> Conn<T, B>
232where
233    B: Body,
234    T: Read + Write + Unpin,
235{
236    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
237        Conn { ponger, conn }
238    }
239}
240
241impl<T, B> Future for Conn<T, B>
242where
243    B: Body,
244    T: Read + Write + Unpin,
245{
246    type Output = Result<(), h2::Error>;
247
248    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
249        let mut this = self.project();
250        match this.ponger.poll(cx) {
251            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
252                this.conn.set_target_window_size(wnd);
253                this.conn.set_initial_window_size(wnd)?;
254            }
255            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
256                debug!("connection keep-alive timed out");
257                return Poll::Ready(Ok(()));
258            }
259            Poll::Pending => {}
260        }
261
262        Pin::new(&mut this.conn).poll(cx)
263    }
264}
265
266pin_project! {
267    struct ConnMapErr<T, B>
268    where
269        B: Body,
270        T: Read,
271        T: Write,
272        T: Unpin,
273    {
274        #[pin]
275        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
276        #[pin]
277        is_terminated: bool,
278    }
279}
280
281impl<T, B> Future for ConnMapErr<T, B>
282where
283    B: Body,
284    T: Read + Write + Unpin,
285{
286    type Output = Result<(), ()>;
287
288    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289        let mut this = self.project();
290
291        if *this.is_terminated {
292            return Poll::Pending;
293        }
294        let polled = this.conn.poll(cx);
295        if polled.is_ready() {
296            *this.is_terminated = true;
297        }
298        polled.map_err(|_e| {
299            debug!(error = %_e, "connection error");
300        })
301    }
302}
303
304impl<T, B> FusedFuture for ConnMapErr<T, B>
305where
306    B: Body,
307    T: Read + Write + Unpin,
308{
309    fn is_terminated(&self) -> bool {
310        self.is_terminated
311    }
312}
313
314pin_project! {
315    pub struct ConnTask<T, B>
316    where
317        B: Body,
318        T: Read,
319        T: Write,
320        T: Unpin,
321    {
322        #[pin]
323        drop_rx: StreamFuture<Receiver<Infallible>>,
324        #[pin]
325        cancel_tx: Option<oneshot::Sender<Infallible>>,
326        #[pin]
327        conn: ConnMapErr<T, B>,
328    }
329}
330
331impl<T, B> ConnTask<T, B>
332where
333    B: Body,
334    T: Read + Write + Unpin,
335{
336    fn new(
337        conn: ConnMapErr<T, B>,
338        drop_rx: StreamFuture<Receiver<Infallible>>,
339        cancel_tx: oneshot::Sender<Infallible>,
340    ) -> Self {
341        Self {
342            drop_rx,
343            cancel_tx: Some(cancel_tx),
344            conn,
345        }
346    }
347}
348
349impl<T, B> Future for ConnTask<T, B>
350where
351    B: Body,
352    T: Read + Write + Unpin,
353{
354    type Output = ();
355
356    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
357        let mut this = self.project();
358
359        if !this.conn.is_terminated() && this.conn.poll_unpin(cx).is_ready() {
360            // ok or err, the `conn` has finished.
361            return Poll::Ready(());
362        }
363
364        if !this.drop_rx.is_terminated() && this.drop_rx.poll_unpin(cx).is_ready() {
365            // mpsc has been dropped, hopefully polling
366            // the connection some more should start shutdown
367            // and then close.
368            trace!("send_request dropped, starting conn shutdown");
369            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
370        }
371
372        Poll::Pending
373    }
374}
375
376pin_project! {
377    #[project = H2ClientFutureProject]
378    pub enum H2ClientFuture<B, T>
379    where
380        B: http_body::Body,
381        B: 'static,
382        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
383        T: Read,
384        T: Write,
385        T: Unpin,
386    {
387        Pipe {
388            #[pin]
389            pipe: PipeMap<B>,
390        },
391        Send {
392            #[pin]
393            send_when: SendWhen<B>,
394        },
395        Task {
396            #[pin]
397            task: ConnTask<T, B>,
398        },
399    }
400}
401
402impl<B, T> Future for H2ClientFuture<B, T>
403where
404    B: http_body::Body + 'static,
405    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
406    T: Read + Write + Unpin,
407{
408    type Output = ();
409
410    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
411        let this = self.project();
412
413        match this {
414            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
415            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
416            H2ClientFutureProject::Task { task } => task.poll(cx),
417        }
418    }
419}
420
421struct FutCtx<B>
422where
423    B: Body,
424{
425    is_connect: bool,
426    eos: bool,
427    fut: ResponseFuture,
428    body_tx: SendStream<SendBuf<B::Data>>,
429    body: B,
430    cb: Callback<Request<B>, Response<IncomingBody>>,
431}
432
433impl<B: Body> Unpin for FutCtx<B> {}
434
435pub(crate) struct ClientTask<B, E, T>
436where
437    B: Body,
438    E: Unpin,
439{
440    ping: ping::Recorder,
441    conn_drop_ref: ConnDropRef,
442    conn_eof: ConnEof,
443    executor: E,
444    h2_tx: SendRequest<SendBuf<B::Data>>,
445    req_rx: ClientRx<B>,
446    fut_ctx: Option<FutCtx<B>>,
447    marker: PhantomData<T>,
448}
449
450impl<B, E, T> ClientTask<B, E, T>
451where
452    B: Body + 'static,
453    E: Http2ClientConnExec<B, T> + Unpin,
454    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
455    T: Read + Write + Unpin,
456{
457    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
458        self.h2_tx.is_extended_connect_protocol_enabled()
459    }
460}
461
462pin_project! {
463    pub struct PipeMap<S>
464    where
465        S: Body,
466    {
467        #[pin]
468        pipe: PipeToSendStream<S>,
469        #[pin]
470        conn_drop_ref: Option<Sender<Infallible>>,
471        #[pin]
472        ping: Option<Recorder>,
473    }
474}
475
476impl<B> Future for PipeMap<B>
477where
478    B: http_body::Body,
479    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
480{
481    type Output = ();
482
483    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
484        let mut this = self.project();
485
486        match this.pipe.poll_unpin(cx) {
487            Poll::Ready(result) => {
488                if let Err(_e) = result {
489                    debug!("client request body error: {}", _e);
490                }
491                drop(this.conn_drop_ref.take().expect("Future polled twice"));
492                drop(this.ping.take().expect("Future polled twice"));
493                return Poll::Ready(());
494            }
495            Poll::Pending => (),
496        };
497        Poll::Pending
498    }
499}
500
501impl<B, E, T> ClientTask<B, E, T>
502where
503    B: Body + 'static + Unpin,
504    B::Data: Send,
505    E: Http2ClientConnExec<B, T> + Unpin,
506    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
507    T: Read + Write + Unpin,
508{
509    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
510        let ping = self.ping.clone();
511
512        let send_stream = if !f.is_connect {
513            if !f.eos {
514                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
515
516                // eagerly see if the body pipe is ready and
517                // can thus skip allocating in the executor
518                match Pin::new(&mut pipe).poll(cx) {
519                    Poll::Ready(_) => (),
520                    Poll::Pending => {
521                        let conn_drop_ref = self.conn_drop_ref.clone();
522                        // keep the ping recorder's knowledge of an
523                        // "open stream" alive while this body is
524                        // still sending...
525                        let ping = ping.clone();
526
527                        let pipe = PipeMap {
528                            pipe,
529                            conn_drop_ref: Some(conn_drop_ref),
530                            ping: Some(ping),
531                        };
532                        // Clear send task
533                        self.executor
534                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
535                    }
536                }
537            }
538
539            None
540        } else {
541            Some(f.body_tx)
542        };
543
544        self.executor.execute_h2_future(H2ClientFuture::Send {
545            send_when: SendWhen {
546                when: ResponseFutMap {
547                    fut: f.fut,
548                    ping: Some(ping),
549                    send_stream: Some(send_stream),
550                },
551                call_back: Some(f.cb),
552            },
553        });
554    }
555}
556
557pin_project! {
558    pub(crate) struct ResponseFutMap<B>
559    where
560        B: Body,
561        B: 'static,
562    {
563        #[pin]
564        fut: ResponseFuture,
565        #[pin]
566        ping: Option<Recorder>,
567        #[pin]
568        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
569    }
570}
571
572impl<B> Future for ResponseFutMap<B>
573where
574    B: Body + 'static,
575{
576    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
577
578    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
579        let mut this = self.project();
580
581        let result = ready!(this.fut.poll(cx));
582
583        let ping = this.ping.take().expect("Future polled twice");
584        let send_stream = this.send_stream.take().expect("Future polled twice");
585
586        match result {
587            Ok(res) => {
588                // record that we got the response headers
589                ping.record_non_data();
590
591                let content_length = headers::content_length_parse_all(res.headers());
592                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
593                    if content_length.map_or(false, |len| len != 0) {
594                        warn!("h2 connect response with non-zero body not supported");
595
596                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
597                        return Poll::Ready(Err((
598                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
599                            None::<Request<B>>,
600                        )));
601                    }
602                    let (parts, recv_stream) = res.into_parts();
603                    let mut res = Response::from_parts(parts, IncomingBody::empty());
604
605                    let (pending, on_upgrade) = crate::upgrade::pending();
606                    let io = H2Upgraded {
607                        ping,
608                        send_stream: unsafe { UpgradedSendStream::new(send_stream) },
609                        recv_stream,
610                        buf: Bytes::new(),
611                    };
612                    let upgraded = Upgraded::new(io, Bytes::new());
613
614                    pending.fulfill(upgraded);
615                    res.extensions_mut().insert(on_upgrade);
616
617                    Poll::Ready(Ok(res))
618                } else {
619                    let res = res.map(|stream| {
620                        let ping = ping.for_stream(&stream);
621                        IncomingBody::h2(stream, content_length.into(), ping)
622                    });
623                    Poll::Ready(Ok(res))
624                }
625            }
626            Err(err) => {
627                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
628
629                debug!("client response error: {}", err);
630                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
631            }
632        }
633    }
634}
635
636impl<B, E, T> Future for ClientTask<B, E, T>
637where
638    B: Body + 'static + Unpin,
639    B::Data: Send,
640    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
641    E: Http2ClientConnExec<B, T> + Unpin,
642    T: Read + Write + Unpin,
643{
644    type Output = crate::Result<Dispatched>;
645
646    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
647        loop {
648            match ready!(self.h2_tx.poll_ready(cx)) {
649                Ok(()) => (),
650                Err(err) => {
651                    self.ping.ensure_not_timed_out()?;
652                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
653                        trace!("connection gracefully shutdown");
654                        Poll::Ready(Ok(Dispatched::Shutdown))
655                    } else {
656                        Poll::Ready(Err(crate::Error::new_h2(err)))
657                    };
658                }
659            };
660
661            // If we were waiting on pending open
662            // continue where we left off.
663            if let Some(f) = self.fut_ctx.take() {
664                self.poll_pipe(f, cx);
665                continue;
666            }
667
668            match self.req_rx.poll_recv(cx) {
669                Poll::Ready(Some((req, cb))) => {
670                    // check that future hasn't been canceled already
671                    if cb.is_canceled() {
672                        trace!("request callback is canceled");
673                        continue;
674                    }
675                    let (head, body) = req.into_parts();
676                    let mut req = ::http::Request::from_parts(head, ());
677                    super::strip_connection_headers(req.headers_mut(), true);
678                    if let Some(len) = body.size_hint().exact() {
679                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
680                            headers::set_content_length_if_missing(req.headers_mut(), len);
681                        }
682                    }
683
684                    let is_connect = req.method() == Method::CONNECT;
685                    let eos = body.is_end_stream();
686
687                    if is_connect
688                        && headers::content_length_parse_all(req.headers())
689                            .map_or(false, |len| len != 0)
690                    {
691                        warn!("h2 connect request with non-zero body not supported");
692                        cb.send(Err(TrySendError {
693                            error: crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
694                            message: None,
695                        }));
696                        continue;
697                    }
698
699                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
700                        req.extensions_mut().insert(protocol.into_inner());
701                    }
702
703                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
704                        Ok(ok) => ok,
705                        Err(err) => {
706                            debug!("client send request error: {}", err);
707                            cb.send(Err(TrySendError {
708                                error: crate::Error::new_h2(err),
709                                message: None,
710                            }));
711                            continue;
712                        }
713                    };
714
715                    let f = FutCtx {
716                        is_connect,
717                        eos,
718                        fut,
719                        body_tx,
720                        body,
721                        cb,
722                    };
723
724                    // Check poll_ready() again.
725                    // If the call to send_request() resulted in the new stream being pending open
726                    // we have to wait for the open to complete before accepting new requests.
727                    match self.h2_tx.poll_ready(cx) {
728                        Poll::Pending => {
729                            // Save Context
730                            self.fut_ctx = Some(f);
731                            return Poll::Pending;
732                        }
733                        Poll::Ready(Ok(())) => (),
734                        Poll::Ready(Err(err)) => {
735                            f.cb.send(Err(TrySendError {
736                                error: crate::Error::new_h2(err),
737                                message: None,
738                            }));
739                            continue;
740                        }
741                    }
742                    self.poll_pipe(f, cx);
743                    continue;
744                }
745
746                Poll::Ready(None) => {
747                    trace!("client::dispatch::Sender dropped");
748                    return Poll::Ready(Ok(Dispatched::Shutdown));
749                }
750
751                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
752                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
753                    // But we cannot remove it as long as MSRV is less than that.
754                    #[allow(unused)]
755                    Ok(never) => match never {},
756                    Err(_conn_is_eof) => {
757                        trace!("connection task is closed, closing dispatch task");
758                        return Poll::Ready(Ok(Dispatched::Shutdown));
759                    }
760                },
761            }
762        }
763    }
764}