Skip to main content

hyper/proto/h2/
client.rs

1use std::{
2    convert::Infallible,
3    future::Future,
4    marker::PhantomData,
5    pin::Pin,
6    task::{Context, Poll},
7    time::Duration,
8};
9
10use crate::rt::{Read, Write};
11use bytes::Bytes;
12use futures_channel::mpsc::{Receiver, Sender};
13use futures_channel::{mpsc, oneshot};
14use futures_core::{ready, FusedFuture, FusedStream, Stream};
15use h2::client::{Builder, Connection, SendRequest};
16use h2::SendStream;
17use http::{Method, StatusCode};
18use pin_project_lite::pin_project;
19
20use super::ping::{Ponger, Recorder};
21use super::{ping, PipeToSendStream, SendBuf};
22use crate::body::{Body, Incoming as IncomingBody};
23use crate::client::dispatch::{Callback, SendWhen, TrySendError};
24use crate::common::either::Either;
25use crate::common::io::Compat;
26use crate::common::time::Time;
27use crate::ext::Protocol;
28use crate::headers;
29use crate::proto::Dispatched;
30use crate::rt::bounds::{Http2ClientConnExec, Http2UpgradedExec};
31use crate::upgrade::Upgraded;
32use crate::{Request, Response};
33use h2::client::ResponseFuture;
34
35type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<IncomingBody>>;
36
37///// An mpsc channel is used to help notify the `Connection` task when *all*
38///// other handles to it have been dropped, so that it can shutdown.
39type ConnDropRef = mpsc::Sender<Infallible>;
40
41///// A oneshot channel watches the `Connection` task, and when it completes,
42///// the "dispatch" task will be notified and can shutdown sooner.
43type ConnEof = oneshot::Receiver<Infallible>;
44
45// Our defaults are chosen for the "majority" case, which usually are not
46// resource constrained, and so the spec default of 64kb can be too limiting
47// for performance.
48const DEFAULT_CONN_WINDOW: u32 = 1024 * 1024 * 5; // 5mb
49const DEFAULT_STREAM_WINDOW: u32 = 1024 * 1024 * 2; // 2mb
50const DEFAULT_MAX_FRAME_SIZE: u32 = 1024 * 16; // 16kb
51const DEFAULT_MAX_SEND_BUF_SIZE: usize = 1024 * 1024; // 1mb
52const DEFAULT_MAX_HEADER_LIST_SIZE: u32 = 1024 * 16; // 16kb
53
54// The maximum number of concurrent streams that the client is allowed to open
55// before it receives the initial SETTINGS frame from the server.
56// This default value is derived from what the HTTP/2 spec recommends as the
57// minimum value that endpoints advertise to their peers. It means that using
58// this value will minimize the chance of the failure where the local endpoint
59// attempts to open too many streams and gets rejected by the remote peer with
60// the `REFUSED_STREAM` error.
61const DEFAULT_INITIAL_MAX_SEND_STREAMS: usize = 100;
62
63#[derive(Clone, Debug)]
64pub(crate) struct Config {
65    pub(crate) adaptive_window: bool,
66    pub(crate) initial_conn_window_size: u32,
67    pub(crate) initial_stream_window_size: u32,
68    pub(crate) initial_max_send_streams: usize,
69    pub(crate) max_frame_size: Option<u32>,
70    pub(crate) max_header_list_size: u32,
71    pub(crate) keep_alive_interval: Option<Duration>,
72    pub(crate) keep_alive_timeout: Duration,
73    pub(crate) keep_alive_while_idle: bool,
74    pub(crate) max_concurrent_reset_streams: Option<usize>,
75    pub(crate) max_send_buffer_size: usize,
76    pub(crate) max_pending_accept_reset_streams: Option<usize>,
77    pub(crate) max_local_error_reset_streams: Option<usize>,
78    pub(crate) header_table_size: Option<u32>,
79    pub(crate) max_concurrent_streams: Option<u32>,
80}
81
82impl Default for Config {
83    fn default() -> Config {
84        Config {
85            adaptive_window: false,
86            initial_conn_window_size: DEFAULT_CONN_WINDOW,
87            initial_stream_window_size: DEFAULT_STREAM_WINDOW,
88            initial_max_send_streams: DEFAULT_INITIAL_MAX_SEND_STREAMS,
89            max_frame_size: Some(DEFAULT_MAX_FRAME_SIZE),
90            max_header_list_size: DEFAULT_MAX_HEADER_LIST_SIZE,
91            keep_alive_interval: None,
92            keep_alive_timeout: Duration::from_secs(20),
93            keep_alive_while_idle: false,
94            max_concurrent_reset_streams: None,
95            max_send_buffer_size: DEFAULT_MAX_SEND_BUF_SIZE,
96            max_pending_accept_reset_streams: None,
97            max_local_error_reset_streams: Some(1024),
98            header_table_size: None,
99            max_concurrent_streams: None,
100        }
101    }
102}
103
104fn new_builder(config: &Config) -> Builder {
105    let mut builder = Builder::default();
106    builder
107        .initial_max_send_streams(config.initial_max_send_streams)
108        .initial_window_size(config.initial_stream_window_size)
109        .initial_connection_window_size(config.initial_conn_window_size)
110        .max_header_list_size(config.max_header_list_size)
111        .max_send_buffer_size(config.max_send_buffer_size)
112        .max_local_error_reset_streams(config.max_local_error_reset_streams)
113        .enable_push(false);
114    if let Some(max) = config.max_frame_size {
115        builder.max_frame_size(max);
116    }
117    if let Some(max) = config.max_concurrent_reset_streams {
118        builder.max_concurrent_reset_streams(max);
119    }
120    if let Some(max) = config.max_pending_accept_reset_streams {
121        builder.max_pending_accept_reset_streams(max);
122    }
123    if let Some(size) = config.header_table_size {
124        builder.header_table_size(size);
125    }
126    if let Some(max) = config.max_concurrent_streams {
127        builder.max_concurrent_streams(max);
128    }
129    builder
130}
131
132fn new_ping_config(config: &Config) -> ping::Config {
133    ping::Config {
134        bdp_initial_window: if config.adaptive_window {
135            Some(config.initial_stream_window_size)
136        } else {
137            None
138        },
139        keep_alive_interval: config.keep_alive_interval,
140        keep_alive_timeout: config.keep_alive_timeout,
141        keep_alive_while_idle: config.keep_alive_while_idle,
142    }
143}
144
145pub(crate) async fn handshake<T, B, E>(
146    io: T,
147    req_rx: ClientRx<B>,
148    config: &Config,
149    mut exec: E,
150    timer: Time,
151) -> crate::Result<ClientTask<B, E, T>>
152where
153    T: Read + Write + Unpin,
154    B: Body + 'static,
155    B::Data: Send + 'static,
156    E: Http2ClientConnExec<B, T> + Clone + Unpin,
157    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
158{
159    let (h2_tx, mut conn) = new_builder(config)
160        .handshake::<_, SendBuf<B::Data>>(Compat::new(io))
161        .await
162        .map_err(crate::Error::new_h2)?;
163
164    // An mpsc channel is used entirely to detect when the
165    // 'Client' has been dropped. This is to get around a bug
166    // in h2 where dropping all SendRequests won't notify a
167    // parked Connection.
168    let (conn_drop_ref, conn_drop_rx) = mpsc::channel(1);
169    let (cancel_tx, conn_eof) = oneshot::channel();
170
171    let ping_config = new_ping_config(config);
172
173    let (conn, ping) = if ping_config.is_enabled() {
174        let pp = conn.ping_pong().expect("conn.ping_pong");
175        let (recorder, ponger) = ping::channel(pp, ping_config, timer);
176
177        let conn: Conn<_, B> = Conn::new(ponger, conn);
178        (Either::left(conn), recorder)
179    } else {
180        (Either::right(conn), ping::disabled())
181    };
182    let conn: ConnMapErr<T, B> = ConnMapErr {
183        conn,
184        is_terminated: false,
185    };
186
187    exec.execute_h2_future(H2ClientFuture::Task {
188        task: ConnTask::new(conn, conn_drop_rx, cancel_tx),
189    });
190
191    Ok(ClientTask {
192        ping,
193        conn_drop_ref,
194        conn_eof,
195        executor: exec,
196        h2_tx,
197        req_rx,
198        fut_ctx: None,
199        marker: PhantomData,
200    })
201}
202
203pin_project! {
204    struct Conn<T, B>
205    where
206        B: Body,
207    {
208        #[pin]
209        ponger: Ponger,
210        #[pin]
211        conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>,
212    }
213}
214
215impl<T, B> Conn<T, B>
216where
217    B: Body,
218    T: Read + Write + Unpin,
219{
220    fn new(ponger: Ponger, conn: Connection<Compat<T>, SendBuf<<B as Body>::Data>>) -> Self {
221        Conn { ponger, conn }
222    }
223}
224
225impl<T, B> Future for Conn<T, B>
226where
227    B: Body,
228    T: Read + Write + Unpin,
229{
230    type Output = Result<(), h2::Error>;
231
232    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
233        let mut this = self.project();
234        match this.ponger.poll(cx) {
235            Poll::Ready(ping::Ponged::SizeUpdate(wnd)) => {
236                this.conn.set_target_window_size(wnd);
237                this.conn.set_initial_window_size(wnd)?;
238            }
239            Poll::Ready(ping::Ponged::KeepAliveTimedOut) => {
240                debug!("connection keep-alive timed out");
241                return Poll::Ready(Ok(()));
242            }
243            Poll::Pending => {}
244        }
245
246        Pin::new(&mut this.conn).poll(cx)
247    }
248}
249
250pin_project! {
251    struct ConnMapErr<T, B>
252    where
253        B: Body,
254        T: Read,
255        T: Write,
256        T: Unpin,
257    {
258        #[pin]
259        conn: Either<Conn<T, B>, Connection<Compat<T>, SendBuf<<B as Body>::Data>>>,
260        #[pin]
261        is_terminated: bool,
262    }
263}
264
265impl<T, B> Future for ConnMapErr<T, B>
266where
267    B: Body,
268    T: Read + Write + Unpin,
269{
270    type Output = Result<(), ()>;
271
272    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
273        let mut this = self.project();
274
275        if *this.is_terminated {
276            return Poll::Pending;
277        }
278        let polled = this.conn.poll(cx);
279        if polled.is_ready() {
280            *this.is_terminated = true;
281        }
282        polled.map_err(|_e| {
283            debug!(error = %_e, "connection error");
284        })
285    }
286}
287
288impl<T, B> FusedFuture for ConnMapErr<T, B>
289where
290    B: Body,
291    T: Read + Write + Unpin,
292{
293    fn is_terminated(&self) -> bool {
294        self.is_terminated
295    }
296}
297
298pin_project! {
299    pub struct ConnTask<T, B>
300    where
301        B: Body,
302        T: Read,
303        T: Write,
304        T: Unpin,
305    {
306        #[pin]
307        drop_rx: Receiver<Infallible>,
308        #[pin]
309        cancel_tx: Option<oneshot::Sender<Infallible>>,
310        #[pin]
311        conn: ConnMapErr<T, B>,
312    }
313}
314
315impl<T, B> ConnTask<T, B>
316where
317    B: Body,
318    T: Read + Write + Unpin,
319{
320    fn new(
321        conn: ConnMapErr<T, B>,
322        drop_rx: Receiver<Infallible>,
323        cancel_tx: oneshot::Sender<Infallible>,
324    ) -> Self {
325        Self {
326            drop_rx,
327            cancel_tx: Some(cancel_tx),
328            conn,
329        }
330    }
331}
332
333impl<T, B> Future for ConnTask<T, B>
334where
335    B: Body,
336    T: Read + Write + Unpin,
337{
338    type Output = ();
339
340    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
341        let mut this = self.project();
342
343        if !this.conn.is_terminated() && Pin::new(&mut this.conn).poll(cx).is_ready() {
344            // ok or err, the `conn` has finished.
345            return Poll::Ready(());
346        }
347
348        if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
349            // mpsc has been dropped, hopefully polling
350            // the connection some more should start shutdown
351            // and then close.
352            trace!("send_request dropped, starting conn shutdown");
353            drop(this.cancel_tx.take().expect("ConnTask Future polled twice"));
354        }
355
356        Poll::Pending
357    }
358}
359
360pin_project! {
361    #[project = H2ClientFutureProject]
362    pub enum H2ClientFuture<B, T, E>
363    where
364        B: http_body::Body,
365        B: 'static,
366        B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
367        T: Read,
368        T: Write,
369        T: Unpin,
370    {
371        Pipe {
372            #[pin]
373            pipe: PipeMap<B>,
374        },
375        Send {
376            #[pin]
377            send_when: SendWhen<B, E>,
378        },
379        Task {
380            #[pin]
381            task: ConnTask<T, B>,
382        },
383    }
384}
385
386impl<B, T, E> Future for H2ClientFuture<B, T, E>
387where
388    B: http_body::Body + 'static,
389    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
390    T: Read + Write + Unpin,
391    E: Http2UpgradedExec<B::Data>,
392{
393    type Output = ();
394
395    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
396        let this = self.project();
397
398        match this {
399            H2ClientFutureProject::Pipe { pipe } => pipe.poll(cx),
400            H2ClientFutureProject::Send { send_when } => send_when.poll(cx),
401            H2ClientFutureProject::Task { task } => task.poll(cx),
402        }
403    }
404}
405
406struct FutCtx<B>
407where
408    B: Body,
409{
410    is_connect: bool,
411    eos: bool,
412    fut: ResponseFuture,
413    body_tx: SendStream<SendBuf<B::Data>>,
414    body: B,
415    cb: Callback<Request<B>, Response<IncomingBody>>,
416}
417
418impl<B: Body> Unpin for FutCtx<B> {}
419
420pub(crate) struct ClientTask<B, E, T>
421where
422    B: Body,
423    E: Unpin,
424{
425    ping: ping::Recorder,
426    conn_drop_ref: ConnDropRef,
427    conn_eof: ConnEof,
428    executor: E,
429    h2_tx: SendRequest<SendBuf<B::Data>>,
430    req_rx: ClientRx<B>,
431    fut_ctx: Option<FutCtx<B>>,
432    marker: PhantomData<T>,
433}
434
435impl<B, E, T> ClientTask<B, E, T>
436where
437    B: Body + 'static,
438    E: Http2ClientConnExec<B, T> + Unpin,
439    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
440    T: Read + Write + Unpin,
441{
442    pub(crate) fn is_extended_connect_protocol_enabled(&self) -> bool {
443        self.h2_tx.is_extended_connect_protocol_enabled()
444    }
445    pub(crate) fn current_max_send_streams(&self) -> usize {
446        self.h2_tx.current_max_send_streams()
447    }
448    pub(crate) fn current_max_recv_streams(&self) -> usize {
449        self.h2_tx.current_max_recv_streams()
450    }
451}
452
453pin_project! {
454    pub struct PipeMap<S>
455    where
456        S: Body,
457    {
458        #[pin]
459        pipe: PipeToSendStream<S>,
460        #[pin]
461        conn_drop_ref: Option<Sender<Infallible>>,
462        #[pin]
463        ping: Option<Recorder>,
464        cancel_rx: Option<oneshot::Receiver<()>>,
465    }
466}
467
468impl<B> Future for PipeMap<B>
469where
470    B: http_body::Body,
471    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
472{
473    type Output = ();
474
475    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> std::task::Poll<Self::Output> {
476        let mut this = self.project();
477
478        // Check if the client cancelled the request (e.g. dropped the
479        // response future due to a timeout). If so, reset the h2 stream
480        // so that a RST_STREAM is sent and flow-control capacity is freed.
481        let cancel_result = this.cancel_rx.as_mut().map(|rx| Pin::new(rx).poll(cx));
482        match cancel_result {
483            Some(Poll::Ready(Ok(()))) => {
484                debug!("client request body send cancelled, resetting stream");
485                this.pipe.as_mut().send_reset(h2::Reason::CANCEL);
486                drop(this.conn_drop_ref.take().expect("Future polled twice"));
487                drop(this.ping.take().expect("Future polled twice"));
488                return Poll::Ready(());
489            }
490            Some(Poll::Ready(Err(_))) => {
491                // Sender dropped without cancelling (normal response or error).
492                // Stop polling the receiver.
493                *this.cancel_rx = None;
494            }
495            Some(Poll::Pending) | None => {}
496        }
497
498        match Pin::new(&mut this.pipe).poll(cx) {
499            Poll::Ready(result) => {
500                if let Err(_e) = result {
501                    debug!("client request body error: {}", _e);
502                }
503                drop(this.conn_drop_ref.take().expect("Future polled twice"));
504                drop(this.ping.take().expect("Future polled twice"));
505                return Poll::Ready(());
506            }
507            Poll::Pending => (),
508        };
509        Poll::Pending
510    }
511}
512
513impl<B, E, T> ClientTask<B, E, T>
514where
515    B: Body + 'static + Unpin,
516    B::Data: Send,
517    E: Http2ClientConnExec<B, T> + Clone + Unpin,
518    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
519    T: Read + Write + Unpin,
520{
521    fn poll_pipe(&mut self, f: FutCtx<B>, cx: &mut Context<'_>) {
522        let ping = self.ping.clone();
523
524        // A one-shot channel so that send_task can tell pipe_task to
525        // reset the stream when the client cancels the request.
526        let (cancel_tx, cancel_rx) = oneshot::channel::<()>();
527
528        let send_stream = if !f.is_connect {
529            if !f.eos {
530                let mut pipe = PipeToSendStream::new(f.body, f.body_tx);
531
532                // eagerly see if the body pipe is ready and
533                // can thus skip allocating in the executor
534                match Pin::new(&mut pipe).poll(cx) {
535                    Poll::Ready(_) => (),
536                    Poll::Pending => {
537                        let conn_drop_ref = self.conn_drop_ref.clone();
538                        // keep the ping recorder's knowledge of an
539                        // "open stream" alive while this body is
540                        // still sending...
541                        let ping = ping.clone();
542
543                        let pipe = PipeMap {
544                            pipe,
545                            conn_drop_ref: Some(conn_drop_ref),
546                            ping: Some(ping),
547                            cancel_rx: Some(cancel_rx),
548                        };
549                        // Clear send task
550                        self.executor
551                            .execute_h2_future(H2ClientFuture::Pipe { pipe });
552                    }
553                }
554            }
555
556            None
557        } else {
558            Some(f.body_tx)
559        };
560
561        self.executor.execute_h2_future(H2ClientFuture::Send {
562            send_when: SendWhen {
563                when: ResponseFutMap {
564                    fut: f.fut,
565                    ping: Some(ping),
566                    send_stream: Some(send_stream),
567                    exec: self.executor.clone(),
568                    cancel_tx: Some(cancel_tx),
569                },
570                call_back: Some(f.cb),
571            },
572        });
573    }
574}
575
576pin_project! {
577    pub(crate) struct ResponseFutMap<B, E>
578    where
579        B: Body,
580        B: 'static,
581    {
582        #[pin]
583        fut: ResponseFuture,
584        ping: Option<Recorder>,
585        #[pin]
586        send_stream: Option<Option<SendStream<SendBuf<<B as Body>::Data>>>>,
587        exec: E,
588        cancel_tx: Option<oneshot::Sender<()>>,
589    }
590}
591
592impl<B: Body + 'static, E> ResponseFutMap<B, E> {
593    /// Signal the pipe_task to reset the stream (e.g. on client cancellation).
594    pub(crate) fn cancel(self: Pin<&mut Self>) {
595        if let Some(cancel_tx) = self.project().cancel_tx.take() {
596            let _ = cancel_tx.send(());
597        }
598    }
599}
600
601impl<B, E> Future for ResponseFutMap<B, E>
602where
603    B: Body + 'static,
604    E: Http2UpgradedExec<B::Data>,
605{
606    type Output = Result<Response<crate::body::Incoming>, (crate::Error, Option<Request<B>>)>;
607
608    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
609        let mut this = self.as_mut().project();
610
611        let result = ready!(this.fut.poll(cx));
612
613        let ping = this.ping.take().expect("Future polled twice");
614        let send_stream = this.send_stream.take().expect("Future polled twice");
615
616        match result {
617            Ok(res) => {
618                // record that we got the response headers
619                ping.record_non_data();
620
621                let content_length = headers::content_length_parse_all(res.headers());
622                if let (Some(mut send_stream), StatusCode::OK) = (send_stream, res.status()) {
623                    if content_length.map_or(false, |len| len != 0) {
624                        warn!("h2 connect response with non-zero body not supported");
625
626                        send_stream.send_reset(h2::Reason::INTERNAL_ERROR);
627                        return Poll::Ready(Err((
628                            crate::Error::new_h2(h2::Reason::INTERNAL_ERROR.into()),
629                            None::<Request<B>>,
630                        )));
631                    }
632                    let (parts, recv_stream) = res.into_parts();
633                    let mut res = Response::from_parts(parts, IncomingBody::empty());
634
635                    let (pending, on_upgrade) = crate::upgrade::pending();
636
637                    let (h2_up, up_task) = super::upgrade::pair(send_stream, recv_stream, ping);
638                    self.exec.execute_upgrade(up_task);
639                    let upgraded = Upgraded::new(h2_up, Bytes::new());
640
641                    pending.fulfill(upgraded);
642                    res.extensions_mut().insert(on_upgrade);
643
644                    Poll::Ready(Ok(res))
645                } else {
646                    let res = res.map(|stream| {
647                        let ping = ping.for_stream(&stream);
648                        IncomingBody::h2(stream, content_length.into(), ping)
649                    });
650                    Poll::Ready(Ok(res))
651                }
652            }
653            Err(err) => {
654                ping.ensure_not_timed_out().map_err(|e| (e, None))?;
655
656                debug!("client response error: {}", err);
657                Poll::Ready(Err((crate::Error::new_h2(err), None::<Request<B>>)))
658            }
659        }
660    }
661}
662
663impl<B, E, T> Future for ClientTask<B, E, T>
664where
665    B: Body + 'static + Unpin,
666    B::Data: Send,
667    B::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
668    E: Http2ClientConnExec<B, T> + Clone + Unpin,
669    T: Read + Write + Unpin,
670{
671    type Output = crate::Result<Dispatched>;
672
673    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
674        loop {
675            match ready!(self.h2_tx.poll_ready(cx)) {
676                Ok(()) => (),
677                Err(err) => {
678                    self.ping.ensure_not_timed_out()?;
679                    return if err.reason() == Some(::h2::Reason::NO_ERROR) {
680                        trace!("connection gracefully shutdown");
681                        Poll::Ready(Ok(Dispatched::Shutdown))
682                    } else {
683                        Poll::Ready(Err(crate::Error::new_h2(err)))
684                    };
685                }
686            };
687
688            // If we were waiting on pending open
689            // continue where we left off.
690            if let Some(f) = self.fut_ctx.take() {
691                self.poll_pipe(f, cx);
692                continue;
693            }
694
695            match self.req_rx.poll_recv(cx) {
696                Poll::Ready(Some((req, cb))) => {
697                    // check that future hasn't been canceled already
698                    if cb.is_canceled() {
699                        trace!("request callback is canceled");
700                        continue;
701                    }
702                    let (head, body) = req.into_parts();
703                    let mut req = ::http::Request::from_parts(head, ());
704                    super::strip_connection_headers(req.headers_mut(), true);
705                    if let Some(len) = body.size_hint().exact() {
706                        if len != 0 || headers::method_has_defined_payload_semantics(req.method()) {
707                            headers::set_content_length_if_missing(req.headers_mut(), len);
708                        }
709                    }
710
711                    let is_connect = req.method() == Method::CONNECT;
712                    let eos = body.is_end_stream();
713
714                    if is_connect
715                        && headers::content_length_parse_all(req.headers())
716                            .map_or(false, |len| len != 0)
717                    {
718                        debug!("h2 connect request with non-zero body not supported");
719                        cb.send(Err(TrySendError {
720                            error: crate::Error::new_user_invalid_connect(),
721                            message: None,
722                        }));
723                        continue;
724                    }
725
726                    if let Some(protocol) = req.extensions_mut().remove::<Protocol>() {
727                        req.extensions_mut().insert(protocol.into_inner());
728                    }
729
730                    let (fut, body_tx) = match self.h2_tx.send_request(req, !is_connect && eos) {
731                        Ok(ok) => ok,
732                        Err(err) => {
733                            debug!("client send request error: {}", err);
734                            cb.send(Err(TrySendError {
735                                error: crate::Error::new_h2(err),
736                                message: None,
737                            }));
738                            continue;
739                        }
740                    };
741
742                    let f = FutCtx {
743                        is_connect,
744                        eos,
745                        fut,
746                        body_tx,
747                        body,
748                        cb,
749                    };
750
751                    // Check poll_ready() again.
752                    // If the call to send_request() resulted in the new stream being pending open
753                    // we have to wait for the open to complete before accepting new requests.
754                    match self.h2_tx.poll_ready(cx) {
755                        Poll::Pending => {
756                            // Save Context
757                            self.fut_ctx = Some(f);
758                            return Poll::Pending;
759                        }
760                        Poll::Ready(Ok(())) => (),
761                        Poll::Ready(Err(err)) => {
762                            f.cb.send(Err(TrySendError {
763                                error: crate::Error::new_h2(err),
764                                message: None,
765                            }));
766                            continue;
767                        }
768                    }
769                    self.poll_pipe(f, cx);
770                    continue;
771                }
772
773                Poll::Ready(None) => {
774                    trace!("client::dispatch::Sender dropped");
775                    return Poll::Ready(Ok(Dispatched::Shutdown));
776                }
777
778                Poll::Pending => match ready!(Pin::new(&mut self.conn_eof).poll(cx)) {
779                    // As of Rust 1.82, this pattern is no longer needed, and emits a warning.
780                    // But we cannot remove it as long as MSRV is less than that.
781                    #[allow(unused)]
782                    Ok(never) => match never {},
783                    Err(_conn_is_eof) => {
784                        trace!("connection task is closed, closing dispatch task");
785                        return Poll::Ready(Ok(Dispatched::Shutdown));
786                    }
787                },
788            }
789        }
790    }
791}