1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
2
3use mio::{Token, net::TcpStream};
4use rusty_ulid::Ulid;
5use sozu_command::{
6 config::MAX_LOOP_ITERATIONS,
7 logging::{EndpointRecord, LogContext},
8};
9
10use crate::{
11 L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
12 backends::Backend,
13 pool::Checkout,
14 protocol::{SessionState, http::parser::Method},
15 socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
16 sozu_command::ready::Ready,
17 timer::TimeoutContainer,
18};
19
20macro_rules! log_context {
23 ($self:expr) => {
24 format!(
25 "PIPE\t{}\tSession(address={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
26 $self.log_context(),
27 $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
28 $self.frontend_token.0,
29 $self.frontend_readiness,
30 $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
31 $self.backend_readiness,
32 )
33 };
34}
35
36#[derive(PartialEq, Eq)]
37pub enum SessionStatus {
38 Normal,
39 DefaultAnswer,
40}
41
42#[derive(Copy, Clone, Debug)]
43enum ConnectionStatus {
44 Normal,
45 ReadOpen,
46 WriteOpen,
47 Closed,
48}
49
50pub enum WebSocketContext {
52 Http {
53 method: Option<Method>,
54 authority: Option<String>,
55 path: Option<String>,
56 status: Option<u16>,
57 reason: Option<String>,
58 },
59 Tcp,
60}
61
62pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
63 backend_buffer: Checkout,
64 backend_id: Option<String>,
65 pub backend_readiness: Readiness,
66 backend_socket: Option<TcpStream>,
67 backend_status: ConnectionStatus,
68 backend_token: Option<Token>,
69 pub backend: Option<Rc<RefCell<Backend>>>,
70 cluster_id: Option<String>,
71 pub container_backend_timeout: Option<TimeoutContainer>,
72 pub container_frontend_timeout: Option<TimeoutContainer>,
73 frontend_buffer: Checkout,
74 pub frontend_readiness: Readiness,
75 frontend_status: ConnectionStatus,
76 frontend_token: Token,
77 frontend: Front,
78 listener: Rc<RefCell<L>>,
79 protocol: Protocol,
80 request_id: Ulid,
81 session_address: Option<SocketAddr>,
82 websocket_context: WebSocketContext,
83}
84
85impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
86 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 backend_buffer: Checkout,
97 backend_id: Option<String>,
98 backend_socket: Option<TcpStream>,
99 backend: Option<Rc<RefCell<Backend>>>,
100 container_backend_timeout: Option<TimeoutContainer>,
101 container_frontend_timeout: Option<TimeoutContainer>,
102 cluster_id: Option<String>,
103 frontend_buffer: Checkout,
104 frontend_token: Token,
105 frontend: Front,
106 listener: Rc<RefCell<L>>,
107 protocol: Protocol,
108 request_id: Ulid,
109 session_address: Option<SocketAddr>,
110 websocket_context: WebSocketContext,
111 ) -> Pipe<Front, L> {
112 let frontend_status = ConnectionStatus::Normal;
113 let backend_status = if backend_socket.is_none() {
114 ConnectionStatus::Closed
115 } else {
116 ConnectionStatus::Normal
117 };
118
119 let session = Pipe {
120 backend_buffer,
121 backend_id,
122 backend_readiness: Readiness {
123 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
124 event: Ready::EMPTY,
125 },
126 backend_socket,
127 backend_status,
128 backend_token: None,
129 backend,
130 cluster_id,
131 container_backend_timeout,
132 container_frontend_timeout,
133 frontend_buffer,
134 frontend_readiness: Readiness {
135 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
136 event: Ready::EMPTY,
137 },
138 frontend_status,
139 frontend_token,
140 frontend,
141 listener,
142 protocol,
143 request_id,
144 session_address,
145 websocket_context,
146 };
147
148 trace!("created pipe");
149 session
150 }
151
152 pub fn front_socket(&self) -> &TcpStream {
153 self.frontend.socket_ref()
154 }
155
156 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
157 self.frontend.socket_mut()
158 }
159
160 pub fn back_socket(&self) -> Option<&TcpStream> {
161 self.backend_socket.as_ref()
162 }
163
164 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
165 self.backend_socket.as_mut()
166 }
167
168 pub fn set_back_socket(&mut self, socket: TcpStream) {
169 self.backend_socket = Some(socket);
170 self.backend_status = ConnectionStatus::Normal;
171 }
172
173 pub fn back_token(&self) -> Vec<Token> {
174 self.backend_token.iter().cloned().collect()
175 }
176
177 fn reset_timeouts(&mut self) {
178 if let Some(t) = self.container_frontend_timeout.as_mut() {
179 if !t.reset() {
180 error!(
181 "{} Could not reset front timeout (pipe)",
182 log_context!(self)
183 );
184 }
185 }
186
187 if let Some(t) = self.container_backend_timeout.as_mut() {
188 if !t.reset() {
189 error!("{} Could not reset back timeout (pipe)", log_context!(self));
190 }
191 }
192 }
193
194 pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
195 self.cluster_id = cluster_id;
196 }
197
198 pub fn set_backend_id(&mut self, backend_id: Option<String>) {
199 self.backend_id = backend_id;
200 }
201
202 pub fn set_back_token(&mut self, token: Token) {
203 self.backend_token = Some(token);
204 }
205
206 pub fn get_session_address(&self) -> Option<SocketAddr> {
207 self.session_address
208 .or_else(|| self.frontend.socket_ref().peer_addr().ok())
209 }
210
211 pub fn get_backend_address(&self) -> Option<SocketAddr> {
212 self.backend_socket
213 .as_ref()
214 .and_then(|backend| backend.peer_addr().ok())
215 }
216
217 fn protocol_string(&self) -> &'static str {
218 match self.protocol {
219 Protocol::TCP => "TCP",
220 Protocol::HTTP => "WS",
221 Protocol::HTTPS => match self.frontend.protocol() {
222 TransportProtocol::Ssl2 => "WSS-SSL2",
223 TransportProtocol::Ssl3 => "WSS-SSL3",
224 TransportProtocol::Tls1_0 => "WSS-TLS1.0",
225 TransportProtocol::Tls1_1 => "WSS-TLS1.1",
226 TransportProtocol::Tls1_2 => "WSS-TLS1.2",
227 TransportProtocol::Tls1_3 => "WSS-TLS1.3",
228 _ => unreachable!(),
229 },
230 _ => unreachable!(),
231 }
232 }
233
234 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
235 let listener = self.listener.borrow();
236 let context = self.log_context();
237 let endpoint = self.log_endpoint();
238 metrics.register_end_of_session(&context);
239 log_access!(
240 error,
241 on_failure: { incr!("unsent-access-logs") },
242 message,
243 context,
244 session_address: self.get_session_address(),
245 backend_address: self.get_backend_address(),
246 protocol: self.protocol_string(),
247 endpoint,
248 tags: listener.get_tags(&listener.get_addr().to_string()),
249 client_rtt: socket_rtt(self.front_socket()),
250 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
251 service_time: metrics.service_time(),
252 response_time: metrics.backend_response_time(),
253 request_time: metrics.request_time(),
254 bytes_in: metrics.bin,
255 bytes_out: metrics.bout,
256 user_agent: None,
257 otel: None,
258 );
259 }
260
261 pub fn log_request_success(&self, metrics: &SessionMetrics) {
262 self.log_request(metrics, false, None);
263 }
264
265 pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
266 incr!("pipe.errors");
267 error!(
268 "{} Could not process request properly got: {}",
269 log_context!(self),
270 message
271 );
272 self.print_state(self.protocol_string());
273 self.log_request(metrics, true, Some(message));
274 }
275
276 pub fn check_connections(&self) -> bool {
279 let request_is_inflight = self.frontend_buffer.available_data() > 0
280 || self.frontend_readiness.event.is_readable();
281 let response_is_inflight =
282 self.backend_buffer.available_data() > 0 || self.backend_readiness.event.is_readable();
283 match (self.frontend_status, self.backend_status) {
284 (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
285 (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
286 (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
287 request_is_inflight || response_is_inflight
292 }
293 (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
294
295 (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
296 request_is_inflight || response_is_inflight
299 }
300 (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
301 (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
302 request_is_inflight || response_is_inflight
303 }
304 (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
305
306 (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
307 (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
308 (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
309 (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
310
311 (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
312 (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
313 (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
314 (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
315 }
316 }
317
318 pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
319 self.log_request_success(metrics);
320 self.frontend_status = ConnectionStatus::Closed;
321 SessionResult::Close
322 }
323
324 pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
325 self.backend_status = ConnectionStatus::Closed;
326 if self.backend_buffer.available_data() == 0 {
327 if self.backend_readiness.event.is_readable() {
328 self.backend_readiness.interest.insert(Ready::READABLE);
329 debug!(
330 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
331 log_context!(self)
332 );
333 SessionResult::Continue
334 } else {
335 self.log_request_success(metrics);
336 SessionResult::Close
337 }
338 } else {
339 debug!(
340 "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
341 log_context!(self)
342 );
343 self.frontend_readiness.interest.insert(Ready::WRITABLE);
344 if self.backend_readiness.event.is_readable() {
345 self.backend_readiness.interest.insert(Ready::READABLE);
346 }
347 SessionResult::Continue
348 }
349 }
350
351 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
353 self.reset_timeouts();
354
355 trace!("pipe readable");
356 if self.frontend_buffer.available_space() == 0 {
357 self.frontend_readiness.interest.remove(Ready::READABLE);
358 self.backend_readiness.interest.insert(Ready::WRITABLE);
359 return SessionResult::Continue;
360 }
361
362 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
363 debug!("{} Read {} bytes", log_context!(self), sz);
364
365 if sz > 0 {
366 self.frontend_buffer.fill(sz);
368
369 count!("bytes_in", sz as i64);
370 metrics.bin += sz;
371
372 if self.frontend_buffer.available_space() == 0 {
373 self.frontend_readiness.interest.remove(Ready::READABLE);
374 }
375 self.backend_readiness.interest.insert(Ready::WRITABLE);
376 } else {
377 self.frontend_readiness.event.remove(Ready::READABLE);
378
379 if res == SocketResult::Continue {
380 self.frontend_status = match self.frontend_status {
381 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
382 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
383 s => s,
384 };
385 }
386 }
387
388 if !self.check_connections() {
389 self.frontend_readiness.reset();
390 self.backend_readiness.reset();
391 self.log_request_success(metrics);
392 return SessionResult::Close;
393 }
394
395 match res {
396 SocketResult::Error => {
397 self.frontend_readiness.reset();
398 self.backend_readiness.reset();
399 self.log_request_error(metrics, "front socket read error");
400 return SessionResult::Close;
401 }
402 SocketResult::Closed => {
403 self.frontend_readiness.reset();
404 self.backend_readiness.reset();
405 self.log_request_success(metrics);
406 return SessionResult::Close;
407 }
408 SocketResult::WouldBlock => {
409 self.frontend_readiness.event.remove(Ready::READABLE);
410 }
411 SocketResult::Continue => {}
412 };
413
414 self.backend_readiness.interest.insert(Ready::WRITABLE);
415 SessionResult::Continue
416 }
417
418 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
420 trace!("{} Pipe writable", log_context!(self));
421 if self.backend_buffer.available_data() == 0 {
422 self.backend_readiness.interest.insert(Ready::READABLE);
423 self.frontend_readiness.interest.remove(Ready::WRITABLE);
424 return SessionResult::Continue;
425 }
426
427 let mut sz = 0usize;
428 let mut res = SocketResult::Continue;
429 while res == SocketResult::Continue {
430 if self.backend_buffer.available_data() == 0 {
432 count!("bytes_out", sz as i64);
433 metrics.bout += sz;
434 self.backend_readiness.interest.insert(Ready::READABLE);
435 self.frontend_readiness.interest.remove(Ready::WRITABLE);
436 return SessionResult::Continue;
437 }
438 let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
439 res = current_res;
440 self.backend_buffer.consume(current_sz);
441 sz += current_sz;
442
443 if current_sz == 0 && res == SocketResult::Continue {
444 self.frontend_status = match self.frontend_status {
445 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
446 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
447 s => s,
448 };
449 }
450
451 if !self.check_connections() {
452 metrics.bout += sz;
453 count!("bytes_out", sz as i64);
454 self.frontend_readiness.reset();
455 self.backend_readiness.reset();
456 self.log_request_success(metrics);
457 return SessionResult::Close;
458 }
459 }
460
461 if sz > 0 {
462 count!("bytes_out", sz as i64);
463 self.backend_readiness.interest.insert(Ready::READABLE);
464 metrics.bout += sz;
465 }
466
467 debug!(
468 "{} Wrote {} bytes of {}",
469 log_context!(self),
470 sz,
471 self.backend_buffer.available_data()
472 );
473
474 match res {
475 SocketResult::Error => {
476 self.frontend_readiness.reset();
477 self.backend_readiness.reset();
478 self.log_request_error(metrics, "front socket write error");
479 return SessionResult::Close;
480 }
481 SocketResult::Closed => {
482 self.frontend_readiness.reset();
483 self.backend_readiness.reset();
484 self.log_request_success(metrics);
485 return SessionResult::Close;
486 }
487 SocketResult::WouldBlock => {
488 self.frontend_readiness.event.remove(Ready::WRITABLE);
489 }
490 SocketResult::Continue => {}
491 }
492
493 SessionResult::Continue
494 }
495
496 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
498 trace!("pipe back_writable");
499
500 if self.frontend_buffer.available_data() == 0 {
501 self.frontend_readiness.interest.insert(Ready::READABLE);
502 self.backend_readiness.interest.remove(Ready::WRITABLE);
503 return SessionResult::Continue;
504 }
505
506 let output_size = self.frontend_buffer.available_data();
507
508 let mut sz = 0usize;
509 let mut socket_res = SocketResult::Continue;
510
511 if let Some(ref mut backend) = self.backend_socket {
512 while socket_res == SocketResult::Continue {
513 if self.frontend_buffer.available_data() == 0 {
515 self.frontend_readiness.interest.insert(Ready::READABLE);
516 self.backend_readiness.interest.remove(Ready::WRITABLE);
517 count!("back_bytes_out", sz as i64);
518 metrics.backend_bout += sz;
519 return SessionResult::Continue;
520 }
521
522 let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
523 socket_res = current_res;
524 self.frontend_buffer.consume(current_sz);
525 sz += current_sz;
526
527 if current_sz == 0 && current_res == SocketResult::Continue {
528 self.backend_status = match self.backend_status {
529 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
530 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
531 s => s,
532 };
533 }
534 }
535 }
536
537 count!("back_bytes_out", sz as i64);
538 metrics.backend_bout += sz;
539
540 if !self.check_connections() {
541 self.frontend_readiness.reset();
542 self.backend_readiness.reset();
543 self.log_request_success(metrics);
544 return SessionResult::Close;
545 }
546
547 debug!(
548 "{} Wrote {} bytes of {}",
549 log_context!(self),
550 sz,
551 output_size
552 );
553
554 match socket_res {
555 SocketResult::Error => {
556 self.frontend_readiness.reset();
557 self.backend_readiness.reset();
558 self.log_request_error(metrics, "back socket write error");
559 return SessionResult::Close;
560 }
561 SocketResult::Closed => {
562 self.frontend_readiness.reset();
563 self.backend_readiness.reset();
564 self.log_request_success(metrics);
565 return SessionResult::Close;
566 }
567 SocketResult::WouldBlock => {
568 self.backend_readiness.event.remove(Ready::WRITABLE);
569 }
570 SocketResult::Continue => {}
571 }
572 SessionResult::Continue
573 }
574
575 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
577 self.reset_timeouts();
578
579 trace!("{} Pipe backend_readable", log_context!(self));
580 if self.backend_buffer.available_space() == 0 {
581 self.backend_readiness.interest.remove(Ready::READABLE);
582 return SessionResult::Continue;
583 }
584
585 if let Some(ref mut backend) = self.backend_socket {
586 let (size, remaining) = backend.socket_read(self.backend_buffer.space());
587 self.backend_buffer.fill(size);
588
589 debug!("{} Read {} bytes", log_context!(self), size);
590
591 if remaining != SocketResult::Continue || size == 0 {
592 self.backend_readiness.event.remove(Ready::READABLE);
593 }
594 if size > 0 {
595 self.frontend_readiness.interest.insert(Ready::WRITABLE);
596 metrics.backend_bin += size;
597 }
598
599 if size == 0 && remaining == SocketResult::Closed {
600 self.backend_status = match self.backend_status {
601 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
602 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
603 s => s,
604 };
605
606 if !self.check_connections() {
607 self.frontend_readiness.reset();
608 self.backend_readiness.reset();
609 self.log_request_success(metrics);
610 return SessionResult::Close;
611 }
612 }
613
614 match remaining {
615 SocketResult::Error => {
616 self.frontend_readiness.reset();
617 self.backend_readiness.reset();
618 self.log_request_error(metrics, "back socket read error");
619 return SessionResult::Close;
620 }
621 SocketResult::Closed => {
622 if !self.check_connections() {
623 self.frontend_readiness.reset();
624 self.backend_readiness.reset();
625 self.log_request_success(metrics);
626 return SessionResult::Close;
627 }
628 }
629 SocketResult::WouldBlock => {
630 self.backend_readiness.event.remove(Ready::READABLE);
631 }
632 SocketResult::Continue => {}
633 }
634 }
635
636 SessionResult::Continue
637 }
638
639 pub fn log_context(&self) -> LogContext<'_> {
640 LogContext {
641 request_id: self.request_id,
642 cluster_id: self.cluster_id.as_deref(),
643 backend_id: self.backend_id.as_deref(),
644 }
645 }
646
647 fn log_endpoint(&self) -> EndpointRecord<'_> {
648 match &self.websocket_context {
649 WebSocketContext::Http {
650 method,
651 authority,
652 path,
653 status,
654 reason,
655 } => EndpointRecord::Http {
656 method: method.as_deref(),
657 authority: authority.as_deref(),
658 path: path.as_deref(),
659 status: status.to_owned(),
660 reason: reason.as_deref(),
661 },
662 WebSocketContext::Tcp => EndpointRecord::Tcp,
663 }
664 }
665}
666
667impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
668 fn ready(
669 &mut self,
670 _session: Rc<RefCell<dyn crate::ProxySession>>,
671 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
672 metrics: &mut SessionMetrics,
673 ) -> SessionResult {
674 let mut counter = 0;
675
676 if self.frontend_readiness.event.is_hup() {
677 return SessionResult::Close;
678 }
679
680 while counter < MAX_LOOP_ITERATIONS {
681 let frontend_interest = self.frontend_readiness.filter_interest();
682 let backend_interest = self.backend_readiness.filter_interest();
683
684 trace!(
685 "{} Frontend interest({:?}), backend interest({:?})",
686 log_context!(self),
687 frontend_interest,
688 backend_interest
689 );
690 if frontend_interest.is_empty() && backend_interest.is_empty() {
691 break;
692 }
693
694 if self.backend_readiness.event.is_hup()
695 && self.frontend_readiness.interest.is_writable()
696 && !self.frontend_readiness.event.is_writable()
697 {
698 break;
699 }
700
701 if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
702 return SessionResult::Close;
703 }
704
705 if backend_interest.is_writable()
706 && self.backend_writable(metrics) == SessionResult::Close
707 {
708 return SessionResult::Close;
709 }
710
711 if backend_interest.is_readable()
712 && self.backend_readable(metrics) == SessionResult::Close
713 {
714 return SessionResult::Close;
715 }
716
717 if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
718 return SessionResult::Close;
719 }
720
721 if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
722 return SessionResult::Close;
723 }
724
725 if frontend_interest.is_error() {
726 error!(
727 "{} Frontend socket error, disconnecting",
728 log_context!(self)
729 );
730
731 self.frontend_readiness.interest = Ready::EMPTY;
732 self.backend_readiness.interest = Ready::EMPTY;
733
734 return SessionResult::Close;
735 }
736
737 if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
738 self.frontend_readiness.interest = Ready::EMPTY;
739 self.backend_readiness.interest = Ready::EMPTY;
740
741 error!("{} Backend socket error, disconnecting", log_context!(self));
742 return SessionResult::Close;
743 }
744
745 counter += 1;
746 }
747
748 if counter >= MAX_LOOP_ITERATIONS {
749 error!(
750 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
751 log_context!(self),
752 MAX_LOOP_ITERATIONS
753 );
754
755 incr!("http.infinite_loop.error");
756 self.print_state(self.protocol_string());
757
758 return SessionResult::Close;
759 }
760
761 SessionResult::Continue
762 }
763
764 fn update_readiness(&mut self, token: Token, events: Ready) {
765 if self.frontend_token == token {
766 self.frontend_readiness.event |= events;
767 } else if self.backend_token == Some(token) {
768 self.backend_readiness.event |= events;
769 }
770 }
771
772 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
773 if self.frontend_token == token {
775 self.log_request_error(metrics, "frontend socket timeout");
776 if let Some(timeout) = self.container_frontend_timeout.as_mut() {
777 timeout.triggered()
778 }
779 return StateResult::CloseSession;
780 }
781
782 if self.backend_token == Some(token) {
783 if let Some(timeout) = self.container_backend_timeout.as_mut() {
785 timeout.triggered()
786 }
787
788 self.log_request_error(metrics, "backend socket timeout");
789 return StateResult::CloseSession;
790 }
791
792 error!("{} Got timeout for an invalid token", log_context!(self));
793 self.log_request_error(metrics, "invalid token timeout");
794 StateResult::CloseSession
795 }
796
797 fn cancel_timeouts(&mut self) {
798 self.container_frontend_timeout.as_mut().map(|t| t.cancel());
799 self.container_backend_timeout.as_mut().map(|t| t.cancel());
800 }
801
802 fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
803 if let Some(backend) = self.backend.as_mut() {
804 let mut backend = backend.borrow_mut();
805 backend.active_requests = backend.active_requests.saturating_sub(1);
806 }
807 }
808
809 fn print_state(&self, context: &str) {
810 error!(
811 "\
812{} {} Session(Pipe)
813\tFrontend:
814\t\ttoken: {:?}\treadiness: {:?}
815\tBackend:
816\t\ttoken: {:?}\treadiness: {:?}",
817 log_context!(self),
818 context,
819 self.frontend_token,
820 self.frontend_readiness,
821 self.backend_token,
822 self.backend_readiness
823 );
824 }
825}