Skip to main content

actix_http/h1/
dispatcher.rs

1use std::{
2    collections::VecDeque,
3    fmt,
4    future::Future,
5    io, mem, net,
6    pin::Pin,
7    rc::Rc,
8    task::{Context, Poll},
9};
10
11use actix_codec::{Framed, FramedParts};
12use actix_rt::time::sleep_until;
13use actix_service::Service;
14use bitflags::bitflags;
15use bytes::{Buf, BytesMut};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tokio::io::{AsyncRead, AsyncWrite};
19use tokio_util::codec::{Decoder as _, Encoder as _};
20use tracing::{error, trace};
21
22use super::{
23    codec::Codec,
24    decoder::MAX_BUFFER_SIZE,
25    payload::{Payload, PayloadSender, PayloadStatus},
26    timer::TimerState,
27    Message, MessageType,
28};
29use crate::{
30    body::{BodySize, BoxBody, MessageBody},
31    config::ServiceConfig,
32    error::{DispatchError, ParseError, PayloadError},
33    service::HttpFlow,
34    Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
35};
36
37const LW_BUFFER_SIZE: usize = 1024;
38const HW_BUFFER_SIZE: usize = 1024 * 8;
39const MAX_PIPELINED_MESSAGES: usize = 16;
40
41bitflags! {
42    #[derive(Debug, Clone, Copy)]
43    pub struct Flags: u8 {
44        /// Set when stream is read for first time.
45        const STARTED          = 0b0000_0001;
46
47        /// Set when full request-response cycle has occurred.
48        const FINISHED         = 0b0000_0010;
49
50        /// Set if connection is in keep-alive (inactive) state.
51        const KEEP_ALIVE       = 0b0000_0100;
52
53        /// Set if in shutdown procedure.
54        const SHUTDOWN         = 0b0000_1000;
55
56        /// Set if read-half is disconnected.
57        const READ_DISCONNECT  = 0b0001_0000;
58
59        /// Set if write-half is disconnected.
60        const WRITE_DISCONNECT = 0b0010_0000;
61    }
62}
63
64// there's 2 versions of Dispatcher state because of:
65// https://github.com/taiki-e/pin-project-lite/issues/3
66//
67// tl;dr: pin-project-lite doesn't play well with other attribute macros
68
69#[cfg(not(test))]
70pin_project! {
71    /// Dispatcher for HTTP/1.1 protocol
72    pub struct Dispatcher<T, S, B, X, U>
73    where
74        S: Service<Request>,
75        S::Error: Into<Response<BoxBody>>,
76
77        B: MessageBody,
78
79        X: Service<Request, Response = Request>,
80        X::Error: Into<Response<BoxBody>>,
81
82        U: Service<(Request, Framed<T, Codec>), Response = ()>,
83        U::Error: fmt::Display,
84    {
85        #[pin]
86        inner: DispatcherState<T, S, B, X, U>,
87    }
88}
89
90#[cfg(test)]
91pin_project! {
92    /// Dispatcher for HTTP/1.1 protocol
93    pub struct Dispatcher<T, S, B, X, U>
94    where
95        S: Service<Request>,
96        S::Error: Into<Response<BoxBody>>,
97
98        B: MessageBody,
99
100        X: Service<Request, Response = Request>,
101        X::Error: Into<Response<BoxBody>>,
102
103        U: Service<(Request, Framed<T, Codec>), Response = ()>,
104        U::Error: fmt::Display,
105    {
106        #[pin]
107        pub(super) inner: DispatcherState<T, S, B, X, U>,
108
109        // used in tests
110        pub(super) poll_count: u64,
111    }
112}
113
114pin_project! {
115    #[project = DispatcherStateProj]
116    pub(super) enum DispatcherState<T, S, B, X, U>
117    where
118        S: Service<Request>,
119        S::Error: Into<Response<BoxBody>>,
120
121        B: MessageBody,
122
123        X: Service<Request, Response = Request>,
124        X::Error: Into<Response<BoxBody>>,
125
126        U: Service<(Request, Framed<T, Codec>), Response = ()>,
127        U::Error: fmt::Display,
128    {
129        Normal { #[pin] inner: InnerDispatcher<T, S, B, X, U> },
130        Upgrade { #[pin] fut: U::Future },
131    }
132}
133
134pin_project! {
135    #[project = InnerDispatcherProj]
136    pub(super) struct InnerDispatcher<T, S, B, X, U>
137    where
138        S: Service<Request>,
139        S::Error: Into<Response<BoxBody>>,
140
141        B: MessageBody,
142
143        X: Service<Request, Response = Request>,
144        X::Error: Into<Response<BoxBody>>,
145
146        U: Service<(Request, Framed<T, Codec>), Response = ()>,
147        U::Error: fmt::Display,
148    {
149        flow: Rc<HttpFlow<S, X, U>>,
150        pub(super) flags: Flags,
151        peer_addr: Option<net::SocketAddr>,
152        conn_data: Option<Rc<Extensions>>,
153        config: ServiceConfig,
154        error: Option<DispatchError>,
155
156        #[pin]
157        pub(super) state: State<S, B, X>,
158        // when Some(_) dispatcher is in state of receiving request payload
159        payload: Option<PayloadSender>,
160        // true when current request uses chunked transfer encoding (drainable when payload is dropped)
161        payload_drainable: bool,
162        messages: VecDeque<DispatcherMessage>,
163
164        head_timer: TimerState,
165        ka_timer: TimerState,
166        shutdown_timer: TimerState,
167
168        pub(super) io: Option<T>,
169        read_buf: BytesMut,
170        write_buf: BytesMut,
171        codec: Codec,
172    }
173}
174
175enum DispatcherMessage {
176    Item(Request),
177    Upgrade(Request),
178    Error(Response<()>),
179}
180
181pin_project! {
182    #[project = StateProj]
183    pub(super) enum State<S, B, X>
184    where
185        S: Service<Request>,
186        X: Service<Request, Response = Request>,
187        B: MessageBody,
188    {
189        None,
190        ExpectCall { #[pin] fut: X::Future },
191        ServiceCall { #[pin] fut: S::Future },
192        SendPayload { #[pin] body: B },
193        SendErrorPayload { #[pin] body: BoxBody },
194    }
195}
196
197impl<S, B, X> State<S, B, X>
198where
199    S: Service<Request>,
200    X: Service<Request, Response = Request>,
201    B: MessageBody,
202{
203    pub(super) fn is_none(&self) -> bool {
204        matches!(self, State::None)
205    }
206}
207
208impl<S, B, X> fmt::Debug for State<S, B, X>
209where
210    S: Service<Request>,
211    X: Service<Request, Response = Request>,
212    B: MessageBody,
213{
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        match self {
216            Self::None => write!(f, "State::None"),
217            Self::ExpectCall { .. } => f.debug_struct("State::ExpectCall").finish_non_exhaustive(),
218            Self::ServiceCall { .. } => {
219                f.debug_struct("State::ServiceCall").finish_non_exhaustive()
220            }
221            Self::SendPayload { .. } => {
222                f.debug_struct("State::SendPayload").finish_non_exhaustive()
223            }
224            Self::SendErrorPayload { .. } => f
225                .debug_struct("State::SendErrorPayload")
226                .finish_non_exhaustive(),
227        }
228    }
229}
230
231#[derive(Debug)]
232enum PollResponse {
233    Upgrade(Request),
234    DoNothing,
235    DrainWriteBuf,
236}
237
238impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
239where
240    T: AsyncRead + AsyncWrite + Unpin,
241
242    S: Service<Request>,
243    S::Error: Into<Response<BoxBody>>,
244    S::Response: Into<Response<B>>,
245
246    B: MessageBody,
247
248    X: Service<Request, Response = Request>,
249    X::Error: Into<Response<BoxBody>>,
250
251    U: Service<(Request, Framed<T, Codec>), Response = ()>,
252    U::Error: fmt::Display,
253{
254    /// Create HTTP/1 dispatcher.
255    pub(crate) fn new(
256        io: T,
257        flow: Rc<HttpFlow<S, X, U>>,
258        config: ServiceConfig,
259        peer_addr: Option<net::SocketAddr>,
260        conn_data: OnConnectData,
261    ) -> Self {
262        Dispatcher {
263            inner: DispatcherState::Normal {
264                inner: InnerDispatcher {
265                    flow,
266                    flags: Flags::empty(),
267                    peer_addr,
268                    conn_data: conn_data.0.map(Rc::new),
269                    config: config.clone(),
270                    error: None,
271
272                    state: State::None,
273                    payload: None,
274                    payload_drainable: false,
275                    messages: VecDeque::new(),
276
277                    head_timer: TimerState::new(config.client_request_deadline().is_some()),
278                    ka_timer: TimerState::new(config.keep_alive().enabled()),
279                    shutdown_timer: TimerState::new(config.client_disconnect_deadline().is_some()),
280
281                    io: Some(io),
282                    read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
283                    write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
284                    codec: Codec::new(config),
285                },
286            },
287
288            #[cfg(test)]
289            poll_count: 0,
290        }
291    }
292}
293
294impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
295where
296    T: AsyncRead + AsyncWrite + Unpin,
297
298    S: Service<Request>,
299    S::Error: Into<Response<BoxBody>>,
300    S::Response: Into<Response<B>>,
301
302    B: MessageBody,
303
304    X: Service<Request, Response = Request>,
305    X::Error: Into<Response<BoxBody>>,
306
307    U: Service<(Request, Framed<T, Codec>), Response = ()>,
308    U::Error: fmt::Display,
309{
310    fn can_read(&self, cx: &mut Context<'_>) -> bool {
311        if self.flags.contains(Flags::READ_DISCONNECT) {
312            false
313        } else if let Some(ref info) = self.payload {
314            matches!(
315                info.need_read(cx),
316                PayloadStatus::Read | PayloadStatus::Dropped
317            )
318        } else {
319            true
320        }
321    }
322
323    fn client_disconnected(self: Pin<&mut Self>) {
324        let this = self.project();
325
326        this.flags
327            .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
328
329        if let Some(mut payload) = this.payload.take() {
330            payload.set_error(PayloadError::Incomplete(None));
331        }
332    }
333
334    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
335        let InnerDispatcherProj { io, write_buf, .. } = self.project();
336        let mut io = Pin::new(io.as_mut().unwrap());
337
338        let len = write_buf.len();
339        let mut written = 0;
340
341        while written < len {
342            match io.as_mut().poll_write(cx, &write_buf[written..])? {
343                Poll::Ready(0) => {
344                    error!("write zero; closing");
345                    return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "")));
346                }
347
348                Poll::Ready(n) => written += n,
349
350                Poll::Pending => {
351                    write_buf.advance(written);
352                    return Poll::Pending;
353                }
354            }
355        }
356
357        // everything has written to I/O; clear buffer
358        write_buf.clear();
359
360        // flush the I/O and check if get blocked
361        io.poll_flush(cx)
362    }
363
364    fn send_response_inner(
365        self: Pin<&mut Self>,
366        res: Response<()>,
367        body: &impl MessageBody,
368    ) -> Result<BodySize, DispatchError> {
369        let this = self.project();
370
371        let size = body.size();
372
373        this.codec
374            .encode(Message::Item((res, size)), this.write_buf)
375            .map_err(|err| {
376                if let Some(mut payload) = this.payload.take() {
377                    payload.set_error(PayloadError::Incomplete(None));
378                }
379
380                DispatchError::Io(err)
381            })?;
382
383        Ok(size)
384    }
385
386    fn send_response(
387        mut self: Pin<&mut Self>,
388        res: Response<()>,
389        body: B,
390    ) -> Result<(), DispatchError> {
391        let size = self.as_mut().send_response_inner(res, &body)?;
392        let mut this = self.project();
393        this.state.set(match size {
394            BodySize::None | BodySize::Sized(0) => {
395                let payload_unfinished = this.payload.is_some();
396                let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
397                    && *this.payload_drainable;
398
399                if payload_unfinished && !drain_payload {
400                    this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
401                } else {
402                    this.flags.insert(Flags::FINISHED);
403                }
404
405                State::None
406            }
407            _ => State::SendPayload { body },
408        });
409
410        Ok(())
411    }
412
413    fn send_error_response(
414        mut self: Pin<&mut Self>,
415        res: Response<()>,
416        body: BoxBody,
417    ) -> Result<(), DispatchError> {
418        let size = self.as_mut().send_response_inner(res, &body)?;
419        let mut this = self.project();
420        this.state.set(match size {
421            BodySize::None | BodySize::Sized(0) => {
422                let payload_unfinished = this.payload.is_some();
423                let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
424                    && *this.payload_drainable;
425
426                if payload_unfinished && !drain_payload {
427                    this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
428                } else {
429                    this.flags.insert(Flags::FINISHED);
430                }
431
432                State::None
433            }
434            _ => State::SendErrorPayload { body },
435        });
436
437        Ok(())
438    }
439
440    fn send_continue(self: Pin<&mut Self>) {
441        self.project()
442            .write_buf
443            .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
444    }
445
446    fn poll_response(
447        mut self: Pin<&mut Self>,
448        cx: &mut Context<'_>,
449    ) -> Result<PollResponse, DispatchError> {
450        'res: loop {
451            let mut this = self.as_mut().project();
452            match this.state.as_mut().project() {
453                // no future is in InnerDispatcher state; pop next message
454                StateProj::None => match this.messages.pop_front() {
455                    // handle request message
456                    Some(DispatcherMessage::Item(req)) => {
457                        // Handle `EXPECT: 100-Continue` header
458                        if req.head().expect() {
459                            // set InnerDispatcher state and continue loop to poll it
460                            let fut = this.flow.expect.call(req);
461                            this.state.set(State::ExpectCall { fut });
462                        } else {
463                            // set InnerDispatcher state and continue loop to poll it
464                            let fut = this.flow.service.call(req);
465                            this.state.set(State::ServiceCall { fut });
466                        };
467                    }
468
469                    // handle error message
470                    Some(DispatcherMessage::Error(res)) => {
471                        // send_response would update InnerDispatcher state to SendPayload or None
472                        // (If response body is empty)
473                        // continue loop to poll it
474                        self.as_mut().send_error_response(res, BoxBody::new(()))?;
475                    }
476
477                    // return with upgrade request and poll it exclusively
478                    Some(DispatcherMessage::Upgrade(req)) => return Ok(PollResponse::Upgrade(req)),
479
480                    // all messages are dealt with
481                    None => {
482                        // start keep-alive only if request payload is fully read/drained
483                        this.flags.set(
484                            Flags::KEEP_ALIVE,
485                            this.payload.is_none() && this.codec.keep_alive(),
486                        );
487
488                        return Ok(PollResponse::DoNothing);
489                    }
490                },
491
492                StateProj::ServiceCall { fut } => {
493                    match fut.poll(cx) {
494                        // service call resolved. send response.
495                        Poll::Ready(Ok(res)) => {
496                            let (res, body) = res.into().replace_body(());
497                            self.as_mut().send_response(res, body)?;
498                        }
499
500                        // send service call error as response
501                        Poll::Ready(Err(err)) => {
502                            let res: Response<BoxBody> = err.into();
503                            let (res, body) = res.replace_body(());
504                            self.as_mut().send_error_response(res, body)?;
505                        }
506
507                        // service call pending and could be waiting for more chunk messages
508                        // (pipeline message limit and/or payload can_read limit)
509                        Poll::Pending => {
510                            // no new message is decoded and no new payload is fed
511                            // nothing to do except waiting for new incoming data from client
512                            if !self.as_mut().poll_request(cx)? {
513                                return Ok(PollResponse::DoNothing);
514                            }
515                            // else loop
516                        }
517                    }
518                }
519
520                StateProj::SendPayload { mut body } => {
521                    // keep populate writer buffer until buffer size limit hit,
522                    // get blocked or finished.
523                    while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
524                        match body.as_mut().poll_next(cx) {
525                            Poll::Ready(Some(Ok(item))) => {
526                                this.codec
527                                    .encode(Message::Chunk(Some(item)), this.write_buf)?;
528                            }
529
530                            Poll::Ready(None) => {
531                                this.codec.encode(Message::Chunk(None), this.write_buf)?;
532
533                                // if we have not yet pipelined to the next request, then
534                                // this.payload was the payload for the request we just finished
535                                // responding to. We can check to see if we finished reading it
536                                // yet, and if not, shutdown the connection.
537                                let payload_unfinished = this.payload.is_some();
538                                let drain_payload =
539                                    this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
540                                        && *this.payload_drainable;
541                                let not_pipelined = this.messages.is_empty();
542
543                                // payload stream finished.
544                                // set state to None and handle next message
545                                this.state.set(State::None);
546
547                                if not_pipelined && payload_unfinished && !drain_payload {
548                                    this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
549                                } else {
550                                    this.flags.insert(Flags::FINISHED);
551                                }
552
553                                continue 'res;
554                            }
555
556                            Poll::Ready(Some(Err(err))) => {
557                                let err = err.into();
558                                tracing::error!("Response payload stream error: {err:?}");
559                                this.flags.insert(Flags::FINISHED);
560                                return Err(DispatchError::Body(err));
561                            }
562
563                            Poll::Pending => return Ok(PollResponse::DoNothing),
564                        }
565                    }
566
567                    // buffer is beyond max size
568                    // return and try to write the whole buffer to I/O stream.
569                    return Ok(PollResponse::DrainWriteBuf);
570                }
571
572                StateProj::SendErrorPayload { mut body } => {
573                    // TODO: de-dupe impl with SendPayload
574
575                    // keep populate writer buffer until buffer size limit hit,
576                    // get blocked or finished.
577                    while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
578                        match body.as_mut().poll_next(cx) {
579                            Poll::Ready(Some(Ok(item))) => {
580                                this.codec
581                                    .encode(Message::Chunk(Some(item)), this.write_buf)?;
582                            }
583
584                            Poll::Ready(None) => {
585                                this.codec.encode(Message::Chunk(None), this.write_buf)?;
586
587                                // if we have not yet pipelined to the next request, then
588                                // this.payload was the payload for the request we just finished
589                                // responding to. We can check to see if we finished reading it
590                                // yet, and if not, shutdown the connection.
591                                let payload_unfinished = this.payload.is_some();
592                                let drain_payload =
593                                    this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
594                                        && *this.payload_drainable;
595                                let not_pipelined = this.messages.is_empty();
596
597                                // payload stream finished.
598                                // set state to None and handle next message
599                                this.state.set(State::None);
600
601                                if not_pipelined && payload_unfinished && !drain_payload {
602                                    this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
603                                } else {
604                                    this.flags.insert(Flags::FINISHED);
605                                }
606
607                                continue 'res;
608                            }
609
610                            Poll::Ready(Some(Err(err))) => {
611                                tracing::error!("Response payload stream error: {err:?}");
612                                this.flags.insert(Flags::FINISHED);
613                                return Err(DispatchError::Body(
614                                    Error::new_body().with_cause(err).into(),
615                                ));
616                            }
617
618                            Poll::Pending => return Ok(PollResponse::DoNothing),
619                        }
620                    }
621
622                    // buffer is beyond max size
623                    // return and try to write the whole buffer to stream
624                    return Ok(PollResponse::DrainWriteBuf);
625                }
626
627                StateProj::ExpectCall { fut } => {
628                    trace!("  calling expect service");
629
630                    match fut.poll(cx) {
631                        // expect resolved. write continue to buffer and set InnerDispatcher state
632                        // to service call.
633                        Poll::Ready(Ok(req)) => {
634                            this.write_buf
635                                .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
636                            let fut = this.flow.service.call(req);
637                            this.state.set(State::ServiceCall { fut });
638                        }
639
640                        // send expect error as response
641                        Poll::Ready(Err(err)) => {
642                            let res: Response<BoxBody> = err.into();
643                            let (res, body) = res.replace_body(());
644                            self.as_mut().send_error_response(res, body)?;
645                        }
646
647                        // expect must be solved before progress can be made.
648                        Poll::Pending => return Ok(PollResponse::DoNothing),
649                    }
650                }
651            }
652        }
653    }
654
655    fn handle_request(
656        mut self: Pin<&mut Self>,
657        req: Request,
658        cx: &mut Context<'_>,
659    ) -> Result<(), DispatchError> {
660        // initialize dispatcher state
661        {
662            let mut this = self.as_mut().project();
663
664            // Handle `EXPECT: 100-Continue` header
665            if req.head().expect() {
666                // set dispatcher state to call expect handler
667                let fut = this.flow.expect.call(req);
668                this.state.set(State::ExpectCall { fut });
669            } else {
670                // set dispatcher state to call service handler
671                let fut = this.flow.service.call(req);
672                this.state.set(State::ServiceCall { fut });
673            };
674        };
675
676        // eagerly poll the future once (or twice if expect is resolved immediately).
677        loop {
678            match self.as_mut().project().state.project() {
679                StateProj::ExpectCall { fut } => {
680                    match fut.poll(cx) {
681                        // expect is resolved; continue loop and poll the service call branch.
682                        Poll::Ready(Ok(req)) => {
683                            self.as_mut().send_continue();
684
685                            let mut this = self.as_mut().project();
686                            let fut = this.flow.service.call(req);
687                            this.state.set(State::ServiceCall { fut });
688
689                            continue;
690                        }
691
692                        // future is error; send response and return a result
693                        // on success to notify the dispatcher a new state is set and the outer loop
694                        // should be continued
695                        Poll::Ready(Err(err)) => {
696                            let res: Response<BoxBody> = err.into();
697                            let (res, body) = res.replace_body(());
698                            return self.send_error_response(res, body);
699                        }
700
701                        // future is pending; return Ok(()) to notify that a new state is
702                        // set and the outer loop should be continue.
703                        Poll::Pending => return Ok(()),
704                    }
705                }
706
707                StateProj::ServiceCall { fut } => {
708                    // return no matter the service call future's result.
709                    return match fut.poll(cx) {
710                        // Future is resolved. Send response and return a result. On success
711                        // to notify the dispatcher a new state is set and the outer loop
712                        // should be continue.
713                        Poll::Ready(Ok(res)) => {
714                            let (res, body) = res.into().replace_body(());
715                            self.as_mut().send_response(res, body)
716                        }
717
718                        // see the comment on ExpectCall state branch's Pending
719                        Poll::Pending => Ok(()),
720
721                        // see the comment on ExpectCall state branch's Ready(Err(_))
722                        Poll::Ready(Err(err)) => {
723                            let res: Response<BoxBody> = err.into();
724                            let (res, body) = res.replace_body(());
725                            self.as_mut().send_error_response(res, body)
726                        }
727                    };
728                }
729
730                _ => {
731                    unreachable!("State must be set to ServiceCall or ExceptCall in handle_request")
732                }
733            }
734        }
735    }
736
737    /// Process one incoming request.
738    ///
739    /// Returns true if any meaningful work was done.
740    fn poll_request(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
741        let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES;
742        let can_not_read = !self.can_read(cx);
743
744        // limit amount of non-processed requests
745        if pipeline_queue_full || can_not_read {
746            return Ok(false);
747        }
748
749        let mut this = self.as_mut().project();
750
751        let mut updated = false;
752
753        // decode from read buf as many full requests as possible
754        loop {
755            match this.codec.decode(this.read_buf) {
756                Ok(Some(msg)) => {
757                    updated = true;
758
759                    match msg {
760                        Message::Item(mut req) => {
761                            // head timer only applies to first request on connection
762                            this.head_timer.clear(line!());
763
764                            req.head_mut().peer_addr = *this.peer_addr;
765
766                            req.conn_data.clone_from(this.conn_data);
767
768                            match this.codec.message_type() {
769                                // request has no payload
770                                MessageType::None => *this.payload_drainable = false,
771
772                                // Request is upgradable. Add upgrade message and break.
773                                // Everything remaining in read buffer will be handed to
774                                // upgraded Request.
775                                MessageType::Stream if this.flow.upgrade.is_some() => {
776                                    *this.payload_drainable = false;
777                                    this.messages.push_back(DispatcherMessage::Upgrade(req));
778                                    break;
779                                }
780
781                                // request is not upgradable
782                                MessageType::Payload | MessageType::Stream => {
783                                    // PayloadSender and Payload are smart pointers share the
784                                    // same state. PayloadSender is attached to dispatcher and used
785                                    // to sink new chunked request data to state. Payload is
786                                    // attached to Request and passed to Service::call where the
787                                    // state can be collected and consumed.
788                                    let (sender, payload) = Payload::create(false);
789                                    *req.payload() = crate::Payload::H1 { payload };
790                                    *this.payload = Some(sender);
791                                    *this.payload_drainable = req.chunked().unwrap_or(false);
792                                }
793                            }
794
795                            // handle request early when no future in InnerDispatcher state.
796                            if this.state.is_none() {
797                                self.as_mut().handle_request(req, cx)?;
798                                this = self.as_mut().project();
799                            } else {
800                                this.messages.push_back(DispatcherMessage::Item(req));
801                            }
802                        }
803
804                        Message::Chunk(Some(chunk)) => {
805                            if let Some(ref mut payload) = this.payload {
806                                payload.feed_data(chunk);
807                            } else {
808                                error!("Internal server error: unexpected payload chunk");
809                                this.flags.insert(Flags::READ_DISCONNECT);
810                                this.messages.push_back(DispatcherMessage::Error(
811                                    Response::internal_server_error().drop_body(),
812                                ));
813                                *this.error = Some(DispatchError::InternalError);
814                                break;
815                            }
816                        }
817
818                        Message::Chunk(None) => {
819                            if let Some(mut payload) = this.payload.take() {
820                                payload.feed_eof();
821                                *this.payload_drainable = false;
822                            } else {
823                                error!("Internal server error: unexpected eof");
824                                this.flags.insert(Flags::READ_DISCONNECT);
825                                this.messages.push_back(DispatcherMessage::Error(
826                                    Response::internal_server_error().drop_body(),
827                                ));
828                                *this.error = Some(DispatchError::InternalError);
829                                break;
830                            }
831                        }
832                    }
833                }
834
835                // decode is partial and buffer is not full yet
836                // break and wait for more read
837                Ok(None) => break,
838
839                Err(ParseError::Io(err)) => {
840                    trace!("I/O error: {}", &err);
841                    self.as_mut().client_disconnected();
842                    this = self.as_mut().project();
843                    *this.error = Some(DispatchError::Io(err));
844                    break;
845                }
846
847                Err(ParseError::TooLarge) => {
848                    trace!("request head was too big; returning 431 response");
849
850                    if let Some(mut payload) = this.payload.take() {
851                        payload.set_error(PayloadError::Overflow);
852                    }
853
854                    // request heads that overflow buffer size return a 431 error
855                    this.messages
856                        .push_back(DispatcherMessage::Error(Response::with_body(
857                            StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
858                            (),
859                        )));
860
861                    this.flags.insert(Flags::READ_DISCONNECT);
862                    *this.error = Some(ParseError::TooLarge.into());
863
864                    break;
865                }
866
867                Err(err) => {
868                    trace!("parse error {}", &err);
869
870                    if let Some(mut payload) = this.payload.take() {
871                        payload.set_error(PayloadError::EncodingCorrupted);
872                    }
873
874                    // malformed requests should be responded with 400
875                    this.messages.push_back(DispatcherMessage::Error(
876                        Response::bad_request().drop_body(),
877                    ));
878
879                    this.flags.insert(Flags::READ_DISCONNECT);
880                    *this.error = Some(err.into());
881                    break;
882                }
883            }
884        }
885
886        Ok(updated)
887    }
888
889    fn poll_head_timer(
890        mut self: Pin<&mut Self>,
891        cx: &mut Context<'_>,
892    ) -> Result<(), DispatchError> {
893        let this = self.as_mut().project();
894
895        if let TimerState::Active { timer } = this.head_timer {
896            if timer.as_mut().poll(cx).is_ready() {
897                // timeout on first request (slow request) return 408
898
899                trace!("timed out on slow request; replying with 408 and closing connection");
900
901                let _ = self.as_mut().send_error_response(
902                    Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
903                    BoxBody::new(()),
904                );
905
906                self.project().flags.insert(Flags::SHUTDOWN);
907            }
908        };
909
910        Ok(())
911    }
912
913    fn poll_ka_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
914        let this = self.as_mut().project();
915        if let TimerState::Active { timer } = this.ka_timer {
916            debug_assert!(
917                this.flags.contains(Flags::KEEP_ALIVE),
918                "keep-alive flag should be set when timer is active",
919            );
920            debug_assert!(
921                this.state.is_none(),
922                "dispatcher should not be in keep-alive phase if state is not none: {:?}",
923                this.state,
924            );
925
926            // Assert removed by @robjtede on account of issue #2655. There are cases where an I/O
927            // flush can be pending after entering the keep-alive state causing the subsequent flush
928            // wake up to panic here. This appears to be a Linux-only problem. Leaving original code
929            // below for posterity because a simple and reliable test could not be found to trigger
930            // the behavior.
931            // debug_assert!(
932            //     this.write_buf.is_empty(),
933            //     "dispatcher should not be in keep-alive phase if write_buf is not empty",
934            // );
935
936            // keep-alive timer has timed out
937            if timer.as_mut().poll(cx).is_ready() {
938                // no tasks at hand
939                trace!("timer timed out; closing connection");
940                this.flags.insert(Flags::SHUTDOWN);
941
942                if let Some(deadline) = this.config.client_disconnect_deadline() {
943                    // start shutdown timeout if enabled
944                    this.shutdown_timer
945                        .set_and_init(cx, sleep_until(deadline.into()), line!());
946                } else {
947                    // no shutdown timeout, drop socket
948                    this.flags.insert(Flags::WRITE_DISCONNECT);
949                }
950            }
951        }
952
953        Ok(())
954    }
955
956    fn poll_shutdown_timer(
957        mut self: Pin<&mut Self>,
958        cx: &mut Context<'_>,
959    ) -> Result<(), DispatchError> {
960        let this = self.as_mut().project();
961        if let TimerState::Active { timer } = this.shutdown_timer {
962            debug_assert!(
963                this.flags.contains(Flags::SHUTDOWN),
964                "shutdown flag should be set when timer is active",
965            );
966
967            // timed-out during shutdown; drop connection
968            if timer.as_mut().poll(cx).is_ready() {
969                trace!("timed-out during shutdown");
970                return Err(DispatchError::DisconnectTimeout);
971            }
972        }
973
974        Ok(())
975    }
976
977    /// Poll head, keep-alive, and disconnect timer.
978    fn poll_timers(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
979        self.as_mut().poll_head_timer(cx)?;
980        self.as_mut().poll_ka_timer(cx)?;
981        self.as_mut().poll_shutdown_timer(cx)?;
982
983        Ok(())
984    }
985
986    /// Returns true when I/O stream can be disconnected after write to it.
987    ///
988    /// It covers these conditions:
989    /// - `std::io::ErrorKind::ConnectionReset` after partial read;
990    /// - all data read done.
991    #[inline(always)] // TODO: bench this inline
992    fn read_available(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
993        let this = self.project();
994
995        if this.flags.contains(Flags::READ_DISCONNECT) {
996            return Ok(false);
997        };
998
999        let mut io = Pin::new(this.io.as_mut().unwrap());
1000
1001        let mut read_some = false;
1002
1003        loop {
1004            // Return early when read buf exceed decoder's max buffer size.
1005            if this.read_buf.len() >= MAX_BUFFER_SIZE {
1006                // At this point it's not known IO stream is still scheduled to be waked up so
1007                // force wake up dispatcher just in case.
1008                //
1009                // Reason:
1010                // AsyncRead mostly would only have guarantee wake up when the poll_read
1011                // return Poll::Pending.
1012                //
1013                // Case:
1014                // When read_buf is beyond max buffer size the early return could be successfully
1015                // be parsed as a new Request. This case would not generate ParseError::TooLarge and
1016                // at this point IO stream is not fully read to Pending and would result in
1017                // dispatcher stuck until timeout (keep-alive).
1018                //
1019                // Note:
1020                // This is a perf choice to reduce branch on <Request as MessageType>::decode.
1021                //
1022                // A Request head too large to parse is only checked on `httparse::Status::Partial`.
1023
1024                match this.payload.as_ref().map(|p| p.need_read(cx)) {
1025                    // Payload consumer is alive but applying backpressure. Wait for its waker.
1026                    Some(PayloadStatus::Pause) => {}
1027
1028                    // Consumer dropped means drain/discard mode; keep polling to make progress.
1029                    Some(PayloadStatus::Dropped) | Some(PayloadStatus::Read) | None => {
1030                        cx.waker().wake_by_ref()
1031                    }
1032                }
1033
1034                return Ok(false);
1035            }
1036
1037            // grow buffer if necessary.
1038            let remaining = this.read_buf.capacity() - this.read_buf.len();
1039            if remaining < LW_BUFFER_SIZE {
1040                this.read_buf.reserve(HW_BUFFER_SIZE - remaining);
1041            }
1042
1043            match tokio_util::io::poll_read_buf(io.as_mut(), cx, this.read_buf) {
1044                Poll::Ready(Ok(n)) => {
1045                    // When draining a dropped request payload, keep FINISHED set so the
1046                    // disconnect/keep-alive decision can be made once the payload is fully drained.
1047                    if !this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) {
1048                        this.flags.remove(Flags::FINISHED);
1049                    }
1050
1051                    if n == 0 {
1052                        return Ok(true);
1053                    }
1054
1055                    read_some = true;
1056                }
1057
1058                Poll::Pending => {
1059                    return Ok(false);
1060                }
1061
1062                Poll::Ready(Err(err)) => {
1063                    return match err.kind() {
1064                        // convert WouldBlock error to the same as Pending return
1065                        io::ErrorKind::WouldBlock => Ok(false),
1066
1067                        // connection reset after partial read
1068                        io::ErrorKind::ConnectionReset if read_some => Ok(true),
1069
1070                        _ => Err(DispatchError::Io(err)),
1071                    };
1072                }
1073            }
1074        }
1075    }
1076
1077    /// call upgrade service with request.
1078    fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future {
1079        let this = self.project();
1080        let mut parts = FramedParts::with_read_buf(
1081            this.io.take().unwrap(),
1082            mem::take(this.codec),
1083            mem::take(this.read_buf),
1084        );
1085        parts.write_buf = mem::take(this.write_buf);
1086        let framed = Framed::from_parts(parts);
1087        this.flow.upgrade.as_ref().unwrap().call((req, framed))
1088    }
1089}
1090
1091impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
1092where
1093    T: AsyncRead + AsyncWrite + Unpin,
1094
1095    S: Service<Request>,
1096    S::Error: Into<Response<BoxBody>>,
1097    S::Response: Into<Response<B>>,
1098
1099    B: MessageBody,
1100
1101    X: Service<Request, Response = Request>,
1102    X::Error: Into<Response<BoxBody>>,
1103
1104    U: Service<(Request, Framed<T, Codec>), Response = ()>,
1105    U::Error: fmt::Display,
1106{
1107    type Output = Result<(), DispatchError>;
1108
1109    #[inline]
1110    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1111        let this = self.as_mut().project();
1112
1113        #[cfg(test)]
1114        {
1115            *this.poll_count += 1;
1116        }
1117
1118        match this.inner.project() {
1119            DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
1120                error!("Upgrade handler error: {}", err);
1121                DispatchError::Upgrade
1122            }),
1123
1124            DispatcherStateProj::Normal { mut inner } => {
1125                trace!("start flags: {:?}", &inner.flags);
1126
1127                trace_timer_states(
1128                    "start",
1129                    &inner.head_timer,
1130                    &inner.ka_timer,
1131                    &inner.shutdown_timer,
1132                );
1133
1134                inner.as_mut().poll_timers(cx)?;
1135
1136                let poll = if inner.flags.contains(Flags::SHUTDOWN) {
1137                    if inner.flags.contains(Flags::WRITE_DISCONNECT) {
1138                        Poll::Ready(Ok(()))
1139                    } else {
1140                        // flush buffer and wait on blocked
1141                        ready!(inner.as_mut().poll_flush(cx))?;
1142                        Pin::new(inner.as_mut().project().io.as_mut().unwrap())
1143                            .poll_shutdown(cx)
1144                            .map_err(DispatchError::from)
1145                    }
1146                } else {
1147                    // read from I/O stream and fill read buffer
1148                    let should_disconnect = inner.as_mut().read_available(cx)?;
1149
1150                    // after reading something from stream, clear keep-alive timer
1151                    if !inner.read_buf.is_empty() && inner.flags.contains(Flags::KEEP_ALIVE) {
1152                        let inner = inner.as_mut().project();
1153                        inner.flags.remove(Flags::KEEP_ALIVE);
1154                        inner.ka_timer.clear(line!());
1155                    }
1156
1157                    if !inner.flags.contains(Flags::STARTED) {
1158                        inner.as_mut().project().flags.insert(Flags::STARTED);
1159
1160                        if let Some(deadline) = inner.config.client_request_deadline() {
1161                            inner.as_mut().project().head_timer.set_and_init(
1162                                cx,
1163                                sleep_until(deadline.into()),
1164                                line!(),
1165                            );
1166                        }
1167                    }
1168
1169                    inner.as_mut().poll_request(cx)?;
1170
1171                    if should_disconnect {
1172                        // I/O stream should to be closed
1173                        let inner = inner.as_mut().project();
1174                        inner.flags.insert(Flags::READ_DISCONNECT);
1175                        if let Some(mut payload) = inner.payload.take() {
1176                            payload.set_error(PayloadError::Incomplete(None));
1177                            payload.feed_eof();
1178                        }
1179                    };
1180
1181                    loop {
1182                        // poll response to populate write buffer
1183                        // drain indicates whether write buffer should be emptied before next run
1184                        let drain = match inner.as_mut().poll_response(cx)? {
1185                            PollResponse::DrainWriteBuf => true,
1186
1187                            PollResponse::DoNothing => {
1188                                // KEEP_ALIVE is set in send_response_inner if client allows it
1189                                // FINISHED is set after writing last chunk of response
1190                                if inner.flags.contains(Flags::KEEP_ALIVE | Flags::FINISHED) {
1191                                    if let Some(timer) = inner.config.keep_alive_deadline() {
1192                                        inner.as_mut().project().ka_timer.set_and_init(
1193                                            cx,
1194                                            sleep_until(timer.into()),
1195                                            line!(),
1196                                        );
1197                                    }
1198                                }
1199
1200                                false
1201                            }
1202
1203                            // upgrade request and goes Upgrade variant of DispatcherState.
1204                            PollResponse::Upgrade(req) => {
1205                                let upgrade = inner.upgrade(req);
1206                                self.as_mut()
1207                                    .project()
1208                                    .inner
1209                                    .set(DispatcherState::Upgrade { fut: upgrade });
1210                                return self.poll(cx);
1211                            }
1212                        };
1213
1214                        // we didn't get WouldBlock from write operation, so data get written to
1215                        // kernel completely (macOS) and we have to write again otherwise response
1216                        // can get stuck
1217                        //
1218                        // TODO: want to find a reference for this behavior
1219                        // see introduced commit: 3872d3ba
1220                        let flush_was_ready = inner.as_mut().poll_flush(cx)?.is_ready();
1221
1222                        // this assert seems to always be true but not willing to commit to it until
1223                        // we understand what Nikolay meant when writing the above comment
1224                        // debug_assert!(flush_was_ready);
1225
1226                        if !flush_was_ready || !drain {
1227                            break;
1228                        }
1229                    }
1230
1231                    // client is gone
1232                    if inner.flags.contains(Flags::WRITE_DISCONNECT) {
1233                        trace!("client is gone; disconnecting");
1234                        return Poll::Ready(Ok(()));
1235                    }
1236
1237                    let inner_p = inner.as_mut().project();
1238                    let state_is_none = inner_p.state.is_none();
1239
1240                    // If the read-half is closed, we start the shutdown procedure if either is
1241                    // true:
1242                    //
1243                    // - state is [`State::None`], which means that we're done with request
1244                    //   processing, so if the client closed its writer-side it means that it won't
1245                    //   send more requests.
1246                    // - The user requested to not allow half-closures
1247                    if inner_p.flags.contains(Flags::READ_DISCONNECT)
1248                        && (!inner_p.config.h1_allow_half_closed() || state_is_none)
1249                    {
1250                        trace!("read half closed; start shutdown");
1251                        inner_p.flags.insert(Flags::SHUTDOWN);
1252                    }
1253
1254                    // keep-alive and stream errors
1255                    if state_is_none && inner_p.write_buf.is_empty() {
1256                        if let Some(err) = inner_p.error.take() {
1257                            error!("stream error: {}", &err);
1258                            return Poll::Ready(Err(err));
1259                        }
1260
1261                        // disconnect if keep-alive is not enabled
1262                        if inner_p.flags.contains(Flags::FINISHED)
1263                            && !inner_p.flags.contains(Flags::KEEP_ALIVE)
1264                            && inner_p.payload.is_none()
1265                        {
1266                            inner_p.flags.remove(Flags::FINISHED);
1267                            inner_p.flags.insert(Flags::SHUTDOWN);
1268                            return self.poll(cx);
1269                        }
1270
1271                        // disconnect if shutdown
1272                        if inner_p.flags.contains(Flags::SHUTDOWN) {
1273                            return self.poll(cx);
1274                        }
1275                    }
1276
1277                    trace_timer_states(
1278                        "end",
1279                        inner_p.head_timer,
1280                        inner_p.ka_timer,
1281                        inner_p.shutdown_timer,
1282                    );
1283
1284                    if inner_p.flags.contains(Flags::SHUTDOWN) {
1285                        cx.waker().wake_by_ref();
1286                    }
1287                    Poll::Pending
1288                };
1289
1290                trace!("end flags: {:?}", &inner.flags);
1291
1292                poll
1293            }
1294        }
1295    }
1296}
1297
1298#[allow(dead_code)]
1299fn trace_timer_states(
1300    label: &str,
1301    head_timer: &TimerState,
1302    ka_timer: &TimerState,
1303    shutdown_timer: &TimerState,
1304) {
1305    trace!("{} timers:", label);
1306
1307    if head_timer.is_enabled() {
1308        trace!("  head {}", &head_timer);
1309    }
1310
1311    if ka_timer.is_enabled() {
1312        trace!("  keep-alive {}", &ka_timer);
1313    }
1314
1315    if shutdown_timer.is_enabled() {
1316        trace!("  shutdown {}", &shutdown_timer);
1317    }
1318}