1pub mod answers;
10pub mod diagnostics;
11pub mod editor;
12pub mod parser;
13
14use std::{
15 cell::RefCell,
16 io::ErrorKind,
17 net::{Shutdown, SocketAddr},
18 rc::{Rc, Weak},
19 time::{Duration, Instant},
20};
21
22use mio::{Interest, Token, net::TcpStream};
23use rusty_ulid::Ulid;
24use sozu_command::{
25 config::MAX_LOOP_ITERATIONS,
26 logging::EndpointRecord,
27 proto::command::{Event, EventKind, ListenerType},
28};
29
30use crate::metrics::names;
32use crate::{
33 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
34 FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession,
35 Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult,
36 StateResult,
37 backends::{Backend, BackendError},
38 pool::{Checkout, Pool},
39 protocol::{
40 SessionState,
41 http::{
42 answers::DefaultAnswerStream,
43 diagnostics::{diagnostic_400_502, diagnostic_413_507},
44 editor::HttpContext,
45 parser::Method,
46 },
47 pipe::WebSocketContext,
48 },
49 retry::RetryPolicy,
50 server::{CONN_RETRIES, push_event},
51 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
52 sozu_command::{
53 logging::{LogContext, ansi_palette},
54 ready::Ready,
55 },
56 timer::TimeoutContainer,
57};
58
59macro_rules! log_context {
74 ($self:expr) => {
75 log_context!($self, $self.response_parsing_phase())
76 };
77 ($self:expr, $response_phase:expr) => {{
78 let (open, reset, grey, gray, white) = ansi_palette();
79 format!(
80 "{gray}{ctx}{reset}\t{open}KAWA-H1{reset}\t{grey}Session{reset}({gray}public{reset}={white}{public}{reset}, {gray}session{reset}={white}{session}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}request_parsing_phase{reset}={white}{request_parsing_phase:?}{reset}, {gray}response_parsing_phase{reset}={white}{response_parsing_phase:?}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
81 open = open,
82 reset = reset,
83 grey = grey,
84 gray = gray,
85 white = white,
86 ctx = $self.context.log_context(),
87 public = $self.context.public_address.to_string(),
88 session = $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
89 frontend = $self.frontend_token.0,
90 request_parsing_phase = $self.request_stream.parsing_phase,
91 response_parsing_phase = $response_phase,
92 frontend_readiness = $self.frontend_readiness,
93 backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
94 backend_readiness = $self.backend_readiness,
95 )
96 }};
97}
98
99type GenericHttpStream = kawa::Kawa<Checkout>;
101
102impl kawa::AsBuffer for Checkout {
103 fn as_buffer(&self) -> &[u8] {
104 self.inner.extra()
105 }
106 fn as_mut_buffer(&mut self) -> &mut [u8] {
107 self.inner.extra_mut()
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum DefaultAnswer {
113 Answer301 {
114 location: String,
115 },
116 Answer302 {
119 location: String,
120 },
121 Answer308 {
125 location: String,
126 },
127 Answer400 {
128 message: String,
129 phase: kawa::ParsingPhaseMarker,
130 successfully_parsed: String,
131 partially_parsed: String,
132 invalid: String,
133 },
134 Answer401 {
135 www_authenticate: Option<String>,
139 },
140 Answer404 {},
141 Answer408 {
142 duration: String,
143 },
144 Answer413 {
145 message: String,
146 phase: kawa::ParsingPhaseMarker,
147 capacity: usize,
148 },
149 Answer421 {},
154 Answer429 {
160 retry_after: Option<u32>,
161 },
162 Answer502 {
163 message: String,
164 phase: kawa::ParsingPhaseMarker,
165 successfully_parsed: String,
166 partially_parsed: String,
167 invalid: String,
168 },
169 Answer503 {
170 message: String,
171 },
172 Answer504 {
173 duration: String,
174 },
175 Answer507 {
176 phase: kawa::ParsingPhaseMarker,
177 message: String,
178 capacity: usize,
179 },
180}
181
182impl From<&DefaultAnswer> for u16 {
183 fn from(answer: &DefaultAnswer) -> u16 {
184 match answer {
185 DefaultAnswer::Answer301 { .. } => 301,
186 DefaultAnswer::Answer302 { .. } => 302,
187 DefaultAnswer::Answer308 { .. } => 308,
188 DefaultAnswer::Answer400 { .. } => 400,
189 DefaultAnswer::Answer401 { .. } => 401,
190 DefaultAnswer::Answer404 { .. } => 404,
191 DefaultAnswer::Answer408 { .. } => 408,
192 DefaultAnswer::Answer413 { .. } => 413,
193 DefaultAnswer::Answer421 { .. } => 421,
194 DefaultAnswer::Answer429 { .. } => 429,
195 DefaultAnswer::Answer502 { .. } => 502,
196 DefaultAnswer::Answer503 { .. } => 503,
197 DefaultAnswer::Answer504 { .. } => 504,
198 DefaultAnswer::Answer507 { .. } => 507,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum TimeoutStatus {
205 Request,
206 Response,
207 WaitingForNewRequest,
208 WaitingForResponse,
209}
210
211pub enum ResponseStream {
212 BackendAnswer(GenericHttpStream),
213 DefaultAnswer(u16, DefaultAnswerStream),
214}
215
216pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
218 answers: Rc<RefCell<answers::HttpAnswers>>,
219 pub backend: Option<Rc<RefCell<Backend>>>,
220 backend_connection_status: BackendConnectionStatus,
221 pub backend_readiness: Readiness,
222 pub backend_socket: Option<TcpStream>,
223 backend_stop: Option<Instant>,
224 pub backend_token: Option<Token>,
225 pub container_backend_timeout: TimeoutContainer,
226 pub container_frontend_timeout: TimeoutContainer,
227 configured_backend_timeout: Duration,
228 configured_connect_timeout: Duration,
229 configured_frontend_timeout: Duration,
230 connection_attempts: u8,
232 pub frontend_readiness: Readiness,
233 pub frontend_socket: Front,
234 frontend_token: Token,
235 keepalive_count: usize,
236 listener: Rc<RefCell<L>>,
237 pub request_stream: GenericHttpStream,
238 pub response_stream: ResponseStream,
239 pub context: HttpContext,
244}
245
246impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
247 #[allow(clippy::too_many_arguments)]
256 pub fn new(
257 answers: Rc<RefCell<answers::HttpAnswers>>,
258 configured_backend_timeout: Duration,
259 configured_connect_timeout: Duration,
260 configured_frontend_timeout: Duration,
261 container_frontend_timeout: TimeoutContainer,
262 frontend_socket: Front,
263 frontend_token: Token,
264 listener: Rc<RefCell<L>>,
265 pool: Weak<RefCell<Pool>>,
266 protocol: Protocol,
267 public_address: SocketAddr,
268 session_id: Ulid,
269 request_id: Ulid,
270 session_address: Option<SocketAddr>,
271 sticky_name: String,
272 ) -> Result<Http<Front, L>, AcceptError> {
273 let (front_buffer, back_buffer) = match pool.upgrade() {
274 Some(pool) => {
275 let mut pool = pool.borrow_mut();
276 match (pool.checkout(), pool.checkout()) {
277 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
278 _ => return Err(AcceptError::BufferCapacityReached),
279 }
280 }
281 None => return Err(AcceptError::BufferCapacityReached),
282 };
283 let sozu_id_header = listener.borrow().get_sozu_id_header().to_string();
284 let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
285 let send_x_real_ip = listener.borrow().get_send_x_real_ip();
286 Ok(Http {
287 answers,
288 backend_connection_status: BackendConnectionStatus::NotConnected,
289 backend_readiness: Readiness::new(),
290 backend_socket: None,
291 backend_stop: None,
292 backend_token: None,
293 backend: None,
294 configured_backend_timeout,
295 configured_connect_timeout,
296 configured_frontend_timeout,
297 connection_attempts: 0,
298 container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
299 container_frontend_timeout,
300 frontend_readiness: Readiness {
301 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
302 event: Ready::EMPTY,
303 },
304 frontend_socket,
305 frontend_token,
306 keepalive_count: 0,
307 listener,
308 request_stream: GenericHttpStream::new(
309 kawa::Kind::Request,
310 kawa::Buffer::new(front_buffer),
311 ),
312 response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
313 kawa::Kind::Response,
314 kawa::Buffer::new(back_buffer),
315 )),
316 context: HttpContext::new(
317 session_id,
318 request_id,
319 protocol,
320 public_address,
321 session_address,
322 sticky_name,
323 sozu_id_header,
324 elide_x_real_ip,
325 send_x_real_ip,
326 ),
327 })
328 }
329
330 pub fn reset(&mut self) {
332 trace!("{} ============== reset", log_context!(self));
333 let response_stream = match &mut self.response_stream {
334 ResponseStream::BackendAnswer(response_stream) => response_stream,
335 _ => return,
336 };
337
338 self.context.id = Ulid::generate();
339 self.context.reset();
340
341 self.request_stream.clear();
342 response_stream.clear();
343 let keepalive_before = self.keepalive_count;
348 self.keepalive_count += 1;
349 debug_assert_eq!(
350 self.keepalive_count,
351 keepalive_before + 1,
352 "reset() must advance keepalive_count by exactly one"
353 );
354 debug_assert!(
358 response_stream.is_initial(),
359 "response stream must be reset to initial on keep-alive"
360 );
361 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
362
363 if let Some(backend) = &mut self.backend {
364 let mut backend = backend.borrow_mut();
365 backend.active_requests = backend.active_requests.saturating_sub(1);
366 }
367
368 self.container_backend_timeout.cancel();
371 self.container_frontend_timeout
372 .set_duration(self.configured_frontend_timeout);
373 self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
374 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
375
376 let response_phase = response_stream.parsing_phase;
383 let response_storage = &mut response_stream.storage;
384 if !response_storage.is_empty() {
385 warn!(
386 "{} Leftover fragment from response: {}",
387 log_context!(self, Some(response_phase)),
388 parser::view(
389 response_storage.used(),
390 16,
391 &[response_storage.start, response_storage.end,],
392 )
393 );
394 }
395
396 response_storage.clear();
397 if !self.request_stream.storage.is_empty() {
398 self.frontend_readiness.event.insert(Ready::READABLE);
399 } else {
400 self.request_stream.storage.clear();
401 }
402 }
403
404 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
405 trace!("{} ============== readable", log_context!(self));
406 if !self.container_frontend_timeout.reset() {
407 error!(
408 "could not reset front timeout {:?}",
409 self.configured_frontend_timeout
410 );
411 self.print_state(self.protocol_string());
412 }
413
414 let response_stream = match &mut self.response_stream {
415 ResponseStream::BackendAnswer(response_stream) => response_stream,
416 ResponseStream::DefaultAnswer(..) => {
417 error!(
418 "{} Sending default answer, should not read from frontend socket",
419 log_context!(self)
420 );
421
422 self.frontend_readiness.interest.remove(Ready::READABLE);
423 self.frontend_readiness.interest.insert(Ready::WRITABLE);
424 return StateResult::Continue;
425 }
426 };
427
428 if self.request_stream.storage.is_full() {
429 self.frontend_readiness.interest.remove(Ready::READABLE);
430 if self.request_stream.is_main_phase() {
431 self.backend_readiness.interest.insert(Ready::WRITABLE);
432 } else {
433 self.set_answer(DefaultAnswer::Answer413 {
435 capacity: self.request_stream.storage.capacity(),
436 phase: self.request_stream.parsing_phase.marker(),
437 message: diagnostic_413_507(self.request_stream.parsing_phase),
438 });
439 }
440 return StateResult::Continue;
441 }
442
443 let available_space = self.request_stream.storage.available_space();
444 let (size, socket_state) = self
445 .frontend_socket
446 .socket_read(self.request_stream.storage.space());
447
448 debug!(
449 "{} Read {} bytes",
450 log_context!(self, Some(response_stream.parsing_phase)),
451 size
452 );
453
454 debug_assert!(
459 size <= available_space,
460 "socket_read returned {size} bytes for a {available_space}-byte buffer slice"
461 );
462
463 if size > 0 {
464 let used_before = self.request_stream.storage.used().len();
465 self.request_stream.storage.fill(size);
466 debug_assert_eq!(
467 self.request_stream.storage.used().len(),
468 used_before + size,
469 "storage.fill(size) must grow the used region by exactly the bytes read"
470 );
471 count!(names::backend::BYTES_IN, size as i64);
472 metrics.bin += size;
473 } else {
477 self.frontend_readiness.event.remove(Ready::READABLE);
478 }
479
480 match socket_state {
481 SocketResult::Error | SocketResult::Closed => {
482 if self.request_stream.is_initial() {
483 if self.keepalive_count == 0 {
488 self.frontend_socket.read_error();
489 }
490 } else {
491 self.frontend_socket.read_error();
492 self.log_request_error(
493 metrics,
494 &format!(
495 "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
496 self.frontend_readiness,
497 self.backend_readiness,
498 )
499 );
500 }
501 return StateResult::CloseSession;
502 }
503 SocketResult::WouldBlock => {
504 self.frontend_readiness.event.remove(Ready::READABLE);
505 }
506 SocketResult::Continue => {}
507 };
508
509 trace!(
510 "{} ============== readable_parse",
511 log_context!(self, Some(response_stream.parsing_phase))
512 );
513 let was_initial = self.request_stream.is_initial();
514 let was_not_proxying = !self.request_stream.is_main_phase();
515
516 kawa::h1::parse(&mut self.request_stream, &mut self.context);
517 if was_initial && !self.request_stream.is_initial() {
520 self.container_frontend_timeout
524 .set_duration(self.configured_frontend_timeout);
525 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
526 incr!(names::http::REQUESTS);
527 }
528
529 if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
530 incr!(names::http::FRONTEND_PARSE_ERRORS);
531 warn!(
532 "{} Parsing request error in {:?}: {}",
533 log_context!(self, Some(response_stream.parsing_phase)),
534 marker,
535 match kind {
536 kawa::ParsingErrorKind::Consuming { index } => {
537 let kawa = &self.request_stream;
538 parser::view(
539 kawa.storage.used(),
540 16,
541 &[
542 kawa.storage.start,
543 kawa.storage.head,
544 index as usize,
545 kawa.storage.end,
546 ],
547 )
548 }
549 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
550 }
551 );
552 if response_stream.consumed {
553 self.log_request_error(metrics, "Parsing error on the request");
554 return StateResult::CloseSession;
555 } else {
556 let (message, successfully_parsed, partially_parsed, invalid) =
557 diagnostic_400_502(marker, kind, &self.request_stream);
558 self.set_answer(DefaultAnswer::Answer400 {
559 message,
560 phase: marker,
561 successfully_parsed,
562 partially_parsed,
563 invalid,
564 });
565 return StateResult::Continue;
566 }
567 }
568
569 if self.request_stream.is_main_phase() {
570 self.backend_readiness.interest.insert(Ready::WRITABLE);
571 if was_not_proxying {
572 trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
575 return StateResult::ConnectBackend;
576 }
577 }
578 if self.request_stream.is_terminated() {
579 self.frontend_readiness.interest.remove(Ready::READABLE);
580 }
581
582 StateResult::Continue
583 }
584
585 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
586 trace!("{} ============== writable", log_context!(self));
587 let response_stream = match &mut self.response_stream {
588 ResponseStream::BackendAnswer(response_stream) => response_stream,
589 _ => return self.writable_default_answer(metrics),
590 };
591
592 response_stream.prepare(&mut kawa::h1::BlockConverter);
593
594 let bufs = response_stream.as_io_slice();
595 if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
596 self.frontend_readiness.interest.remove(Ready::WRITABLE);
597 }
599
600 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
601
602 debug!(
603 "{} Wrote {} bytes",
604 log_context!(self, Some(response_stream.parsing_phase)),
605 size
606 );
607
608 if size > 0 {
609 let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
615 debug_assert!(
616 size <= queued,
617 "socket_write_vectored reported {size} bytes for {queued} queued"
618 );
619 response_stream.consume(size);
620 count!(names::backend::BYTES_OUT, size as i64);
621 metrics.bout += size;
622 self.backend_readiness.interest.insert(Ready::READABLE);
623 }
624
625 match socket_state {
626 SocketResult::Error | SocketResult::Closed => {
627 self.frontend_socket.write_error();
628 self.log_request_error(
629 metrics,
630 &format!(
631 "front socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
632 self.frontend_readiness,
633 self.backend_readiness,
634 ),
635 );
636 return StateResult::CloseSession;
637 }
638 SocketResult::WouldBlock => {
639 self.frontend_readiness.event.remove(Ready::WRITABLE);
640 }
641 SocketResult::Continue => {}
642 }
643
644 if self.frontend_socket.socket_wants_write() {
645 return StateResult::Continue;
646 }
647
648 if response_stream.is_terminated() && response_stream.is_completed() {
649 if self.context.closing {
650 debug!("{} closing proxy, no keep alive", log_context!(self));
651 self.log_request_success(metrics);
652 return StateResult::CloseSession;
653 }
654
655 match response_stream.detached.status_line {
656 kawa::StatusLine::Response { code: 101, .. } => {
657 trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
658 self.log_request_success(metrics);
659 return StateResult::Upgrade;
660 }
661 kawa::StatusLine::Response { code: 100, .. } => {
662 trace!(
663 "{} ============== HANDLE CONTINUE!",
664 log_context!(self, Some(response_stream.parsing_phase))
665 );
666 response_stream.clear();
667 self.log_request_success(metrics);
668 return StateResult::Continue;
669 }
670 kawa::StatusLine::Response { code: 103, .. } => {
671 self.backend_readiness.event.insert(Ready::READABLE);
672 trace!(
673 "{} ============== HANDLE EARLY HINT!",
674 log_context!(self, Some(response_stream.parsing_phase))
675 );
676 response_stream.clear();
677 self.log_request_success(metrics);
678 return StateResult::Continue;
679 }
680 _ => (),
681 }
682
683 let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
684 let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
685 if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
686 && request_length_known
687 {
688 error!(
689 "{} Response terminated before request, this case is not handled properly yet",
690 log_context!(self)
691 );
692 incr!(names::http::EARLY_RESPONSE_CLOSE);
693 }
696
697 trace!(
702 "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
703 log_context!(self),
704 self.context.keep_alive_frontend,
705 self.context.keep_alive_backend,
706 response_length_known
707 );
708
709 self.log_request_success(metrics);
710 return match (
711 self.context.keep_alive_frontend,
712 self.context.keep_alive_backend,
713 response_length_known,
714 ) {
715 (true, true, true) => {
716 debug!("{} Keep alive frontend/backend", log_context!(self));
717 metrics.reset();
718 self.reset();
719 StateResult::Continue
720 }
721 (true, false, true) => {
722 debug!("{} Keep alive frontend", log_context!(self));
723 metrics.reset();
724 self.reset();
725 StateResult::CloseBackend
726 }
727 _ => {
728 debug!("{} No keep alive", log_context!(self));
729 StateResult::CloseSession
730 }
731 };
732 }
733 StateResult::Continue
734 }
735
736 fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
737 trace!(
738 "{} ============== writable_default_answer",
739 log_context!(self)
740 );
741 let response_stream = match &mut self.response_stream {
742 ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
743 _ => return StateResult::CloseSession,
744 };
745 let bufs = response_stream.as_io_slice();
746 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
747
748 let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
752 debug_assert!(
753 size <= queued,
754 "default-answer write reported {size} bytes for {queued} queued"
755 );
756 count!(names::backend::BYTES_OUT, size as i64);
757 metrics.bout += size;
758 response_stream.consume(size);
759
760 if size == 0 || socket_state != SocketResult::Continue {
761 self.frontend_readiness.event.remove(Ready::WRITABLE);
762 }
763
764 if response_stream.is_completed() {
765 save_http_status_metric(self.context.status, self.context.log_context());
766 self.log_default_answer_success(metrics);
767 self.frontend_readiness.reset();
768 self.backend_readiness.reset();
769 return StateResult::CloseSession;
770 }
771
772 if socket_state == SocketResult::Error {
773 self.frontend_socket.write_error();
774 self.log_request_error(
775 metrics,
776 "error writing default answer to front socket, closing",
777 );
778 StateResult::CloseSession
779 } else {
780 StateResult::Continue
781 }
782 }
783
784 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
785 trace!("{} ============== backend_writable", log_context!(self));
786 if let ResponseStream::DefaultAnswer(..) = self.response_stream {
787 error!(
788 "{}\tsending default answer, should not write to back",
789 log_context!(self)
790 );
791 self.backend_readiness.interest.remove(Ready::WRITABLE);
792 self.frontend_readiness.interest.insert(Ready::WRITABLE);
793 return SessionResult::Continue;
794 }
795
796 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
797 backend_socket
798 } else {
799 self.log_request_error(metrics, "back socket not found, closing session");
800 return SessionResult::Close;
801 };
802
803 self.request_stream.prepare(&mut kawa::h1::BlockConverter);
804
805 let bufs = self.request_stream.as_io_slice();
806 if bufs.is_empty() {
807 self.backend_readiness.interest.remove(Ready::WRITABLE);
808 return SessionResult::Continue;
809 }
810
811 let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
812 debug!("{} Wrote {} bytes", log_context!(self), size);
813
814 if size > 0 {
815 let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
819 debug_assert!(
820 size <= queued,
821 "backend socket_write_vectored reported {size} bytes for {queued} queued"
822 );
823 self.request_stream.consume(size);
824 count!(names::backend::BACK_BYTES_OUT, size as i64);
825 metrics.backend_bout += size;
826 self.frontend_readiness.interest.insert(Ready::READABLE);
827 self.backend_readiness.interest.insert(Ready::READABLE);
828 } else {
829 self.backend_readiness.event.remove(Ready::WRITABLE);
830 }
831
832 match socket_state {
833 SocketResult::Error | SocketResult::Closed => {
843 self.frontend_readiness.interest.remove(Ready::READABLE);
844 self.backend_readiness.interest.remove(Ready::WRITABLE);
845 return SessionResult::Continue;
846 }
847 SocketResult::WouldBlock => {
848 self.backend_readiness.event.remove(Ready::WRITABLE);
849 }
850 SocketResult::Continue => {}
851 }
852
853 if self.request_stream.is_terminated() && self.request_stream.is_completed() {
854 self.backend_readiness.interest.remove(Ready::WRITABLE);
855
856 self.container_frontend_timeout.cancel();
858 self.container_backend_timeout.reset();
859 }
860 SessionResult::Continue
861 }
862
863 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
865 trace!("{} ============== backend_readable", log_context!(self));
866 if !self.container_backend_timeout.reset() {
867 error!(
868 "{} Could not reset back timeout {:?}",
869 log_context!(self),
870 self.configured_backend_timeout
871 );
872 self.print_state(self.protocol_string());
873 }
874
875 let response_stream = match &mut self.response_stream {
876 ResponseStream::BackendAnswer(response_stream) => response_stream,
877 _ => {
878 error!(
879 "{} Sending default answer, should not read from backend socket",
880 log_context!(self),
881 );
882
883 self.backend_readiness.interest.remove(Ready::READABLE);
884 self.frontend_readiness.interest.insert(Ready::WRITABLE);
885 return SessionResult::Continue;
886 }
887 };
888
889 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
890 backend_socket
891 } else {
892 self.log_request_error(metrics, "back socket not found, closing session");
893 return SessionResult::Close;
894 };
895
896 if response_stream.storage.is_full() {
897 self.backend_readiness.interest.remove(Ready::READABLE);
898 if response_stream.is_main_phase() {
899 self.frontend_readiness.interest.insert(Ready::WRITABLE);
900 } else {
901 let capacity = response_stream.storage.capacity();
903 let phase = response_stream.parsing_phase.marker();
904 let message = diagnostic_413_507(response_stream.parsing_phase);
905 self.set_answer(DefaultAnswer::Answer507 {
906 capacity,
907 phase,
908 message,
909 });
910 }
911 return SessionResult::Continue;
912 }
913
914 let available_space = response_stream.storage.available_space();
915 let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
916 debug!(
917 "{} Read {} bytes",
918 log_context!(self, Some(response_stream.parsing_phase)),
919 size
920 );
921
922 debug_assert!(
926 size <= available_space,
927 "backend socket_read returned {size} bytes for a {available_space}-byte slice"
928 );
929
930 if size > 0 {
931 let used_before = response_stream.storage.used().len();
932 response_stream.storage.fill(size);
933 debug_assert_eq!(
934 response_stream.storage.used().len(),
935 used_before + size,
936 "response storage.fill(size) must grow the used region by exactly the bytes read"
937 );
938 count!(names::backend::BACK_BYTES_IN, size as i64);
939 metrics.backend_bin += size;
940 self.container_frontend_timeout.cancel();
951 } else {
952 self.backend_readiness.event.remove(Ready::READABLE);
953 }
954
955 match socket_state {
957 SocketResult::Error => {
958 backend_socket.read_error();
959 self.log_request_error(
960 metrics,
961 &format!(
962 "back socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
963 self.frontend_readiness,
964 self.backend_readiness,
965 ),
966 );
967 return SessionResult::Close;
968 }
969 SocketResult::WouldBlock | SocketResult::Closed => {
970 self.backend_readiness.event.remove(Ready::READABLE);
971 }
972 SocketResult::Continue => {}
973 }
974
975 trace!(
976 "{} ============== backend_readable_parse",
977 log_context!(self, Some(response_stream.parsing_phase))
978 );
979 kawa::h1::parse(response_stream, &mut self.context);
980 if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
983 incr!(names::http::BACKEND_PARSE_ERRORS);
984 warn!(
985 "{} Parsing response error in {:?}: {}",
986 log_context!(self, Some(response_stream.parsing_phase)),
987 marker,
988 match kind {
989 kawa::ParsingErrorKind::Consuming { index } => {
990 parser::view(
991 response_stream.storage.used(),
992 16,
993 &[
994 response_stream.storage.start,
995 response_stream.storage.head,
996 index as usize,
997 response_stream.storage.end,
998 ],
999 )
1000 }
1001 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
1002 }
1003 );
1004 if response_stream.consumed {
1005 return SessionResult::Close;
1006 } else {
1007 let (message, successfully_parsed, partially_parsed, invalid) =
1008 diagnostic_400_502(marker, kind, response_stream);
1009 self.set_answer(DefaultAnswer::Answer502 {
1010 message,
1011 phase: marker,
1012 successfully_parsed,
1013 partially_parsed,
1014 invalid,
1015 });
1016 return SessionResult::Continue;
1017 }
1018 }
1019
1020 if response_stream.is_main_phase() {
1021 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1022 }
1023 if response_stream.is_terminated() {
1024 metrics.backend_stop();
1025 self.backend_stop = Some(Instant::now());
1026 self.backend_readiness.interest.remove(Ready::READABLE);
1027 }
1028 SessionResult::Continue
1029 }
1030}
1031
1032impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
1033 fn log_endpoint(&self) -> EndpointRecord<'_> {
1034 EndpointRecord::Http {
1035 method: self.context.method.as_deref(),
1036 authority: self.context.authority.as_deref(),
1037 path: self.context.path.as_deref(),
1038 reason: self.context.reason.as_deref(),
1039 status: self.context.status,
1040 }
1041 }
1042
1043 fn response_parsing_phase(&self) -> Option<kawa::ParsingPhase> {
1047 match &self.response_stream {
1048 ResponseStream::BackendAnswer(inner) => Some(inner.parsing_phase),
1049 _ => None,
1050 }
1051 }
1052
1053 pub fn get_session_address(&self) -> Option<SocketAddr> {
1054 self.context
1055 .session_address
1056 .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
1057 }
1058
1059 pub fn get_backend_address(&self) -> Option<SocketAddr> {
1060 self.backend
1061 .as_ref()
1062 .map(|backend| backend.borrow().address)
1063 .or_else(|| {
1064 self.backend_socket
1065 .as_ref()
1066 .and_then(|backend| backend.peer_addr().ok())
1067 })
1068 }
1069
1070 fn protocol_string(&self) -> &'static str {
1072 match self.context.protocol {
1073 Protocol::HTTP => "HTTP",
1074 Protocol::HTTPS => match self.frontend_socket.protocol() {
1075 TransportProtocol::Ssl2 => "HTTPS-SSL2",
1076 TransportProtocol::Ssl3 => "HTTPS-SSL3",
1077 TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
1078 TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
1079 TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
1080 TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
1081 _ => unreachable!(),
1082 },
1083 _ => unreachable!(),
1084 }
1085 }
1086
1087 pub fn websocket_context(&self) -> WebSocketContext {
1089 WebSocketContext::Http {
1090 method: self.context.method.clone(),
1091 authority: self.context.authority.clone(),
1092 path: self.context.path.clone(),
1093 reason: self.context.reason.clone(),
1094 status: self.context.status,
1095 }
1096 }
1097
1098 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
1099 let listener = self.listener.borrow();
1100 let tags = self.context.authority.as_ref().and_then(|host| {
1101 let hostname = match host.split_once(':') {
1102 None => host,
1103 Some((hostname, _)) => hostname,
1104 };
1105 listener.get_tags(hostname)
1106 });
1107
1108 let context = self.context.log_context();
1109 metrics.register_end_of_session(&context);
1110
1111 log_access! {
1112 error,
1113 on_failure: { incr!(names::access_logs::UNSENT) },
1114 message,
1115 context,
1116 session_address: self.get_session_address(),
1117 backend_address: self.get_backend_address(),
1118 protocol: self.protocol_string(),
1119 endpoint: self.log_endpoint(),
1120 tags,
1121 client_rtt: socket_rtt(self.front_socket()),
1122 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
1123 service_time: metrics.service_time(),
1124 response_time: metrics.backend_response_time(),
1125 request_time: metrics.request_time(),
1126 start_time_ns: metrics.start_wall_ns(),
1127 bytes_in: metrics.bin,
1128 bytes_out: metrics.bout,
1129 user_agent: self.context.user_agent.as_deref(),
1130 x_request_id: self.context.x_request_id.as_deref(),
1131 tls_version: self.context.tls_version,
1132 tls_cipher: self.context.tls_cipher,
1133 tls_sni: self.context.tls_server_name.as_deref(),
1134 tls_alpn: self.context.tls_alpn,
1135 xff_chain: self.context.xff_chain.as_deref(),
1136 #[cfg(feature = "opentelemetry")]
1137 otel: self.context.otel.as_ref(),
1138 #[cfg(not(feature = "opentelemetry"))]
1139 otel: None,
1140 };
1141 }
1142
1143 pub fn log_request_success(&self, metrics: &SessionMetrics) {
1144 save_http_status_metric(self.context.status, self.context.log_context());
1145 self.log_request(metrics, false, None);
1146 }
1147
1148 pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
1149 self.log_request(metrics, false, self.context.access_log_message);
1154 }
1155 pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
1156 incr!(
1163 "http.errors",
1164 self.context.cluster_id.as_deref(),
1165 self.context.backend_id.as_deref()
1166 );
1167 error!(
1168 "{} Could not process request properly got: {}",
1169 log_context!(self),
1170 message
1171 );
1172 self.print_state(self.protocol_string());
1173 self.log_request(metrics, true, Some(message));
1174 }
1175
1176 pub fn set_answer(&mut self, answer: DefaultAnswer) {
1177 let status = u16::from(&answer);
1178 if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
1179 error!(
1180 "already set the default answer to {}, trying to set to {}",
1181 old_status, status
1182 );
1183 } else {
1184 match answer {
1185 DefaultAnswer::Answer301 { .. } => incr!(
1186 "http.301.redirection",
1187 self.context.cluster_id.as_deref(),
1188 self.context.backend_id.as_deref()
1189 ),
1190 DefaultAnswer::Answer302 { .. } => incr!(
1191 "http.302.redirection",
1192 self.context.cluster_id.as_deref(),
1193 self.context.backend_id.as_deref()
1194 ),
1195 DefaultAnswer::Answer308 { .. } => incr!(
1196 "http.308.redirection",
1197 self.context.cluster_id.as_deref(),
1198 self.context.backend_id.as_deref()
1199 ),
1200 DefaultAnswer::Answer400 { .. } => incr!(names::http::ERR_400),
1201 DefaultAnswer::Answer401 { .. } => incr!(
1202 "http.401.errors",
1203 self.context.cluster_id.as_deref(),
1204 self.context.backend_id.as_deref()
1205 ),
1206 DefaultAnswer::Answer404 { .. } => incr!(names::http::ERR_404),
1207 DefaultAnswer::Answer408 { .. } => incr!(
1208 "http.408.errors",
1209 self.context.cluster_id.as_deref(),
1210 self.context.backend_id.as_deref()
1211 ),
1212 DefaultAnswer::Answer413 { .. } => incr!(
1213 "http.413.errors",
1214 self.context.cluster_id.as_deref(),
1215 self.context.backend_id.as_deref()
1216 ),
1217 DefaultAnswer::Answer421 { .. } => incr!(
1218 "http.421.errors",
1219 self.context.cluster_id.as_deref(),
1220 self.context.backend_id.as_deref()
1221 ),
1222 DefaultAnswer::Answer429 { .. } => incr!(
1223 "connections.rejected_per_cluster_ip",
1224 self.context.cluster_id.as_deref(),
1225 self.context.backend_id.as_deref()
1226 ),
1227 DefaultAnswer::Answer502 { .. } => incr!(
1228 "http.502.errors",
1229 self.context.cluster_id.as_deref(),
1230 self.context.backend_id.as_deref()
1231 ),
1232 DefaultAnswer::Answer503 { .. } => incr!(
1233 "http.503.errors",
1234 self.context.cluster_id.as_deref(),
1235 self.context.backend_id.as_deref()
1236 ),
1237 DefaultAnswer::Answer504 { .. } => incr!(
1238 "http.504.errors",
1239 self.context.cluster_id.as_deref(),
1240 self.context.backend_id.as_deref()
1241 ),
1242 DefaultAnswer::Answer507 { .. } => incr!(
1243 "http.507.errors",
1244 self.context.cluster_id.as_deref(),
1245 self.context.backend_id.as_deref()
1246 ),
1247 };
1248 }
1249
1250 let (resolved_status, keep_alive, mut kawa) = self.answers.borrow().get(
1251 answer,
1252 self.context.id.to_string(),
1253 self.context.cluster_id.as_deref(),
1254 self.context.backend_id.as_deref(),
1255 self.get_route(),
1256 );
1257 debug_assert!(
1262 (100..=999).contains(&resolved_status),
1263 "default answer must resolve to a 3-digit HTTP status, got {resolved_status}"
1264 );
1265 kawa.prepare(&mut kawa::h1::BlockConverter);
1266 let status = resolved_status;
1269 self.context.status = Some(status);
1270 self.context.reason = None;
1271 if !keep_alive {
1275 self.context.keep_alive_frontend = false;
1276 }
1277 self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1278 self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1279 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1280 debug_assert!(
1285 matches!(self.response_stream, ResponseStream::DefaultAnswer(s, _) if s == status),
1286 "set_answer must leave the response stream as the queued DefaultAnswer"
1287 );
1288 debug_assert!(
1289 self.frontend_readiness.interest.is_writable()
1290 && !self.frontend_readiness.interest.is_readable(),
1291 "default answer must steer the frontend to WRITABLE only"
1292 );
1293 }
1294
1295 pub fn test_backend_socket(&self) -> bool {
1296 match self.backend_socket {
1297 Some(ref s) => {
1298 let mut tmp = [0u8; 1];
1299 let res = s.peek(&mut tmp[..]);
1300
1301 match res {
1302 Ok(0) => false,
1304 Ok(_) => true,
1305 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1306 }
1307 }
1308 None => false,
1309 }
1310 }
1311
1312 pub fn is_valid_backend_socket(&self) -> bool {
1313 match self.backend_stop.as_ref() {
1315 Some(stop_instant) => {
1316 let now = Instant::now();
1317 let dur = now - *stop_instant;
1318 if dur > Duration::from_secs(1) {
1319 return self.test_backend_socket();
1320 }
1321 }
1322 None => return self.test_backend_socket(),
1323 }
1324
1325 true
1326 }
1327
1328 pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1329 self.backend_socket = Some(socket);
1330 self.backend = backend;
1331 }
1332
1333 pub fn set_cluster_id(&mut self, cluster_id: String) {
1334 self.context.cluster_id = Some(cluster_id);
1335 }
1336
1337 pub fn set_backend_id(&mut self, backend_id: String) {
1338 self.context.backend_id = Some(backend_id);
1339 }
1340
1341 pub fn set_backend_token(&mut self, token: Token) {
1342 self.backend_token = Some(token);
1343 }
1344
1345 pub fn clear_backend_token(&mut self) {
1346 self.backend_token = None;
1347 }
1348
1349 pub fn set_backend_timeout(&mut self, dur: Duration) {
1350 if let Some(token) = self.backend_token.as_ref() {
1351 self.container_backend_timeout.set_duration(dur);
1352 self.container_backend_timeout.set(*token);
1353 }
1354 }
1355
1356 pub fn front_socket(&self) -> &TcpStream {
1357 self.frontend_socket.socket_ref()
1358 }
1359
1360 fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1364 self.container_backend_timeout.cancel();
1365 debug!(
1366 "{}\tPROXY [{}->{}] CLOSED BACKEND",
1367 log_context!(self),
1368 self.frontend_token.0,
1369 self.backend_token
1370 .map(|t| format!("{}", t.0))
1371 .unwrap_or_else(|| "-".to_string())
1372 );
1373
1374 let proxy = proxy.borrow();
1375 if let Some(socket) = &mut self.backend_socket.take() {
1376 if let Err(e) = proxy.deregister_socket(socket) {
1377 error!(
1378 "{} Error deregistering back socket({:?}): {:?}",
1379 log_context!(self),
1380 socket,
1381 e
1382 );
1383 }
1384 if let Err(e) = socket.shutdown(Shutdown::Both) {
1392 if e.kind() != ErrorKind::NotConnected {
1393 error!(
1394 "{} Error shutting down back socket({:?}): {:?}",
1395 log_context!(self),
1396 socket,
1397 e
1398 );
1399 }
1400 }
1401 }
1402
1403 if let Some(token) = self.backend_token.take() {
1404 proxy.remove_session(token);
1405
1406 if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1407 self.backend_readiness.event = Ready::EMPTY;
1408 }
1409
1410 if self.backend_connection_status == BackendConnectionStatus::Connected {
1411 gauge_add!(names::backend::CONNECTIONS, -1);
1412 gauge_add!(
1413 names::backend::CONNECTIONS_PER_BACKEND,
1414 -1,
1415 self.context.cluster_id.as_deref(),
1416 metrics.backend_id.as_deref()
1417 );
1418 }
1419
1420 self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1421
1422 if let Some(backend) = self.backend.take() {
1423 backend.borrow_mut().dec_connections();
1424 }
1425 }
1426 debug_assert!(
1431 self.backend_token.is_none(),
1432 "close_backend must release the backend token"
1433 );
1434 debug_assert_ne!(
1435 self.backend_connection_status,
1436 BackendConnectionStatus::Connected,
1437 "close_backend must leave the backend out of the Connected state"
1438 );
1439 }
1440
1441 fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1443 if self.connection_attempts >= CONN_RETRIES {
1444 incr!(
1445 "backend.connect.retries_exhausted",
1446 self.context.cluster_id.as_deref(),
1447 self.context.backend_id.as_deref()
1448 );
1449 warn!(
1450 "{} Max connection attempt reached ({})",
1451 log_context!(self),
1452 self.connection_attempts,
1453 );
1454
1455 self.set_answer(DefaultAnswer::Answer503 {
1456 message: format!(
1457 "Max connection attempt reached: {}",
1458 self.connection_attempts
1459 ),
1460 });
1461 return Err(BackendConnectionError::MaxConnectionRetries(None));
1462 }
1463 debug_assert!(
1467 self.connection_attempts < CONN_RETRIES,
1468 "check_circuit_breaker must only return Ok with retry budget remaining"
1469 );
1470 Ok(())
1471 }
1472
1473 fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1474 let is_valid_backend_socket = self.is_valid_backend_socket();
1475
1476 if !is_valid_backend_socket {
1477 return false;
1478 }
1479
1480 metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1482
1483 metrics.backend_start();
1484 metrics.backend_connected();
1485 if let Some(b) = self.backend.as_mut() {
1486 b.borrow_mut().active_requests += 1;
1487 }
1488 true
1489 }
1490
1491 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1493 let given_method = self
1494 .context
1495 .method
1496 .as_ref()
1497 .ok_or(RetrieveClusterError::NoMethod)?;
1498 let given_authority = self
1499 .context
1500 .authority
1501 .as_deref()
1502 .ok_or(RetrieveClusterError::NoHost)?;
1503 let given_path = self
1504 .context
1505 .path
1506 .as_deref()
1507 .ok_or(RetrieveClusterError::NoPath)?;
1508
1509 Ok((given_authority, given_path, given_method))
1510 }
1511
1512 pub fn get_route(&self) -> String {
1513 if let Some(method) = &self.context.method {
1514 if let Some(authority) = &self.context.authority {
1515 if let Some(path) = &self.context.path {
1516 return format!("{method} {authority}{path}");
1517 }
1518 return format!("{method} {authority}");
1519 }
1520 return format!("{method}");
1521 }
1522 String::new()
1523 }
1524
1525 fn cluster_id_from_request(
1526 &mut self,
1527 proxy: Rc<RefCell<dyn L7Proxy>>,
1528 ) -> Result<String, RetrieveClusterError> {
1529 let (host, uri, method) = match self.extract_route() {
1530 Ok(tuple) => tuple,
1531 Err(cluster_error) => {
1532 self.set_answer(DefaultAnswer::Answer400 {
1533 message: "Could not extract the route after connection started, this should not happen.".into(),
1534 phase: self.request_stream.parsing_phase.marker(),
1535 successfully_parsed: "null".into(),
1536 partially_parsed: "null".into(),
1537 invalid: "null".into(),
1538 });
1539 return Err(cluster_error);
1540 }
1541 };
1542
1543 let route_result = self
1544 .listener
1545 .borrow()
1546 .frontend_from_request(host, uri, method);
1547
1548 let route = match route_result {
1549 Ok(route) => route,
1550 Err(frontend_error) => {
1551 match &frontend_error {
1555 FrontendFromRequestError::HostParse { .. }
1556 | FrontendFromRequestError::InvalidCharsAfterHost(_) => {
1557 self.set_answer(DefaultAnswer::Answer400 {
1558 message: frontend_error.to_string(),
1559 phase: self.request_stream.parsing_phase.marker(),
1560 successfully_parsed: "null".into(),
1561 partially_parsed: "null".into(),
1562 invalid: "null".into(),
1563 });
1564 }
1565 FrontendFromRequestError::NoClusterFound(_) => {
1566 self.set_answer(DefaultAnswer::Answer404 {});
1567 }
1568 }
1569 return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1570 }
1571 };
1572
1573 let cluster_id = match route.cluster_id {
1579 Some(cluster_id) => cluster_id,
1580 None => {
1581 self.set_answer(DefaultAnswer::Answer401 {
1582 www_authenticate: None,
1583 });
1584 return Err(RetrieveClusterError::UnauthorizedRoute);
1585 }
1586 };
1587
1588 let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1589 && proxy
1590 .borrow()
1591 .clusters()
1592 .get(&cluster_id)
1593 .map(|cluster| cluster.https_redirect)
1594 .unwrap_or(false);
1595
1596 if frontend_should_redirect_https {
1597 self.set_answer(DefaultAnswer::Answer301 {
1598 location: format!("https://{host}{uri}"),
1599 });
1600 return Err(RetrieveClusterError::UnauthorizedRoute);
1601 }
1602
1603 Ok(cluster_id)
1604 }
1605
1606 pub fn backend_from_request(
1607 &mut self,
1608 cluster_id: &str,
1609 frontend_should_stick: bool,
1610 proxy: Rc<RefCell<dyn L7Proxy>>,
1611 metrics: &mut SessionMetrics,
1612 ) -> Result<TcpStream, BackendConnectionError> {
1613 let (backend, conn) = self
1614 .get_backend_for_sticky_session(
1615 frontend_should_stick,
1616 self.context.sticky_session_found.as_deref(),
1617 cluster_id,
1618 proxy,
1619 )
1620 .map_err(|backend_error| {
1621 self.set_answer(DefaultAnswer::Answer503 {
1624 message: backend_error.to_string(),
1625 });
1626 BackendConnectionError::Backend(backend_error)
1627 })?;
1628
1629 if frontend_should_stick {
1630 self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1632
1633 self.context.sticky_session = Some(
1634 backend
1635 .borrow()
1636 .sticky_id
1637 .clone()
1638 .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1639 );
1640 }
1641
1642 metrics.backend_id = Some(backend.borrow().backend_id.clone());
1643 metrics.backend_start();
1644 self.set_backend_id(backend.borrow().backend_id.clone());
1645
1646 self.backend = Some(backend);
1647 Ok(conn)
1648 }
1649
1650 fn get_backend_for_sticky_session(
1651 &self,
1652 frontend_should_stick: bool,
1653 sticky_session: Option<&str>,
1654 cluster_id: &str,
1655 proxy: Rc<RefCell<dyn L7Proxy>>,
1656 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1657 match (frontend_should_stick, sticky_session) {
1658 (true, Some(sticky_session)) => proxy
1659 .borrow()
1660 .backends()
1661 .borrow_mut()
1662 .backend_from_sticky_session(cluster_id, sticky_session),
1663 _ => proxy
1664 .borrow()
1665 .backends()
1666 .borrow_mut()
1667 .backend_from_cluster_id(cluster_id),
1668 }
1669 }
1670
1671 fn connect_to_backend(
1672 &mut self,
1673 session_rc: Rc<RefCell<dyn ProxySession>>,
1674 proxy: Rc<RefCell<dyn L7Proxy>>,
1675 metrics: &mut SessionMetrics,
1676 ) -> Result<BackendConnectAction, BackendConnectionError> {
1677 let old_cluster_id = self.context.cluster_id.clone();
1678 let old_backend_token = self.backend_token;
1679
1680 self.check_circuit_breaker()?;
1681
1682 let cluster_id = self
1683 .cluster_id_from_request(proxy.clone())
1684 .map_err(BackendConnectionError::RetrieveClusterError)?;
1685
1686 trace!(
1687 "{} Connect_to_backend: {:?} {:?} {:?}",
1688 log_context!(self),
1689 self.context.cluster_id,
1690 cluster_id,
1691 self.backend_connection_status
1692 );
1693 if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1695 && self.backend_connection_status == BackendConnectionStatus::Connected
1696 {
1697 let has_backend = self
1698 .backend
1699 .as_ref()
1700 .map(|backend| {
1701 let backend = backend.borrow();
1702 proxy
1703 .borrow()
1704 .backends()
1705 .borrow()
1706 .has_backend(&cluster_id, &backend)
1707 })
1708 .unwrap_or(false);
1709
1710 if has_backend && self.check_backend_connection(metrics) {
1711 return Ok(BackendConnectAction::Reuse);
1712 } else if self.backend_token.take().is_some() {
1713 self.close_backend(proxy.clone(), metrics);
1714 }
1715 }
1716
1717 if old_cluster_id.is_some()
1719 && old_cluster_id.as_ref() != Some(&cluster_id)
1720 && self.backend_token.take().is_some()
1721 {
1722 self.close_backend(proxy.clone(), metrics);
1723 }
1724
1725 self.context.cluster_id = Some(cluster_id.clone());
1726
1727 let frontend_should_stick = proxy
1728 .borrow()
1729 .clusters()
1730 .get(&cluster_id)
1731 .map(|cluster| cluster.sticky_session)
1732 .unwrap_or(false);
1733
1734 let mut socket =
1735 self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1736 if let Err(e) = socket.set_nodelay(true) {
1737 error!(
1738 "{} Error setting nodelay on backend socket({:?}): {:?}",
1739 log_context!(self),
1740 socket,
1741 e
1742 );
1743 }
1744
1745 self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1746 self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1747
1748 match old_backend_token {
1749 Some(backend_token) => {
1750 self.set_backend_token(backend_token);
1751 if let Err(e) = proxy.borrow().register_socket(
1752 &mut socket,
1753 backend_token,
1754 Interest::READABLE | Interest::WRITABLE,
1755 ) {
1756 error!(
1757 "{} Error registering back socket({:?}): {:?}",
1758 log_context!(self),
1759 socket,
1760 e
1761 );
1762 }
1763
1764 self.set_backend_socket(socket, self.backend.clone());
1765 self.set_backend_timeout(self.configured_connect_timeout);
1766
1767 Ok(BackendConnectAction::Replace)
1768 }
1769 None => {
1770 let backend_token = proxy.borrow().add_session(session_rc);
1771
1772 if let Err(e) = proxy.borrow().register_socket(
1773 &mut socket,
1774 backend_token,
1775 Interest::READABLE | Interest::WRITABLE,
1776 ) {
1777 error!(
1778 "{} Error registering back socket({:?}): {:?}",
1779 log_context!(self),
1780 socket,
1781 e
1782 );
1783 }
1784
1785 self.set_backend_socket(socket, self.backend.clone());
1786 self.set_backend_token(backend_token);
1787 self.set_backend_timeout(self.configured_connect_timeout);
1788
1789 Ok(BackendConnectAction::New)
1790 }
1791 }
1792 }
1793
1794 fn set_backend_connected(
1795 &mut self,
1796 connected: BackendConnectionStatus,
1797 metrics: &mut SessionMetrics,
1798 ) {
1799 let last = self.backend_connection_status;
1800 self.backend_connection_status = connected;
1801
1802 if connected == BackendConnectionStatus::Connected {
1803 debug_assert_ne!(
1810 last,
1811 BackendConnectionStatus::Connected,
1812 "set_backend_connected(Connected) must not run twice without a close in between"
1813 );
1814 gauge_add!(names::backend::CONNECTIONS, 1);
1815 gauge_add!(
1816 names::backend::CONNECTIONS_PER_BACKEND,
1817 1,
1818 self.context.cluster_id.as_deref(),
1819 metrics.backend_id.as_deref()
1820 );
1821
1822 self.set_backend_timeout(self.configured_backend_timeout);
1825 if !self.backend_readiness.interest.is_readable() {
1828 self.container_backend_timeout.cancel();
1829 }
1830
1831 if let Some(backend) = &self.backend {
1832 let mut backend = backend.borrow_mut();
1833
1834 if backend.retry_policy.is_down() {
1835 incr!(
1836 "backend.up",
1837 self.context.cluster_id.as_deref(),
1838 metrics.backend_id.as_deref()
1839 );
1840 gauge!(
1841 names::backend::AVAILABLE,
1842 1,
1843 self.context.cluster_id.as_deref(),
1844 metrics.backend_id.as_deref()
1845 );
1846
1847 info!(
1848 "{} backend server {} at {} is up",
1849 log_context!(self),
1850 backend.backend_id,
1851 backend.address
1852 );
1853
1854 push_event(Event {
1855 kind: EventKind::BackendUp as i32,
1856 backend_id: Some(backend.backend_id.to_owned()),
1857 address: Some(backend.address.into()),
1858 cluster_id: None,
1859 metric_detail: None,
1860 });
1861 }
1862
1863 if let BackendConnectionStatus::Connecting(start) = last {
1864 backend.set_connection_time(Instant::now() - start);
1865 }
1866
1867 backend.failures = 0;
1869 backend.active_requests += 1;
1870 backend.retry_policy.succeed();
1871 }
1872 }
1873 }
1874
1875 fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1876 if let Some(backend) = &self.backend {
1877 let mut backend = backend.borrow_mut();
1878 backend.failures += 1;
1879
1880 let already_unavailable = backend.retry_policy.is_down();
1881 backend.retry_policy.fail();
1882 incr!(
1883 "backend.connections.error",
1884 self.context.cluster_id.as_deref(),
1885 metrics.backend_id.as_deref()
1886 );
1887
1888 if !already_unavailable && backend.retry_policy.is_down() {
1889 error!(
1890 "{} backend server {} at {} is down",
1891 log_context!(self),
1892 backend.backend_id,
1893 backend.address
1894 );
1895
1896 incr!(
1897 "backend.down",
1898 self.context.cluster_id.as_deref(),
1899 metrics.backend_id.as_deref()
1900 );
1901 gauge!(
1902 names::backend::AVAILABLE,
1903 0,
1904 self.context.cluster_id.as_deref(),
1905 metrics.backend_id.as_deref()
1906 );
1907
1908 push_event(Event {
1909 kind: EventKind::BackendDown as i32,
1910 backend_id: Some(backend.backend_id.to_owned()),
1911 address: Some(backend.address.into()),
1912 cluster_id: None,
1913 metric_detail: None,
1914 });
1915 }
1916 }
1917 }
1918
1919 pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1920 let response_stream = match &mut self.response_stream {
1921 ResponseStream::BackendAnswer(response_stream) => response_stream,
1922 _ => return StateResult::CloseBackend,
1923 };
1924
1925 if self.backend_readiness.event.is_readable()
1927 && self.backend_readiness.interest.is_readable()
1928 {
1929 return StateResult::Continue;
1930 }
1931
1932 if response_stream.is_terminated() {
1934 return StateResult::CloseBackend;
1935 }
1936 debug_assert!(
1942 !response_stream.is_terminated(),
1943 "backend_hup reached its decision match with an already-terminated response"
1944 );
1945 match (
1946 self.request_stream.is_initial(),
1947 response_stream.is_initial(),
1948 ) {
1949 (_, false) => {
1952 error!(
1953 "{} Backend closed before session is over",
1954 log_context!(self, Some(response_stream.parsing_phase)),
1955 );
1956
1957 trace!(
1958 "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1959 log_context!(self, Some(response_stream.parsing_phase))
1960 );
1961
1962 response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1963
1964 debug_assert!(
1967 response_stream.is_terminated(),
1968 "forced backend-hup termination must mark the response terminated"
1969 );
1970
1971 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1974 debug_assert!(
1977 self.frontend_readiness.interest.is_writable(),
1978 "backend-hup termination must arm the frontend for the final flush"
1979 );
1980 StateResult::Continue
1981 }
1982 (true, true) => {
1984 trace!(
1985 "{} Backend hanged up in between requests",
1986 log_context!(self)
1987 );
1988 StateResult::CloseBackend
1989 }
1990 (false, true) => {
1992 error!(
1993 "{} Frontend transmitted data but the back closed",
1994 log_context!(self)
1995 );
1996
1997 self.set_answer(DefaultAnswer::Answer503 {
1998 message: "Backend closed after consuming part of the request".into(),
1999 });
2000
2001 self.backend_readiness.interest = Ready::EMPTY;
2002 StateResult::Continue
2003 }
2004 }
2005 }
2006
2007 fn ready_inner(
2019 &mut self,
2020 session: Rc<RefCell<dyn crate::ProxySession>>,
2021 proxy: Rc<RefCell<dyn L7Proxy>>,
2022 metrics: &mut SessionMetrics,
2023 ) -> SessionResult {
2024 let mut counter = 0;
2025
2026 if self.backend_connection_status.is_connecting()
2027 && !self.backend_readiness.event.is_empty()
2028 {
2029 if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
2030 warn!(
2032 "{} Error connecting to backend, trying again, attempt {}",
2033 log_context!(self),
2034 self.connection_attempts
2035 );
2036
2037 let attempts_before = self.connection_attempts;
2044 self.connection_attempts += 1;
2045 debug_assert_eq!(
2046 self.connection_attempts,
2047 attempts_before + 1,
2048 "a backend retry must bump connection_attempts by exactly one"
2049 );
2050 debug_assert!(
2051 self.connection_attempts <= CONN_RETRIES,
2052 "connection_attempts ({}) must not exceed the breaker budget on retry",
2053 self.connection_attempts
2054 );
2055 self.fail_backend_connection(metrics);
2056
2057 self.backend_connection_status =
2058 BackendConnectionStatus::Connecting(Instant::now());
2059
2060 self.close_backend(proxy.clone(), metrics);
2062
2063 let connection_result =
2064 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
2065 if let Err(err) = &connection_result {
2066 match err {
2067 BackendConnectionError::MaxConnectionRetries(_) => trace!(
2070 "{} Error connecting to backend: {}",
2071 log_context!(self),
2072 err
2073 ),
2074 _ => warn!(
2075 "{} Error connecting to backend: {}",
2076 log_context!(self),
2077 err
2078 ),
2079 }
2080 }
2081
2082 if let Some(session_result) = handle_connection_result(connection_result) {
2083 return session_result;
2084 }
2085 } else {
2086 metrics.backend_connected();
2087 self.connection_attempts = 0;
2088 self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
2089 self.backend_readiness.interest.insert(Ready::READABLE);
2092 }
2093 }
2094
2095 if self.frontend_readiness.event.is_hup() {
2096 if !self.request_stream.is_initial() {
2097 self.log_request_error(metrics, "Client disconnected abruptly");
2098 }
2099 return SessionResult::Close;
2100 }
2101
2102 while counter < MAX_LOOP_ITERATIONS {
2103 let frontend_interest = self.frontend_readiness.filter_interest();
2104 let backend_interest = self.backend_readiness.filter_interest();
2105
2106 trace!(
2107 "{} Frontend interest({:?}) and backend interest({:?})",
2108 log_context!(self),
2109 frontend_interest,
2110 backend_interest,
2111 );
2112
2113 if frontend_interest.is_empty() && backend_interest.is_empty() {
2114 break;
2115 }
2116
2117 if self.backend_readiness.event.is_hup()
2118 && self.frontend_readiness.interest.is_writable()
2119 && !self.frontend_readiness.event.is_writable()
2120 {
2121 break;
2122 }
2123
2124 if frontend_interest.is_readable() {
2125 let state_result = self.readable(metrics);
2126 trace!(
2127 "{} frontend_readable: {:?}",
2128 log_context!(self),
2129 state_result
2130 );
2131
2132 match state_result {
2133 StateResult::Continue => {}
2134 StateResult::ConnectBackend => {
2135 let connection_result =
2136 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
2137 if let Err(err) = &connection_result {
2138 match err {
2139 BackendConnectionError::MaxConnectionRetries(_) => trace!(
2142 "{} Error connecting to backend: {}",
2143 log_context!(self),
2144 err
2145 ),
2146 _ => warn!(
2147 "{} Error connecting to backend: {}",
2148 log_context!(self),
2149 err
2150 ),
2151 }
2152 }
2153
2154 if let Some(session_result) = handle_connection_result(connection_result) {
2155 return session_result;
2156 }
2157 }
2158 StateResult::CloseBackend => unreachable!(),
2159 StateResult::CloseSession => return SessionResult::Close,
2160 StateResult::Upgrade => return SessionResult::Upgrade,
2161 }
2162 }
2163
2164 if backend_interest.is_writable() {
2165 let session_result = self.backend_writable(metrics);
2166 trace!(
2167 "{} backend_writable: {:?}",
2168 log_context!(self),
2169 session_result
2170 );
2171 if session_result != SessionResult::Continue {
2172 return session_result;
2173 }
2174 }
2175
2176 if backend_interest.is_readable() {
2177 let session_result = self.backend_readable(metrics);
2178 trace!(
2179 "{} backend_readable: {:?}",
2180 log_context!(self),
2181 session_result
2182 );
2183 if session_result != SessionResult::Continue {
2184 return session_result;
2185 }
2186 }
2187
2188 if frontend_interest.is_writable() {
2189 let state_result = self.writable(metrics);
2190 trace!(
2191 "{} frontend_writable: {:?}",
2192 log_context!(self),
2193 state_result
2194 );
2195 match state_result {
2196 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2197 StateResult::CloseSession => return SessionResult::Close,
2198 StateResult::Upgrade => return SessionResult::Upgrade,
2199 StateResult::Continue => {}
2200 StateResult::ConnectBackend => unreachable!(),
2201 }
2202 }
2203
2204 if frontend_interest.is_error() {
2205 error!(
2206 "{} frontend socket error, disconnecting",
2207 log_context!(self)
2208 );
2209
2210 return SessionResult::Close;
2211 }
2212
2213 if backend_interest.is_hup() || backend_interest.is_error() {
2214 let state_result = self.backend_hup(metrics);
2215
2216 trace!("{} backend_hup: {:?}", log_context!(self), state_result);
2217 match state_result {
2218 StateResult::Continue => {}
2219 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2220 StateResult::CloseSession => return SessionResult::Close,
2221 StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
2222 }
2223 }
2224
2225 counter += 1;
2226 }
2227
2228 if counter >= MAX_LOOP_ITERATIONS {
2229 error!(
2230 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
2231 log_context!(self),
2232 MAX_LOOP_ITERATIONS
2233 );
2234
2235 incr!(names::http::INFINITE_LOOP_ERROR);
2236 self.print_state(self.protocol_string());
2237
2238 return SessionResult::Close;
2239 }
2240
2241 debug_assert!(
2245 counter < MAX_LOOP_ITERATIONS,
2246 "ready_inner must only Continue when the loop settled within the iteration budget"
2247 );
2248 SessionResult::Continue
2249 }
2250
2251 pub fn timeout_status(&self) -> TimeoutStatus {
2252 if self.request_stream.is_main_phase() {
2253 match &self.response_stream {
2254 ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
2255 TimeoutStatus::WaitingForResponse
2256 }
2257 _ => TimeoutStatus::Response,
2258 }
2259 } else if self.keepalive_count > 0 {
2260 TimeoutStatus::WaitingForNewRequest
2261 } else {
2262 TimeoutStatus::Request
2263 }
2264 }
2265
2266 #[cfg(debug_assertions)]
2294 fn check_invariants(&self) {
2295 debug_assert!(
2296 self.connection_attempts <= CONN_RETRIES,
2297 "connection_attempts ({}) must stay within the circuit-breaker budget ({CONN_RETRIES})",
2298 self.connection_attempts
2299 );
2300 if self.backend_connection_status == BackendConnectionStatus::Connected {
2301 debug_assert!(
2302 self.backend_socket.is_some(),
2303 "a Connected backend must own a live socket (teardown-ordering bug)"
2304 );
2305 }
2306 if let ResponseStream::DefaultAnswer(status, _) = &self.response_stream {
2307 debug_assert!(
2308 !self.frontend_readiness.interest.is_readable(),
2309 "frontend must not be asked to read more request bytes while a default answer is queued"
2310 );
2311 debug_assert!(
2312 !self.backend_readiness.interest.is_readable()
2313 && !self.backend_readiness.interest.is_writable(),
2314 "backend must be idle (no read/write interest) while a default answer is queued"
2315 );
2316 debug_assert_eq!(
2317 self.context.status,
2318 Some(*status),
2319 "context.status must match the queued default-answer status"
2320 );
2321 }
2322 }
2323}
2324
2325impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
2326 fn ready(
2327 &mut self,
2328 session: Rc<RefCell<dyn crate::ProxySession>>,
2329 proxy: Rc<RefCell<dyn L7Proxy>>,
2330 metrics: &mut SessionMetrics,
2331 ) -> SessionResult {
2332 let session_result = self.ready_inner(session, proxy, metrics);
2333 if session_result == SessionResult::Upgrade {
2334 let response_storage = match &mut self.response_stream {
2335 ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
2336 _ => return SessionResult::Close,
2337 };
2338
2339 self.request_stream.storage.buffer.sync(
2342 self.request_stream.storage.end,
2343 self.request_stream.storage.head,
2344 );
2345 response_storage
2346 .buffer
2347 .sync(response_storage.end, response_storage.head);
2348 }
2349 #[cfg(debug_assertions)]
2352 self.check_invariants();
2353 session_result
2354 }
2355
2356 fn update_readiness(&mut self, token: Token, events: Ready) {
2357 if self.frontend_token == token {
2358 self.frontend_readiness.event |= events;
2359 } else if self.backend_token == Some(token) {
2360 self.backend_readiness.event |= events;
2361 }
2362 }
2363
2364 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
2365 self.close_backend(proxy, metrics);
2366 self.frontend_socket.socket_close();
2367 let _ = self.frontend_socket.socket_write_vectored(&[]);
2368
2369 if !self.request_stream.is_initial() {
2371 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
2372
2373 if let Some(b) = self.backend.as_mut() {
2374 let mut backend = b.borrow_mut();
2375 backend.active_requests = backend.active_requests.saturating_sub(1);
2376 }
2377 }
2378 }
2379
2380 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
2381 if self.frontend_token == token {
2383 self.container_frontend_timeout.triggered();
2384 return match self.timeout_status() {
2385 TimeoutStatus::Request => {
2387 self.context.access_log_message = Some("client_timeout");
2388 self.set_answer(DefaultAnswer::Answer408 {
2389 duration: self.container_frontend_timeout.to_string(),
2390 });
2391 self.writable(metrics)
2392 }
2393 TimeoutStatus::WaitingForResponse => {
2395 self.context.access_log_message = Some("client_timeout_during_response");
2398 self.set_answer(DefaultAnswer::Answer504 {
2399 duration: self.container_backend_timeout.to_string(),
2400 });
2401 self.writable(metrics)
2402 }
2403 TimeoutStatus::Response => StateResult::Continue,
2406 TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
2408 };
2409 }
2410
2411 if self.backend_token == Some(token) {
2412 self.container_backend_timeout.triggered();
2414 return match self.timeout_status() {
2415 TimeoutStatus::Request => {
2416 error!(
2417 "{} got backend timeout while waiting for a request, this should not happen",
2418 log_context!(self)
2419 );
2420 self.context.access_log_message = Some("backend_timeout");
2425 self.set_answer(DefaultAnswer::Answer504 {
2426 duration: self.container_backend_timeout.to_string(),
2427 });
2428 self.writable(metrics)
2429 }
2430 TimeoutStatus::WaitingForResponse => {
2431 self.context.access_log_message = Some("backend_timeout");
2432 self.set_answer(DefaultAnswer::Answer504 {
2433 duration: self.container_backend_timeout.to_string(),
2434 });
2435 self.writable(metrics)
2436 }
2437 TimeoutStatus::Response => {
2438 error!(
2439 "backend {:?} timeout while receiving response (cluster {:?})",
2440 self.context.backend_id, self.context.cluster_id
2441 );
2442 self.context.access_log_message = Some("backend_response_timeout");
2443 StateResult::CloseSession
2444 }
2445 TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
2447 };
2448 }
2449
2450 error!("{} Got timeout for an invalid token", log_context!(self));
2451 StateResult::CloseSession
2452 }
2453
2454 fn cancel_timeouts(&mut self) {
2455 self.container_backend_timeout.cancel();
2456 self.container_frontend_timeout.cancel();
2457 }
2458
2459 fn print_state(&self, context: &str) {
2460 error!(
2461 "\
2462{} {} Session(Kawa)
2463\tFrontend:
2464\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2465\tBackend:
2466\t\ttoken: {:?}\treadiness: {:?}",
2467 log_context!(self),
2468 context,
2469 self.frontend_token,
2470 self.frontend_readiness,
2471 self.request_stream.parsing_phase,
2472 self.backend_token,
2473 self.backend_readiness,
2474 );
2476 }
2477
2478 fn shutting_down(&mut self) -> SessionIsToBeClosed {
2479 if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2480 {
2482 true
2483 } else {
2484 self.context.closing = true;
2485 false
2486 }
2487 }
2488}
2489
2490fn handle_connection_result(
2491 connection_result: Result<BackendConnectAction, BackendConnectionError>,
2492) -> Option<SessionResult> {
2493 match connection_result {
2494 Ok(BackendConnectAction::Reuse) => None,
2496 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2497 Some(SessionResult::Continue)
2499 }
2500 Err(_) => {
2501 None
2509 }
2510 }
2511}
2512
2513fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2525 if let Some(status) = status {
2526 debug_assert!(
2532 (100..=999).contains(&status),
2533 "save_http_status_metric got a non-3-digit status: {status}"
2534 );
2535 match status {
2536 100..=199 => {
2537 incr!(
2538 names::http::STATUS_1XX,
2539 context.cluster_id,
2540 context.backend_id
2541 );
2542 }
2543 200..=299 => {
2544 incr!(
2545 names::http::STATUS_2XX,
2546 context.cluster_id,
2547 context.backend_id
2548 );
2549 }
2550 300..=399 => {
2551 incr!(
2552 names::http::STATUS_3XX,
2553 context.cluster_id,
2554 context.backend_id
2555 );
2556 }
2557 400..=499 => {
2558 incr!(
2559 names::http::STATUS_4XX,
2560 context.cluster_id,
2561 context.backend_id
2562 );
2563 }
2564 500..=599 => {
2565 incr!(
2566 names::http::STATUS_5XX,
2567 context.cluster_id,
2568 context.backend_id
2569 );
2570 }
2571 _ => {
2572 incr!(names::http::STATUS_OTHER);
2574 }
2575 }
2576
2577 if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
2578 incr!(per_code, context.cluster_id, context.backend_id);
2579 }
2580 }
2581}