1pub mod answers;
10pub mod diagnostics;
11pub mod editor;
12pub mod parser;
13
14use std::{
15 cell::RefCell,
16 io::ErrorKind,
17 net::{Shutdown, SocketAddr},
18 rc::{Rc, Weak},
19 time::{Duration, Instant},
20};
21
22use mio::{Interest, Token, net::TcpStream};
23use rusty_ulid::Ulid;
24use sozu_command::{
25 config::MAX_LOOP_ITERATIONS,
26 logging::EndpointRecord,
27 proto::command::{Event, EventKind, ListenerType},
28};
29
30use crate::metrics::names;
32use crate::{
33 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
34 FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession,
35 Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult,
36 StateResult,
37 backends::{Backend, BackendError},
38 pool::{Checkout, Pool},
39 protocol::{
40 SessionState,
41 http::{
42 answers::DefaultAnswerStream,
43 diagnostics::{diagnostic_400_502, diagnostic_413_507},
44 editor::HttpContext,
45 parser::Method,
46 },
47 pipe::WebSocketContext,
48 },
49 retry::RetryPolicy,
50 server::{CONN_RETRIES, push_event},
51 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
52 sozu_command::{
53 logging::{LogContext, ansi_palette},
54 ready::Ready,
55 },
56 timer::TimeoutContainer,
57};
58
59macro_rules! log_context {
74 ($self:expr) => {
75 log_context!($self, $self.response_parsing_phase())
76 };
77 ($self:expr, $response_phase:expr) => {{
78 let (open, reset, grey, gray, white) = ansi_palette();
79 format!(
80 "{gray}{ctx}{reset}\t{open}KAWA-H1{reset}\t{grey}Session{reset}({gray}public{reset}={white}{public}{reset}, {gray}session{reset}={white}{session}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}request_parsing_phase{reset}={white}{request_parsing_phase:?}{reset}, {gray}response_parsing_phase{reset}={white}{response_parsing_phase:?}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
81 open = open,
82 reset = reset,
83 grey = grey,
84 gray = gray,
85 white = white,
86 ctx = $self.context.log_context(),
87 public = $self.context.public_address.to_string(),
88 session = $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
89 frontend = $self.frontend_token.0,
90 request_parsing_phase = $self.request_stream.parsing_phase,
91 response_parsing_phase = $response_phase,
92 frontend_readiness = $self.frontend_readiness,
93 backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
94 backend_readiness = $self.backend_readiness,
95 )
96 }};
97}
98
99type GenericHttpStream = kawa::Kawa<Checkout>;
101
102impl kawa::AsBuffer for Checkout {
103 fn as_buffer(&self) -> &[u8] {
104 self.inner.extra()
105 }
106 fn as_mut_buffer(&mut self) -> &mut [u8] {
107 self.inner.extra_mut()
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum DefaultAnswer {
113 Answer301 {
114 location: String,
115 },
116 Answer302 {
119 location: String,
120 },
121 Answer308 {
125 location: String,
126 },
127 Answer400 {
128 message: String,
129 phase: kawa::ParsingPhaseMarker,
130 successfully_parsed: String,
131 partially_parsed: String,
132 invalid: String,
133 },
134 Answer401 {
135 www_authenticate: Option<String>,
139 },
140 Answer404 {},
141 Answer408 {
142 duration: String,
143 },
144 Answer413 {
145 message: String,
146 phase: kawa::ParsingPhaseMarker,
147 capacity: usize,
148 },
149 Answer421 {},
154 Answer429 {
160 retry_after: Option<u32>,
161 },
162 Answer502 {
163 message: String,
164 phase: kawa::ParsingPhaseMarker,
165 successfully_parsed: String,
166 partially_parsed: String,
167 invalid: String,
168 },
169 Answer503 {
170 message: String,
171 },
172 Answer504 {
173 duration: String,
174 },
175 Answer507 {
176 phase: kawa::ParsingPhaseMarker,
177 message: String,
178 capacity: usize,
179 },
180}
181
182impl From<&DefaultAnswer> for u16 {
183 fn from(answer: &DefaultAnswer) -> u16 {
184 match answer {
185 DefaultAnswer::Answer301 { .. } => 301,
186 DefaultAnswer::Answer302 { .. } => 302,
187 DefaultAnswer::Answer308 { .. } => 308,
188 DefaultAnswer::Answer400 { .. } => 400,
189 DefaultAnswer::Answer401 { .. } => 401,
190 DefaultAnswer::Answer404 { .. } => 404,
191 DefaultAnswer::Answer408 { .. } => 408,
192 DefaultAnswer::Answer413 { .. } => 413,
193 DefaultAnswer::Answer421 { .. } => 421,
194 DefaultAnswer::Answer429 { .. } => 429,
195 DefaultAnswer::Answer502 { .. } => 502,
196 DefaultAnswer::Answer503 { .. } => 503,
197 DefaultAnswer::Answer504 { .. } => 504,
198 DefaultAnswer::Answer507 { .. } => 507,
199 }
200 }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum TimeoutStatus {
205 Request,
206 Response,
207 WaitingForNewRequest,
208 WaitingForResponse,
209}
210
211pub enum ResponseStream {
212 BackendAnswer(GenericHttpStream),
213 DefaultAnswer(u16, DefaultAnswerStream),
214}
215
216pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
218 answers: Rc<RefCell<answers::HttpAnswers>>,
219 pub backend: Option<Rc<RefCell<Backend>>>,
220 backend_connection_status: BackendConnectionStatus,
221 pub backend_readiness: Readiness,
222 pub backend_socket: Option<TcpStream>,
223 backend_stop: Option<Instant>,
224 pub backend_token: Option<Token>,
225 pub container_backend_timeout: TimeoutContainer,
226 pub container_frontend_timeout: TimeoutContainer,
227 configured_backend_timeout: Duration,
228 configured_connect_timeout: Duration,
229 configured_frontend_timeout: Duration,
230 connection_attempts: u8,
232 pub frontend_readiness: Readiness,
233 pub frontend_socket: Front,
234 frontend_token: Token,
235 keepalive_count: usize,
236 listener: Rc<RefCell<L>>,
237 pub request_stream: GenericHttpStream,
238 pub response_stream: ResponseStream,
239 pub context: HttpContext,
244}
245
246impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
247 #[allow(clippy::too_many_arguments)]
256 pub fn new(
257 answers: Rc<RefCell<answers::HttpAnswers>>,
258 configured_backend_timeout: Duration,
259 configured_connect_timeout: Duration,
260 configured_frontend_timeout: Duration,
261 container_frontend_timeout: TimeoutContainer,
262 frontend_socket: Front,
263 frontend_token: Token,
264 listener: Rc<RefCell<L>>,
265 pool: Weak<RefCell<Pool>>,
266 protocol: Protocol,
267 public_address: SocketAddr,
268 session_id: Ulid,
269 request_id: Ulid,
270 session_address: Option<SocketAddr>,
271 sticky_name: String,
272 ) -> Result<Http<Front, L>, AcceptError> {
273 let (front_buffer, back_buffer) = match pool.upgrade() {
274 Some(pool) => {
275 let mut pool = pool.borrow_mut();
276 match (pool.checkout(), pool.checkout()) {
277 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
278 _ => return Err(AcceptError::BufferCapacityReached),
279 }
280 }
281 None => return Err(AcceptError::BufferCapacityReached),
282 };
283 let sozu_id_header = listener.borrow().get_sozu_id_header().to_string();
284 let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
285 let send_x_real_ip = listener.borrow().get_send_x_real_ip();
286 Ok(Http {
287 answers,
288 backend_connection_status: BackendConnectionStatus::NotConnected,
289 backend_readiness: Readiness::new(),
290 backend_socket: None,
291 backend_stop: None,
292 backend_token: None,
293 backend: None,
294 configured_backend_timeout,
295 configured_connect_timeout,
296 configured_frontend_timeout,
297 connection_attempts: 0,
298 container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
299 container_frontend_timeout,
300 frontend_readiness: Readiness {
301 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
302 event: Ready::EMPTY,
303 },
304 frontend_socket,
305 frontend_token,
306 keepalive_count: 0,
307 listener,
308 request_stream: GenericHttpStream::new(
309 kawa::Kind::Request,
310 kawa::Buffer::new(front_buffer),
311 ),
312 response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
313 kawa::Kind::Response,
314 kawa::Buffer::new(back_buffer),
315 )),
316 context: HttpContext::new(
317 session_id,
318 request_id,
319 protocol,
320 public_address,
321 session_address,
322 sticky_name,
323 sozu_id_header,
324 elide_x_real_ip,
325 send_x_real_ip,
326 ),
327 })
328 }
329
330 pub fn reset(&mut self) {
332 trace!("{} ============== reset", log_context!(self));
333 let response_stream = match &mut self.response_stream {
334 ResponseStream::BackendAnswer(response_stream) => response_stream,
335 _ => return,
336 };
337
338 self.context.id = Ulid::generate();
339 self.context.reset();
340
341 self.request_stream.clear();
342 response_stream.clear();
343 self.keepalive_count += 1;
344 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
345
346 if let Some(backend) = &mut self.backend {
347 let mut backend = backend.borrow_mut();
348 backend.active_requests = backend.active_requests.saturating_sub(1);
349 }
350
351 self.container_backend_timeout.cancel();
354 self.container_frontend_timeout
355 .set_duration(self.configured_frontend_timeout);
356 self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
357 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
358
359 let response_phase = response_stream.parsing_phase;
366 let response_storage = &mut response_stream.storage;
367 if !response_storage.is_empty() {
368 warn!(
369 "{} Leftover fragment from response: {}",
370 log_context!(self, Some(response_phase)),
371 parser::view(
372 response_storage.used(),
373 16,
374 &[response_storage.start, response_storage.end,],
375 )
376 );
377 }
378
379 response_storage.clear();
380 if !self.request_stream.storage.is_empty() {
381 self.frontend_readiness.event.insert(Ready::READABLE);
382 } else {
383 self.request_stream.storage.clear();
384 }
385 }
386
387 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
388 trace!("{} ============== readable", log_context!(self));
389 if !self.container_frontend_timeout.reset() {
390 error!(
391 "could not reset front timeout {:?}",
392 self.configured_frontend_timeout
393 );
394 self.print_state(self.protocol_string());
395 }
396
397 let response_stream = match &mut self.response_stream {
398 ResponseStream::BackendAnswer(response_stream) => response_stream,
399 ResponseStream::DefaultAnswer(..) => {
400 error!(
401 "{} Sending default answer, should not read from frontend socket",
402 log_context!(self)
403 );
404
405 self.frontend_readiness.interest.remove(Ready::READABLE);
406 self.frontend_readiness.interest.insert(Ready::WRITABLE);
407 return StateResult::Continue;
408 }
409 };
410
411 if self.request_stream.storage.is_full() {
412 self.frontend_readiness.interest.remove(Ready::READABLE);
413 if self.request_stream.is_main_phase() {
414 self.backend_readiness.interest.insert(Ready::WRITABLE);
415 } else {
416 self.set_answer(DefaultAnswer::Answer413 {
418 capacity: self.request_stream.storage.capacity(),
419 phase: self.request_stream.parsing_phase.marker(),
420 message: diagnostic_413_507(self.request_stream.parsing_phase),
421 });
422 }
423 return StateResult::Continue;
424 }
425
426 let (size, socket_state) = self
427 .frontend_socket
428 .socket_read(self.request_stream.storage.space());
429
430 debug!(
431 "{} Read {} bytes",
432 log_context!(self, Some(response_stream.parsing_phase)),
433 size
434 );
435
436 if size > 0 {
437 self.request_stream.storage.fill(size);
438 count!(names::backend::BYTES_IN, size as i64);
439 metrics.bin += size;
440 } else {
444 self.frontend_readiness.event.remove(Ready::READABLE);
445 }
446
447 match socket_state {
448 SocketResult::Error | SocketResult::Closed => {
449 if self.request_stream.is_initial() {
450 if self.keepalive_count == 0 {
455 self.frontend_socket.read_error();
456 }
457 } else {
458 self.frontend_socket.read_error();
459 self.log_request_error(
460 metrics,
461 &format!(
462 "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
463 self.frontend_readiness,
464 self.backend_readiness,
465 )
466 );
467 }
468 return StateResult::CloseSession;
469 }
470 SocketResult::WouldBlock => {
471 self.frontend_readiness.event.remove(Ready::READABLE);
472 }
473 SocketResult::Continue => {}
474 };
475
476 trace!(
477 "{} ============== readable_parse",
478 log_context!(self, Some(response_stream.parsing_phase))
479 );
480 let was_initial = self.request_stream.is_initial();
481 let was_not_proxying = !self.request_stream.is_main_phase();
482
483 kawa::h1::parse(&mut self.request_stream, &mut self.context);
484 if was_initial && !self.request_stream.is_initial() {
487 self.container_frontend_timeout
491 .set_duration(self.configured_frontend_timeout);
492 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
493 incr!(names::http::REQUESTS);
494 }
495
496 if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
497 incr!(names::http::FRONTEND_PARSE_ERRORS);
498 warn!(
499 "{} Parsing request error in {:?}: {}",
500 log_context!(self, Some(response_stream.parsing_phase)),
501 marker,
502 match kind {
503 kawa::ParsingErrorKind::Consuming { index } => {
504 let kawa = &self.request_stream;
505 parser::view(
506 kawa.storage.used(),
507 16,
508 &[
509 kawa.storage.start,
510 kawa.storage.head,
511 index as usize,
512 kawa.storage.end,
513 ],
514 )
515 }
516 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
517 }
518 );
519 if response_stream.consumed {
520 self.log_request_error(metrics, "Parsing error on the request");
521 return StateResult::CloseSession;
522 } else {
523 let (message, successfully_parsed, partially_parsed, invalid) =
524 diagnostic_400_502(marker, kind, &self.request_stream);
525 self.set_answer(DefaultAnswer::Answer400 {
526 message,
527 phase: marker,
528 successfully_parsed,
529 partially_parsed,
530 invalid,
531 });
532 return StateResult::Continue;
533 }
534 }
535
536 if self.request_stream.is_main_phase() {
537 self.backend_readiness.interest.insert(Ready::WRITABLE);
538 if was_not_proxying {
539 trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
542 return StateResult::ConnectBackend;
543 }
544 }
545 if self.request_stream.is_terminated() {
546 self.frontend_readiness.interest.remove(Ready::READABLE);
547 }
548
549 StateResult::Continue
550 }
551
552 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
553 trace!("{} ============== writable", log_context!(self));
554 let response_stream = match &mut self.response_stream {
555 ResponseStream::BackendAnswer(response_stream) => response_stream,
556 _ => return self.writable_default_answer(metrics),
557 };
558
559 response_stream.prepare(&mut kawa::h1::BlockConverter);
560
561 let bufs = response_stream.as_io_slice();
562 if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
563 self.frontend_readiness.interest.remove(Ready::WRITABLE);
564 }
566
567 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
568
569 debug!(
570 "{} Wrote {} bytes",
571 log_context!(self, Some(response_stream.parsing_phase)),
572 size
573 );
574
575 if size > 0 {
576 response_stream.consume(size);
577 count!(names::backend::BYTES_OUT, size as i64);
578 metrics.bout += size;
579 self.backend_readiness.interest.insert(Ready::READABLE);
580 }
581
582 match socket_state {
583 SocketResult::Error | SocketResult::Closed => {
584 self.frontend_socket.write_error();
585 self.log_request_error(
586 metrics,
587 &format!(
588 "front socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
589 self.frontend_readiness,
590 self.backend_readiness,
591 ),
592 );
593 return StateResult::CloseSession;
594 }
595 SocketResult::WouldBlock => {
596 self.frontend_readiness.event.remove(Ready::WRITABLE);
597 }
598 SocketResult::Continue => {}
599 }
600
601 if self.frontend_socket.socket_wants_write() {
602 return StateResult::Continue;
603 }
604
605 if response_stream.is_terminated() && response_stream.is_completed() {
606 if self.context.closing {
607 debug!("{} closing proxy, no keep alive", log_context!(self));
608 self.log_request_success(metrics);
609 return StateResult::CloseSession;
610 }
611
612 match response_stream.detached.status_line {
613 kawa::StatusLine::Response { code: 101, .. } => {
614 trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
615 self.log_request_success(metrics);
616 return StateResult::Upgrade;
617 }
618 kawa::StatusLine::Response { code: 100, .. } => {
619 trace!(
620 "{} ============== HANDLE CONTINUE!",
621 log_context!(self, Some(response_stream.parsing_phase))
622 );
623 response_stream.clear();
624 self.log_request_success(metrics);
625 return StateResult::Continue;
626 }
627 kawa::StatusLine::Response { code: 103, .. } => {
628 self.backend_readiness.event.insert(Ready::READABLE);
629 trace!(
630 "{} ============== HANDLE EARLY HINT!",
631 log_context!(self, Some(response_stream.parsing_phase))
632 );
633 response_stream.clear();
634 self.log_request_success(metrics);
635 return StateResult::Continue;
636 }
637 _ => (),
638 }
639
640 let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
641 let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
642 if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
643 && request_length_known
644 {
645 error!(
646 "{} Response terminated before request, this case is not handled properly yet",
647 log_context!(self)
648 );
649 incr!(names::http::EARLY_RESPONSE_CLOSE);
650 }
653
654 trace!(
659 "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
660 log_context!(self),
661 self.context.keep_alive_frontend,
662 self.context.keep_alive_backend,
663 response_length_known
664 );
665
666 self.log_request_success(metrics);
667 return match (
668 self.context.keep_alive_frontend,
669 self.context.keep_alive_backend,
670 response_length_known,
671 ) {
672 (true, true, true) => {
673 debug!("{} Keep alive frontend/backend", log_context!(self));
674 metrics.reset();
675 self.reset();
676 StateResult::Continue
677 }
678 (true, false, true) => {
679 debug!("{} Keep alive frontend", log_context!(self));
680 metrics.reset();
681 self.reset();
682 StateResult::CloseBackend
683 }
684 _ => {
685 debug!("{} No keep alive", log_context!(self));
686 StateResult::CloseSession
687 }
688 };
689 }
690 StateResult::Continue
691 }
692
693 fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
694 trace!(
695 "{} ============== writable_default_answer",
696 log_context!(self)
697 );
698 let response_stream = match &mut self.response_stream {
699 ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
700 _ => return StateResult::CloseSession,
701 };
702 let bufs = response_stream.as_io_slice();
703 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
704
705 count!(names::backend::BYTES_OUT, size as i64);
706 metrics.bout += size;
707 response_stream.consume(size);
708
709 if size == 0 || socket_state != SocketResult::Continue {
710 self.frontend_readiness.event.remove(Ready::WRITABLE);
711 }
712
713 if response_stream.is_completed() {
714 save_http_status_metric(self.context.status, self.context.log_context());
715 self.log_default_answer_success(metrics);
716 self.frontend_readiness.reset();
717 self.backend_readiness.reset();
718 return StateResult::CloseSession;
719 }
720
721 if socket_state == SocketResult::Error {
722 self.frontend_socket.write_error();
723 self.log_request_error(
724 metrics,
725 "error writing default answer to front socket, closing",
726 );
727 StateResult::CloseSession
728 } else {
729 StateResult::Continue
730 }
731 }
732
733 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
734 trace!("{} ============== backend_writable", log_context!(self));
735 if let ResponseStream::DefaultAnswer(..) = self.response_stream {
736 error!(
737 "{}\tsending default answer, should not write to back",
738 log_context!(self)
739 );
740 self.backend_readiness.interest.remove(Ready::WRITABLE);
741 self.frontend_readiness.interest.insert(Ready::WRITABLE);
742 return SessionResult::Continue;
743 }
744
745 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
746 backend_socket
747 } else {
748 self.log_request_error(metrics, "back socket not found, closing session");
749 return SessionResult::Close;
750 };
751
752 self.request_stream.prepare(&mut kawa::h1::BlockConverter);
753
754 let bufs = self.request_stream.as_io_slice();
755 if bufs.is_empty() {
756 self.backend_readiness.interest.remove(Ready::WRITABLE);
757 return SessionResult::Continue;
758 }
759
760 let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
761 debug!("{} Wrote {} bytes", log_context!(self), size);
762
763 if size > 0 {
764 self.request_stream.consume(size);
765 count!(names::backend::BACK_BYTES_OUT, size as i64);
766 metrics.backend_bout += size;
767 self.frontend_readiness.interest.insert(Ready::READABLE);
768 self.backend_readiness.interest.insert(Ready::READABLE);
769 } else {
770 self.backend_readiness.event.remove(Ready::WRITABLE);
771 }
772
773 match socket_state {
774 SocketResult::Error | SocketResult::Closed => {
784 self.frontend_readiness.interest.remove(Ready::READABLE);
785 self.backend_readiness.interest.remove(Ready::WRITABLE);
786 return SessionResult::Continue;
787 }
788 SocketResult::WouldBlock => {
789 self.backend_readiness.event.remove(Ready::WRITABLE);
790 }
791 SocketResult::Continue => {}
792 }
793
794 if self.request_stream.is_terminated() && self.request_stream.is_completed() {
795 self.backend_readiness.interest.remove(Ready::WRITABLE);
796
797 self.container_frontend_timeout.cancel();
799 self.container_backend_timeout.reset();
800 }
801 SessionResult::Continue
802 }
803
804 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
806 trace!("{} ============== backend_readable", log_context!(self));
807 if !self.container_backend_timeout.reset() {
808 error!(
809 "{} Could not reset back timeout {:?}",
810 log_context!(self),
811 self.configured_backend_timeout
812 );
813 self.print_state(self.protocol_string());
814 }
815
816 let response_stream = match &mut self.response_stream {
817 ResponseStream::BackendAnswer(response_stream) => response_stream,
818 _ => {
819 error!(
820 "{} Sending default answer, should not read from backend socket",
821 log_context!(self),
822 );
823
824 self.backend_readiness.interest.remove(Ready::READABLE);
825 self.frontend_readiness.interest.insert(Ready::WRITABLE);
826 return SessionResult::Continue;
827 }
828 };
829
830 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
831 backend_socket
832 } else {
833 self.log_request_error(metrics, "back socket not found, closing session");
834 return SessionResult::Close;
835 };
836
837 if response_stream.storage.is_full() {
838 self.backend_readiness.interest.remove(Ready::READABLE);
839 if response_stream.is_main_phase() {
840 self.frontend_readiness.interest.insert(Ready::WRITABLE);
841 } else {
842 let capacity = response_stream.storage.capacity();
844 let phase = response_stream.parsing_phase.marker();
845 let message = diagnostic_413_507(response_stream.parsing_phase);
846 self.set_answer(DefaultAnswer::Answer507 {
847 capacity,
848 phase,
849 message,
850 });
851 }
852 return SessionResult::Continue;
853 }
854
855 let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
856 debug!(
857 "{} Read {} bytes",
858 log_context!(self, Some(response_stream.parsing_phase)),
859 size
860 );
861
862 if size > 0 {
863 response_stream.storage.fill(size);
864 count!(names::backend::BACK_BYTES_IN, size as i64);
865 metrics.backend_bin += size;
866 self.container_frontend_timeout.cancel();
877 } else {
878 self.backend_readiness.event.remove(Ready::READABLE);
879 }
880
881 match socket_state {
883 SocketResult::Error => {
884 backend_socket.read_error();
885 self.log_request_error(
886 metrics,
887 &format!(
888 "back socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
889 self.frontend_readiness,
890 self.backend_readiness,
891 ),
892 );
893 return SessionResult::Close;
894 }
895 SocketResult::WouldBlock | SocketResult::Closed => {
896 self.backend_readiness.event.remove(Ready::READABLE);
897 }
898 SocketResult::Continue => {}
899 }
900
901 trace!(
902 "{} ============== backend_readable_parse",
903 log_context!(self, Some(response_stream.parsing_phase))
904 );
905 kawa::h1::parse(response_stream, &mut self.context);
906 if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
909 incr!(names::http::BACKEND_PARSE_ERRORS);
910 warn!(
911 "{} Parsing response error in {:?}: {}",
912 log_context!(self, Some(response_stream.parsing_phase)),
913 marker,
914 match kind {
915 kawa::ParsingErrorKind::Consuming { index } => {
916 parser::view(
917 response_stream.storage.used(),
918 16,
919 &[
920 response_stream.storage.start,
921 response_stream.storage.head,
922 index as usize,
923 response_stream.storage.end,
924 ],
925 )
926 }
927 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
928 }
929 );
930 if response_stream.consumed {
931 return SessionResult::Close;
932 } else {
933 let (message, successfully_parsed, partially_parsed, invalid) =
934 diagnostic_400_502(marker, kind, response_stream);
935 self.set_answer(DefaultAnswer::Answer502 {
936 message,
937 phase: marker,
938 successfully_parsed,
939 partially_parsed,
940 invalid,
941 });
942 return SessionResult::Continue;
943 }
944 }
945
946 if response_stream.is_main_phase() {
947 self.frontend_readiness.interest.insert(Ready::WRITABLE);
948 }
949 if response_stream.is_terminated() {
950 metrics.backend_stop();
951 self.backend_stop = Some(Instant::now());
952 self.backend_readiness.interest.remove(Ready::READABLE);
953 }
954 SessionResult::Continue
955 }
956}
957
958impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
959 fn log_endpoint(&self) -> EndpointRecord<'_> {
960 EndpointRecord::Http {
961 method: self.context.method.as_deref(),
962 authority: self.context.authority.as_deref(),
963 path: self.context.path.as_deref(),
964 reason: self.context.reason.as_deref(),
965 status: self.context.status,
966 }
967 }
968
969 fn response_parsing_phase(&self) -> Option<kawa::ParsingPhase> {
973 match &self.response_stream {
974 ResponseStream::BackendAnswer(inner) => Some(inner.parsing_phase),
975 _ => None,
976 }
977 }
978
979 pub fn get_session_address(&self) -> Option<SocketAddr> {
980 self.context
981 .session_address
982 .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
983 }
984
985 pub fn get_backend_address(&self) -> Option<SocketAddr> {
986 self.backend
987 .as_ref()
988 .map(|backend| backend.borrow().address)
989 .or_else(|| {
990 self.backend_socket
991 .as_ref()
992 .and_then(|backend| backend.peer_addr().ok())
993 })
994 }
995
996 fn protocol_string(&self) -> &'static str {
998 match self.context.protocol {
999 Protocol::HTTP => "HTTP",
1000 Protocol::HTTPS => match self.frontend_socket.protocol() {
1001 TransportProtocol::Ssl2 => "HTTPS-SSL2",
1002 TransportProtocol::Ssl3 => "HTTPS-SSL3",
1003 TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
1004 TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
1005 TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
1006 TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
1007 _ => unreachable!(),
1008 },
1009 _ => unreachable!(),
1010 }
1011 }
1012
1013 pub fn websocket_context(&self) -> WebSocketContext {
1015 WebSocketContext::Http {
1016 method: self.context.method.clone(),
1017 authority: self.context.authority.clone(),
1018 path: self.context.path.clone(),
1019 reason: self.context.reason.clone(),
1020 status: self.context.status,
1021 }
1022 }
1023
1024 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
1025 let listener = self.listener.borrow();
1026 let tags = self.context.authority.as_ref().and_then(|host| {
1027 let hostname = match host.split_once(':') {
1028 None => host,
1029 Some((hostname, _)) => hostname,
1030 };
1031 listener.get_tags(hostname)
1032 });
1033
1034 let context = self.context.log_context();
1035 metrics.register_end_of_session(&context);
1036
1037 log_access! {
1038 error,
1039 on_failure: { incr!(names::access_logs::UNSENT) },
1040 message,
1041 context,
1042 session_address: self.get_session_address(),
1043 backend_address: self.get_backend_address(),
1044 protocol: self.protocol_string(),
1045 endpoint: self.log_endpoint(),
1046 tags,
1047 client_rtt: socket_rtt(self.front_socket()),
1048 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
1049 service_time: metrics.service_time(),
1050 response_time: metrics.backend_response_time(),
1051 request_time: metrics.request_time(),
1052 bytes_in: metrics.bin,
1053 bytes_out: metrics.bout,
1054 user_agent: self.context.user_agent.as_deref(),
1055 x_request_id: self.context.x_request_id.as_deref(),
1056 tls_version: self.context.tls_version,
1057 tls_cipher: self.context.tls_cipher,
1058 tls_sni: self.context.tls_server_name.as_deref(),
1059 tls_alpn: self.context.tls_alpn,
1060 xff_chain: self.context.xff_chain.as_deref(),
1061 #[cfg(feature = "opentelemetry")]
1062 otel: self.context.otel.as_ref(),
1063 #[cfg(not(feature = "opentelemetry"))]
1064 otel: None,
1065 };
1066 }
1067
1068 pub fn log_request_success(&self, metrics: &SessionMetrics) {
1069 save_http_status_metric(self.context.status, self.context.log_context());
1070 self.log_request(metrics, false, None);
1071 }
1072
1073 pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
1074 self.log_request(metrics, false, self.context.access_log_message);
1079 }
1080 pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
1081 incr!(
1088 "http.errors",
1089 self.context.cluster_id.as_deref(),
1090 self.context.backend_id.as_deref()
1091 );
1092 error!(
1093 "{} Could not process request properly got: {}",
1094 log_context!(self),
1095 message
1096 );
1097 self.print_state(self.protocol_string());
1098 self.log_request(metrics, true, Some(message));
1099 }
1100
1101 pub fn set_answer(&mut self, answer: DefaultAnswer) {
1102 let status = u16::from(&answer);
1103 if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
1104 error!(
1105 "already set the default answer to {}, trying to set to {}",
1106 old_status, status
1107 );
1108 } else {
1109 match answer {
1110 DefaultAnswer::Answer301 { .. } => incr!(
1111 "http.301.redirection",
1112 self.context.cluster_id.as_deref(),
1113 self.context.backend_id.as_deref()
1114 ),
1115 DefaultAnswer::Answer302 { .. } => incr!(
1116 "http.302.redirection",
1117 self.context.cluster_id.as_deref(),
1118 self.context.backend_id.as_deref()
1119 ),
1120 DefaultAnswer::Answer308 { .. } => incr!(
1121 "http.308.redirection",
1122 self.context.cluster_id.as_deref(),
1123 self.context.backend_id.as_deref()
1124 ),
1125 DefaultAnswer::Answer400 { .. } => incr!(names::http::ERR_400),
1126 DefaultAnswer::Answer401 { .. } => incr!(
1127 "http.401.errors",
1128 self.context.cluster_id.as_deref(),
1129 self.context.backend_id.as_deref()
1130 ),
1131 DefaultAnswer::Answer404 { .. } => incr!(names::http::ERR_404),
1132 DefaultAnswer::Answer408 { .. } => incr!(
1133 "http.408.errors",
1134 self.context.cluster_id.as_deref(),
1135 self.context.backend_id.as_deref()
1136 ),
1137 DefaultAnswer::Answer413 { .. } => incr!(
1138 "http.413.errors",
1139 self.context.cluster_id.as_deref(),
1140 self.context.backend_id.as_deref()
1141 ),
1142 DefaultAnswer::Answer421 { .. } => incr!(
1143 "http.421.errors",
1144 self.context.cluster_id.as_deref(),
1145 self.context.backend_id.as_deref()
1146 ),
1147 DefaultAnswer::Answer429 { .. } => incr!(
1148 "connections.rejected_per_cluster_ip",
1149 self.context.cluster_id.as_deref(),
1150 self.context.backend_id.as_deref()
1151 ),
1152 DefaultAnswer::Answer502 { .. } => incr!(
1153 "http.502.errors",
1154 self.context.cluster_id.as_deref(),
1155 self.context.backend_id.as_deref()
1156 ),
1157 DefaultAnswer::Answer503 { .. } => incr!(
1158 "http.503.errors",
1159 self.context.cluster_id.as_deref(),
1160 self.context.backend_id.as_deref()
1161 ),
1162 DefaultAnswer::Answer504 { .. } => incr!(
1163 "http.504.errors",
1164 self.context.cluster_id.as_deref(),
1165 self.context.backend_id.as_deref()
1166 ),
1167 DefaultAnswer::Answer507 { .. } => incr!(
1168 "http.507.errors",
1169 self.context.cluster_id.as_deref(),
1170 self.context.backend_id.as_deref()
1171 ),
1172 };
1173 }
1174
1175 let (resolved_status, keep_alive, mut kawa) = self.answers.borrow().get(
1176 answer,
1177 self.context.id.to_string(),
1178 self.context.cluster_id.as_deref(),
1179 self.context.backend_id.as_deref(),
1180 self.get_route(),
1181 );
1182 kawa.prepare(&mut kawa::h1::BlockConverter);
1183 let status = resolved_status;
1186 self.context.status = Some(status);
1187 self.context.reason = None;
1188 if !keep_alive {
1192 self.context.keep_alive_frontend = false;
1193 }
1194 self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1195 self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1196 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1197 }
1198
1199 pub fn test_backend_socket(&self) -> bool {
1200 match self.backend_socket {
1201 Some(ref s) => {
1202 let mut tmp = [0u8; 1];
1203 let res = s.peek(&mut tmp[..]);
1204
1205 match res {
1206 Ok(0) => false,
1208 Ok(_) => true,
1209 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1210 }
1211 }
1212 None => false,
1213 }
1214 }
1215
1216 pub fn is_valid_backend_socket(&self) -> bool {
1217 match self.backend_stop.as_ref() {
1219 Some(stop_instant) => {
1220 let now = Instant::now();
1221 let dur = now - *stop_instant;
1222 if dur > Duration::from_secs(1) {
1223 return self.test_backend_socket();
1224 }
1225 }
1226 None => return self.test_backend_socket(),
1227 }
1228
1229 true
1230 }
1231
1232 pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1233 self.backend_socket = Some(socket);
1234 self.backend = backend;
1235 }
1236
1237 pub fn set_cluster_id(&mut self, cluster_id: String) {
1238 self.context.cluster_id = Some(cluster_id);
1239 }
1240
1241 pub fn set_backend_id(&mut self, backend_id: String) {
1242 self.context.backend_id = Some(backend_id);
1243 }
1244
1245 pub fn set_backend_token(&mut self, token: Token) {
1246 self.backend_token = Some(token);
1247 }
1248
1249 pub fn clear_backend_token(&mut self) {
1250 self.backend_token = None;
1251 }
1252
1253 pub fn set_backend_timeout(&mut self, dur: Duration) {
1254 if let Some(token) = self.backend_token.as_ref() {
1255 self.container_backend_timeout.set_duration(dur);
1256 self.container_backend_timeout.set(*token);
1257 }
1258 }
1259
1260 pub fn front_socket(&self) -> &TcpStream {
1261 self.frontend_socket.socket_ref()
1262 }
1263
1264 fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1268 self.container_backend_timeout.cancel();
1269 debug!(
1270 "{}\tPROXY [{}->{}] CLOSED BACKEND",
1271 log_context!(self),
1272 self.frontend_token.0,
1273 self.backend_token
1274 .map(|t| format!("{}", t.0))
1275 .unwrap_or_else(|| "-".to_string())
1276 );
1277
1278 let proxy = proxy.borrow();
1279 if let Some(socket) = &mut self.backend_socket.take() {
1280 if let Err(e) = proxy.deregister_socket(socket) {
1281 error!(
1282 "{} Error deregistering back socket({:?}): {:?}",
1283 log_context!(self),
1284 socket,
1285 e
1286 );
1287 }
1288 if let Err(e) = socket.shutdown(Shutdown::Both) {
1296 if e.kind() != ErrorKind::NotConnected {
1297 error!(
1298 "{} Error shutting down back socket({:?}): {:?}",
1299 log_context!(self),
1300 socket,
1301 e
1302 );
1303 }
1304 }
1305 }
1306
1307 if let Some(token) = self.backend_token.take() {
1308 proxy.remove_session(token);
1309
1310 if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1311 self.backend_readiness.event = Ready::EMPTY;
1312 }
1313
1314 if self.backend_connection_status == BackendConnectionStatus::Connected {
1315 gauge_add!(names::backend::CONNECTIONS, -1);
1316 gauge_add!(
1317 names::backend::CONNECTIONS_PER_BACKEND,
1318 -1,
1319 self.context.cluster_id.as_deref(),
1320 metrics.backend_id.as_deref()
1321 );
1322 }
1323
1324 self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1325
1326 if let Some(backend) = self.backend.take() {
1327 backend.borrow_mut().dec_connections();
1328 }
1329 }
1330 }
1331
1332 fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1334 if self.connection_attempts >= CONN_RETRIES {
1335 incr!(
1336 "backend.connect.retries_exhausted",
1337 self.context.cluster_id.as_deref(),
1338 self.context.backend_id.as_deref()
1339 );
1340 warn!(
1341 "{} Max connection attempt reached ({})",
1342 log_context!(self),
1343 self.connection_attempts,
1344 );
1345
1346 self.set_answer(DefaultAnswer::Answer503 {
1347 message: format!(
1348 "Max connection attempt reached: {}",
1349 self.connection_attempts
1350 ),
1351 });
1352 return Err(BackendConnectionError::MaxConnectionRetries(None));
1353 }
1354 Ok(())
1355 }
1356
1357 fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1358 let is_valid_backend_socket = self.is_valid_backend_socket();
1359
1360 if !is_valid_backend_socket {
1361 return false;
1362 }
1363
1364 metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1366
1367 metrics.backend_start();
1368 metrics.backend_connected();
1369 if let Some(b) = self.backend.as_mut() {
1370 b.borrow_mut().active_requests += 1;
1371 }
1372 true
1373 }
1374
1375 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1377 let given_method = self
1378 .context
1379 .method
1380 .as_ref()
1381 .ok_or(RetrieveClusterError::NoMethod)?;
1382 let given_authority = self
1383 .context
1384 .authority
1385 .as_deref()
1386 .ok_or(RetrieveClusterError::NoHost)?;
1387 let given_path = self
1388 .context
1389 .path
1390 .as_deref()
1391 .ok_or(RetrieveClusterError::NoPath)?;
1392
1393 Ok((given_authority, given_path, given_method))
1394 }
1395
1396 pub fn get_route(&self) -> String {
1397 if let Some(method) = &self.context.method {
1398 if let Some(authority) = &self.context.authority {
1399 if let Some(path) = &self.context.path {
1400 return format!("{method} {authority}{path}");
1401 }
1402 return format!("{method} {authority}");
1403 }
1404 return format!("{method}");
1405 }
1406 String::new()
1407 }
1408
1409 fn cluster_id_from_request(
1410 &mut self,
1411 proxy: Rc<RefCell<dyn L7Proxy>>,
1412 ) -> Result<String, RetrieveClusterError> {
1413 let (host, uri, method) = match self.extract_route() {
1414 Ok(tuple) => tuple,
1415 Err(cluster_error) => {
1416 self.set_answer(DefaultAnswer::Answer400 {
1417 message: "Could not extract the route after connection started, this should not happen.".into(),
1418 phase: self.request_stream.parsing_phase.marker(),
1419 successfully_parsed: "null".into(),
1420 partially_parsed: "null".into(),
1421 invalid: "null".into(),
1422 });
1423 return Err(cluster_error);
1424 }
1425 };
1426
1427 let route_result = self
1428 .listener
1429 .borrow()
1430 .frontend_from_request(host, uri, method);
1431
1432 let route = match route_result {
1433 Ok(route) => route,
1434 Err(frontend_error) => {
1435 match &frontend_error {
1439 FrontendFromRequestError::HostParse { .. }
1440 | FrontendFromRequestError::InvalidCharsAfterHost(_) => {
1441 self.set_answer(DefaultAnswer::Answer400 {
1442 message: frontend_error.to_string(),
1443 phase: self.request_stream.parsing_phase.marker(),
1444 successfully_parsed: "null".into(),
1445 partially_parsed: "null".into(),
1446 invalid: "null".into(),
1447 });
1448 }
1449 FrontendFromRequestError::NoClusterFound(_) => {
1450 self.set_answer(DefaultAnswer::Answer404 {});
1451 }
1452 }
1453 return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1454 }
1455 };
1456
1457 let cluster_id = match route.cluster_id {
1463 Some(cluster_id) => cluster_id,
1464 None => {
1465 self.set_answer(DefaultAnswer::Answer401 {
1466 www_authenticate: None,
1467 });
1468 return Err(RetrieveClusterError::UnauthorizedRoute);
1469 }
1470 };
1471
1472 let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1473 && proxy
1474 .borrow()
1475 .clusters()
1476 .get(&cluster_id)
1477 .map(|cluster| cluster.https_redirect)
1478 .unwrap_or(false);
1479
1480 if frontend_should_redirect_https {
1481 self.set_answer(DefaultAnswer::Answer301 {
1482 location: format!("https://{host}{uri}"),
1483 });
1484 return Err(RetrieveClusterError::UnauthorizedRoute);
1485 }
1486
1487 Ok(cluster_id)
1488 }
1489
1490 pub fn backend_from_request(
1491 &mut self,
1492 cluster_id: &str,
1493 frontend_should_stick: bool,
1494 proxy: Rc<RefCell<dyn L7Proxy>>,
1495 metrics: &mut SessionMetrics,
1496 ) -> Result<TcpStream, BackendConnectionError> {
1497 let (backend, conn) = self
1498 .get_backend_for_sticky_session(
1499 frontend_should_stick,
1500 self.context.sticky_session_found.as_deref(),
1501 cluster_id,
1502 proxy,
1503 )
1504 .map_err(|backend_error| {
1505 self.set_answer(DefaultAnswer::Answer503 {
1508 message: backend_error.to_string(),
1509 });
1510 BackendConnectionError::Backend(backend_error)
1511 })?;
1512
1513 if frontend_should_stick {
1514 self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1516
1517 self.context.sticky_session = Some(
1518 backend
1519 .borrow()
1520 .sticky_id
1521 .clone()
1522 .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1523 );
1524 }
1525
1526 metrics.backend_id = Some(backend.borrow().backend_id.clone());
1527 metrics.backend_start();
1528 self.set_backend_id(backend.borrow().backend_id.clone());
1529
1530 self.backend = Some(backend);
1531 Ok(conn)
1532 }
1533
1534 fn get_backend_for_sticky_session(
1535 &self,
1536 frontend_should_stick: bool,
1537 sticky_session: Option<&str>,
1538 cluster_id: &str,
1539 proxy: Rc<RefCell<dyn L7Proxy>>,
1540 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1541 match (frontend_should_stick, sticky_session) {
1542 (true, Some(sticky_session)) => proxy
1543 .borrow()
1544 .backends()
1545 .borrow_mut()
1546 .backend_from_sticky_session(cluster_id, sticky_session),
1547 _ => proxy
1548 .borrow()
1549 .backends()
1550 .borrow_mut()
1551 .backend_from_cluster_id(cluster_id),
1552 }
1553 }
1554
1555 fn connect_to_backend(
1556 &mut self,
1557 session_rc: Rc<RefCell<dyn ProxySession>>,
1558 proxy: Rc<RefCell<dyn L7Proxy>>,
1559 metrics: &mut SessionMetrics,
1560 ) -> Result<BackendConnectAction, BackendConnectionError> {
1561 let old_cluster_id = self.context.cluster_id.clone();
1562 let old_backend_token = self.backend_token;
1563
1564 self.check_circuit_breaker()?;
1565
1566 let cluster_id = self
1567 .cluster_id_from_request(proxy.clone())
1568 .map_err(BackendConnectionError::RetrieveClusterError)?;
1569
1570 trace!(
1571 "{} Connect_to_backend: {:?} {:?} {:?}",
1572 log_context!(self),
1573 self.context.cluster_id,
1574 cluster_id,
1575 self.backend_connection_status
1576 );
1577 if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1579 && self.backend_connection_status == BackendConnectionStatus::Connected
1580 {
1581 let has_backend = self
1582 .backend
1583 .as_ref()
1584 .map(|backend| {
1585 let backend = backend.borrow();
1586 proxy
1587 .borrow()
1588 .backends()
1589 .borrow()
1590 .has_backend(&cluster_id, &backend)
1591 })
1592 .unwrap_or(false);
1593
1594 if has_backend && self.check_backend_connection(metrics) {
1595 return Ok(BackendConnectAction::Reuse);
1596 } else if self.backend_token.take().is_some() {
1597 self.close_backend(proxy.clone(), metrics);
1598 }
1599 }
1600
1601 if old_cluster_id.is_some()
1603 && old_cluster_id.as_ref() != Some(&cluster_id)
1604 && self.backend_token.take().is_some()
1605 {
1606 self.close_backend(proxy.clone(), metrics);
1607 }
1608
1609 self.context.cluster_id = Some(cluster_id.clone());
1610
1611 let frontend_should_stick = proxy
1612 .borrow()
1613 .clusters()
1614 .get(&cluster_id)
1615 .map(|cluster| cluster.sticky_session)
1616 .unwrap_or(false);
1617
1618 let mut socket =
1619 self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1620 if let Err(e) = socket.set_nodelay(true) {
1621 error!(
1622 "{} Error setting nodelay on backend socket({:?}): {:?}",
1623 log_context!(self),
1624 socket,
1625 e
1626 );
1627 }
1628
1629 self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1630 self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1631
1632 match old_backend_token {
1633 Some(backend_token) => {
1634 self.set_backend_token(backend_token);
1635 if let Err(e) = proxy.borrow().register_socket(
1636 &mut socket,
1637 backend_token,
1638 Interest::READABLE | Interest::WRITABLE,
1639 ) {
1640 error!(
1641 "{} Error registering back socket({:?}): {:?}",
1642 log_context!(self),
1643 socket,
1644 e
1645 );
1646 }
1647
1648 self.set_backend_socket(socket, self.backend.clone());
1649 self.set_backend_timeout(self.configured_connect_timeout);
1650
1651 Ok(BackendConnectAction::Replace)
1652 }
1653 None => {
1654 let backend_token = proxy.borrow().add_session(session_rc);
1655
1656 if let Err(e) = proxy.borrow().register_socket(
1657 &mut socket,
1658 backend_token,
1659 Interest::READABLE | Interest::WRITABLE,
1660 ) {
1661 error!(
1662 "{} Error registering back socket({:?}): {:?}",
1663 log_context!(self),
1664 socket,
1665 e
1666 );
1667 }
1668
1669 self.set_backend_socket(socket, self.backend.clone());
1670 self.set_backend_token(backend_token);
1671 self.set_backend_timeout(self.configured_connect_timeout);
1672
1673 Ok(BackendConnectAction::New)
1674 }
1675 }
1676 }
1677
1678 fn set_backend_connected(
1679 &mut self,
1680 connected: BackendConnectionStatus,
1681 metrics: &mut SessionMetrics,
1682 ) {
1683 let last = self.backend_connection_status;
1684 self.backend_connection_status = connected;
1685
1686 if connected == BackendConnectionStatus::Connected {
1687 gauge_add!(names::backend::CONNECTIONS, 1);
1688 gauge_add!(
1689 names::backend::CONNECTIONS_PER_BACKEND,
1690 1,
1691 self.context.cluster_id.as_deref(),
1692 metrics.backend_id.as_deref()
1693 );
1694
1695 self.set_backend_timeout(self.configured_backend_timeout);
1698 if !self.backend_readiness.interest.is_readable() {
1701 self.container_backend_timeout.cancel();
1702 }
1703
1704 if let Some(backend) = &self.backend {
1705 let mut backend = backend.borrow_mut();
1706
1707 if backend.retry_policy.is_down() {
1708 incr!(
1709 "backend.up",
1710 self.context.cluster_id.as_deref(),
1711 metrics.backend_id.as_deref()
1712 );
1713 gauge!(
1714 names::backend::AVAILABLE,
1715 1,
1716 self.context.cluster_id.as_deref(),
1717 metrics.backend_id.as_deref()
1718 );
1719
1720 info!(
1721 "{} backend server {} at {} is up",
1722 log_context!(self),
1723 backend.backend_id,
1724 backend.address
1725 );
1726
1727 push_event(Event {
1728 kind: EventKind::BackendUp as i32,
1729 backend_id: Some(backend.backend_id.to_owned()),
1730 address: Some(backend.address.into()),
1731 cluster_id: None,
1732 metric_detail: None,
1733 });
1734 }
1735
1736 if let BackendConnectionStatus::Connecting(start) = last {
1737 backend.set_connection_time(Instant::now() - start);
1738 }
1739
1740 backend.failures = 0;
1742 backend.active_requests += 1;
1743 backend.retry_policy.succeed();
1744 }
1745 }
1746 }
1747
1748 fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1749 if let Some(backend) = &self.backend {
1750 let mut backend = backend.borrow_mut();
1751 backend.failures += 1;
1752
1753 let already_unavailable = backend.retry_policy.is_down();
1754 backend.retry_policy.fail();
1755 incr!(
1756 "backend.connections.error",
1757 self.context.cluster_id.as_deref(),
1758 metrics.backend_id.as_deref()
1759 );
1760
1761 if !already_unavailable && backend.retry_policy.is_down() {
1762 error!(
1763 "{} backend server {} at {} is down",
1764 log_context!(self),
1765 backend.backend_id,
1766 backend.address
1767 );
1768
1769 incr!(
1770 "backend.down",
1771 self.context.cluster_id.as_deref(),
1772 metrics.backend_id.as_deref()
1773 );
1774 gauge!(
1775 names::backend::AVAILABLE,
1776 0,
1777 self.context.cluster_id.as_deref(),
1778 metrics.backend_id.as_deref()
1779 );
1780
1781 push_event(Event {
1782 kind: EventKind::BackendDown as i32,
1783 backend_id: Some(backend.backend_id.to_owned()),
1784 address: Some(backend.address.into()),
1785 cluster_id: None,
1786 metric_detail: None,
1787 });
1788 }
1789 }
1790 }
1791
1792 pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1793 let response_stream = match &mut self.response_stream {
1794 ResponseStream::BackendAnswer(response_stream) => response_stream,
1795 _ => return StateResult::CloseBackend,
1796 };
1797
1798 if self.backend_readiness.event.is_readable()
1800 && self.backend_readiness.interest.is_readable()
1801 {
1802 return StateResult::Continue;
1803 }
1804
1805 if response_stream.is_terminated() {
1807 return StateResult::CloseBackend;
1808 }
1809 match (
1810 self.request_stream.is_initial(),
1811 response_stream.is_initial(),
1812 ) {
1813 (_, false) => {
1816 error!(
1817 "{} Backend closed before session is over",
1818 log_context!(self, Some(response_stream.parsing_phase)),
1819 );
1820
1821 trace!(
1822 "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1823 log_context!(self, Some(response_stream.parsing_phase))
1824 );
1825
1826 response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1827
1828 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1831 StateResult::Continue
1832 }
1833 (true, true) => {
1835 trace!(
1836 "{} Backend hanged up in between requests",
1837 log_context!(self)
1838 );
1839 StateResult::CloseBackend
1840 }
1841 (false, true) => {
1843 error!(
1844 "{} Frontend transmitted data but the back closed",
1845 log_context!(self)
1846 );
1847
1848 self.set_answer(DefaultAnswer::Answer503 {
1849 message: "Backend closed after consuming part of the request".into(),
1850 });
1851
1852 self.backend_readiness.interest = Ready::EMPTY;
1853 StateResult::Continue
1854 }
1855 }
1856 }
1857
1858 fn ready_inner(
1870 &mut self,
1871 session: Rc<RefCell<dyn crate::ProxySession>>,
1872 proxy: Rc<RefCell<dyn L7Proxy>>,
1873 metrics: &mut SessionMetrics,
1874 ) -> SessionResult {
1875 let mut counter = 0;
1876
1877 if self.backend_connection_status.is_connecting()
1878 && !self.backend_readiness.event.is_empty()
1879 {
1880 if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1881 warn!(
1883 "{} Error connecting to backend, trying again, attempt {}",
1884 log_context!(self),
1885 self.connection_attempts
1886 );
1887
1888 self.connection_attempts += 1;
1889 self.fail_backend_connection(metrics);
1890
1891 self.backend_connection_status =
1892 BackendConnectionStatus::Connecting(Instant::now());
1893
1894 self.close_backend(proxy.clone(), metrics);
1896
1897 let connection_result =
1898 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1899 if let Err(err) = &connection_result {
1900 match err {
1901 BackendConnectionError::MaxConnectionRetries(_) => trace!(
1904 "{} Error connecting to backend: {}",
1905 log_context!(self),
1906 err
1907 ),
1908 _ => warn!(
1909 "{} Error connecting to backend: {}",
1910 log_context!(self),
1911 err
1912 ),
1913 }
1914 }
1915
1916 if let Some(session_result) = handle_connection_result(connection_result) {
1917 return session_result;
1918 }
1919 } else {
1920 metrics.backend_connected();
1921 self.connection_attempts = 0;
1922 self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1923 self.backend_readiness.interest.insert(Ready::READABLE);
1926 }
1927 }
1928
1929 if self.frontend_readiness.event.is_hup() {
1930 if !self.request_stream.is_initial() {
1931 self.log_request_error(metrics, "Client disconnected abruptly");
1932 }
1933 return SessionResult::Close;
1934 }
1935
1936 while counter < MAX_LOOP_ITERATIONS {
1937 let frontend_interest = self.frontend_readiness.filter_interest();
1938 let backend_interest = self.backend_readiness.filter_interest();
1939
1940 trace!(
1941 "{} Frontend interest({:?}) and backend interest({:?})",
1942 log_context!(self),
1943 frontend_interest,
1944 backend_interest,
1945 );
1946
1947 if frontend_interest.is_empty() && backend_interest.is_empty() {
1948 break;
1949 }
1950
1951 if self.backend_readiness.event.is_hup()
1952 && self.frontend_readiness.interest.is_writable()
1953 && !self.frontend_readiness.event.is_writable()
1954 {
1955 break;
1956 }
1957
1958 if frontend_interest.is_readable() {
1959 let state_result = self.readable(metrics);
1960 trace!(
1961 "{} frontend_readable: {:?}",
1962 log_context!(self),
1963 state_result
1964 );
1965
1966 match state_result {
1967 StateResult::Continue => {}
1968 StateResult::ConnectBackend => {
1969 let connection_result =
1970 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1971 if let Err(err) = &connection_result {
1972 match err {
1973 BackendConnectionError::MaxConnectionRetries(_) => trace!(
1976 "{} Error connecting to backend: {}",
1977 log_context!(self),
1978 err
1979 ),
1980 _ => warn!(
1981 "{} Error connecting to backend: {}",
1982 log_context!(self),
1983 err
1984 ),
1985 }
1986 }
1987
1988 if let Some(session_result) = handle_connection_result(connection_result) {
1989 return session_result;
1990 }
1991 }
1992 StateResult::CloseBackend => unreachable!(),
1993 StateResult::CloseSession => return SessionResult::Close,
1994 StateResult::Upgrade => return SessionResult::Upgrade,
1995 }
1996 }
1997
1998 if backend_interest.is_writable() {
1999 let session_result = self.backend_writable(metrics);
2000 trace!(
2001 "{} backend_writable: {:?}",
2002 log_context!(self),
2003 session_result
2004 );
2005 if session_result != SessionResult::Continue {
2006 return session_result;
2007 }
2008 }
2009
2010 if backend_interest.is_readable() {
2011 let session_result = self.backend_readable(metrics);
2012 trace!(
2013 "{} backend_readable: {:?}",
2014 log_context!(self),
2015 session_result
2016 );
2017 if session_result != SessionResult::Continue {
2018 return session_result;
2019 }
2020 }
2021
2022 if frontend_interest.is_writable() {
2023 let state_result = self.writable(metrics);
2024 trace!(
2025 "{} frontend_writable: {:?}",
2026 log_context!(self),
2027 state_result
2028 );
2029 match state_result {
2030 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2031 StateResult::CloseSession => return SessionResult::Close,
2032 StateResult::Upgrade => return SessionResult::Upgrade,
2033 StateResult::Continue => {}
2034 StateResult::ConnectBackend => unreachable!(),
2035 }
2036 }
2037
2038 if frontend_interest.is_error() {
2039 error!(
2040 "{} frontend socket error, disconnecting",
2041 log_context!(self)
2042 );
2043
2044 return SessionResult::Close;
2045 }
2046
2047 if backend_interest.is_hup() || backend_interest.is_error() {
2048 let state_result = self.backend_hup(metrics);
2049
2050 trace!("{} backend_hup: {:?}", log_context!(self), state_result);
2051 match state_result {
2052 StateResult::Continue => {}
2053 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2054 StateResult::CloseSession => return SessionResult::Close,
2055 StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
2056 }
2057 }
2058
2059 counter += 1;
2060 }
2061
2062 if counter >= MAX_LOOP_ITERATIONS {
2063 error!(
2064 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
2065 log_context!(self),
2066 MAX_LOOP_ITERATIONS
2067 );
2068
2069 incr!(names::http::INFINITE_LOOP_ERROR);
2070 self.print_state(self.protocol_string());
2071
2072 return SessionResult::Close;
2073 }
2074
2075 SessionResult::Continue
2076 }
2077
2078 pub fn timeout_status(&self) -> TimeoutStatus {
2079 if self.request_stream.is_main_phase() {
2080 match &self.response_stream {
2081 ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
2082 TimeoutStatus::WaitingForResponse
2083 }
2084 _ => TimeoutStatus::Response,
2085 }
2086 } else if self.keepalive_count > 0 {
2087 TimeoutStatus::WaitingForNewRequest
2088 } else {
2089 TimeoutStatus::Request
2090 }
2091 }
2092}
2093
2094impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
2095 fn ready(
2096 &mut self,
2097 session: Rc<RefCell<dyn crate::ProxySession>>,
2098 proxy: Rc<RefCell<dyn L7Proxy>>,
2099 metrics: &mut SessionMetrics,
2100 ) -> SessionResult {
2101 let session_result = self.ready_inner(session, proxy, metrics);
2102 if session_result == SessionResult::Upgrade {
2103 let response_storage = match &mut self.response_stream {
2104 ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
2105 _ => return SessionResult::Close,
2106 };
2107
2108 self.request_stream.storage.buffer.sync(
2111 self.request_stream.storage.end,
2112 self.request_stream.storage.head,
2113 );
2114 response_storage
2115 .buffer
2116 .sync(response_storage.end, response_storage.head);
2117 }
2118 session_result
2119 }
2120
2121 fn update_readiness(&mut self, token: Token, events: Ready) {
2122 if self.frontend_token == token {
2123 self.frontend_readiness.event |= events;
2124 } else if self.backend_token == Some(token) {
2125 self.backend_readiness.event |= events;
2126 }
2127 }
2128
2129 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
2130 self.close_backend(proxy, metrics);
2131 self.frontend_socket.socket_close();
2132 let _ = self.frontend_socket.socket_write_vectored(&[]);
2133
2134 if !self.request_stream.is_initial() {
2136 gauge_add!(names::http::ACTIVE_REQUESTS, -1);
2137
2138 if let Some(b) = self.backend.as_mut() {
2139 let mut backend = b.borrow_mut();
2140 backend.active_requests = backend.active_requests.saturating_sub(1);
2141 }
2142 }
2143 }
2144
2145 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
2146 if self.frontend_token == token {
2148 self.container_frontend_timeout.triggered();
2149 return match self.timeout_status() {
2150 TimeoutStatus::Request => {
2152 self.context.access_log_message = Some("client_timeout");
2153 self.set_answer(DefaultAnswer::Answer408 {
2154 duration: self.container_frontend_timeout.to_string(),
2155 });
2156 self.writable(metrics)
2157 }
2158 TimeoutStatus::WaitingForResponse => {
2160 self.context.access_log_message = Some("client_timeout_during_response");
2163 self.set_answer(DefaultAnswer::Answer504 {
2164 duration: self.container_backend_timeout.to_string(),
2165 });
2166 self.writable(metrics)
2167 }
2168 TimeoutStatus::Response => StateResult::Continue,
2171 TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
2173 };
2174 }
2175
2176 if self.backend_token == Some(token) {
2177 self.container_backend_timeout.triggered();
2179 return match self.timeout_status() {
2180 TimeoutStatus::Request => {
2181 error!(
2182 "{} got backend timeout while waiting for a request, this should not happen",
2183 log_context!(self)
2184 );
2185 self.context.access_log_message = Some("backend_timeout");
2190 self.set_answer(DefaultAnswer::Answer504 {
2191 duration: self.container_backend_timeout.to_string(),
2192 });
2193 self.writable(metrics)
2194 }
2195 TimeoutStatus::WaitingForResponse => {
2196 self.context.access_log_message = Some("backend_timeout");
2197 self.set_answer(DefaultAnswer::Answer504 {
2198 duration: self.container_backend_timeout.to_string(),
2199 });
2200 self.writable(metrics)
2201 }
2202 TimeoutStatus::Response => {
2203 error!(
2204 "backend {:?} timeout while receiving response (cluster {:?})",
2205 self.context.backend_id, self.context.cluster_id
2206 );
2207 self.context.access_log_message = Some("backend_response_timeout");
2208 StateResult::CloseSession
2209 }
2210 TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
2212 };
2213 }
2214
2215 error!("{} Got timeout for an invalid token", log_context!(self));
2216 StateResult::CloseSession
2217 }
2218
2219 fn cancel_timeouts(&mut self) {
2220 self.container_backend_timeout.cancel();
2221 self.container_frontend_timeout.cancel();
2222 }
2223
2224 fn print_state(&self, context: &str) {
2225 error!(
2226 "\
2227{} {} Session(Kawa)
2228\tFrontend:
2229\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2230\tBackend:
2231\t\ttoken: {:?}\treadiness: {:?}",
2232 log_context!(self),
2233 context,
2234 self.frontend_token,
2235 self.frontend_readiness,
2236 self.request_stream.parsing_phase,
2237 self.backend_token,
2238 self.backend_readiness,
2239 );
2241 }
2242
2243 fn shutting_down(&mut self) -> SessionIsToBeClosed {
2244 if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2245 {
2247 true
2248 } else {
2249 self.context.closing = true;
2250 false
2251 }
2252 }
2253}
2254
2255fn handle_connection_result(
2256 connection_result: Result<BackendConnectAction, BackendConnectionError>,
2257) -> Option<SessionResult> {
2258 match connection_result {
2259 Ok(BackendConnectAction::Reuse) => None,
2261 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2262 Some(SessionResult::Continue)
2264 }
2265 Err(_) => {
2266 None
2274 }
2275 }
2276}
2277
2278fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2290 if let Some(status) = status {
2291 match status {
2292 100..=199 => {
2293 incr!(
2294 names::http::STATUS_1XX,
2295 context.cluster_id,
2296 context.backend_id
2297 );
2298 }
2299 200..=299 => {
2300 incr!(
2301 names::http::STATUS_2XX,
2302 context.cluster_id,
2303 context.backend_id
2304 );
2305 }
2306 300..=399 => {
2307 incr!(
2308 names::http::STATUS_3XX,
2309 context.cluster_id,
2310 context.backend_id
2311 );
2312 }
2313 400..=499 => {
2314 incr!(
2315 names::http::STATUS_4XX,
2316 context.cluster_id,
2317 context.backend_id
2318 );
2319 }
2320 500..=599 => {
2321 incr!(
2322 names::http::STATUS_5XX,
2323 context.cluster_id,
2324 context.backend_id
2325 );
2326 }
2327 _ => {
2328 incr!(names::http::STATUS_OTHER);
2330 }
2331 }
2332
2333 if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
2334 incr!(per_code, context.cluster_id, context.backend_id);
2335 }
2336 }
2337}