1use std::{
2 collections::VecDeque,
3 fmt,
4 future::Future,
5 io, mem, net,
6 pin::Pin,
7 rc::Rc,
8 task::{Context, Poll},
9};
10
11use actix_codec::{Framed, FramedParts};
12use actix_rt::time::sleep_until;
13use actix_service::Service;
14use bitflags::bitflags;
15use bytes::{Buf, BytesMut};
16use futures_core::ready;
17use pin_project_lite::pin_project;
18use tokio::io::{AsyncRead, AsyncWrite};
19use tokio_util::codec::{Decoder as _, Encoder as _};
20use tracing::{error, trace};
21
22use super::{
23 codec::Codec,
24 decoder::MAX_BUFFER_SIZE,
25 payload::{Payload, PayloadSender, PayloadStatus},
26 timer::TimerState,
27 Message, MessageType,
28};
29use crate::{
30 body::{BodySize, BoxBody, MessageBody},
31 config::ServiceConfig,
32 error::{DispatchError, ParseError, PayloadError},
33 service::HttpFlow,
34 Error, Extensions, HttpMessage, OnConnectData, Request, Response, StatusCode,
35};
36
37const LW_BUFFER_SIZE: usize = 1024;
38const HW_BUFFER_SIZE: usize = 1024 * 8;
39const MAX_PIPELINED_MESSAGES: usize = 16;
40
41bitflags! {
42 #[derive(Debug, Clone, Copy)]
43 pub struct Flags: u8 {
44 const STARTED = 0b0000_0001;
46
47 const FINISHED = 0b0000_0010;
49
50 const KEEP_ALIVE = 0b0000_0100;
52
53 const SHUTDOWN = 0b0000_1000;
55
56 const READ_DISCONNECT = 0b0001_0000;
58
59 const WRITE_DISCONNECT = 0b0010_0000;
61 }
62}
63
64#[cfg(not(test))]
70pin_project! {
71 pub struct Dispatcher<T, S, B, X, U>
73 where
74 S: Service<Request>,
75 S::Error: Into<Response<BoxBody>>,
76
77 B: MessageBody,
78
79 X: Service<Request, Response = Request>,
80 X::Error: Into<Response<BoxBody>>,
81
82 U: Service<(Request, Framed<T, Codec>), Response = ()>,
83 U::Error: fmt::Display,
84 {
85 #[pin]
86 inner: DispatcherState<T, S, B, X, U>,
87 }
88}
89
90#[cfg(test)]
91pin_project! {
92 pub struct Dispatcher<T, S, B, X, U>
94 where
95 S: Service<Request>,
96 S::Error: Into<Response<BoxBody>>,
97
98 B: MessageBody,
99
100 X: Service<Request, Response = Request>,
101 X::Error: Into<Response<BoxBody>>,
102
103 U: Service<(Request, Framed<T, Codec>), Response = ()>,
104 U::Error: fmt::Display,
105 {
106 #[pin]
107 pub(super) inner: DispatcherState<T, S, B, X, U>,
108
109 pub(super) poll_count: u64,
111 }
112}
113
114pin_project! {
115 #[project = DispatcherStateProj]
116 pub(super) enum DispatcherState<T, S, B, X, U>
117 where
118 S: Service<Request>,
119 S::Error: Into<Response<BoxBody>>,
120
121 B: MessageBody,
122
123 X: Service<Request, Response = Request>,
124 X::Error: Into<Response<BoxBody>>,
125
126 U: Service<(Request, Framed<T, Codec>), Response = ()>,
127 U::Error: fmt::Display,
128 {
129 Normal { #[pin] inner: InnerDispatcher<T, S, B, X, U> },
130 Upgrade { #[pin] fut: U::Future },
131 }
132}
133
134pin_project! {
135 #[project = InnerDispatcherProj]
136 pub(super) struct InnerDispatcher<T, S, B, X, U>
137 where
138 S: Service<Request>,
139 S::Error: Into<Response<BoxBody>>,
140
141 B: MessageBody,
142
143 X: Service<Request, Response = Request>,
144 X::Error: Into<Response<BoxBody>>,
145
146 U: Service<(Request, Framed<T, Codec>), Response = ()>,
147 U::Error: fmt::Display,
148 {
149 flow: Rc<HttpFlow<S, X, U>>,
150 pub(super) flags: Flags,
151 peer_addr: Option<net::SocketAddr>,
152 conn_data: Option<Rc<Extensions>>,
153 config: ServiceConfig,
154 error: Option<DispatchError>,
155
156 #[pin]
157 pub(super) state: State<S, B, X>,
158 payload: Option<PayloadSender>,
160 payload_drainable: bool,
162 messages: VecDeque<DispatcherMessage>,
163
164 head_timer: TimerState,
165 ka_timer: TimerState,
166 shutdown_timer: TimerState,
167
168 pub(super) io: Option<T>,
169 read_buf: BytesMut,
170 write_buf: BytesMut,
171 codec: Codec,
172 }
173}
174
175enum DispatcherMessage {
176 Item(Request),
177 Upgrade(Request),
178 Error(Response<()>),
179}
180
181pin_project! {
182 #[project = StateProj]
183 pub(super) enum State<S, B, X>
184 where
185 S: Service<Request>,
186 X: Service<Request, Response = Request>,
187 B: MessageBody,
188 {
189 None,
190 ExpectCall { #[pin] fut: X::Future },
191 ServiceCall { #[pin] fut: S::Future },
192 SendPayload { #[pin] body: B },
193 SendErrorPayload { #[pin] body: BoxBody },
194 }
195}
196
197impl<S, B, X> State<S, B, X>
198where
199 S: Service<Request>,
200 X: Service<Request, Response = Request>,
201 B: MessageBody,
202{
203 pub(super) fn is_none(&self) -> bool {
204 matches!(self, State::None)
205 }
206}
207
208impl<S, B, X> fmt::Debug for State<S, B, X>
209where
210 S: Service<Request>,
211 X: Service<Request, Response = Request>,
212 B: MessageBody,
213{
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 match self {
216 Self::None => write!(f, "State::None"),
217 Self::ExpectCall { .. } => f.debug_struct("State::ExpectCall").finish_non_exhaustive(),
218 Self::ServiceCall { .. } => {
219 f.debug_struct("State::ServiceCall").finish_non_exhaustive()
220 }
221 Self::SendPayload { .. } => {
222 f.debug_struct("State::SendPayload").finish_non_exhaustive()
223 }
224 Self::SendErrorPayload { .. } => f
225 .debug_struct("State::SendErrorPayload")
226 .finish_non_exhaustive(),
227 }
228 }
229}
230
231#[derive(Debug)]
232enum PollResponse {
233 Upgrade(Request),
234 DoNothing,
235 DrainWriteBuf,
236}
237
238impl<T, S, B, X, U> Dispatcher<T, S, B, X, U>
239where
240 T: AsyncRead + AsyncWrite + Unpin,
241
242 S: Service<Request>,
243 S::Error: Into<Response<BoxBody>>,
244 S::Response: Into<Response<B>>,
245
246 B: MessageBody,
247
248 X: Service<Request, Response = Request>,
249 X::Error: Into<Response<BoxBody>>,
250
251 U: Service<(Request, Framed<T, Codec>), Response = ()>,
252 U::Error: fmt::Display,
253{
254 pub(crate) fn new(
256 io: T,
257 flow: Rc<HttpFlow<S, X, U>>,
258 config: ServiceConfig,
259 peer_addr: Option<net::SocketAddr>,
260 conn_data: OnConnectData,
261 ) -> Self {
262 Dispatcher {
263 inner: DispatcherState::Normal {
264 inner: InnerDispatcher {
265 flow,
266 flags: Flags::empty(),
267 peer_addr,
268 conn_data: conn_data.0.map(Rc::new),
269 config: config.clone(),
270 error: None,
271
272 state: State::None,
273 payload: None,
274 payload_drainable: false,
275 messages: VecDeque::new(),
276
277 head_timer: TimerState::new(config.client_request_deadline().is_some()),
278 ka_timer: TimerState::new(config.keep_alive().enabled()),
279 shutdown_timer: TimerState::new(config.client_disconnect_deadline().is_some()),
280
281 io: Some(io),
282 read_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
283 write_buf: BytesMut::with_capacity(HW_BUFFER_SIZE),
284 codec: Codec::new(config),
285 },
286 },
287
288 #[cfg(test)]
289 poll_count: 0,
290 }
291 }
292}
293
294impl<T, S, B, X, U> InnerDispatcher<T, S, B, X, U>
295where
296 T: AsyncRead + AsyncWrite + Unpin,
297
298 S: Service<Request>,
299 S::Error: Into<Response<BoxBody>>,
300 S::Response: Into<Response<B>>,
301
302 B: MessageBody,
303
304 X: Service<Request, Response = Request>,
305 X::Error: Into<Response<BoxBody>>,
306
307 U: Service<(Request, Framed<T, Codec>), Response = ()>,
308 U::Error: fmt::Display,
309{
310 fn can_read(&self, cx: &mut Context<'_>) -> bool {
311 if self.flags.contains(Flags::READ_DISCONNECT) {
312 false
313 } else if let Some(ref info) = self.payload {
314 matches!(
315 info.need_read(cx),
316 PayloadStatus::Read | PayloadStatus::Dropped
317 )
318 } else {
319 true
320 }
321 }
322
323 fn client_disconnected(self: Pin<&mut Self>) {
324 let this = self.project();
325
326 this.flags
327 .insert(Flags::READ_DISCONNECT | Flags::WRITE_DISCONNECT);
328
329 if let Some(mut payload) = this.payload.take() {
330 payload.set_error(PayloadError::Incomplete(None));
331 }
332 }
333
334 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
335 let InnerDispatcherProj { io, write_buf, .. } = self.project();
336 let mut io = Pin::new(io.as_mut().unwrap());
337
338 let len = write_buf.len();
339 let mut written = 0;
340
341 while written < len {
342 match io.as_mut().poll_write(cx, &write_buf[written..])? {
343 Poll::Ready(0) => {
344 error!("write zero; closing");
345 return Poll::Ready(Err(io::Error::new(io::ErrorKind::WriteZero, "")));
346 }
347
348 Poll::Ready(n) => written += n,
349
350 Poll::Pending => {
351 write_buf.advance(written);
352 return Poll::Pending;
353 }
354 }
355 }
356
357 write_buf.clear();
359
360 io.poll_flush(cx)
362 }
363
364 fn send_response_inner(
365 self: Pin<&mut Self>,
366 res: Response<()>,
367 body: &impl MessageBody,
368 ) -> Result<BodySize, DispatchError> {
369 let this = self.project();
370
371 let size = body.size();
372
373 this.codec
374 .encode(Message::Item((res, size)), this.write_buf)
375 .map_err(|err| {
376 if let Some(mut payload) = this.payload.take() {
377 payload.set_error(PayloadError::Incomplete(None));
378 }
379
380 DispatchError::Io(err)
381 })?;
382
383 Ok(size)
384 }
385
386 fn send_response(
387 mut self: Pin<&mut Self>,
388 res: Response<()>,
389 body: B,
390 ) -> Result<(), DispatchError> {
391 let size = self.as_mut().send_response_inner(res, &body)?;
392 let mut this = self.project();
393 this.state.set(match size {
394 BodySize::None | BodySize::Sized(0) => {
395 let payload_unfinished = this.payload.is_some();
396 let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
397 && *this.payload_drainable;
398
399 if payload_unfinished && !drain_payload {
400 this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
401 } else {
402 this.flags.insert(Flags::FINISHED);
403 }
404
405 State::None
406 }
407 _ => State::SendPayload { body },
408 });
409
410 Ok(())
411 }
412
413 fn send_error_response(
414 mut self: Pin<&mut Self>,
415 res: Response<()>,
416 body: BoxBody,
417 ) -> Result<(), DispatchError> {
418 let size = self.as_mut().send_response_inner(res, &body)?;
419 let mut this = self.project();
420 this.state.set(match size {
421 BodySize::None | BodySize::Sized(0) => {
422 let payload_unfinished = this.payload.is_some();
423 let drain_payload = this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
424 && *this.payload_drainable;
425
426 if payload_unfinished && !drain_payload {
427 this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
428 } else {
429 this.flags.insert(Flags::FINISHED);
430 }
431
432 State::None
433 }
434 _ => State::SendErrorPayload { body },
435 });
436
437 Ok(())
438 }
439
440 fn send_continue(self: Pin<&mut Self>) {
441 self.project()
442 .write_buf
443 .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
444 }
445
446 fn poll_response(
447 mut self: Pin<&mut Self>,
448 cx: &mut Context<'_>,
449 ) -> Result<PollResponse, DispatchError> {
450 'res: loop {
451 let mut this = self.as_mut().project();
452 match this.state.as_mut().project() {
453 StateProj::None => match this.messages.pop_front() {
455 Some(DispatcherMessage::Item(req)) => {
457 if req.head().expect() {
459 let fut = this.flow.expect.call(req);
461 this.state.set(State::ExpectCall { fut });
462 } else {
463 let fut = this.flow.service.call(req);
465 this.state.set(State::ServiceCall { fut });
466 };
467 }
468
469 Some(DispatcherMessage::Error(res)) => {
471 self.as_mut().send_error_response(res, BoxBody::new(()))?;
475 }
476
477 Some(DispatcherMessage::Upgrade(req)) => return Ok(PollResponse::Upgrade(req)),
479
480 None => {
482 this.flags.set(
484 Flags::KEEP_ALIVE,
485 this.payload.is_none() && this.codec.keep_alive(),
486 );
487
488 return Ok(PollResponse::DoNothing);
489 }
490 },
491
492 StateProj::ServiceCall { fut } => {
493 match fut.poll(cx) {
494 Poll::Ready(Ok(res)) => {
496 let (res, body) = res.into().replace_body(());
497 self.as_mut().send_response(res, body)?;
498 }
499
500 Poll::Ready(Err(err)) => {
502 let res: Response<BoxBody> = err.into();
503 let (res, body) = res.replace_body(());
504 self.as_mut().send_error_response(res, body)?;
505 }
506
507 Poll::Pending => {
510 if !self.as_mut().poll_request(cx)? {
513 return Ok(PollResponse::DoNothing);
514 }
515 }
517 }
518 }
519
520 StateProj::SendPayload { mut body } => {
521 while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
524 match body.as_mut().poll_next(cx) {
525 Poll::Ready(Some(Ok(item))) => {
526 this.codec
527 .encode(Message::Chunk(Some(item)), this.write_buf)?;
528 }
529
530 Poll::Ready(None) => {
531 this.codec.encode(Message::Chunk(None), this.write_buf)?;
532
533 let payload_unfinished = this.payload.is_some();
538 let drain_payload =
539 this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
540 && *this.payload_drainable;
541 let not_pipelined = this.messages.is_empty();
542
543 this.state.set(State::None);
546
547 if not_pipelined && payload_unfinished && !drain_payload {
548 this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
549 } else {
550 this.flags.insert(Flags::FINISHED);
551 }
552
553 continue 'res;
554 }
555
556 Poll::Ready(Some(Err(err))) => {
557 let err = err.into();
558 tracing::error!("Response payload stream error: {err:?}");
559 this.flags.insert(Flags::FINISHED);
560 return Err(DispatchError::Body(err));
561 }
562
563 Poll::Pending => return Ok(PollResponse::DoNothing),
564 }
565 }
566
567 return Ok(PollResponse::DrainWriteBuf);
570 }
571
572 StateProj::SendErrorPayload { mut body } => {
573 while this.write_buf.len() < super::payload::MAX_BUFFER_SIZE {
578 match body.as_mut().poll_next(cx) {
579 Poll::Ready(Some(Ok(item))) => {
580 this.codec
581 .encode(Message::Chunk(Some(item)), this.write_buf)?;
582 }
583
584 Poll::Ready(None) => {
585 this.codec.encode(Message::Chunk(None), this.write_buf)?;
586
587 let payload_unfinished = this.payload.is_some();
592 let drain_payload =
593 this.payload.as_ref().is_some_and(|pl| pl.is_dropped())
594 && *this.payload_drainable;
595 let not_pipelined = this.messages.is_empty();
596
597 this.state.set(State::None);
600
601 if not_pipelined && payload_unfinished && !drain_payload {
602 this.flags.insert(Flags::SHUTDOWN | Flags::FINISHED);
603 } else {
604 this.flags.insert(Flags::FINISHED);
605 }
606
607 continue 'res;
608 }
609
610 Poll::Ready(Some(Err(err))) => {
611 tracing::error!("Response payload stream error: {err:?}");
612 this.flags.insert(Flags::FINISHED);
613 return Err(DispatchError::Body(
614 Error::new_body().with_cause(err).into(),
615 ));
616 }
617
618 Poll::Pending => return Ok(PollResponse::DoNothing),
619 }
620 }
621
622 return Ok(PollResponse::DrainWriteBuf);
625 }
626
627 StateProj::ExpectCall { fut } => {
628 trace!(" calling expect service");
629
630 match fut.poll(cx) {
631 Poll::Ready(Ok(req)) => {
634 this.write_buf
635 .extend_from_slice(b"HTTP/1.1 100 Continue\r\n\r\n");
636 let fut = this.flow.service.call(req);
637 this.state.set(State::ServiceCall { fut });
638 }
639
640 Poll::Ready(Err(err)) => {
642 let res: Response<BoxBody> = err.into();
643 let (res, body) = res.replace_body(());
644 self.as_mut().send_error_response(res, body)?;
645 }
646
647 Poll::Pending => return Ok(PollResponse::DoNothing),
649 }
650 }
651 }
652 }
653 }
654
655 fn handle_request(
656 mut self: Pin<&mut Self>,
657 req: Request,
658 cx: &mut Context<'_>,
659 ) -> Result<(), DispatchError> {
660 {
662 let mut this = self.as_mut().project();
663
664 if req.head().expect() {
666 let fut = this.flow.expect.call(req);
668 this.state.set(State::ExpectCall { fut });
669 } else {
670 let fut = this.flow.service.call(req);
672 this.state.set(State::ServiceCall { fut });
673 };
674 };
675
676 loop {
678 match self.as_mut().project().state.project() {
679 StateProj::ExpectCall { fut } => {
680 match fut.poll(cx) {
681 Poll::Ready(Ok(req)) => {
683 self.as_mut().send_continue();
684
685 let mut this = self.as_mut().project();
686 let fut = this.flow.service.call(req);
687 this.state.set(State::ServiceCall { fut });
688
689 continue;
690 }
691
692 Poll::Ready(Err(err)) => {
696 let res: Response<BoxBody> = err.into();
697 let (res, body) = res.replace_body(());
698 return self.send_error_response(res, body);
699 }
700
701 Poll::Pending => return Ok(()),
704 }
705 }
706
707 StateProj::ServiceCall { fut } => {
708 return match fut.poll(cx) {
710 Poll::Ready(Ok(res)) => {
714 let (res, body) = res.into().replace_body(());
715 self.as_mut().send_response(res, body)
716 }
717
718 Poll::Pending => Ok(()),
720
721 Poll::Ready(Err(err)) => {
723 let res: Response<BoxBody> = err.into();
724 let (res, body) = res.replace_body(());
725 self.as_mut().send_error_response(res, body)
726 }
727 };
728 }
729
730 _ => {
731 unreachable!("State must be set to ServiceCall or ExceptCall in handle_request")
732 }
733 }
734 }
735 }
736
737 fn poll_request(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
741 let pipeline_queue_full = self.messages.len() >= MAX_PIPELINED_MESSAGES;
742 let can_not_read = !self.can_read(cx);
743
744 if pipeline_queue_full || can_not_read {
746 return Ok(false);
747 }
748
749 let mut this = self.as_mut().project();
750
751 let mut updated = false;
752
753 loop {
755 match this.codec.decode(this.read_buf) {
756 Ok(Some(msg)) => {
757 updated = true;
758
759 match msg {
760 Message::Item(mut req) => {
761 this.head_timer.clear(line!());
763
764 req.head_mut().peer_addr = *this.peer_addr;
765
766 req.conn_data.clone_from(this.conn_data);
767
768 match this.codec.message_type() {
769 MessageType::None => *this.payload_drainable = false,
771
772 MessageType::Stream if this.flow.upgrade.is_some() => {
776 *this.payload_drainable = false;
777 this.messages.push_back(DispatcherMessage::Upgrade(req));
778 break;
779 }
780
781 MessageType::Payload | MessageType::Stream => {
783 let (sender, payload) = Payload::create(false);
789 *req.payload() = crate::Payload::H1 { payload };
790 *this.payload = Some(sender);
791 *this.payload_drainable = req.chunked().unwrap_or(false);
792 }
793 }
794
795 if this.state.is_none() {
797 self.as_mut().handle_request(req, cx)?;
798 this = self.as_mut().project();
799 } else {
800 this.messages.push_back(DispatcherMessage::Item(req));
801 }
802 }
803
804 Message::Chunk(Some(chunk)) => {
805 if let Some(ref mut payload) = this.payload {
806 payload.feed_data(chunk);
807 } else {
808 error!("Internal server error: unexpected payload chunk");
809 this.flags.insert(Flags::READ_DISCONNECT);
810 this.messages.push_back(DispatcherMessage::Error(
811 Response::internal_server_error().drop_body(),
812 ));
813 *this.error = Some(DispatchError::InternalError);
814 break;
815 }
816 }
817
818 Message::Chunk(None) => {
819 if let Some(mut payload) = this.payload.take() {
820 payload.feed_eof();
821 *this.payload_drainable = false;
822 } else {
823 error!("Internal server error: unexpected eof");
824 this.flags.insert(Flags::READ_DISCONNECT);
825 this.messages.push_back(DispatcherMessage::Error(
826 Response::internal_server_error().drop_body(),
827 ));
828 *this.error = Some(DispatchError::InternalError);
829 break;
830 }
831 }
832 }
833 }
834
835 Ok(None) => break,
838
839 Err(ParseError::Io(err)) => {
840 trace!("I/O error: {}", &err);
841 self.as_mut().client_disconnected();
842 this = self.as_mut().project();
843 *this.error = Some(DispatchError::Io(err));
844 break;
845 }
846
847 Err(ParseError::TooLarge) => {
848 trace!("request head was too big; returning 431 response");
849
850 if let Some(mut payload) = this.payload.take() {
851 payload.set_error(PayloadError::Overflow);
852 }
853
854 this.messages
856 .push_back(DispatcherMessage::Error(Response::with_body(
857 StatusCode::REQUEST_HEADER_FIELDS_TOO_LARGE,
858 (),
859 )));
860
861 this.flags.insert(Flags::READ_DISCONNECT);
862 *this.error = Some(ParseError::TooLarge.into());
863
864 break;
865 }
866
867 Err(err) => {
868 trace!("parse error {}", &err);
869
870 if let Some(mut payload) = this.payload.take() {
871 payload.set_error(PayloadError::EncodingCorrupted);
872 }
873
874 this.messages.push_back(DispatcherMessage::Error(
876 Response::bad_request().drop_body(),
877 ));
878
879 this.flags.insert(Flags::READ_DISCONNECT);
880 *this.error = Some(err.into());
881 break;
882 }
883 }
884 }
885
886 Ok(updated)
887 }
888
889 fn poll_head_timer(
890 mut self: Pin<&mut Self>,
891 cx: &mut Context<'_>,
892 ) -> Result<(), DispatchError> {
893 let this = self.as_mut().project();
894
895 if let TimerState::Active { timer } = this.head_timer {
896 if timer.as_mut().poll(cx).is_ready() {
897 trace!("timed out on slow request; replying with 408 and closing connection");
900
901 let _ = self.as_mut().send_error_response(
902 Response::with_body(StatusCode::REQUEST_TIMEOUT, ()),
903 BoxBody::new(()),
904 );
905
906 self.project().flags.insert(Flags::SHUTDOWN);
907 }
908 };
909
910 Ok(())
911 }
912
913 fn poll_ka_timer(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
914 let this = self.as_mut().project();
915 if let TimerState::Active { timer } = this.ka_timer {
916 debug_assert!(
917 this.flags.contains(Flags::KEEP_ALIVE),
918 "keep-alive flag should be set when timer is active",
919 );
920 debug_assert!(
921 this.state.is_none(),
922 "dispatcher should not be in keep-alive phase if state is not none: {:?}",
923 this.state,
924 );
925
926 if timer.as_mut().poll(cx).is_ready() {
938 trace!("timer timed out; closing connection");
940 this.flags.insert(Flags::SHUTDOWN);
941
942 if let Some(deadline) = this.config.client_disconnect_deadline() {
943 this.shutdown_timer
945 .set_and_init(cx, sleep_until(deadline.into()), line!());
946 } else {
947 this.flags.insert(Flags::WRITE_DISCONNECT);
949 }
950 }
951 }
952
953 Ok(())
954 }
955
956 fn poll_shutdown_timer(
957 mut self: Pin<&mut Self>,
958 cx: &mut Context<'_>,
959 ) -> Result<(), DispatchError> {
960 let this = self.as_mut().project();
961 if let TimerState::Active { timer } = this.shutdown_timer {
962 debug_assert!(
963 this.flags.contains(Flags::SHUTDOWN),
964 "shutdown flag should be set when timer is active",
965 );
966
967 if timer.as_mut().poll(cx).is_ready() {
969 trace!("timed-out during shutdown");
970 return Err(DispatchError::DisconnectTimeout);
971 }
972 }
973
974 Ok(())
975 }
976
977 fn poll_timers(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<(), DispatchError> {
979 self.as_mut().poll_head_timer(cx)?;
980 self.as_mut().poll_ka_timer(cx)?;
981 self.as_mut().poll_shutdown_timer(cx)?;
982
983 Ok(())
984 }
985
986 #[inline(always)] fn read_available(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Result<bool, DispatchError> {
993 let this = self.project();
994
995 if this.flags.contains(Flags::READ_DISCONNECT) {
996 return Ok(false);
997 };
998
999 let mut io = Pin::new(this.io.as_mut().unwrap());
1000
1001 let mut read_some = false;
1002
1003 loop {
1004 if this.read_buf.len() >= MAX_BUFFER_SIZE {
1006 match this.payload.as_ref().map(|p| p.need_read(cx)) {
1025 Some(PayloadStatus::Pause) => {}
1027
1028 Some(PayloadStatus::Dropped) | Some(PayloadStatus::Read) | None => {
1030 cx.waker().wake_by_ref()
1031 }
1032 }
1033
1034 return Ok(false);
1035 }
1036
1037 let remaining = this.read_buf.capacity() - this.read_buf.len();
1039 if remaining < LW_BUFFER_SIZE {
1040 this.read_buf.reserve(HW_BUFFER_SIZE - remaining);
1041 }
1042
1043 match tokio_util::io::poll_read_buf(io.as_mut(), cx, this.read_buf) {
1044 Poll::Ready(Ok(n)) => {
1045 if !this.payload.as_ref().is_some_and(|pl| pl.is_dropped()) {
1048 this.flags.remove(Flags::FINISHED);
1049 }
1050
1051 if n == 0 {
1052 return Ok(true);
1053 }
1054
1055 read_some = true;
1056 }
1057
1058 Poll::Pending => {
1059 return Ok(false);
1060 }
1061
1062 Poll::Ready(Err(err)) => {
1063 return match err.kind() {
1064 io::ErrorKind::WouldBlock => Ok(false),
1066
1067 io::ErrorKind::ConnectionReset if read_some => Ok(true),
1069
1070 _ => Err(DispatchError::Io(err)),
1071 };
1072 }
1073 }
1074 }
1075 }
1076
1077 fn upgrade(self: Pin<&mut Self>, req: Request) -> U::Future {
1079 let this = self.project();
1080 let mut parts = FramedParts::with_read_buf(
1081 this.io.take().unwrap(),
1082 mem::take(this.codec),
1083 mem::take(this.read_buf),
1084 );
1085 parts.write_buf = mem::take(this.write_buf);
1086 let framed = Framed::from_parts(parts);
1087 this.flow.upgrade.as_ref().unwrap().call((req, framed))
1088 }
1089}
1090
1091impl<T, S, B, X, U> Future for Dispatcher<T, S, B, X, U>
1092where
1093 T: AsyncRead + AsyncWrite + Unpin,
1094
1095 S: Service<Request>,
1096 S::Error: Into<Response<BoxBody>>,
1097 S::Response: Into<Response<B>>,
1098
1099 B: MessageBody,
1100
1101 X: Service<Request, Response = Request>,
1102 X::Error: Into<Response<BoxBody>>,
1103
1104 U: Service<(Request, Framed<T, Codec>), Response = ()>,
1105 U::Error: fmt::Display,
1106{
1107 type Output = Result<(), DispatchError>;
1108
1109 #[inline]
1110 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
1111 let this = self.as_mut().project();
1112
1113 #[cfg(test)]
1114 {
1115 *this.poll_count += 1;
1116 }
1117
1118 match this.inner.project() {
1119 DispatcherStateProj::Upgrade { fut: upgrade } => upgrade.poll(cx).map_err(|err| {
1120 error!("Upgrade handler error: {}", err);
1121 DispatchError::Upgrade
1122 }),
1123
1124 DispatcherStateProj::Normal { mut inner } => {
1125 trace!("start flags: {:?}", &inner.flags);
1126
1127 trace_timer_states(
1128 "start",
1129 &inner.head_timer,
1130 &inner.ka_timer,
1131 &inner.shutdown_timer,
1132 );
1133
1134 inner.as_mut().poll_timers(cx)?;
1135
1136 let poll = if inner.flags.contains(Flags::SHUTDOWN) {
1137 if inner.flags.contains(Flags::WRITE_DISCONNECT) {
1138 Poll::Ready(Ok(()))
1139 } else {
1140 ready!(inner.as_mut().poll_flush(cx))?;
1142 Pin::new(inner.as_mut().project().io.as_mut().unwrap())
1143 .poll_shutdown(cx)
1144 .map_err(DispatchError::from)
1145 }
1146 } else {
1147 let should_disconnect = inner.as_mut().read_available(cx)?;
1149
1150 if !inner.read_buf.is_empty() && inner.flags.contains(Flags::KEEP_ALIVE) {
1152 let inner = inner.as_mut().project();
1153 inner.flags.remove(Flags::KEEP_ALIVE);
1154 inner.ka_timer.clear(line!());
1155 }
1156
1157 if !inner.flags.contains(Flags::STARTED) {
1158 inner.as_mut().project().flags.insert(Flags::STARTED);
1159
1160 if let Some(deadline) = inner.config.client_request_deadline() {
1161 inner.as_mut().project().head_timer.set_and_init(
1162 cx,
1163 sleep_until(deadline.into()),
1164 line!(),
1165 );
1166 }
1167 }
1168
1169 inner.as_mut().poll_request(cx)?;
1170
1171 if should_disconnect {
1172 let inner = inner.as_mut().project();
1174 inner.flags.insert(Flags::READ_DISCONNECT);
1175 if let Some(mut payload) = inner.payload.take() {
1176 payload.set_error(PayloadError::Incomplete(None));
1177 payload.feed_eof();
1178 }
1179 };
1180
1181 loop {
1182 let drain = match inner.as_mut().poll_response(cx)? {
1185 PollResponse::DrainWriteBuf => true,
1186
1187 PollResponse::DoNothing => {
1188 if inner.flags.contains(Flags::KEEP_ALIVE | Flags::FINISHED) {
1191 if let Some(timer) = inner.config.keep_alive_deadline() {
1192 inner.as_mut().project().ka_timer.set_and_init(
1193 cx,
1194 sleep_until(timer.into()),
1195 line!(),
1196 );
1197 }
1198 }
1199
1200 false
1201 }
1202
1203 PollResponse::Upgrade(req) => {
1205 let upgrade = inner.upgrade(req);
1206 self.as_mut()
1207 .project()
1208 .inner
1209 .set(DispatcherState::Upgrade { fut: upgrade });
1210 return self.poll(cx);
1211 }
1212 };
1213
1214 let flush_was_ready = inner.as_mut().poll_flush(cx)?.is_ready();
1221
1222 if !flush_was_ready || !drain {
1227 break;
1228 }
1229 }
1230
1231 if inner.flags.contains(Flags::WRITE_DISCONNECT) {
1233 trace!("client is gone; disconnecting");
1234 return Poll::Ready(Ok(()));
1235 }
1236
1237 let inner_p = inner.as_mut().project();
1238 let state_is_none = inner_p.state.is_none();
1239
1240 if inner_p.flags.contains(Flags::READ_DISCONNECT)
1248 && (!inner_p.config.h1_allow_half_closed() || state_is_none)
1249 {
1250 trace!("read half closed; start shutdown");
1251 inner_p.flags.insert(Flags::SHUTDOWN);
1252 }
1253
1254 if state_is_none && inner_p.write_buf.is_empty() {
1256 if let Some(err) = inner_p.error.take() {
1257 error!("stream error: {}", &err);
1258 return Poll::Ready(Err(err));
1259 }
1260
1261 if inner_p.flags.contains(Flags::FINISHED)
1263 && !inner_p.flags.contains(Flags::KEEP_ALIVE)
1264 && inner_p.payload.is_none()
1265 {
1266 inner_p.flags.remove(Flags::FINISHED);
1267 inner_p.flags.insert(Flags::SHUTDOWN);
1268 return self.poll(cx);
1269 }
1270
1271 if inner_p.flags.contains(Flags::SHUTDOWN) {
1273 return self.poll(cx);
1274 }
1275 }
1276
1277 trace_timer_states(
1278 "end",
1279 inner_p.head_timer,
1280 inner_p.ka_timer,
1281 inner_p.shutdown_timer,
1282 );
1283
1284 if inner_p.flags.contains(Flags::SHUTDOWN) {
1285 cx.waker().wake_by_ref();
1286 }
1287 Poll::Pending
1288 };
1289
1290 trace!("end flags: {:?}", &inner.flags);
1291
1292 poll
1293 }
1294 }
1295 }
1296}
1297
1298#[allow(dead_code)]
1299fn trace_timer_states(
1300 label: &str,
1301 head_timer: &TimerState,
1302 ka_timer: &TimerState,
1303 shutdown_timer: &TimerState,
1304) {
1305 trace!("{} timers:", label);
1306
1307 if head_timer.is_enabled() {
1308 trace!(" head {}", &head_timer);
1309 }
1310
1311 if ka_timer.is_enabled() {
1312 trace!(" keep-alive {}", &ka_timer);
1313 }
1314
1315 if shutdown_timer.is_enabled() {
1316 trace!(" shutdown {}", &shutdown_timer);
1317 }
1318}