1pub mod answers;
2pub mod diagnostics;
3pub mod editor;
4pub mod parser;
5
6use std::{
7 cell::RefCell,
8 io::ErrorKind,
9 net::{Shutdown, SocketAddr},
10 rc::{Rc, Weak},
11 time::{Duration, Instant},
12};
13
14use mio::{net::TcpStream, Interest, Token};
15use rusty_ulid::Ulid;
16use sozu_command::{
17 config::MAX_LOOP_ITERATIONS,
18 logging::EndpointRecord,
19 proto::command::{Event, EventKind, ListenerType},
20};
21use crate::{
24 backends::{Backend, BackendError},
25 pool::{Checkout, Pool},
26 protocol::{
27 http::{
28 answers::DefaultAnswerStream,
29 diagnostics::{diagnostic_400_502, diagnostic_413_507},
30 editor::HttpContext,
31 parser::Method,
32 },
33 pipe::WebSocketContext,
34 SessionState,
35 },
36 retry::RetryPolicy,
37 router::Route,
38 server::{push_event, CONN_RETRIES},
39 socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
40 sozu_command::{logging::LogContext, ready::Ready},
41 timer::TimeoutContainer,
42 AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
43 L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession, Readiness,
44 RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult, StateResult,
45};
46
47macro_rules! log_context {
50 ($self:expr) => {
51 format!(
52 "KAWA-H1\t{}\tSession(public={}, session={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
53 $self.context.log_context(),
54 $self.context.public_address.to_string(),
55 $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
56 $self.frontend_token.0,
57 $self.frontend_readiness,
58 $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
59 $self.backend_readiness,
60 )
61 };
62}
63
64type GenericHttpStream = kawa::Kawa<Checkout>;
66
67impl kawa::AsBuffer for Checkout {
68 fn as_buffer(&self) -> &[u8] {
69 self.inner.extra()
70 }
71 fn as_mut_buffer(&mut self) -> &mut [u8] {
72 self.inner.extra_mut()
73 }
74}
75
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum DefaultAnswer {
78 Answer301 {
79 location: String,
80 },
81 Answer400 {
82 message: String,
83 phase: kawa::ParsingPhaseMarker,
84 successfully_parsed: String,
85 partially_parsed: String,
86 invalid: String,
87 },
88 Answer401 {},
89 Answer404 {},
90 Answer408 {
91 duration: String,
92 },
93 Answer413 {
94 message: String,
95 phase: kawa::ParsingPhaseMarker,
96 capacity: usize,
97 },
98 Answer502 {
99 message: String,
100 phase: kawa::ParsingPhaseMarker,
101 successfully_parsed: String,
102 partially_parsed: String,
103 invalid: String,
104 },
105 Answer503 {
106 message: String,
107 },
108 Answer504 {
109 duration: String,
110 },
111 Answer507 {
112 phase: kawa::ParsingPhaseMarker,
113 message: String,
114 capacity: usize,
115 },
116}
117
118impl From<&DefaultAnswer> for u16 {
119 fn from(answer: &DefaultAnswer) -> u16 {
120 match answer {
121 DefaultAnswer::Answer301 { .. } => 301,
122 DefaultAnswer::Answer400 { .. } => 400,
123 DefaultAnswer::Answer401 { .. } => 401,
124 DefaultAnswer::Answer404 { .. } => 404,
125 DefaultAnswer::Answer408 { .. } => 408,
126 DefaultAnswer::Answer413 { .. } => 413,
127 DefaultAnswer::Answer502 { .. } => 502,
128 DefaultAnswer::Answer503 { .. } => 503,
129 DefaultAnswer::Answer504 { .. } => 504,
130 DefaultAnswer::Answer507 { .. } => 507,
131 }
132 }
133}
134
135#[derive(Debug, Clone, Copy, PartialEq, Eq)]
136pub enum TimeoutStatus {
137 Request,
138 Response,
139 WaitingForNewRequest,
140 WaitingForResponse,
141}
142
143pub enum ResponseStream {
144 BackendAnswer(GenericHttpStream),
145 DefaultAnswer(u16, DefaultAnswerStream),
146}
147
148pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
150 answers: Rc<RefCell<answers::HttpAnswers>>,
151 pub backend: Option<Rc<RefCell<Backend>>>,
152 backend_connection_status: BackendConnectionStatus,
153 pub backend_readiness: Readiness,
154 pub backend_socket: Option<TcpStream>,
155 backend_stop: Option<Instant>,
156 pub backend_token: Option<Token>,
157 pub container_backend_timeout: TimeoutContainer,
158 pub container_frontend_timeout: TimeoutContainer,
159 configured_backend_timeout: Duration,
160 configured_connect_timeout: Duration,
161 configured_frontend_timeout: Duration,
162 connection_attempts: u8,
164 pub frontend_readiness: Readiness,
165 pub frontend_socket: Front,
166 frontend_token: Token,
167 keepalive_count: usize,
168 listener: Rc<RefCell<L>>,
169 pub request_stream: GenericHttpStream,
170 pub response_stream: ResponseStream,
171 pub context: HttpContext,
176}
177
178impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
179 #[allow(clippy::too_many_arguments)]
188 pub fn new(
189 answers: Rc<RefCell<answers::HttpAnswers>>,
190 configured_backend_timeout: Duration,
191 configured_connect_timeout: Duration,
192 configured_frontend_timeout: Duration,
193 container_frontend_timeout: TimeoutContainer,
194 frontend_socket: Front,
195 frontend_token: Token,
196 listener: Rc<RefCell<L>>,
197 pool: Weak<RefCell<Pool>>,
198 protocol: Protocol,
199 public_address: SocketAddr,
200 request_id: Ulid,
201 session_address: Option<SocketAddr>,
202 sticky_name: String,
203 ) -> Result<Http<Front, L>, AcceptError> {
204 let (front_buffer, back_buffer) = match pool.upgrade() {
205 Some(pool) => {
206 let mut pool = pool.borrow_mut();
207 match (pool.checkout(), pool.checkout()) {
208 (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
209 _ => return Err(AcceptError::BufferCapacityReached),
210 }
211 }
212 None => return Err(AcceptError::BufferCapacityReached),
213 };
214 Ok(Http {
215 answers,
216 backend_connection_status: BackendConnectionStatus::NotConnected,
217 backend_readiness: Readiness::new(),
218 backend_socket: None,
219 backend_stop: None,
220 backend_token: None,
221 backend: None,
222 configured_backend_timeout,
223 configured_connect_timeout,
224 configured_frontend_timeout,
225 connection_attempts: 0,
226 container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
227 container_frontend_timeout,
228 frontend_readiness: Readiness {
229 interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
230 event: Ready::EMPTY,
231 },
232 frontend_socket,
233 frontend_token,
234 keepalive_count: 0,
235 listener,
236 request_stream: GenericHttpStream::new(
237 kawa::Kind::Request,
238 kawa::Buffer::new(front_buffer),
239 ),
240 response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
241 kawa::Kind::Response,
242 kawa::Buffer::new(back_buffer),
243 )),
244 context: HttpContext {
245 id: request_id,
246 backend_id: None,
247 cluster_id: None,
248
249 closing: false,
250 keep_alive_backend: true,
251 keep_alive_frontend: true,
252 protocol,
253 public_address,
254 session_address,
255 sticky_name,
256 sticky_session: None,
257 sticky_session_found: None,
258
259 method: None,
260 authority: None,
261 path: None,
262 status: None,
263 reason: None,
264 user_agent: None,
265 },
266 })
267 }
268
269 pub fn reset(&mut self) {
271 trace!("{} ============== reset", log_context!(self));
272 let response_stream = match &mut self.response_stream {
273 ResponseStream::BackendAnswer(response_stream) => response_stream,
274 _ => return,
275 };
276
277 self.context.id = Ulid::generate();
278 self.context.reset();
279
280 self.request_stream.clear();
281 response_stream.clear();
282 self.keepalive_count += 1;
283 gauge_add!("http.active_requests", -1);
284
285 if let Some(backend) = &mut self.backend {
286 let mut backend = backend.borrow_mut();
287 backend.active_requests = backend.active_requests.saturating_sub(1);
288 }
289
290 self.container_backend_timeout.cancel();
293 self.container_frontend_timeout
294 .set_duration(self.configured_frontend_timeout);
295 self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
296 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
297
298 let response_storage = &mut response_stream.storage;
305 if !response_storage.is_empty() {
306 warn!(
307 "{} Leftover fragment from response: {}",
308 log_context!(self),
309 parser::view(
310 response_storage.used(),
311 16,
312 &[response_storage.start, response_storage.end,],
313 )
314 );
315 }
316
317 response_storage.clear();
318 if !self.request_stream.storage.is_empty() {
319 self.frontend_readiness.event.insert(Ready::READABLE);
320 } else {
321 self.request_stream.storage.clear();
322 }
323 }
324
325 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
326 trace!("{} ============== readable", log_context!(self));
327 if !self.container_frontend_timeout.reset() {
328 error!(
329 "could not reset front timeout {:?}",
330 self.configured_frontend_timeout
331 );
332 self.print_state(self.protocol_string());
333 }
334
335 let response_stream = match &mut self.response_stream {
336 ResponseStream::BackendAnswer(response_stream) => response_stream,
337 ResponseStream::DefaultAnswer(..) => {
338 error!(
339 "{} Sending default answer, should not read from frontend socket",
340 log_context!(self)
341 );
342
343 self.frontend_readiness.interest.remove(Ready::READABLE);
344 self.frontend_readiness.interest.insert(Ready::WRITABLE);
345 return StateResult::Continue;
346 }
347 };
348
349 if self.request_stream.storage.is_full() {
350 self.frontend_readiness.interest.remove(Ready::READABLE);
351 if self.request_stream.is_main_phase() {
352 self.backend_readiness.interest.insert(Ready::WRITABLE);
353 } else {
354 self.set_answer(DefaultAnswer::Answer413 {
356 capacity: self.request_stream.storage.capacity(),
357 phase: self.request_stream.parsing_phase.marker(),
358 message: diagnostic_413_507(self.request_stream.parsing_phase),
359 });
360 }
361 return StateResult::Continue;
362 }
363
364 let (size, socket_state) = self
365 .frontend_socket
366 .socket_read(self.request_stream.storage.space());
367
368 debug!("{} Read {} bytes", log_context!(self), size);
369
370 if size > 0 {
371 self.request_stream.storage.fill(size);
372 count!("bytes_in", size as i64);
373 metrics.bin += size;
374 } else {
378 self.frontend_readiness.event.remove(Ready::READABLE);
379 }
380
381 match socket_state {
382 SocketResult::Error | SocketResult::Closed => {
383 if self.request_stream.is_initial() {
384 if self.keepalive_count == 0 {
389 self.frontend_socket.read_error();
390 }
391 } else {
392 self.frontend_socket.read_error();
393 self.log_request_error(
394 metrics,
395 &format!(
396 "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
397 self.frontend_readiness,
398 self.backend_readiness,
399 )
400 );
401 }
402 return StateResult::CloseSession;
403 }
404 SocketResult::WouldBlock => {
405 self.frontend_readiness.event.remove(Ready::READABLE);
406 }
407 SocketResult::Continue => {}
408 };
409
410 trace!("{} ============== readable_parse", log_context!(self));
411 let was_initial = self.request_stream.is_initial();
412 let was_not_proxying = !self.request_stream.is_main_phase();
413
414 kawa::h1::parse(&mut self.request_stream, &mut self.context);
415 if was_initial && !self.request_stream.is_initial() {
418 self.container_frontend_timeout
422 .set_duration(self.configured_frontend_timeout);
423 gauge_add!("http.active_requests", 1);
424 incr!("http.requests");
425 }
426
427 if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
428 incr!("http.frontend_parse_errors");
429 warn!(
430 "{} Parsing request error in {:?}: {}",
431 log_context!(self),
432 marker,
433 match kind {
434 kawa::ParsingErrorKind::Consuming { index } => {
435 let kawa = &self.request_stream;
436 parser::view(
437 kawa.storage.used(),
438 16,
439 &[
440 kawa.storage.start,
441 kawa.storage.head,
442 index as usize,
443 kawa.storage.end,
444 ],
445 )
446 }
447 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
448 }
449 );
450 if response_stream.consumed {
451 self.log_request_error(metrics, "Parsing error on the request");
452 return StateResult::CloseSession;
453 } else {
454 let (message, successfully_parsed, partially_parsed, invalid) =
455 diagnostic_400_502(marker, kind, &self.request_stream);
456 self.set_answer(DefaultAnswer::Answer400 {
457 message,
458 phase: marker,
459 successfully_parsed,
460 partially_parsed,
461 invalid,
462 });
463 return StateResult::Continue;
464 }
465 }
466
467 if self.request_stream.is_main_phase() {
468 self.backend_readiness.interest.insert(Ready::WRITABLE);
469 if was_not_proxying {
470 trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
473 return StateResult::ConnectBackend;
474 }
475 }
476 if self.request_stream.is_terminated() {
477 self.frontend_readiness.interest.remove(Ready::READABLE);
478 }
479
480 StateResult::Continue
481 }
482
483 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
484 trace!("{} ============== writable", log_context!(self));
485 let response_stream = match &mut self.response_stream {
486 ResponseStream::BackendAnswer(response_stream) => response_stream,
487 _ => return self.writable_default_answer(metrics),
488 };
489
490 response_stream.prepare(&mut kawa::h1::BlockConverter);
491
492 let bufs = response_stream.as_io_slice();
493 if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
494 self.frontend_readiness.interest.remove(Ready::WRITABLE);
495 }
497
498 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
499
500 debug!("{} Wrote {} bytes", log_context!(self), size);
501
502 if size > 0 {
503 response_stream.consume(size);
504 count!("bytes_out", size as i64);
505 metrics.bout += size;
506 self.backend_readiness.interest.insert(Ready::READABLE);
507 }
508
509 match socket_state {
510 SocketResult::Error | SocketResult::Closed => {
511 self.frontend_socket.write_error();
512 self.log_request_error(
513 metrics,
514 &format!(
515 "front socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
516 self.frontend_readiness,
517 self.backend_readiness,
518 ),
519 );
520 return StateResult::CloseSession;
521 }
522 SocketResult::WouldBlock => {
523 self.frontend_readiness.event.remove(Ready::WRITABLE);
524 }
525 SocketResult::Continue => {}
526 }
527
528 if self.frontend_socket.socket_wants_write() {
529 return StateResult::Continue;
530 }
531
532 if response_stream.is_terminated() && response_stream.is_completed() {
533 if self.context.closing {
534 debug!("{} closing proxy, no keep alive", log_context!(self));
535 self.log_request_success(metrics);
536 return StateResult::CloseSession;
537 }
538
539 match response_stream.detached.status_line {
540 kawa::StatusLine::Response { code: 101, .. } => {
541 trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
542 self.log_request_success(metrics);
543 return StateResult::Upgrade;
544 }
545 kawa::StatusLine::Response { code: 100, .. } => {
546 trace!("{} ============== HANDLE CONTINUE!", log_context!(self));
547 response_stream.clear();
548 self.log_request_success(metrics);
549 return StateResult::Continue;
550 }
551 kawa::StatusLine::Response { code: 103, .. } => {
552 self.backend_readiness.event.insert(Ready::READABLE);
553 trace!("{} ============== HANDLE EARLY HINT!", log_context!(self));
554 response_stream.clear();
555 self.log_request_success(metrics);
556 return StateResult::Continue;
557 }
558 _ => (),
559 }
560
561 let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
562 let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
563 if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
564 && request_length_known
565 {
566 error!(
567 "{} Response terminated before request, this case is not handled properly yet",
568 log_context!(self)
569 );
570 incr!("http.early_response_close");
571 }
574
575 trace!(
580 "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
581 log_context!(self),
582 self.context.keep_alive_frontend,
583 self.context.keep_alive_backend,
584 response_length_known
585 );
586
587 self.log_request_success(metrics);
588 return match (
589 self.context.keep_alive_frontend,
590 self.context.keep_alive_backend,
591 response_length_known,
592 ) {
593 (true, true, true) => {
594 debug!("{} Keep alive frontend/backend", log_context!(self));
595 metrics.reset();
596 self.reset();
597 StateResult::Continue
598 }
599 (true, false, true) => {
600 debug!("{} Keep alive frontend", log_context!(self));
601 metrics.reset();
602 self.reset();
603 StateResult::CloseBackend
604 }
605 _ => {
606 debug!("{} No keep alive", log_context!(self));
607 StateResult::CloseSession
608 }
609 };
610 }
611 StateResult::Continue
612 }
613
614 fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
615 trace!(
616 "{} ============== writable_default_answer",
617 log_context!(self)
618 );
619 let response_stream = match &mut self.response_stream {
620 ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
621 _ => return StateResult::CloseSession,
622 };
623 let bufs = response_stream.as_io_slice();
624 let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
625
626 count!("bytes_out", size as i64);
627 metrics.bout += size;
628 response_stream.consume(size);
629
630 if size == 0 || socket_state != SocketResult::Continue {
631 self.frontend_readiness.event.remove(Ready::WRITABLE);
632 }
633
634 if response_stream.is_completed() {
635 save_http_status_metric(self.context.status, self.context.log_context());
636 self.log_default_answer_success(metrics);
637 self.frontend_readiness.reset();
638 self.backend_readiness.reset();
639 return StateResult::CloseSession;
640 }
641
642 if socket_state == SocketResult::Error {
643 self.frontend_socket.write_error();
644 self.log_request_error(
645 metrics,
646 "error writing default answer to front socket, closing",
647 );
648 StateResult::CloseSession
649 } else {
650 StateResult::Continue
651 }
652 }
653
654 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
655 trace!("{} ============== backend_writable", log_context!(self));
656 if let ResponseStream::DefaultAnswer(..) = self.response_stream {
657 error!(
658 "{}\tsending default answer, should not write to back",
659 log_context!(self)
660 );
661 self.backend_readiness.interest.remove(Ready::WRITABLE);
662 self.frontend_readiness.interest.insert(Ready::WRITABLE);
663 return SessionResult::Continue;
664 }
665
666 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
667 backend_socket
668 } else {
669 self.log_request_error(metrics, "back socket not found, closing session");
670 return SessionResult::Close;
671 };
672
673 self.request_stream.prepare(&mut kawa::h1::BlockConverter);
674
675 let bufs = self.request_stream.as_io_slice();
676 if bufs.is_empty() {
677 self.backend_readiness.interest.remove(Ready::WRITABLE);
678 return SessionResult::Continue;
679 }
680
681 let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
682 debug!("{} Wrote {} bytes", log_context!(self), size);
683
684 if size > 0 {
685 self.request_stream.consume(size);
686 count!("back_bytes_out", size as i64);
687 metrics.backend_bout += size;
688 self.frontend_readiness.interest.insert(Ready::READABLE);
689 self.backend_readiness.interest.insert(Ready::READABLE);
690 } else {
691 self.backend_readiness.event.remove(Ready::WRITABLE);
692 }
693
694 match socket_state {
695 SocketResult::Error | SocketResult::Closed => {
705 self.frontend_readiness.interest.remove(Ready::READABLE);
706 self.backend_readiness.interest.remove(Ready::WRITABLE);
707 return SessionResult::Continue;
708 }
709 SocketResult::WouldBlock => {
710 self.backend_readiness.event.remove(Ready::WRITABLE);
711 }
712 SocketResult::Continue => {}
713 }
714
715 if self.request_stream.is_terminated() && self.request_stream.is_completed() {
716 self.backend_readiness.interest.remove(Ready::WRITABLE);
717
718 self.container_frontend_timeout.cancel();
720 self.container_backend_timeout.reset();
721 }
722 SessionResult::Continue
723 }
724
725 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
727 trace!("{} ============== backend_readable", log_context!(self));
728 if !self.container_backend_timeout.reset() {
729 error!(
730 "{} Could not reset back timeout {:?}",
731 log_context!(self),
732 self.configured_backend_timeout
733 );
734 self.print_state(self.protocol_string());
735 }
736
737 let response_stream = match &mut self.response_stream {
738 ResponseStream::BackendAnswer(response_stream) => response_stream,
739 _ => {
740 error!(
741 "{} Sending default answer, should not read from backend socket",
742 log_context!(self),
743 );
744
745 self.backend_readiness.interest.remove(Ready::READABLE);
746 self.frontend_readiness.interest.insert(Ready::WRITABLE);
747 return SessionResult::Continue;
748 }
749 };
750
751 let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
752 backend_socket
753 } else {
754 self.log_request_error(metrics, "back socket not found, closing session");
755 return SessionResult::Close;
756 };
757
758 if response_stream.storage.is_full() {
759 self.backend_readiness.interest.remove(Ready::READABLE);
760 if response_stream.is_main_phase() {
761 self.frontend_readiness.interest.insert(Ready::WRITABLE);
762 } else {
763 let capacity = response_stream.storage.capacity();
765 let phase = response_stream.parsing_phase.marker();
766 let message = diagnostic_413_507(response_stream.parsing_phase);
767 self.set_answer(DefaultAnswer::Answer507 {
768 capacity,
769 phase,
770 message,
771 });
772 }
773 return SessionResult::Continue;
774 }
775
776 let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
777 debug!("{} Read {} bytes", log_context!(self), size);
778
779 if size > 0 {
780 response_stream.storage.fill(size);
781 count!("back_bytes_in", size as i64);
782 metrics.backend_bin += size;
783 self.container_frontend_timeout.cancel();
794 } else {
795 self.backend_readiness.event.remove(Ready::READABLE);
796 }
797
798 match socket_state {
800 SocketResult::Error => {
801 backend_socket.read_error();
802 self.log_request_error(
803 metrics,
804 &format!(
805 "back socket {socket_state:?}, closing session. Readiness: {:?} -> {:?}, read {size} bytes",
806 self.frontend_readiness,
807 self.backend_readiness,
808 ),
809 );
810 return SessionResult::Close;
811 }
812 SocketResult::WouldBlock | SocketResult::Closed => {
813 self.backend_readiness.event.remove(Ready::READABLE);
814 }
815 SocketResult::Continue => {}
816 }
817
818 trace!(
819 "{} ============== backend_readable_parse",
820 log_context!(self)
821 );
822 kawa::h1::parse(response_stream, &mut self.context);
823 if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
826 incr!("http.backend_parse_errors");
827 warn!(
828 "{} Parsing response error in {:?}: {}",
829 log_context!(self),
830 marker,
831 match kind {
832 kawa::ParsingErrorKind::Consuming { index } => {
833 parser::view(
834 response_stream.storage.used(),
835 16,
836 &[
837 response_stream.storage.start,
838 response_stream.storage.head,
839 index as usize,
840 response_stream.storage.end,
841 ],
842 )
843 }
844 kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
845 }
846 );
847 if response_stream.consumed {
848 return SessionResult::Close;
849 } else {
850 let (message, successfully_parsed, partially_parsed, invalid) =
851 diagnostic_400_502(marker, kind, response_stream);
852 self.set_answer(DefaultAnswer::Answer502 {
853 message,
854 phase: marker,
855 successfully_parsed,
856 partially_parsed,
857 invalid,
858 });
859 return SessionResult::Continue;
860 }
861 }
862
863 if response_stream.is_main_phase() {
864 self.frontend_readiness.interest.insert(Ready::WRITABLE);
865 }
866 if response_stream.is_terminated() {
867 metrics.backend_stop();
868 self.backend_stop = Some(Instant::now());
869 self.backend_readiness.interest.remove(Ready::READABLE);
870 }
871 SessionResult::Continue
872 }
873}
874
875impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
876 fn log_endpoint(&self) -> EndpointRecord {
877 EndpointRecord::Http {
878 method: self.context.method.as_deref(),
879 authority: self.context.authority.as_deref(),
880 path: self.context.path.as_deref(),
881 reason: self.context.reason.as_deref(),
882 status: self.context.status,
883 }
884 }
885
886 pub fn get_session_address(&self) -> Option<SocketAddr> {
887 self.context
888 .session_address
889 .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
890 }
891
892 pub fn get_backend_address(&self) -> Option<SocketAddr> {
893 self.backend
894 .as_ref()
895 .map(|backend| backend.borrow().address)
896 .or_else(|| {
897 self.backend_socket
898 .as_ref()
899 .and_then(|backend| backend.peer_addr().ok())
900 })
901 }
902
903 fn protocol_string(&self) -> &'static str {
905 match self.context.protocol {
906 Protocol::HTTP => "HTTP",
907 Protocol::HTTPS => match self.frontend_socket.protocol() {
908 TransportProtocol::Ssl2 => "HTTPS-SSL2",
909 TransportProtocol::Ssl3 => "HTTPS-SSL3",
910 TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
911 TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
912 TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
913 TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
914 _ => unreachable!(),
915 },
916 _ => unreachable!(),
917 }
918 }
919
920 pub fn websocket_context(&self) -> WebSocketContext {
922 WebSocketContext::Http {
923 method: self.context.method.clone(),
924 authority: self.context.authority.clone(),
925 path: self.context.path.clone(),
926 reason: self.context.reason.clone(),
927 status: self.context.status,
928 }
929 }
930
931 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
932 let listener = self.listener.borrow();
933 let tags = self.context.authority.as_ref().and_then(|host| {
934 let hostname = match host.split_once(':') {
935 None => host,
936 Some((hostname, _)) => hostname,
937 };
938 listener.get_tags(hostname)
939 });
940
941 let context = self.context.log_context();
942 metrics.register_end_of_session(&context);
943
944 log_access! {
945 error,
946 on_failure: { incr!("unsent-access-logs") },
947 message: message,
948 context,
949 session_address: self.get_session_address(),
950 backend_address: self.get_backend_address(),
951 protocol: self.protocol_string(),
952 endpoint: self.log_endpoint(),
953 tags,
954 client_rtt: socket_rtt(self.front_socket()),
955 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
956 service_time: metrics.service_time(),
957 response_time: metrics.backend_response_time(),
958 request_time: metrics.request_time(),
959 bytes_in: metrics.bin,
960 bytes_out: metrics.bout,
961 user_agent: self.context.user_agent.as_deref(),
962 };
963 }
964
965 pub fn log_request_success(&self, metrics: &SessionMetrics) {
966 save_http_status_metric(self.context.status, self.context.log_context());
967 self.log_request(metrics, false, None);
968 }
969
970 pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
971 self.log_request(metrics, false, None);
972 }
973 pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
974 incr!("http.errors");
975 error!(
976 "{} Could not process request properly got: {}",
977 log_context!(self),
978 message
979 );
980 self.print_state(self.protocol_string());
981 self.log_request(metrics, true, Some(message));
982 }
983
984 pub fn set_answer(&mut self, answer: DefaultAnswer) {
985 let status = u16::from(&answer);
986 if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
987 error!(
988 "already set the default answer to {}, trying to set to {}",
989 old_status, status
990 );
991 } else {
992 match answer {
993 DefaultAnswer::Answer301 { .. } => incr!(
994 "http.301.redirection",
995 self.context.cluster_id.as_deref(),
996 self.context.backend_id.as_deref()
997 ),
998 DefaultAnswer::Answer400 { .. } => incr!("http.400.errors"),
999 DefaultAnswer::Answer401 { .. } => incr!(
1000 "http.401.errors",
1001 self.context.cluster_id.as_deref(),
1002 self.context.backend_id.as_deref()
1003 ),
1004 DefaultAnswer::Answer404 { .. } => incr!("http.404.errors"),
1005 DefaultAnswer::Answer408 { .. } => incr!(
1006 "http.408.errors",
1007 self.context.cluster_id.as_deref(),
1008 self.context.backend_id.as_deref()
1009 ),
1010 DefaultAnswer::Answer413 { .. } => incr!(
1011 "http.413.errors",
1012 self.context.cluster_id.as_deref(),
1013 self.context.backend_id.as_deref()
1014 ),
1015 DefaultAnswer::Answer502 { .. } => incr!(
1016 "http.502.errors",
1017 self.context.cluster_id.as_deref(),
1018 self.context.backend_id.as_deref()
1019 ),
1020 DefaultAnswer::Answer503 { .. } => incr!(
1021 "http.503.errors",
1022 self.context.cluster_id.as_deref(),
1023 self.context.backend_id.as_deref()
1024 ),
1025 DefaultAnswer::Answer504 { .. } => incr!(
1026 "http.504.errors",
1027 self.context.cluster_id.as_deref(),
1028 self.context.backend_id.as_deref()
1029 ),
1030 DefaultAnswer::Answer507 { .. } => incr!(
1031 "http.507.errors",
1032 self.context.cluster_id.as_deref(),
1033 self.context.backend_id.as_deref()
1034 ),
1035 };
1036 }
1037
1038 let mut kawa = self.answers.borrow().get(
1039 answer,
1040 self.context.id.to_string(),
1041 self.context.cluster_id.as_deref(),
1042 self.context.backend_id.as_deref(),
1043 self.get_route(),
1044 );
1045 kawa.prepare(&mut kawa::h1::BlockConverter);
1046 self.context.status = Some(status);
1047 self.context.reason = None;
1048 self.context.keep_alive_frontend = false;
1049 self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1050 self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1051 self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1052 }
1053
1054 pub fn test_backend_socket(&self) -> bool {
1055 match self.backend_socket {
1056 Some(ref s) => {
1057 let mut tmp = [0u8; 1];
1058 let res = s.peek(&mut tmp[..]);
1059
1060 match res {
1061 Ok(0) => false,
1063 Ok(_) => true,
1064 Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1065 }
1066 }
1067 None => false,
1068 }
1069 }
1070
1071 pub fn is_valid_backend_socket(&self) -> bool {
1072 match self.backend_stop.as_ref() {
1074 Some(stop_instant) => {
1075 let now = Instant::now();
1076 let dur = now - *stop_instant;
1077 if dur > Duration::from_secs(1) {
1078 return self.test_backend_socket();
1079 }
1080 }
1081 None => return self.test_backend_socket(),
1082 }
1083
1084 true
1085 }
1086
1087 pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1088 self.backend_socket = Some(socket);
1089 self.backend = backend;
1090 }
1091
1092 pub fn set_cluster_id(&mut self, cluster_id: String) {
1093 self.context.cluster_id = Some(cluster_id);
1094 }
1095
1096 pub fn set_backend_id(&mut self, backend_id: String) {
1097 self.context.backend_id = Some(backend_id);
1098 }
1099
1100 pub fn set_backend_token(&mut self, token: Token) {
1101 self.backend_token = Some(token);
1102 }
1103
1104 pub fn clear_backend_token(&mut self) {
1105 self.backend_token = None;
1106 }
1107
1108 pub fn set_backend_timeout(&mut self, dur: Duration) {
1109 if let Some(token) = self.backend_token.as_ref() {
1110 self.container_backend_timeout.set_duration(dur);
1111 self.container_backend_timeout.set(*token);
1112 }
1113 }
1114
1115 pub fn front_socket(&self) -> &TcpStream {
1116 self.frontend_socket.socket_ref()
1117 }
1118
1119 fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1123 self.container_backend_timeout.cancel();
1124 debug!(
1125 "{}\tPROXY [{}->{}] CLOSED BACKEND",
1126 log_context!(self),
1127 self.frontend_token.0,
1128 self.backend_token
1129 .map(|t| format!("{}", t.0))
1130 .unwrap_or_else(|| "-".to_string())
1131 );
1132
1133 let proxy = proxy.borrow();
1134 if let Some(socket) = &mut self.backend_socket.take() {
1135 if let Err(e) = proxy.deregister_socket(socket) {
1136 error!(
1137 "{} Error deregistering back socket({:?}): {:?}",
1138 log_context!(self),
1139 socket,
1140 e
1141 );
1142 }
1143 if let Err(e) = socket.shutdown(Shutdown::Both) {
1144 if e.kind() != ErrorKind::NotConnected {
1145 error!(
1146 "{} Error shutting down back socket({:?}): {:?}",
1147 log_context!(self),
1148 socket,
1149 e
1150 );
1151 }
1152 }
1153 }
1154
1155 if let Some(token) = self.backend_token.take() {
1156 proxy.remove_session(token);
1157
1158 if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1159 self.backend_readiness.event = Ready::EMPTY;
1160 }
1161
1162 if self.backend_connection_status == BackendConnectionStatus::Connected {
1163 gauge_add!("backend.connections", -1);
1164 gauge_add!(
1165 "connections_per_backend",
1166 -1,
1167 self.context.cluster_id.as_deref(),
1168 metrics.backend_id.as_deref()
1169 );
1170 }
1171
1172 self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1173
1174 if let Some(backend) = self.backend.take() {
1175 backend.borrow_mut().dec_connections();
1176 }
1177 }
1178 }
1179
1180 fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1182 if self.connection_attempts >= CONN_RETRIES {
1183 error!(
1184 "{} Max connection attempt reached ({})",
1185 log_context!(self),
1186 self.connection_attempts,
1187 );
1188
1189 self.set_answer(DefaultAnswer::Answer503 {
1190 message: format!(
1191 "Max connection attempt reached: {}",
1192 self.connection_attempts
1193 ),
1194 });
1195 return Err(BackendConnectionError::MaxConnectionRetries(None));
1196 }
1197 Ok(())
1198 }
1199
1200 fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1201 let is_valid_backend_socket = self.is_valid_backend_socket();
1202
1203 if !is_valid_backend_socket {
1204 return false;
1205 }
1206
1207 metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1209
1210 metrics.backend_start();
1211 if let Some(b) = self.backend.as_mut() {
1212 b.borrow_mut().active_requests += 1;
1213 }
1214 true
1215 }
1216
1217 pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1219 let given_method = self
1220 .context
1221 .method
1222 .as_ref()
1223 .ok_or(RetrieveClusterError::NoMethod)?;
1224 let given_authority = self
1225 .context
1226 .authority
1227 .as_deref()
1228 .ok_or(RetrieveClusterError::NoHost)?;
1229 let given_path = self
1230 .context
1231 .path
1232 .as_deref()
1233 .ok_or(RetrieveClusterError::NoPath)?;
1234
1235 Ok((given_authority, given_path, given_method))
1236 }
1237
1238 pub fn get_route(&self) -> String {
1239 if let Some(method) = &self.context.method {
1240 if let Some(authority) = &self.context.authority {
1241 if let Some(path) = &self.context.path {
1242 return format!("{method} {authority}{path}");
1243 }
1244 return format!("{method} {authority}");
1245 }
1246 return format!("{method}");
1247 }
1248 String::new()
1249 }
1250
1251 fn cluster_id_from_request(
1252 &mut self,
1253 proxy: Rc<RefCell<dyn L7Proxy>>,
1254 ) -> Result<String, RetrieveClusterError> {
1255 let (host, uri, method) = match self.extract_route() {
1256 Ok(tuple) => tuple,
1257 Err(cluster_error) => {
1258 self.set_answer(DefaultAnswer::Answer400 {
1259 message: "Could not extract the route after connection started, this should not happen.".into(),
1260 phase: self.request_stream.parsing_phase.marker(),
1261 successfully_parsed: "null".into(),
1262 partially_parsed: "null".into(),
1263 invalid: "null".into(),
1264 });
1265 return Err(cluster_error);
1266 }
1267 };
1268
1269 let route_result = self
1270 .listener
1271 .borrow()
1272 .frontend_from_request(host, uri, method);
1273
1274 let route = match route_result {
1275 Ok(route) => route,
1276 Err(frontend_error) => {
1277 self.set_answer(DefaultAnswer::Answer404 {});
1278 return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1279 }
1280 };
1281
1282 let cluster_id = match route {
1283 Route::ClusterId(cluster_id) => cluster_id,
1284 Route::Deny => {
1285 self.set_answer(DefaultAnswer::Answer401 {});
1286 return Err(RetrieveClusterError::UnauthorizedRoute);
1287 }
1288 };
1289
1290 let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1291 && proxy
1292 .borrow()
1293 .clusters()
1294 .get(&cluster_id)
1295 .map(|cluster| cluster.https_redirect)
1296 .unwrap_or(false);
1297
1298 if frontend_should_redirect_https {
1299 self.set_answer(DefaultAnswer::Answer301 {
1300 location: format!("https://{host}{uri}"),
1301 });
1302 return Err(RetrieveClusterError::UnauthorizedRoute);
1303 }
1304
1305 Ok(cluster_id)
1306 }
1307
1308 pub fn backend_from_request(
1309 &mut self,
1310 cluster_id: &str,
1311 frontend_should_stick: bool,
1312 proxy: Rc<RefCell<dyn L7Proxy>>,
1313 metrics: &mut SessionMetrics,
1314 ) -> Result<TcpStream, BackendConnectionError> {
1315 let (backend, conn) = self
1316 .get_backend_for_sticky_session(
1317 frontend_should_stick,
1318 self.context.sticky_session_found.as_deref(),
1319 cluster_id,
1320 proxy,
1321 )
1322 .map_err(|backend_error| {
1323 self.set_answer(DefaultAnswer::Answer503 {
1326 message: backend_error.to_string(),
1327 });
1328 BackendConnectionError::Backend(backend_error)
1329 })?;
1330
1331 if frontend_should_stick {
1332 self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1334
1335 self.context.sticky_session = Some(
1336 backend
1337 .borrow()
1338 .sticky_id
1339 .clone()
1340 .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1341 );
1342 }
1343
1344 metrics.backend_id = Some(backend.borrow().backend_id.clone());
1345 metrics.backend_start();
1346 self.set_backend_id(backend.borrow().backend_id.clone());
1347
1348 self.backend = Some(backend);
1349 Ok(conn)
1350 }
1351
1352 fn get_backend_for_sticky_session(
1353 &self,
1354 frontend_should_stick: bool,
1355 sticky_session: Option<&str>,
1356 cluster_id: &str,
1357 proxy: Rc<RefCell<dyn L7Proxy>>,
1358 ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1359 match (frontend_should_stick, sticky_session) {
1360 (true, Some(sticky_session)) => proxy
1361 .borrow()
1362 .backends()
1363 .borrow_mut()
1364 .backend_from_sticky_session(cluster_id, sticky_session),
1365 _ => proxy
1366 .borrow()
1367 .backends()
1368 .borrow_mut()
1369 .backend_from_cluster_id(cluster_id),
1370 }
1371 }
1372
1373 fn connect_to_backend(
1374 &mut self,
1375 session_rc: Rc<RefCell<dyn ProxySession>>,
1376 proxy: Rc<RefCell<dyn L7Proxy>>,
1377 metrics: &mut SessionMetrics,
1378 ) -> Result<BackendConnectAction, BackendConnectionError> {
1379 let old_cluster_id = self.context.cluster_id.clone();
1380 let old_backend_token = self.backend_token;
1381
1382 self.check_circuit_breaker()?;
1383
1384 let cluster_id = self
1385 .cluster_id_from_request(proxy.clone())
1386 .map_err(BackendConnectionError::RetrieveClusterError)?;
1387
1388 trace!(
1389 "{} Connect_to_backend: {:?} {:?} {:?}",
1390 log_context!(self),
1391 self.context.cluster_id,
1392 cluster_id,
1393 self.backend_connection_status
1394 );
1395 if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1397 && self.backend_connection_status == BackendConnectionStatus::Connected
1398 {
1399 let has_backend = self
1400 .backend
1401 .as_ref()
1402 .map(|backend| {
1403 let backend = backend.borrow();
1404 proxy
1405 .borrow()
1406 .backends()
1407 .borrow()
1408 .has_backend(&cluster_id, &backend)
1409 })
1410 .unwrap_or(false);
1411
1412 if has_backend && self.check_backend_connection(metrics) {
1413 return Ok(BackendConnectAction::Reuse);
1414 } else if self.backend_token.take().is_some() {
1415 self.close_backend(proxy.clone(), metrics);
1416 }
1417 }
1418
1419 if old_cluster_id.is_some()
1421 && old_cluster_id.as_ref() != Some(&cluster_id)
1422 && self.backend_token.take().is_some()
1423 {
1424 self.close_backend(proxy.clone(), metrics);
1425 }
1426
1427 self.context.cluster_id = Some(cluster_id.clone());
1428
1429 let frontend_should_stick = proxy
1430 .borrow()
1431 .clusters()
1432 .get(&cluster_id)
1433 .map(|cluster| cluster.sticky_session)
1434 .unwrap_or(false);
1435
1436 let mut socket =
1437 self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1438 if let Err(e) = socket.set_nodelay(true) {
1439 error!(
1440 "{} Error setting nodelay on backend socket({:?}): {:?}",
1441 log_context!(self),
1442 socket,
1443 e
1444 );
1445 }
1446
1447 self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1448 self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1449
1450 match old_backend_token {
1451 Some(backend_token) => {
1452 self.set_backend_token(backend_token);
1453 if let Err(e) = proxy.borrow().register_socket(
1454 &mut socket,
1455 backend_token,
1456 Interest::READABLE | Interest::WRITABLE,
1457 ) {
1458 error!(
1459 "{} Error registering back socket({:?}): {:?}",
1460 log_context!(self),
1461 socket,
1462 e
1463 );
1464 }
1465
1466 self.set_backend_socket(socket, self.backend.clone());
1467 self.set_backend_timeout(self.configured_connect_timeout);
1468
1469 Ok(BackendConnectAction::Replace)
1470 }
1471 None => {
1472 let backend_token = proxy.borrow().add_session(session_rc);
1473
1474 if let Err(e) = proxy.borrow().register_socket(
1475 &mut socket,
1476 backend_token,
1477 Interest::READABLE | Interest::WRITABLE,
1478 ) {
1479 error!(
1480 "{} Error registering back socket({:?}): {:?}",
1481 log_context!(self),
1482 socket,
1483 e
1484 );
1485 }
1486
1487 self.set_backend_socket(socket, self.backend.clone());
1488 self.set_backend_token(backend_token);
1489 self.set_backend_timeout(self.configured_connect_timeout);
1490
1491 Ok(BackendConnectAction::New)
1492 }
1493 }
1494 }
1495
1496 fn set_backend_connected(
1497 &mut self,
1498 connected: BackendConnectionStatus,
1499 metrics: &mut SessionMetrics,
1500 ) {
1501 let last = self.backend_connection_status;
1502 self.backend_connection_status = connected;
1503
1504 if connected == BackendConnectionStatus::Connected {
1505 gauge_add!("backend.connections", 1);
1506 gauge_add!(
1507 "connections_per_backend",
1508 1,
1509 self.context.cluster_id.as_deref(),
1510 metrics.backend_id.as_deref()
1511 );
1512
1513 self.set_backend_timeout(self.configured_backend_timeout);
1516 if !self.backend_readiness.interest.is_readable() {
1519 self.container_backend_timeout.cancel();
1520 }
1521
1522 if let Some(backend) = &self.backend {
1523 let mut backend = backend.borrow_mut();
1524
1525 if backend.retry_policy.is_down() {
1526 incr!(
1527 "backend.up",
1528 self.context.cluster_id.as_deref(),
1529 metrics.backend_id.as_deref()
1530 );
1531
1532 info!(
1533 "backend server {} at {} is up",
1534 backend.backend_id, backend.address
1535 );
1536
1537 push_event(Event {
1538 kind: EventKind::BackendUp as i32,
1539 backend_id: Some(backend.backend_id.to_owned()),
1540 address: Some(backend.address.into()),
1541 cluster_id: None,
1542 });
1543 }
1544
1545 if let BackendConnectionStatus::Connecting(start) = last {
1546 backend.set_connection_time(Instant::now() - start);
1547 }
1548
1549 backend.failures = 0;
1551 backend.active_requests += 1;
1552 backend.retry_policy.succeed();
1553 }
1554 }
1555 }
1556
1557 fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1558 if let Some(backend) = &self.backend {
1559 let mut backend = backend.borrow_mut();
1560 backend.failures += 1;
1561
1562 let already_unavailable = backend.retry_policy.is_down();
1563 backend.retry_policy.fail();
1564 incr!(
1565 "backend.connections.error",
1566 self.context.cluster_id.as_deref(),
1567 metrics.backend_id.as_deref()
1568 );
1569
1570 if !already_unavailable && backend.retry_policy.is_down() {
1571 error!(
1572 "{} backend server {} at {} is down",
1573 log_context!(self),
1574 backend.backend_id,
1575 backend.address
1576 );
1577
1578 incr!(
1579 "backend.down",
1580 self.context.cluster_id.as_deref(),
1581 metrics.backend_id.as_deref()
1582 );
1583
1584 push_event(Event {
1585 kind: EventKind::BackendDown as i32,
1586 backend_id: Some(backend.backend_id.to_owned()),
1587 address: Some(backend.address.into()),
1588 cluster_id: None,
1589 });
1590 }
1591 }
1592 }
1593
1594 pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> StateResult {
1595 let response_stream = match &mut self.response_stream {
1596 ResponseStream::BackendAnswer(response_stream) => response_stream,
1597 _ => return StateResult::CloseBackend,
1598 };
1599
1600 if self.backend_readiness.event.is_readable()
1602 && self.backend_readiness.interest.is_readable()
1603 {
1604 return StateResult::Continue;
1605 }
1606
1607 if response_stream.is_terminated() {
1609 return StateResult::CloseBackend;
1610 }
1611 match (
1612 self.request_stream.is_initial(),
1613 response_stream.is_initial(),
1614 ) {
1615 (_, false) => {
1618 error!(
1619 "{} Backend closed before session is over",
1620 log_context!(self),
1621 );
1622
1623 trace!("{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.", log_context!(self));
1624
1625 response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1626
1627 self.frontend_readiness.interest.insert(Ready::WRITABLE);
1630 StateResult::Continue
1631 }
1632 (true, true) => {
1634 trace!(
1635 "{} Backend hanged up in between requests",
1636 log_context!(self)
1637 );
1638 StateResult::CloseBackend
1639 }
1640 (false, true) => {
1642 error!(
1643 "{} Frontend transmitted data but the back closed",
1644 log_context!(self)
1645 );
1646
1647 self.set_answer(DefaultAnswer::Answer503 {
1648 message: "Backend closed after consuming part of the request".into(),
1649 });
1650
1651 self.backend_readiness.interest = Ready::EMPTY;
1652 StateResult::Continue
1653 }
1654 }
1655 }
1656
1657 fn ready_inner(
1669 &mut self,
1670 session: Rc<RefCell<dyn crate::ProxySession>>,
1671 proxy: Rc<RefCell<dyn L7Proxy>>,
1672 metrics: &mut SessionMetrics,
1673 ) -> SessionResult {
1674 let mut counter = 0;
1675
1676 if self.backend_connection_status.is_connecting()
1677 && !self.backend_readiness.event.is_empty()
1678 {
1679 if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1680 error!(
1682 "{} Error connecting to backend, trying again, attempt {}",
1683 log_context!(self),
1684 self.connection_attempts
1685 );
1686
1687 self.connection_attempts += 1;
1688 self.fail_backend_connection(metrics);
1689
1690 self.backend_connection_status =
1691 BackendConnectionStatus::Connecting(Instant::now());
1692
1693 self.close_backend(proxy.clone(), metrics);
1695
1696 let connection_result =
1697 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1698 if let Err(err) = &connection_result {
1699 error!(
1700 "{} Error connecting to backend: {}",
1701 log_context!(self),
1702 err
1703 );
1704 }
1705
1706 if let Some(session_result) = handle_connection_result(connection_result) {
1707 return session_result;
1708 }
1709 } else {
1710 metrics.backend_connected();
1711 self.connection_attempts = 0;
1712 self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1713 self.backend_readiness.interest.insert(Ready::READABLE);
1716 }
1717 }
1718
1719 if self.frontend_readiness.event.is_hup() {
1720 if !self.request_stream.is_initial() {
1721 self.log_request_error(metrics, "Client disconnected abruptly");
1722 }
1723 return SessionResult::Close;
1724 }
1725
1726 while counter < MAX_LOOP_ITERATIONS {
1727 let frontend_interest = self.frontend_readiness.filter_interest();
1728 let backend_interest = self.backend_readiness.filter_interest();
1729
1730 trace!(
1731 "{} Frontend interest({:?}) and backend interest({:?})",
1732 log_context!(self),
1733 frontend_interest,
1734 backend_interest,
1735 );
1736
1737 if frontend_interest.is_empty() && backend_interest.is_empty() {
1738 break;
1739 }
1740
1741 if self.backend_readiness.event.is_hup()
1742 && self.frontend_readiness.interest.is_writable()
1743 && !self.frontend_readiness.event.is_writable()
1744 {
1745 break;
1746 }
1747
1748 if frontend_interest.is_readable() {
1749 let state_result = self.readable(metrics);
1750 trace!(
1751 "{} frontend_readable: {:?}",
1752 log_context!(self),
1753 state_result
1754 );
1755
1756 match state_result {
1757 StateResult::Continue => {}
1758 StateResult::ConnectBackend => {
1759 let connection_result =
1760 self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1761 if let Err(err) = &connection_result {
1762 error!(
1763 "{} Error connecting to backend: {}",
1764 log_context!(self),
1765 err
1766 );
1767 }
1768
1769 if let Some(session_result) = handle_connection_result(connection_result) {
1770 return session_result;
1771 }
1772 }
1773 StateResult::CloseBackend => unreachable!(),
1774 StateResult::CloseSession => return SessionResult::Close,
1775 StateResult::Upgrade => return SessionResult::Upgrade,
1776 }
1777 }
1778
1779 if backend_interest.is_writable() {
1780 let session_result = self.backend_writable(metrics);
1781 trace!(
1782 "{} backend_writable: {:?}",
1783 log_context!(self),
1784 session_result
1785 );
1786 if session_result != SessionResult::Continue {
1787 return session_result;
1788 }
1789 }
1790
1791 if backend_interest.is_readable() {
1792 let session_result = self.backend_readable(metrics);
1793 trace!(
1794 "{} backend_readable: {:?}",
1795 log_context!(self),
1796 session_result
1797 );
1798 if session_result != SessionResult::Continue {
1799 return session_result;
1800 }
1801 }
1802
1803 if frontend_interest.is_writable() {
1804 let state_result = self.writable(metrics);
1805 trace!(
1806 "{} frontend_writable: {:?}",
1807 log_context!(self),
1808 state_result
1809 );
1810 match state_result {
1811 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1812 StateResult::CloseSession => return SessionResult::Close,
1813 StateResult::Upgrade => return SessionResult::Upgrade,
1814 StateResult::Continue => {}
1815 StateResult::ConnectBackend => unreachable!(),
1816 }
1817 }
1818
1819 if frontend_interest.is_error() {
1820 error!(
1821 "{} frontend socket error, disconnecting",
1822 log_context!(self)
1823 );
1824
1825 return SessionResult::Close;
1826 }
1827
1828 if backend_interest.is_hup() || backend_interest.is_error() {
1829 let state_result = self.backend_hup(metrics);
1830
1831 trace!("{} backend_hup: {:?}", log_context!(self), state_result);
1832 match state_result {
1833 StateResult::Continue => {}
1834 StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
1835 StateResult::CloseSession => return SessionResult::Close,
1836 StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
1837 }
1838 }
1839
1840 counter += 1;
1841 }
1842
1843 if counter >= MAX_LOOP_ITERATIONS {
1844 error!(
1845 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1846 log_context!(self), MAX_LOOP_ITERATIONS
1847 );
1848
1849 incr!("http.infinite_loop.error");
1850 self.print_state(self.protocol_string());
1851
1852 return SessionResult::Close;
1853 }
1854
1855 SessionResult::Continue
1856 }
1857
1858 pub fn timeout_status(&self) -> TimeoutStatus {
1859 if self.request_stream.is_main_phase() {
1860 match &self.response_stream {
1861 ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
1862 TimeoutStatus::WaitingForResponse
1863 }
1864 _ => TimeoutStatus::Response,
1865 }
1866 } else if self.keepalive_count > 0 {
1867 TimeoutStatus::WaitingForNewRequest
1868 } else {
1869 TimeoutStatus::Request
1870 }
1871 }
1872}
1873
1874impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
1875 fn ready(
1876 &mut self,
1877 session: Rc<RefCell<dyn crate::ProxySession>>,
1878 proxy: Rc<RefCell<dyn L7Proxy>>,
1879 metrics: &mut SessionMetrics,
1880 ) -> SessionResult {
1881 let session_result = self.ready_inner(session, proxy, metrics);
1882 if session_result == SessionResult::Upgrade {
1883 let response_storage = match &mut self.response_stream {
1884 ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
1885 _ => return SessionResult::Close,
1886 };
1887
1888 self.request_stream.storage.buffer.sync(
1891 self.request_stream.storage.end,
1892 self.request_stream.storage.head,
1893 );
1894 response_storage
1895 .buffer
1896 .sync(response_storage.end, response_storage.head);
1897 }
1898 session_result
1899 }
1900
1901 fn update_readiness(&mut self, token: Token, events: Ready) {
1902 if self.frontend_token == token {
1903 self.frontend_readiness.event |= events;
1904 } else if self.backend_token == Some(token) {
1905 self.backend_readiness.event |= events;
1906 }
1907 }
1908
1909 fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1910 self.close_backend(proxy, metrics);
1911 self.frontend_socket.socket_close();
1912 let _ = self.frontend_socket.socket_write_vectored(&[]);
1913
1914 if !self.request_stream.is_initial() {
1916 gauge_add!("http.active_requests", -1);
1917
1918 if let Some(b) = self.backend.as_mut() {
1919 let mut backend = b.borrow_mut();
1920 backend.active_requests = backend.active_requests.saturating_sub(1);
1921 }
1922 }
1923 }
1924
1925 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1926 if self.frontend_token == token {
1928 self.container_frontend_timeout.triggered();
1929 return match self.timeout_status() {
1930 TimeoutStatus::Request => {
1932 self.set_answer(DefaultAnswer::Answer408 {
1933 duration: self.container_frontend_timeout.to_string(),
1934 });
1935 self.writable(metrics)
1936 }
1937 TimeoutStatus::WaitingForResponse => {
1939 self.set_answer(DefaultAnswer::Answer504 {
1942 duration: self.container_backend_timeout.to_string(),
1943 });
1944 self.writable(metrics)
1945 }
1946 TimeoutStatus::Response => StateResult::Continue,
1949 TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
1951 };
1952 }
1953
1954 if self.backend_token == Some(token) {
1955 self.container_backend_timeout.triggered();
1957 return match self.timeout_status() {
1958 TimeoutStatus::Request => {
1959 error!(
1960 "got backend timeout while waiting for a request, this should not happen"
1961 );
1962 self.set_answer(DefaultAnswer::Answer504 {
1963 duration: self.container_backend_timeout.to_string(),
1964 });
1965 self.writable(metrics)
1966 }
1967 TimeoutStatus::WaitingForResponse => {
1968 self.set_answer(DefaultAnswer::Answer504 {
1969 duration: self.container_backend_timeout.to_string(),
1970 });
1971 self.writable(metrics)
1972 }
1973 TimeoutStatus::Response => {
1974 error!(
1975 "backend {:?} timeout while receiving response (cluster {:?})",
1976 self.context.backend_id, self.context.cluster_id
1977 );
1978 StateResult::CloseSession
1979 }
1980 TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
1982 };
1983 }
1984
1985 error!("{} Got timeout for an invalid token", log_context!(self));
1986 StateResult::CloseSession
1987 }
1988
1989 fn cancel_timeouts(&mut self) {
1990 self.container_backend_timeout.cancel();
1991 self.container_frontend_timeout.cancel();
1992 }
1993
1994 fn print_state(&self, context: &str) {
1995 error!(
1996 "\
1997{} {} Session(Kawa)
1998\tFrontend:
1999\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2000\tBackend:
2001\t\ttoken: {:?}\treadiness: {:?}",
2002 log_context!(self),
2003 context,
2004 self.frontend_token,
2005 self.frontend_readiness,
2006 self.request_stream.parsing_phase,
2007 self.backend_token,
2008 self.backend_readiness,
2009 );
2011 }
2012
2013 fn shutting_down(&mut self) -> SessionIsToBeClosed {
2014 if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2015 {
2017 true
2018 } else {
2019 self.context.closing = true;
2020 false
2021 }
2022 }
2023}
2024
2025fn handle_connection_result(
2026 connection_result: Result<BackendConnectAction, BackendConnectionError>,
2027) -> Option<SessionResult> {
2028 match connection_result {
2029 Ok(BackendConnectAction::Reuse) => None,
2031 Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2032 Some(SessionResult::Continue)
2034 }
2035 Err(_) => {
2036 None
2044 }
2045 }
2046}
2047
2048fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2050 if let Some(status) = status {
2051 match status {
2052 100..=199 => {
2053 incr!("http.status.1xx", context.cluster_id, context.backend_id);
2054 }
2055 200..=299 => {
2056 incr!("http.status.2xx", context.cluster_id, context.backend_id);
2057 }
2058 300..=399 => {
2059 incr!("http.status.3xx", context.cluster_id, context.backend_id);
2060 }
2061 400..=499 => {
2062 incr!("http.status.4xx", context.cluster_id, context.backend_id);
2063 }
2064 500..=599 => {
2065 incr!("http.status.5xx", context.cluster_id, context.backend_id);
2066 }
2067 _ => {
2068 incr!("http.status.other");
2070 }
2071 }
2072 }
2073}