1pub mod answers;
2pub mod diagnostics;
3pub mod editor;
4pub mod parser;
5
6use std::{
7 cell::RefCell,
8 io::ErrorKind,
9 net::{Shutdown, SocketAddr},
10 rc::{Rc, Weak},
11 time::{Duration, Instant},
12};
13
14use mio::{Interest, Token, net::TcpStream};
15use rusty_ulid::Ulid;
16use sozu_command::{
17 config::MAX_LOOP_ITERATIONS,
18 logging::EndpointRecord,
19 proto::command::{Event, EventKind, ListenerType},
20};
21
22use crate::{
24 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
25 L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness,
26 RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
27 backends::{Backend, BackendError},
28 pool::{Checkout, Pool},
29 protocol::{
30 SessionState,
31 http::{
32 answers::DefaultAnswerStream,
33 diagnostics::{diagnostic_400_502, diagnostic_413_507},
34 editor::HttpContext,
35 parser::Method,
36 },
37 pipe::WebSocketContext,
38 },
39 retry::RetryPolicy,
40 router::Route,
41 server::{CONN_RETRIES, push_event},
42 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
43 sozu_command::{logging::LogContext, ready::Ready},
44 timer::TimeoutContainer,
45};
46
47macro_rules! log_context {
50 ($self:expr) => {
51 format!(
52 "KAWA-H1\t{}\tSession(public={}, session={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
53 $self.context.log_context(),
54 $self.context.public_address.to_string(),
55 $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
56 $self.frontend_token.0,
57 $self.frontend_readiness,
58 $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
59 $self.backend_readiness,
60 )
61 };
62}
63
64type GenericHttpStream = kawa::Kawa<Checkout>;
66
67impl kawa::AsBuffer for Checkout {
68 fn as_buffer(&self) -> &[u8] {
69 self.inner.extra()
70 }
71 fn as_mut_buffer(&mut self) -> &mut [u8] {
72 self.inner.extra_mut()
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum DefaultAnswer {
78 Answer301 {
79 location: String,
80 },
81 Answer400 {
82 message: String,
83 phase: kawa::ParsingPhaseMarker,
84 successfully_parsed: String,
85 partially_parsed: String,
86 invalid: String,
87 },
88 Answer401 {},
89 Answer404 {},
90 Answer408 {
91 duration: String,
92 },
93 Answer413 {
94 message: String,
95 phase: kawa::ParsingPhaseMarker,
96 capacity: usize,
97 },
98 Answer502 {
99 message: String,
100 phase: kawa::ParsingPhaseMarker,
101 successfully_parsed: String,
102 partially_parsed: String,
103 invalid: String,
104 },
105 Answer503 {
106 message: String,
107 },
108 Answer504 {
109 duration: String,
110 },
111 Answer507 {
112 phase: kawa::ParsingPhaseMarker,
113 message: String,
114 capacity: usize,
115 },
116}
117
118impl From<&DefaultAnswer> for u16 {
119 fn from(answer: &DefaultAnswer) -> u16 {
120 match answer {
121 DefaultAnswer::Answer301 { .. } => 301,
122 DefaultAnswer::Answer400 { .. } => 400,
123 DefaultAnswer::Answer401 { .. } => 401,
124 DefaultAnswer::Answer404 { .. } => 404,
125 DefaultAnswer::Answer408 { .. } => 408,
126 DefaultAnswer::Answer413 { .. } => 413,
127 DefaultAnswer::Answer502 { .. } => 502,
128 DefaultAnswer::Answer503 { .. } => 503,
129 DefaultAnswer::Answer504 { .. } => 504,
130 DefaultAnswer::Answer507 { .. } => 507,
131 }
132 }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum TimeoutStatus {
137 Request,
138 Response,
139 WaitingForNewRequest,
140 WaitingForResponse,
141}
142
143pub enum ResponseStream {
144 BackendAnswer(GenericHttpStream),
145 DefaultAnswer(u16, DefaultAnswerStream),
146}
147
148pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
150 answers: Rc<RefCell<answers::HttpAnswers>>,
151 pub backend: Option<Rc<RefCell<Backend>>>,
152 backend_connection_status: BackendConnectionStatus,
153 pub backend_readiness: Readiness,
154 pub backend_socket: Option<TcpStream>,
155 backend_stop: Option<Instant>,
156 pub backend_token: Option<Token>,
157 pub container_backend_timeout: TimeoutContainer,
158 pub container_frontend_timeout: TimeoutContainer,
159 configured_backend_timeout: Duration,
160 configured_connect_timeout: Duration,
161 configured_frontend_timeout: Duration,
162 connection_attempts: u8,
164 pub frontend_readiness: Readiness,
165 pub frontend_socket: Front,
166 frontend_token: Token,
167 keepalive_count: usize,
168 listener: Rc<RefCell<L>>,
169 pub request_stream: GenericHttpStream,
170 pub response_stream: ResponseStream,
171 pub context: HttpContext,
176}
177
178impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
179 #[allow(clippy::too_many_arguments)]
188 pub fn new(
189 answers: Rc<RefCell<answers::HttpAnswers>>,
190 configured_backend_timeout: Duration,
191 configured_connect_timeout: Duration,
192 configured_frontend_timeout: Duration,
193 container_frontend_timeout: TimeoutContainer,
194 frontend_socket: Front,
195 frontend_token: Token,
196 listener: Rc<RefCell<L>>,
197 pool: Weak<RefCell<Pool>>,
198 protocol: Protocol,
199 public_address: SocketAddr,
200 request_id: Ulid,
201 session_address: Option<SocketAddr>,
202 sticky_name: String,
203 ) -> Result<Http<Front, L>, AcceptError> {
204 let (front_buffer, back_buffer) = match pool.upgrade() {
205 Some(pool) => {
206 let mut pool = pool.borrow_mut();
207 match (pool.checkout(), pool.checkout()) {
208 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
209 _ => return Err(AcceptError::BufferCapacityReached),
210 }
211 }
212 None => return Err(AcceptError::BufferCapacityReached),
213 };
214 Ok(Http {
215 answers,
216 backend_connection_status: BackendConnectionStatus::NotConnected,
217 backend_readiness: Readiness::new(),
218 backend_socket: None,
219 backend_stop: None,
220 backend_token: None,
221 backend: None,
222 configured_backend_timeout,
223 configured_connect_timeout,
224 configured_frontend_timeout,
225 connection_attempts: 0,
226 container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
227 container_frontend_timeout,
228 frontend_readiness: Readiness {
229 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
230 event: Ready::EMPTY,
231 },
232 frontend_socket,
233 frontend_token,
234 keepalive_count: 0,
235 listener,
236 request_stream: GenericHttpStream::new(
237 kawa::Kind::Request,
238 kawa::Buffer::new(front_buffer),
239 ),
240 response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
241 kawa::Kind::Response,
242 kawa::Buffer::new(back_buffer),
243 )),
244 context: HttpContext::new(
245 request_id,
246 protocol,
247 public_address,
248 session_address,
249 sticky_name,
250 ),
251 })
252 }
253
254 pub fn reset(&mut self) {
256 trace!("{} ============== reset", log_context!(self));
257 let response_stream = match &mut self.response_stream {
258 ResponseStream::BackendAnswer(response_stream) => response_stream,
259 _ => return,
260 };
261
262 self.context.id = Ulid::generate();
263 self.context.reset();
264
265 self.request_stream.clear();
266 response_stream.clear();
267 self.keepalive_count += 1;
268 gauge_add!("http.active_requests", -1);
269
270 if let Some(backend) = &mut self.backend {
271 let mut backend = backend.borrow_mut();
272 backend.active_requests = backend.active_requests.saturating_sub(1);
273 }
274
275 self.container_backend_timeout.cancel();
278 self.container_frontend_timeout
279 .set_duration(self.configured_frontend_timeout);
280 self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
281 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
282
283 let response_storage = &mut response_stream.storage;
290 if !response_storage.is_empty() {
291 warn!(
292 "{} Leftover fragment from response: {}",
293 log_context!(self),
294 parser::view(
295 response_storage.used(),
296 16,
297 &[response_storage.start, response_storage.end,],
298 )
299 );
300 }
301
302 response_storage.clear();
303 if !self.request_stream.storage.is_empty() {
304 self.frontend_readiness.event.insert(Ready::READABLE);
305 } else {
306 self.request_stream.storage.clear();
307 }
308 }
309
310 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
311 trace!("{} ============== readable", log_context!(self));
312 if !self.container_frontend_timeout.reset() {
313 error!(
314 "could not reset front timeout {:?}",
315 self.configured_frontend_timeout
316 );
317 self.print_state(self.protocol_string());
318 }
319
320 let response_stream = match &mut self.response_stream {
321 ResponseStream::BackendAnswer(response_stream) => response_stream,
322 ResponseStream::DefaultAnswer(..) => {
323 error!(
324 "{} Sending default answer, should not read from frontend socket",
325 log_context!(self)
326 );
327
328 self.frontend_readiness.interest.remove(Ready::READABLE);
329 self.frontend_readiness.interest.insert(Ready::WRITABLE);
330 return StateResult::Continue;
331 }
332 };
333
334 if self.request_stream.storage.is_full() {
335 self.frontend_readiness.interest.remove(Ready::READABLE);
336 if self.request_stream.is_main_phase() {
337 self.backend_readiness.interest.insert(Ready::WRITABLE);
338 } else {
339 self.set_answer(DefaultAnswer::Answer413 {
341 capacity: self.request_stream.storage.capacity(),
342 phase: self.request_stream.parsing_phase.marker(),
343 message: diagnostic_413_507(self.request_stream.parsing_phase),
344 });
345 }
346 return StateResult::Continue;
347 }
348
349 let (size, socket_state) = self
350 .frontend_socket
351 .socket_read(self.request_stream.storage.space());
352
353 debug!("{} Read {} bytes", log_context!(self), size);
354
355 if size > 0 {
356 self.request_stream.storage.fill(size);
357 count!("bytes_in", size as i64);
358 metrics.bin += size;
359 } else {
363 self.frontend_readiness.event.remove(Ready::READABLE);
364 }
365
366 match socket_state {
367 SocketResult::Error | SocketResult::Closed => {
368 if self.request_stream.is_initial() {
369 if self.keepalive_count == 0 {
374 self.frontend_socket.read_error();
375 }
376 } else {
377 self.frontend_socket.read_error();
378 self.log_request_error(
379 metrics,
380 &format!(
381 "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
382 self.frontend_readiness,
383 self.backend_readiness,
384 )
385 );
386 }
387 return StateResult::CloseSession;
388 }
389 SocketResult::WouldBlock => {
390 self.frontend_readiness.event.remove(Ready::READABLE);
391 }
392 SocketResult::Continue => {}
393 };
394
395 trace!("{} ============== readable_parse", log_context!(self));
396 let was_initial = self.request_stream.is_initial();
397 let was_not_proxying = !self.request_stream.is_main_phase();
398
399 kawa::h1::parse(&mut self.request_stream, &mut self.context);
400 if was_initial && !self.request_stream.is_initial() {
403 self.container_frontend_timeout
407 .set_duration(self.configured_frontend_timeout);
408 gauge_add!("http.active_requests", 1);
409 incr!("http.requests");
410 }
411
412 if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
413 incr!("http.frontend_parse_errors");
414 warn!(
415 "{} Parsing request error in {:?}: {}",
416 log_context!(self),
417 marker,
418 match kind {
419 kawa::ParsingErrorKind::Consuming { index } => {
420 let kawa = &self.request_stream;
421 parser::view(
422 kawa.storage.used(),
423 16,
424 &[
425 kawa.storage.start,
426 kawa.storage.head,
427 index as usize,
428 kawa.storage.end,
429 ],
430 )
431 }
432 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
433 }
434 );
435 if response_stream.consumed {
436 self.log_request_error(metrics, "Parsing error on the request");
437 return StateResult::CloseSession;
438 } else {
439 let (message, successfully_parsed, partially_parsed, invalid) =
440 diagnostic_400_502(marker, kind, &self.request_stream);
441 self.set_answer(DefaultAnswer::Answer400 {
442 message,
443 phase: marker,
444 successfully_parsed,
445 partially_parsed,
446 invalid,
447 });
448 return StateResult::Continue;
449 }
450 }
451
452 if self.request_stream.is_main_phase() {
453 self.backend_readiness.interest.insert(Ready::WRITABLE);
454 if was_not_proxying {
455 trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
458 return StateResult::ConnectBackend;
459 }
460 }
461 if self.request_stream.is_terminated() {
462 self.frontend_readiness.interest.remove(Ready::READABLE);
463 }
464
465 StateResult::Continue
466 }
467
468 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
469 trace!("{} ============== writable", log_context!(self));
470 let response_stream = match &mut self.response_stream {
471 ResponseStream::BackendAnswer(response_stream) => response_stream,
472 _ => return self.writable_default_answer(metrics),
473 };
474
475 response_stream.prepare(&mut kawa::h1::BlockConverter);
476
477 let bufs = response_stream.as_io_slice();
478 if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
479 self.frontend_readiness.interest.remove(Ready::WRITABLE);
480 }
482
483 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
484
485 debug!("{} Wrote {} bytes", log_context!(self), size);
486
487 if size > 0 {
488 response_stream.consume(size);
489 count!("bytes_out", size as i64);
490 metrics.bout += size;
491 self.backend_readiness.interest.insert(Ready::READABLE);
492 }
493
494 match socket_state {
495 SocketResult::Error | SocketResult::Closed => {
496 self.frontend_socket.write_error();
497 self.log_request_error(
498 metrics,
499 &format!(
500 "front socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
501 self.frontend_readiness,
502 self.backend_readiness,
503 ),
504 );
505 return StateResult::CloseSession;
506 }
507 SocketResult::WouldBlock => {
508 self.frontend_readiness.event.remove(Ready::WRITABLE);
509 }
510 SocketResult::Continue => {}
511 }
512
513 if self.frontend_socket.socket_wants_write() {
514 return StateResult::Continue;
515 }
516
517 if response_stream.is_terminated() && response_stream.is_completed() {
518 if self.context.closing {
519 debug!("{} closing proxy, no keep alive", log_context!(self));
520 self.log_request_success(metrics);
521 return StateResult::CloseSession;
522 }
523
524 match response_stream.detached.status_line {
525 kawa::StatusLine::Response { code: 101, .. } => {
526 trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
527 self.log_request_success(metrics);
528 return StateResult::Upgrade;
529 }
530 kawa::StatusLine::Response { code: 100, .. } => {
531 trace!("{} ============== HANDLE CONTINUE!", log_context!(self));
532 response_stream.clear();
533 self.log_request_success(metrics);
534 return StateResult::Continue;
535 }
536 kawa::StatusLine::Response { code: 103, .. } => {
537 self.backend_readiness.event.insert(Ready::READABLE);
538 trace!("{} ============== HANDLE EARLY HINT!", log_context!(self));
539 response_stream.clear();
540 self.log_request_success(metrics);
541 return StateResult::Continue;
542 }
543 _ => (),
544 }
545
546 let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
547 let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
548 if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
549 && request_length_known
550 {
551 error!(
552 "{} Response terminated before request, this case is not handled properly yet",
553 log_context!(self)
554 );
555 incr!("http.early_response_close");
556 }
559
560 trace!(
565 "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
566 log_context!(self),
567 self.context.keep_alive_frontend,
568 self.context.keep_alive_backend,
569 response_length_known
570 );
571
572 self.log_request_success(metrics);
573 return match (
574 self.context.keep_alive_frontend,
575 self.context.keep_alive_backend,
576 response_length_known,
577 ) {
578 (true, true, true) => {
579 debug!("{} Keep alive frontend/backend", log_context!(self));
580 metrics.reset();
581 self.reset();
582 StateResult::Continue
583 }
584 (true, false, true) => {
585 debug!("{} Keep alive frontend", log_context!(self));
586 metrics.reset();
587 self.reset();
588 StateResult::CloseBackend
589 }
590 _ => {
591 debug!("{} No keep alive", log_context!(self));
592 StateResult::CloseSession
593 }
594 };
595 }
596 StateResult::Continue
597 }
598
599 fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
600 trace!(
601 "{} ============== writable_default_answer",
602 log_context!(self)
603 );
604 let response_stream = match &mut self.response_stream {
605 ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
606 _ => return StateResult::CloseSession,
607 };
608 let bufs = response_stream.as_io_slice();
609 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
610
611 count!("bytes_out", size as i64);
612 metrics.bout += size;
613 response_stream.consume(size);
614
615 if size == 0 || socket_state != SocketResult::Continue {
616 self.frontend_readiness.event.remove(Ready::WRITABLE);
617 }
618
619 if response_stream.is_completed() {
620 save_http_status_metric(self.context.status, self.context.log_context());
621 self.log_default_answer_success(metrics);
622 self.frontend_readiness.reset();
623 self.backend_readiness.reset();
624 return StateResult::CloseSession;
625 }
626
627 if socket_state == SocketResult::Error {
628 self.frontend_socket.write_error();
629 self.log_request_error(
630 metrics,
631 "error writing default answer to front socket, closing",
632 );
633 StateResult::CloseSession
634 } else {
635 StateResult::Continue
636 }
637 }
638
639 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
640 trace!("{} ============== backend_writable", log_context!(self));
641 if let ResponseStream::DefaultAnswer(..) = self.response_stream {
642 error!(
643 "{}\tsending default answer, should not write to back",
644 log_context!(self)
645 );
646 self.backend_readiness.interest.remove(Ready::WRITABLE);
647 self.frontend_readiness.interest.insert(Ready::WRITABLE);
648 return SessionResult::Continue;
649 }
650
651 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
652 backend_socket
653 } else {
654 self.log_request_error(metrics, "back socket not found, closing session");
655 return SessionResult::Close;
656 };
657
658 self.request_stream.prepare(&mut kawa::h1::BlockConverter);
659
660 let bufs = self.request_stream.as_io_slice();
661 if bufs.is_empty() {
662 self.backend_readiness.interest.remove(Ready::WRITABLE);
663 return SessionResult::Continue;
664 }
665
666 let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
667 debug!("{} Wrote {} bytes", log_context!(self), size);
668
669 if size > 0 {
670 self.request_stream.consume(size);
671 count!("back_bytes_out", size as i64);
672 metrics.backend_bout += size;
673 self.frontend_readiness.interest.insert(Ready::READABLE);
674 self.backend_readiness.interest.insert(Ready::READABLE);
675 } else {
676 self.backend_readiness.event.remove(Ready::WRITABLE);
677 }
678
679 match socket_state {
680 SocketResult::Error | SocketResult::Closed => {
690 self.frontend_readiness.interest.remove(Ready::READABLE);
691 self.backend_readiness.interest.remove(Ready::WRITABLE);
692 return SessionResult::Continue;
693 }
694 SocketResult::WouldBlock => {
695 self.backend_readiness.event.remove(Ready::WRITABLE);
696 }
697 SocketResult::Continue => {}
698 }
699
700 if self.request_stream.is_terminated() && self.request_stream.is_completed() {
701 self.backend_readiness.interest.remove(Ready::WRITABLE);
702
703 self.container_frontend_timeout.cancel();
705 self.container_backend_timeout.reset();
706 }
707 SessionResult::Continue
708 }
709
710 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
712 trace!("{} ============== backend_readable", log_context!(self));
713 if !self.container_backend_timeout.reset() {
714 error!(
715 "{} Could not reset back timeout {:?}",
716 log_context!(self),
717 self.configured_backend_timeout
718 );
719 self.print_state(self.protocol_string());
720 }
721
722 let response_stream = match &mut self.response_stream {
723 ResponseStream::BackendAnswer(response_stream) => response_stream,
724 _ => {
725 error!(
726 "{} Sending default answer, should not read from backend socket",
727 log_context!(self),
728 );
729
730 self.backend_readiness.interest.remove(Ready::READABLE);
731 self.frontend_readiness.interest.insert(Ready::WRITABLE);
732 return SessionResult::Continue;
733 }
734 };
735
736 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
737 backend_socket
738 } else {
739 self.log_request_error(metrics, "back socket not found, closing session");
740 return SessionResult::Close;
741 };
742
743 if response_stream.storage.is_full() {
744 self.backend_readiness.interest.remove(Ready::READABLE);
745 if response_stream.is_main_phase() {
746 self.frontend_readiness.interest.insert(Ready::WRITABLE);
747 } else {
748 let capacity = response_stream.storage.capacity();
750 let phase = response_stream.parsing_phase.marker();
751 let message = diagnostic_413_507(response_stream.parsing_phase);
752 self.set_answer(DefaultAnswer::Answer507 {
753 capacity,
754 phase,
755 message,
756 });
757 }
758 return SessionResult::Continue;
759 }
760
761 let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
762 debug!("{} Read {} bytes", log_context!(self), size);
763
764 if size > 0 {
765 response_stream.storage.fill(size);
766 count!("back_bytes_in", size as i64);
767 metrics.backend_bin += size;
768 self.container_frontend_timeout.cancel();
779 } else {
780 self.backend_readiness.event.remove(Ready::READABLE);
781 }
782
783 match socket_state {
785 SocketResult::Error => {
786 backend_socket.read_error();
787 self.log_request_error(
788 metrics,
789 &format!(
790 "back socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
791 self.frontend_readiness,
792 self.backend_readiness,
793 ),
794 );
795 return SessionResult::Close;
796 }
797 SocketResult::WouldBlock | SocketResult::Closed => {
798 self.backend_readiness.event.remove(Ready::READABLE);
799 }
800 SocketResult::Continue => {}
801 }
802
803 trace!(
804 "{} ============== backend_readable_parse",
805 log_context!(self)
806 );
807 kawa::h1::parse(response_stream, &mut self.context);
808 if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
811 incr!("http.backend_parse_errors");
812 warn!(
813 "{} Parsing response error in {:?}: {}",
814 log_context!(self),
815 marker,
816 match kind {
817 kawa::ParsingErrorKind::Consuming { index } => {
818 parser::view(
819 response_stream.storage.used(),
820 16,
821 &[
822 response_stream.storage.start,
823 response_stream.storage.head,
824 index as usize,
825 response_stream.storage.end,
826 ],
827 )
828 }
829 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
830 }
831 );
832 if response_stream.consumed {
833 return SessionResult::Close;
834 } else {
835 let (message, successfully_parsed, partially_parsed, invalid) =
836 diagnostic_400_502(marker, kind, response_stream);
837 self.set_answer(DefaultAnswer::Answer502 {
838 message,
839 phase: marker,
840 successfully_parsed,
841 partially_parsed,
842 invalid,
843 });
844 return SessionResult::Continue;
845 }
846 }
847
848 if response_stream.is_main_phase() {
849 self.frontend_readiness.interest.insert(Ready::WRITABLE);
850 }
851 if response_stream.is_terminated() {
852 metrics.backend_stop();
853 self.backend_stop = Some(Instant::now());
854 self.backend_readiness.interest.remove(Ready::READABLE);
855 }
856 SessionResult::Continue
857 }
858}
859
860impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
861 fn log_endpoint(&self) -> EndpointRecord<'_> {
862 EndpointRecord::Http {
863 method: self.context.method.as_deref(),
864 authority: self.context.authority.as_deref(),
865 path: self.context.path.as_deref(),
866 reason: self.context.reason.as_deref(),
867 status: self.context.status,
868 }
869 }
870
871 pub fn get_session_address(&self) -> Option<SocketAddr> {
872 self.context
873 .session_address
874 .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
875 }
876
877 pub fn get_backend_address(&self) -> Option<SocketAddr> {
878 self.backend
879 .as_ref()
880 .map(|backend| backend.borrow().address)
881 .or_else(|| {
882 self.backend_socket
883 .as_ref()
884 .and_then(|backend| backend.peer_addr().ok())
885 })
886 }
887
888 fn protocol_string(&self) -> &'static str {
890 match self.context.protocol {
891 Protocol::HTTP => "HTTP",
892 Protocol::HTTPS => match self.frontend_socket.protocol() {
893 TransportProtocol::Ssl2 => "HTTPS-SSL2",
894 TransportProtocol::Ssl3 => "HTTPS-SSL3",
895 TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
896 TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
897 TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
898 TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
899 _ => unreachable!(),
900 },
901 _ => unreachable!(),
902 }
903 }
904
905 pub fn websocket_context(&self) -> WebSocketContext {
907 WebSocketContext::Http {
908 method: self.context.method.clone(),
909 authority: self.context.authority.clone(),
910 path: self.context.path.clone(),
911 reason: self.context.reason.clone(),
912 status: self.context.status,
913 }
914 }
915
916 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
917 let listener = self.listener.borrow();
918 let tags = self.context.authority.as_ref().and_then(|host| {
919 let hostname = match host.split_once(':') {
920 None => host,
921 Some((hostname, _)) => hostname,
922 };
923 listener.get_tags(hostname)
924 });
925
926 let context = self.context.log_context();
927 metrics.register_end_of_session(&context);
928
929 log_access! {
930 error,
931 on_failure: { incr!("unsent-access-logs") },
932 message,
933 context,
934 session_address: self.get_session_address(),
935 backend_address: self.get_backend_address(),
936 protocol: self.protocol_string(),
937 endpoint: self.log_endpoint(),
938 tags,
939 client_rtt: socket_rtt(self.front_socket()),
940 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
941 service_time: metrics.service_time(),
942 response_time: metrics.backend_response_time(),
943 request_time: metrics.request_time(),
944 bytes_in: metrics.bin,
945 bytes_out: metrics.bout,
946 user_agent: self.context.user_agent.as_deref(),
947 #[cfg(feature = "opentelemetry")]
948 otel: self.context.otel.as_ref(),
949 #[cfg(not(feature = "opentelemetry"))]
950 otel: None,
951 };
952 }
953
954 pub fn log_request_success(&self, metrics: &SessionMetrics) {
955 save_http_status_metric(self.context.status, self.context.log_context());
956 self.log_request(metrics, false, None);
957 }
958
959 pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
960 self.log_request(metrics, false, None);
961 }
962 pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
963 incr!("http.errors");
964 error!(
965 "{} Could not process request properly got: {}",
966 log_context!(self),
967 message
968 );
969 self.print_state(self.protocol_string());
970 self.log_request(metrics, true, Some(message));
971 }
972
973 pub fn set_answer(&mut self, answer: DefaultAnswer) {
974 let status = u16::from(&answer);
975 if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
976 error!(
977 "already set the default answer to {}, trying to set to {}",
978 old_status, status
979 );
980 } else {
981 match answer {
982 DefaultAnswer::Answer301 { .. } => incr!(
983 "http.301.redirection",
984 self.context.cluster_id.as_deref(),
985 self.context.backend_id.as_deref()
986 ),
987 DefaultAnswer::Answer400 { .. } => incr!("http.400.errors"),
988 DefaultAnswer::Answer401 { .. } => incr!(
989 "http.401.errors",
990 self.context.cluster_id.as_deref(),
991 self.context.backend_id.as_deref()
992 ),
993 DefaultAnswer::Answer404 { .. } => incr!("http.404.errors"),
994 DefaultAnswer::Answer408 { .. } => incr!(
995 "http.408.errors",
996 self.context.cluster_id.as_deref(),
997 self.context.backend_id.as_deref()
998 ),
999 DefaultAnswer::Answer413 { .. } => incr!(
1000 "http.413.errors",
1001 self.context.cluster_id.as_deref(),
1002 self.context.backend_id.as_deref()
1003 ),
1004 DefaultAnswer::Answer502 { .. } => incr!(
1005 "http.502.errors",
1006 self.context.cluster_id.as_deref(),
1007 self.context.backend_id.as_deref()
1008 ),
1009 DefaultAnswer::Answer503 { .. } => incr!(
1010 "http.503.errors",
1011 self.context.cluster_id.as_deref(),
1012 self.context.backend_id.as_deref()
1013 ),
1014 DefaultAnswer::Answer504 { .. } => incr!(
1015 "http.504.errors",
1016 self.context.cluster_id.as_deref(),
1017 self.context.backend_id.as_deref()
1018 ),
1019 DefaultAnswer::Answer507 { .. } => incr!(
1020 "http.507.errors",
1021 self.context.cluster_id.as_deref(),
1022 self.context.backend_id.as_deref()
1023 ),
1024 };
1025 }
1026
1027 let mut kawa = self.answers.borrow().get(
1028 answer,
1029 self.context.id.to_string(),
1030 self.context.cluster_id.as_deref(),
1031 self.context.backend_id.as_deref(),
1032 self.get_route(),
1033 );
1034 kawa.prepare(&mut kawa::h1::BlockConverter);
1035 self.context.status = Some(status);
1036 self.context.reason = None;
1037 self.context.keep_alive_frontend = false;
1038 self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1039 self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1040 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1041 }
1042
1043 pub fn test_backend_socket(&self) -> bool {
1044 match self.backend_socket {
1045 Some(ref s) => {
1046 let mut tmp = [0u8; 1];
1047 let res = s.peek(&mut tmp[..]);
1048
1049 match res {
1050 Ok(0) => false,
1052 Ok(_) => true,
1053 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1054 }
1055 }
1056 None => false,
1057 }
1058 }
1059
1060 pub fn is_valid_backend_socket(&self) -> bool {
1061 match self.backend_stop.as_ref() {
1063 Some(stop_instant) => {
1064 let now = Instant::now();
1065 let dur = now - *stop_instant;
1066 if dur > Duration::from_secs(1) {
1067 return self.test_backend_socket();
1068 }
1069 }
1070 None => return self.test_backend_socket(),
1071 }
1072
1073 true
1074 }
1075
1076 pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1077 self.backend_socket = Some(socket);
1078 self.backend = backend;
1079 }
1080
1081 pub fn set_cluster_id(&mut self, cluster_id: String) {
1082 self.context.cluster_id = Some(cluster_id);
1083 }
1084
1085 pub fn set_backend_id(&mut self, backend_id: String) {
1086 self.context.backend_id = Some(backend_id);
1087 }
1088
1089 pub fn set_backend_token(&mut self, token: Token) {
1090 self.backend_token = Some(token);
1091 }
1092
1093 pub fn clear_backend_token(&mut self) {
1094 self.backend_token = None;
1095 }
1096
1097 pub fn set_backend_timeout(&mut self, dur: Duration) {
1098 if let Some(token) = self.backend_token.as_ref() {
1099 self.container_backend_timeout.set_duration(dur);
1100 self.container_backend_timeout.set(*token);
1101 }
1102 }
1103
1104 pub fn front_socket(&self) -> &TcpStream {
1105 self.frontend_socket.socket_ref()
1106 }
1107
1108 fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1112 self.container_backend_timeout.cancel();
1113 debug!(
1114 "{}\tPROXY [{}->{}] CLOSED BACKEND",
1115 log_context!(self),
1116 self.frontend_token.0,
1117 self.backend_token
1118 .map(|t| format!("{}", t.0))
1119 .unwrap_or_else(|| "-".to_string())
1120 );
1121
1122 let proxy = proxy.borrow();
1123 if let Some(socket) = &mut self.backend_socket.take() {
1124 if let Err(e) = proxy.deregister_socket(socket) {
1125 error!(
1126 "{} Error deregistering back socket({:?}): {:?}",
1127 log_context!(self),
1128 socket,
1129 e
1130 );
1131 }
1132 if let Err(e) = socket.shutdown(Shutdown::Both) {
1133 if e.kind() != ErrorKind::NotConnected {
1134 error!(
1135 "{} Error shutting down back socket({:?}): {:?}",
1136 log_context!(self),
1137 socket,
1138 e
1139 );
1140 }
1141 }
1142 }
1143
1144 if let Some(token) = self.backend_token.take() {
1145 proxy.remove_session(token);
1146
1147 if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1148 self.backend_readiness.event = Ready::EMPTY;
1149 }
1150
1151 if self.backend_connection_status == BackendConnectionStatus::Connected {
1152 gauge_add!("backend.connections", -1);
1153 gauge_add!(
1154 "connections_per_backend",
1155 -1,
1156 self.context.cluster_id.as_deref(),
1157 metrics.backend_id.as_deref()
1158 );
1159 }
1160
1161 self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1162
1163 if let Some(backend) = self.backend.take() {
1164 backend.borrow_mut().dec_connections();
1165 }
1166 }
1167 }
1168
1169 fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1171 if self.connection_attempts >= CONN_RETRIES {
1172 error!(
1173 "{} Max connection attempt reached ({})",
1174 log_context!(self),
1175 self.connection_attempts,
1176 );
1177
1178 self.set_answer(DefaultAnswer::Answer503 {
1179 message: format!(
1180 "Max connection attempt reached: {}",
1181 self.connection_attempts
1182 ),
1183 });
1184 return Err(BackendConnectionError::MaxConnectionRetries(None));
1185 }
1186 Ok(())
1187 }
1188
1189 fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1190 let is_valid_backend_socket = self.is_valid_backend_socket();
1191
1192 if !is_valid_backend_socket {
1193 return false;
1194 }
1195
1196 metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1198
1199 metrics.backend_start();
1200 metrics.backend_connected();
1201 if let Some(b) = self.backend.as_mut() {
1202 b.borrow_mut().active_requests += 1;
1203 }
1204 true
1205 }
1206
1207 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1209 let given_method = self
1210 .context
1211 .method
1212 .as_ref()
1213 .ok_or(RetrieveClusterError::NoMethod)?;
1214 let given_authority = self
1215 .context
1216 .authority
1217 .as_deref()
1218 .ok_or(RetrieveClusterError::NoHost)?;
1219 let given_path = self
1220 .context
1221 .path
1222 .as_deref()
1223 .ok_or(RetrieveClusterError::NoPath)?;
1224
1225 Ok((given_authority, given_path, given_method))
1226 }
1227
1228 pub fn get_route(&self) -> String {
1229 if let Some(method) = &self.context.method {
1230 if let Some(authority) = &self.context.authority {
1231 if let Some(path) = &self.context.path {
1232 return format!("{method} {authority}{path}");
1233 }
1234 return format!("{method} {authority}");
1235 }
1236 return format!("{method}");
1237 }
1238 String::new()
1239 }
1240
1241 fn cluster_id_from_request(
1242 &mut self,
1243 proxy: Rc<RefCell<dyn L7Proxy>>,
1244 ) -> Result<String, RetrieveClusterError> {
1245 let (host, uri, method) = match self.extract_route() {
1246 Ok(tuple) => tuple,
1247 Err(cluster_error) => {
1248 self.set_answer(DefaultAnswer::Answer400 {
1249 message: "Could not extract the route after connection started, this should not happen.".into(),
1250 phase: self.request_stream.parsing_phase.marker(),
1251 successfully_parsed: "null".into(),
1252 partially_parsed: "null".into(),
1253 invalid: "null".into(),
1254 });
1255 return Err(cluster_error);
1256 }
1257 };
1258
1259 let route_result = self
1260 .listener
1261 .borrow()
1262 .frontend_from_request(host, uri, method);
1263
1264 let route = match route_result {
1265 Ok(route) => route,
1266 Err(frontend_error) => {
1267 self.set_answer(DefaultAnswer::Answer404 {});
1268 return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1269 }
1270 };
1271
1272 let cluster_id = match route {
1273 Route::ClusterId(cluster_id) => cluster_id,
1274 Route::Deny => {
1275 self.set_answer(DefaultAnswer::Answer401 {});
1276 return Err(RetrieveClusterError::UnauthorizedRoute);
1277 }
1278 };
1279
1280 let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1281 && proxy
1282 .borrow()
1283 .clusters()
1284 .get(&cluster_id)
1285 .map(|cluster| cluster.https_redirect)
1286 .unwrap_or(false);
1287
1288 if frontend_should_redirect_https {
1289 self.set_answer(DefaultAnswer::Answer301 {
1290 location: format!("https://{host}{uri}"),
1291 });
1292 return Err(RetrieveClusterError::UnauthorizedRoute);
1293 }
1294
1295 Ok(cluster_id)
1296 }
1297
1298 pub fn backend_from_request(
1299 &mut self,
1300 cluster_id: &str,
1301 frontend_should_stick: bool,
1302 proxy: Rc<RefCell<dyn L7Proxy>>,
1303 metrics: &mut SessionMetrics,
1304 ) -> Result<TcpStream, BackendConnectionError> {
1305 let (backend, conn) = self
1306 .get_backend_for_sticky_session(
1307 frontend_should_stick,
1308 self.context.sticky_session_found.as_deref(),
1309 cluster_id,
1310 proxy,
1311 )
1312 .map_err(|backend_error| {
1313 self.set_answer(DefaultAnswer::Answer503 {
1316 message: backend_error.to_string(),
1317 });
1318 BackendConnectionError::Backend(backend_error)
1319 })?;
1320
1321 if frontend_should_stick {
1322 self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1324
1325 self.context.sticky_session = Some(
1326 backend
1327 .borrow()
1328 .sticky_id
1329 .clone()
1330 .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1331 );
1332 }
1333
1334 metrics.backend_id = Some(backend.borrow().backend_id.clone());
1335 metrics.backend_start();
1336 self.set_backend_id(backend.borrow().backend_id.clone());
1337
1338 self.backend = Some(backend);
1339 Ok(conn)
1340 }
1341
1342 fn get_backend_for_sticky_session(
1343 &self,
1344 frontend_should_stick: bool,
1345 sticky_session: Option<&str>,
1346 cluster_id: &str,
1347 proxy: Rc<RefCell<dyn L7Proxy>>,
1348 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1349 match (frontend_should_stick, sticky_session) {
1350 (true, Some(sticky_session)) => proxy
1351 .borrow()
1352 .backends()
1353 .borrow_mut()
1354 .backend_from_sticky_session(cluster_id, sticky_session),
1355 _ => proxy
1356 .borrow()
1357 .backends()
1358 .borrow_mut()
1359 .backend_from_cluster_id(cluster_id),
1360 }
1361 }
1362
1363 fn connect_to_backend(
1364 &mut self,
1365 session_rc: Rc<RefCell<dyn ProxySession>>,
1366 proxy: Rc<RefCell<dyn L7Proxy>>,
1367 metrics: &mut SessionMetrics,
1368 ) -> Result<BackendConnectAction, BackendConnectionError> {
1369 let old_cluster_id = self.context.cluster_id.clone();
1370 let old_backend_token = self.backend_token;
1371
1372 self.check_circuit_breaker()?;
1373
1374 let cluster_id = self
1375 .cluster_id_from_request(proxy.clone())
1376 .map_err(BackendConnectionError::RetrieveClusterError)?;
1377
1378 trace!(
1379 "{} Connect_to_backend: {:?} {:?} {:?}",
1380 log_context!(self),
1381 self.context.cluster_id,
1382 cluster_id,
1383 self.backend_connection_status
1384 );
1385 if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1387 && self.backend_connection_status == BackendConnectionStatus::Connected
1388 {
1389 let has_backend = self
1390 .backend
1391 .as_ref()
1392 .map(|backend| {
1393 let backend = backend.borrow();
1394 proxy
1395 .borrow()
1396 .backends()
1397 .borrow()
1398 .has_backend(&cluster_id, &backend)
1399 })
1400 .unwrap_or(false);
1401
1402 if has_backend && self.check_backend_connection(metrics) {
1403 return Ok(BackendConnectAction::Reuse);
1404 } else if self.backend_token.take().is_some() {
1405 self.close_backend(proxy.clone(), metrics);
1406 }
1407 }
1408
1409 if old_cluster_id.is_some()
1411 && old_cluster_id.as_ref() != Some(&cluster_id)
1412 && self.backend_token.take().is_some()
1413 {
1414 self.close_backend(proxy.clone(), metrics);
1415 }
1416
1417 self.context.cluster_id = Some(cluster_id.clone());
1418
1419 let frontend_should_stick = proxy
1420 .borrow()
1421 .clusters()
1422 .get(&cluster_id)
1423 .map(|cluster| cluster.sticky_session)
1424 .unwrap_or(false);
1425
1426 let mut socket =
1427 self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1428 if let Err(e) = socket.set_nodelay(true) {
1429 error!(
1430 "{} Error setting nodelay on backend socket({:?}): {:?}",
1431 log_context!(self),
1432 socket,
1433 e
1434 );
1435 }
1436
1437 self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1438 self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1439
1440 match old_backend_token {
1441 Some(backend_token) => {
1442 self.set_backend_token(backend_token);
1443 if let Err(e) = proxy.borrow().register_socket(
1444 &mut socket,
1445 backend_token,
1446 Interest::READABLE | Interest::WRITABLE,
1447 ) {
1448 error!(
1449 "{} Error registering back socket({:?}): {:?}",
1450 log_context!(self),
1451 socket,
1452 e
1453 );
1454 }
1455
1456 self.set_backend_socket(socket, self.backend.clone());
1457 self.set_backend_timeout(self.configured_connect_timeout);
1458
1459 Ok(BackendConnectAction::Replace)
1460 }
1461 None => {
1462 let backend_token = proxy.borrow().add_session(session_rc);
1463
1464 if let Err(e) = proxy.borrow().register_socket(
1465 &mut socket,
1466 backend_token,
1467 Interest::READABLE | Interest::WRITABLE,
1468 ) {
1469 error!(
1470 "{} Error registering back socket({:?}): {:?}",
1471 log_context!(self),
1472 socket,
1473 e
1474 );
1475 }
1476
1477 self.set_backend_socket(socket, self.backend.clone());
1478 self.set_backend_token(backend_token);
1479 self.set_backend_timeout(self.configured_connect_timeout);
1480
1481 Ok(BackendConnectAction::New)
1482 }
1483 }
1484 }
1485
1486 fn set_backend_connected(
1487 &mut self,
1488 connected: BackendConnectionStatus,
1489 metrics: &mut SessionMetrics,
1490 ) {
1491 let last = self.backend_connection_status;
1492 self.backend_connection_status = connected;
1493
1494 if connected == BackendConnectionStatus::Connected {
1495 gauge_add!("backend.connections", 1);
1496 gauge_add!(
1497 "connections_per_backend",
1498 1,
1499 self.context.cluster_id.as_deref(),
1500 metrics.backend_id.as_deref()
1501 );
1502
1503 self.set_backend_timeout(self.configured_backend_timeout);
1506 if !self.backend_readiness.interest.is_readable() {
1509 self.container_backend_timeout.cancel();
1510 }
1511
1512 if let Some(backend) = &self.backend {
1513 let mut backend = backend.borrow_mut();
1514
1515 if backend.retry_policy.is_down() {
1516 incr!(
1517 "backend.up",
1518 self.context.cluster_id.as_deref(),
1519 metrics.backend_id.as_deref()
1520 );
1521
1522 info!(
1523 "backend server {} at {} is up",
1524 backend.backend_id, backend.address
1525 );
1526
1527 push_event(Event {
1528 kind: EventKind::BackendUp as i32,
1529 backend_id: Some(backend.backend_id.to_owned()),
1530 address: Some(backend.address.into()),
1531 cluster_id: None,
1532 });
1533 }
1534
1535 if let BackendConnectionStatus::Connecting(start) = last {
1536 backend.set_connection_time(Instant::now() - start);
1537 }
1538
1539 backend.failures = 0;
1541 backend.active_requests += 1;
1542 backend.retry_policy.succeed();
1543 }
1544 }
1545 }
1546
1547 fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1548 if let Some(backend) = &self.backend {
1549 let mut backend = backend.borrow_mut();
1550 backend.failures += 1;
1551
1552 let already_unavailable = backend.retry_policy.is_down();
1553 backend.retry_policy.fail();
1554 incr!(
1555 "backend.connections.error",
1556 self.context.cluster_id.as_deref(),
1557 metrics.backend_id.as_deref()
1558 );
1559
1560 if !already_unavailable && backend.retry_policy.is_down() {
1561 error!(
1562 "{} backend server {} at {} is down",
1563 log_context!(self),
1564 backend.backend_id,
1565 backend.address
1566 );
1567
1568 incr!(
1569 "backend.down",
1570 self.context.cluster_id.as_deref(),
1571 metrics.backend_id.as_deref()
1572 );
1573
1574 push_event(Event {
1575 kind: EventKind::BackendDown as i32,
1576 backend_id: Some(backend.backend_id.to_owned()),
1577 address: Some(backend.address.into()),
1578 cluster_id: None,
1579 });
1580 }
1581 }
1582 }
1583
1584 pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1585 let response_stream = match &mut self.response_stream {
1586 ResponseStream::BackendAnswer(response_stream) => response_stream,
1587 _ => return StateResult::CloseBackend,
1588 };
1589
1590 if self.backend_readiness.event.is_readable()
1592 && self.backend_readiness.interest.is_readable()
1593 {
1594 return StateResult::Continue;
1595 }
1596
1597 if response_stream.is_terminated() {
1599 return StateResult::CloseBackend;
1600 }
1601 match (
1602 self.request_stream.is_initial(),
1603 response_stream.is_initial(),
1604 ) {
1605 (_, false) => {
1608 error!(
1609 "{} Backend closed before session is over",
1610 log_context!(self),
1611 );
1612
1613 trace!(
1614 "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1615 log_context!(self)
1616 );
1617
1618 response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1619
1620 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1623 StateResult::Continue
1624 }
1625 (true, true) => {
1627 trace!(
1628 "{} Backend hanged up in between requests",
1629 log_context!(self)
1630 );
1631 StateResult::CloseBackend
1632 }
1633 (false, true) => {
1635 error!(
1636 "{} Frontend transmitted data but the back closed",
1637 log_context!(self)
1638 );
1639
1640 self.set_answer(DefaultAnswer::Answer503 {
1641 message: "Backend closed after consuming part of the request".into(),
1642 });
1643
1644 self.backend_readiness.interest = Ready::EMPTY;
1645 StateResult::Continue
1646 }
1647 }
1648 }
1649
1650 fn ready_inner(
1662 &mut self,
1663 session: Rc<RefCell<dyn crate::ProxySession>>,
1664 proxy: Rc<RefCell<dyn L7Proxy>>,
1665 metrics: &mut SessionMetrics,
1666 ) -> SessionResult {
1667 let mut counter = 0;
1668
1669 if self.backend_connection_status.is_connecting()
1670 && !self.backend_readiness.event.is_empty()
1671 {
1672 if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1673 error!(
1675 "{} Error connecting to backend, trying again, attempt {}",
1676 log_context!(self),
1677 self.connection_attempts
1678 );
1679
1680 self.connection_attempts += 1;
1681 self.fail_backend_connection(metrics);
1682
1683 self.backend_connection_status =
1684 BackendConnectionStatus::Connecting(Instant::now());
1685
1686 self.close_backend(proxy.clone(), metrics);
1688
1689 let connection_result =
1690 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1691 if let Err(err) = &connection_result {
1692 error!(
1693 "{} Error connecting to backend: {}",
1694 log_context!(self),
1695 err
1696 );
1697 }
1698
1699 if let Some(session_result) = handle_connection_result(connection_result) {
1700 return session_result;
1701 }
1702 } else {
1703 metrics.backend_connected();
1704 self.connection_attempts = 0;
1705 self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1706 self.backend_readiness.interest.insert(Ready::READABLE);
1709 }
1710 }
1711
1712 if self.frontend_readiness.event.is_hup() {
1713 if !self.request_stream.is_initial() {
1714 self.log_request_error(metrics, "Client disconnected abruptly");
1715 }
1716 return SessionResult::Close;
1717 }
1718
1719 while counter < MAX_LOOP_ITERATIONS {
1720 let frontend_interest = self.frontend_readiness.filter_interest();
1721 let backend_interest = self.backend_readiness.filter_interest();
1722
1723 trace!(
1724 "{} Frontend interest({:?}) and backend interest({:?})",
1725 log_context!(self),
1726 frontend_interest,
1727 backend_interest,
1728 );
1729
1730 if frontend_interest.is_empty() && backend_interest.is_empty() {
1731 break;
1732 }
1733
1734 if self.backend_readiness.event.is_hup()
1735 && self.frontend_readiness.interest.is_writable()
1736 && !self.frontend_readiness.event.is_writable()
1737 {
1738 break;
1739 }
1740
1741 if frontend_interest.is_readable() {
1742 let state_result = self.readable(metrics);
1743 trace!(
1744 "{} frontend_readable: {:?}",
1745 log_context!(self),
1746 state_result
1747 );
1748
1749 match state_result {
1750 StateResult::Continue => {}
1751 StateResult::ConnectBackend => {
1752 let connection_result =
1753 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1754 if let Err(err) = &connection_result {
1755 error!(
1756 "{} Error connecting to backend: {}",
1757 log_context!(self),
1758 err
1759 );
1760 }
1761
1762 if let Some(session_result) = handle_connection_result(connection_result) {
1763 return session_result;
1764 }
1765 }
1766 StateResult::CloseBackend => unreachable!(),
1767 StateResult::CloseSession => return SessionResult::Close,
1768 StateResult::Upgrade => return SessionResult::Upgrade,
1769 }
1770 }
1771
1772 if backend_interest.is_writable() {
1773 let session_result = self.backend_writable(metrics);
1774 trace!(
1775 "{} backend_writable: {:?}",
1776 log_context!(self),
1777 session_result
1778 );
1779 if session_result != SessionResult::Continue {
1780 return session_result;
1781 }
1782 }
1783
1784 if backend_interest.is_readable() {
1785 let session_result = self.backend_readable(metrics);
1786 trace!(
1787 "{} backend_readable: {:?}",
1788 log_context!(self),
1789 session_result
1790 );
1791 if session_result != SessionResult::Continue {
1792 return session_result;
1793 }
1794 }
1795
1796 if frontend_interest.is_writable() {
1797 let state_result = self.writable(metrics);
1798 trace!(
1799 "{} frontend_writable: {:?}",
1800 log_context!(self),
1801 state_result
1802 );
1803 match state_result {
1804 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1805 StateResult::CloseSession => return SessionResult::Close,
1806 StateResult::Upgrade => return SessionResult::Upgrade,
1807 StateResult::Continue => {}
1808 StateResult::ConnectBackend => unreachable!(),
1809 }
1810 }
1811
1812 if frontend_interest.is_error() {
1813 error!(
1814 "{} frontend socket error, disconnecting",
1815 log_context!(self)
1816 );
1817
1818 return SessionResult::Close;
1819 }
1820
1821 if backend_interest.is_hup() || backend_interest.is_error() {
1822 let state_result = self.backend_hup(metrics);
1823
1824 trace!("{} backend_hup: {:?}", log_context!(self), state_result);
1825 match state_result {
1826 StateResult::Continue => {}
1827 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1828 StateResult::CloseSession => return SessionResult::Close,
1829 StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
1830 }
1831 }
1832
1833 counter += 1;
1834 }
1835
1836 if counter >= MAX_LOOP_ITERATIONS {
1837 error!(
1838 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1839 log_context!(self),
1840 MAX_LOOP_ITERATIONS
1841 );
1842
1843 incr!("http.infinite_loop.error");
1844 self.print_state(self.protocol_string());
1845
1846 return SessionResult::Close;
1847 }
1848
1849 SessionResult::Continue
1850 }
1851
1852 pub fn timeout_status(&self) -> TimeoutStatus {
1853 if self.request_stream.is_main_phase() {
1854 match &self.response_stream {
1855 ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
1856 TimeoutStatus::WaitingForResponse
1857 }
1858 _ => TimeoutStatus::Response,
1859 }
1860 } else if self.keepalive_count > 0 {
1861 TimeoutStatus::WaitingForNewRequest
1862 } else {
1863 TimeoutStatus::Request
1864 }
1865 }
1866}
1867
1868impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
1869 fn ready(
1870 &mut self,
1871 session: Rc<RefCell<dyn crate::ProxySession>>,
1872 proxy: Rc<RefCell<dyn L7Proxy>>,
1873 metrics: &mut SessionMetrics,
1874 ) -> SessionResult {
1875 let session_result = self.ready_inner(session, proxy, metrics);
1876 if session_result == SessionResult::Upgrade {
1877 let response_storage = match &mut self.response_stream {
1878 ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
1879 _ => return SessionResult::Close,
1880 };
1881
1882 self.request_stream.storage.buffer.sync(
1885 self.request_stream.storage.end,
1886 self.request_stream.storage.head,
1887 );
1888 response_storage
1889 .buffer
1890 .sync(response_storage.end, response_storage.head);
1891 }
1892 session_result
1893 }
1894
1895 fn update_readiness(&mut self, token: Token, events: Ready) {
1896 if self.frontend_token == token {
1897 self.frontend_readiness.event |= events;
1898 } else if self.backend_token == Some(token) {
1899 self.backend_readiness.event |= events;
1900 }
1901 }
1902
1903 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1904 self.close_backend(proxy, metrics);
1905 self.frontend_socket.socket_close();
1906 let _ = self.frontend_socket.socket_write_vectored(&[]);
1907
1908 if !self.request_stream.is_initial() {
1910 gauge_add!("http.active_requests", -1);
1911
1912 if let Some(b) = self.backend.as_mut() {
1913 let mut backend = b.borrow_mut();
1914 backend.active_requests = backend.active_requests.saturating_sub(1);
1915 }
1916 }
1917 }
1918
1919 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1920 if self.frontend_token == token {
1922 self.container_frontend_timeout.triggered();
1923 return match self.timeout_status() {
1924 TimeoutStatus::Request => {
1926 self.set_answer(DefaultAnswer::Answer408 {
1927 duration: self.container_frontend_timeout.to_string(),
1928 });
1929 self.writable(metrics)
1930 }
1931 TimeoutStatus::WaitingForResponse => {
1933 self.set_answer(DefaultAnswer::Answer504 {
1936 duration: self.container_backend_timeout.to_string(),
1937 });
1938 self.writable(metrics)
1939 }
1940 TimeoutStatus::Response => StateResult::Continue,
1943 TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
1945 };
1946 }
1947
1948 if self.backend_token == Some(token) {
1949 self.container_backend_timeout.triggered();
1951 return match self.timeout_status() {
1952 TimeoutStatus::Request => {
1953 error!(
1954 "got backend timeout while waiting for a request, this should not happen"
1955 );
1956 self.set_answer(DefaultAnswer::Answer504 {
1957 duration: self.container_backend_timeout.to_string(),
1958 });
1959 self.writable(metrics)
1960 }
1961 TimeoutStatus::WaitingForResponse => {
1962 self.set_answer(DefaultAnswer::Answer504 {
1963 duration: self.container_backend_timeout.to_string(),
1964 });
1965 self.writable(metrics)
1966 }
1967 TimeoutStatus::Response => {
1968 error!(
1969 "backend {:?} timeout while receiving response (cluster {:?})",
1970 self.context.backend_id, self.context.cluster_id
1971 );
1972 StateResult::CloseSession
1973 }
1974 TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
1976 };
1977 }
1978
1979 error!("{} Got timeout for an invalid token", log_context!(self));
1980 StateResult::CloseSession
1981 }
1982
1983 fn cancel_timeouts(&mut self) {
1984 self.container_backend_timeout.cancel();
1985 self.container_frontend_timeout.cancel();
1986 }
1987
1988 fn print_state(&self, context: &str) {
1989 error!(
1990 "\
1991{} {} Session(Kawa)
1992\tFrontend:
1993\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
1994\tBackend:
1995\t\ttoken: {:?}\treadiness: {:?}",
1996 log_context!(self),
1997 context,
1998 self.frontend_token,
1999 self.frontend_readiness,
2000 self.request_stream.parsing_phase,
2001 self.backend_token,
2002 self.backend_readiness,
2003 );
2005 }
2006
2007 fn shutting_down(&mut self) -> SessionIsToBeClosed {
2008 if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2009 {
2011 true
2012 } else {
2013 self.context.closing = true;
2014 false
2015 }
2016 }
2017}
2018
2019fn handle_connection_result(
2020 connection_result: Result<BackendConnectAction, BackendConnectionError>,
2021) -> Option<SessionResult> {
2022 match connection_result {
2023 Ok(BackendConnectAction::Reuse) => None,
2025 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2026 Some(SessionResult::Continue)
2028 }
2029 Err(_) => {
2030 None
2038 }
2039 }
2040}
2041
2042fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2044 if let Some(status) = status {
2045 match status {
2046 100..=199 => {
2047 incr!("http.status.1xx", context.cluster_id, context.backend_id);
2048 }
2049 200..=299 => {
2050 incr!("http.status.2xx", context.cluster_id, context.backend_id);
2051 }
2052 300..=399 => {
2053 incr!("http.status.3xx", context.cluster_id, context.backend_id);
2054 }
2055 400..=499 => {
2056 incr!("http.status.4xx", context.cluster_id, context.backend_id);
2057 }
2058 500..=599 => {
2059 incr!("http.status.5xx", context.cluster_id, context.backend_id);
2060 }
2061 _ => {
2062 incr!("http.status.other");
2064 }
2065 }
2066 }
2067}