1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use rusty_ulid::Ulid;
5use sozu_command::{
6 config::MAX_LOOP_ITERATIONS,
7 logging::{EndpointRecord, LogContext},
8};
9
10use crate::{
11 backends::Backend,
12 pool::Checkout,
13 protocol::{http::parser::Method, SessionState},
14 socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
15 sozu_command::ready::Ready,
16 timer::TimeoutContainer,
17 L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
18};
19
20macro_rules! log_context {
23 ($self:expr) => {
24 format!(
25 "PIPE\t{}\tSession(address={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
26 $self.log_context(),
27 $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
28 $self.frontend_token.0,
29 $self.frontend_readiness,
30 $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
31 $self.backend_readiness,
32 )
33 };
34}
35
36#[derive(PartialEq, Eq)]
37pub enum SessionStatus {
38 Normal,
39 DefaultAnswer,
40}
41
42#[derive(Copy, Clone, Debug)]
43enum ConnectionStatus {
44 Normal,
45 ReadOpen,
46 WriteOpen,
47 Closed,
48}
49
50pub enum WebSocketContext {
52 Http {
53 method: Option<Method>,
54 authority: Option<String>,
55 path: Option<String>,
56 status: Option<u16>,
57 reason: Option<String>,
58 },
59 Tcp,
60}
61
62pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
63 backend_buffer: Checkout,
64 backend_id: Option<String>,
65 pub backend_readiness: Readiness,
66 backend_socket: Option<TcpStream>,
67 backend_status: ConnectionStatus,
68 backend_token: Option<Token>,
69 pub backend: Option<Rc<RefCell<Backend>>>,
70 cluster_id: Option<String>,
71 pub container_backend_timeout: Option<TimeoutContainer>,
72 pub container_frontend_timeout: Option<TimeoutContainer>,
73 frontend_buffer: Checkout,
74 pub frontend_readiness: Readiness,
75 frontend_status: ConnectionStatus,
76 frontend_token: Token,
77 frontend: Front,
78 listener: Rc<RefCell<L>>,
79 protocol: Protocol,
80 request_id: Ulid,
81 session_address: Option<SocketAddr>,
82 websocket_context: WebSocketContext,
83}
84
85impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
86 #[allow(clippy::too_many_arguments)]
95 pub fn new(
96 backend_buffer: Checkout,
97 backend_id: Option<String>,
98 backend_socket: Option<TcpStream>,
99 backend: Option<Rc<RefCell<Backend>>>,
100 container_backend_timeout: Option<TimeoutContainer>,
101 container_frontend_timeout: Option<TimeoutContainer>,
102 cluster_id: Option<String>,
103 frontend_buffer: Checkout,
104 frontend_token: Token,
105 frontend: Front,
106 listener: Rc<RefCell<L>>,
107 protocol: Protocol,
108 request_id: Ulid,
109 session_address: Option<SocketAddr>,
110 websocket_context: WebSocketContext,
111 ) -> Pipe<Front, L> {
112 let frontend_status = ConnectionStatus::Normal;
113 let backend_status = if backend_socket.is_none() {
114 ConnectionStatus::Closed
115 } else {
116 ConnectionStatus::Normal
117 };
118
119 let session = Pipe {
120 backend_buffer,
121 backend_id,
122 backend_readiness: Readiness {
123 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
124 event: Ready::EMPTY,
125 },
126 backend_socket,
127 backend_status,
128 backend_token: None,
129 backend,
130 cluster_id,
131 container_backend_timeout,
132 container_frontend_timeout,
133 frontend_buffer,
134 frontend_readiness: Readiness {
135 interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
136 event: Ready::EMPTY,
137 },
138 frontend_status,
139 frontend_token,
140 frontend,
141 listener,
142 protocol,
143 request_id,
144 session_address,
145 websocket_context,
146 };
147
148 trace!("created pipe");
149 session
150 }
151
152 pub fn front_socket(&self) -> &TcpStream {
153 self.frontend.socket_ref()
154 }
155
156 pub fn front_socket_mut(&mut self) -> &mut TcpStream {
157 self.frontend.socket_mut()
158 }
159
160 pub fn back_socket(&self) -> Option<&TcpStream> {
161 self.backend_socket.as_ref()
162 }
163
164 pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
165 self.backend_socket.as_mut()
166 }
167
168 pub fn set_back_socket(&mut self, socket: TcpStream) {
169 self.backend_socket = Some(socket);
170 self.backend_status = ConnectionStatus::Normal;
171 }
172
173 pub fn back_token(&self) -> Vec<Token> {
174 self.backend_token.iter().cloned().collect()
175 }
176
177 fn reset_timeouts(&mut self) {
178 if let Some(t) = self.container_frontend_timeout.as_mut() {
179 if !t.reset() {
180 error!(
181 "{} Could not reset front timeout (pipe)",
182 log_context!(self)
183 );
184 }
185 }
186
187 if let Some(t) = self.container_backend_timeout.as_mut() {
188 if !t.reset() {
189 error!("{} Could not reset back timeout (pipe)", log_context!(self));
190 }
191 }
192 }
193
194 pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
195 self.cluster_id = cluster_id;
196 }
197
198 pub fn set_backend_id(&mut self, backend_id: Option<String>) {
199 self.backend_id = backend_id;
200 }
201
202 pub fn set_back_token(&mut self, token: Token) {
203 self.backend_token = Some(token);
204 }
205
206 pub fn get_session_address(&self) -> Option<SocketAddr> {
207 self.session_address
208 .or_else(|| self.frontend.socket_ref().peer_addr().ok())
209 }
210
211 pub fn get_backend_address(&self) -> Option<SocketAddr> {
212 self.backend_socket
213 .as_ref()
214 .and_then(|backend| backend.peer_addr().ok())
215 }
216
217 fn protocol_string(&self) -> &'static str {
218 match self.protocol {
219 Protocol::TCP => "TCP",
220 Protocol::HTTP => "WS",
221 Protocol::HTTPS => match self.frontend.protocol() {
222 TransportProtocol::Ssl2 => "WSS-SSL2",
223 TransportProtocol::Ssl3 => "WSS-SSL3",
224 TransportProtocol::Tls1_0 => "WSS-TLS1.0",
225 TransportProtocol::Tls1_1 => "WSS-TLS1.1",
226 TransportProtocol::Tls1_2 => "WSS-TLS1.2",
227 TransportProtocol::Tls1_3 => "WSS-TLS1.3",
228 _ => unreachable!(),
229 },
230 _ => unreachable!(),
231 }
232 }
233
234 pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
235 let listener = self.listener.borrow();
236 let context = self.log_context();
237 let endpoint = self.log_endpoint();
238 metrics.register_end_of_session(&context);
239 log_access!(
240 error,
241 on_failure: { incr!("unsent-access-logs") },
242 message,
243 context,
244 session_address: self.get_session_address(),
245 backend_address: self.get_backend_address(),
246 protocol: self.protocol_string(),
247 endpoint,
248 tags: listener.get_tags(&listener.get_addr().to_string()),
249 client_rtt: socket_rtt(self.front_socket()),
250 server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
251 service_time: metrics.service_time(),
252 response_time: metrics.backend_response_time(),
253 request_time: metrics.request_time(),
254 bytes_in: metrics.bin,
255 bytes_out: metrics.bout,
256 user_agent: None
257 );
258 }
259
260 pub fn log_request_success(&self, metrics: &SessionMetrics) {
261 self.log_request(metrics, false, None);
262 }
263
264 pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
265 incr!("pipe.errors");
266 error!(
267 "{} Could not process request properly got: {}",
268 log_context!(self),
269 message
270 );
271 self.print_state(self.protocol_string());
272 self.log_request(metrics, true, Some(message));
273 }
274
275 pub fn check_connections(&self) -> bool {
278 let request_is_inflight = self.frontend_buffer.available_data() > 0
279 || self.frontend_readiness.event.is_readable();
280 let response_is_inflight =
281 self.backend_buffer.available_data() > 0 || self.backend_readiness.event.is_readable();
282 match (self.frontend_status, self.backend_status) {
283 (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
284 (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
285 (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
286 request_is_inflight || response_is_inflight
291 }
292 (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
293
294 (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
295 request_is_inflight || response_is_inflight
298 }
299 (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
300 (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
301 request_is_inflight || response_is_inflight
302 }
303 (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
304
305 (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
306 (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
307 (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
308 (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
309
310 (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
311 (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
312 (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
313 (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
314 }
315 }
316
317 pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
318 self.log_request_success(metrics);
319 self.frontend_status = ConnectionStatus::Closed;
320 SessionResult::Close
321 }
322
323 pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
324 self.backend_status = ConnectionStatus::Closed;
325 if self.backend_buffer.available_data() == 0 {
326 if self.backend_readiness.event.is_readable() {
327 self.backend_readiness.interest.insert(Ready::READABLE);
328 debug!("{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.", log_context!(self));
329 SessionResult::Continue
330 } else {
331 self.log_request_success(metrics);
332 SessionResult::Close
333 }
334 } else {
335 debug!("{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.", log_context!(self));
336 self.frontend_readiness.interest.insert(Ready::WRITABLE);
337 if self.backend_readiness.event.is_readable() {
338 self.backend_readiness.interest.insert(Ready::READABLE);
339 }
340 SessionResult::Continue
341 }
342 }
343
344 pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
346 self.reset_timeouts();
347
348 trace!("pipe readable");
349 if self.frontend_buffer.available_space() == 0 {
350 self.frontend_readiness.interest.remove(Ready::READABLE);
351 self.backend_readiness.interest.insert(Ready::WRITABLE);
352 return SessionResult::Continue;
353 }
354
355 let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
356 debug!("{} Read {} bytes", log_context!(self), sz);
357
358 if sz > 0 {
359 self.frontend_buffer.fill(sz);
361
362 count!("bytes_in", sz as i64);
363 metrics.bin += sz;
364
365 if self.frontend_buffer.available_space() == 0 {
366 self.frontend_readiness.interest.remove(Ready::READABLE);
367 }
368 self.backend_readiness.interest.insert(Ready::WRITABLE);
369 } else {
370 self.frontend_readiness.event.remove(Ready::READABLE);
371
372 if res == SocketResult::Continue {
373 self.frontend_status = match self.frontend_status {
374 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
375 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
376 s => s,
377 };
378 }
379 }
380
381 if !self.check_connections() {
382 self.frontend_readiness.reset();
383 self.backend_readiness.reset();
384 self.log_request_success(metrics);
385 return SessionResult::Close;
386 }
387
388 match res {
389 SocketResult::Error => {
390 self.frontend_readiness.reset();
391 self.backend_readiness.reset();
392 self.log_request_error(metrics, "front socket read error");
393 return SessionResult::Close;
394 }
395 SocketResult::Closed => {
396 self.frontend_readiness.reset();
397 self.backend_readiness.reset();
398 self.log_request_success(metrics);
399 return SessionResult::Close;
400 }
401 SocketResult::WouldBlock => {
402 self.frontend_readiness.event.remove(Ready::READABLE);
403 }
404 SocketResult::Continue => {}
405 };
406
407 self.backend_readiness.interest.insert(Ready::WRITABLE);
408 SessionResult::Continue
409 }
410
411 pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
413 trace!("{} Pipe writable", log_context!(self));
414 if self.backend_buffer.available_data() == 0 {
415 self.backend_readiness.interest.insert(Ready::READABLE);
416 self.frontend_readiness.interest.remove(Ready::WRITABLE);
417 return SessionResult::Continue;
418 }
419
420 let mut sz = 0usize;
421 let mut res = SocketResult::Continue;
422 while res == SocketResult::Continue {
423 if self.backend_buffer.available_data() == 0 {
425 count!("bytes_out", sz as i64);
426 metrics.bout += sz;
427 self.backend_readiness.interest.insert(Ready::READABLE);
428 self.frontend_readiness.interest.remove(Ready::WRITABLE);
429 return SessionResult::Continue;
430 }
431 let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
432 res = current_res;
433 self.backend_buffer.consume(current_sz);
434 sz += current_sz;
435
436 if current_sz == 0 && res == SocketResult::Continue {
437 self.frontend_status = match self.frontend_status {
438 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
439 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
440 s => s,
441 };
442 }
443
444 if !self.check_connections() {
445 metrics.bout += sz;
446 count!("bytes_out", sz as i64);
447 self.frontend_readiness.reset();
448 self.backend_readiness.reset();
449 self.log_request_success(metrics);
450 return SessionResult::Close;
451 }
452 }
453
454 if sz > 0 {
455 count!("bytes_out", sz as i64);
456 self.backend_readiness.interest.insert(Ready::READABLE);
457 metrics.bout += sz;
458 }
459
460 debug!(
461 "{} Wrote {} bytes of {}",
462 log_context!(self),
463 sz,
464 self.backend_buffer.available_data()
465 );
466
467 match res {
468 SocketResult::Error => {
469 self.frontend_readiness.reset();
470 self.backend_readiness.reset();
471 self.log_request_error(metrics, "front socket write error");
472 return SessionResult::Close;
473 }
474 SocketResult::Closed => {
475 self.frontend_readiness.reset();
476 self.backend_readiness.reset();
477 self.log_request_success(metrics);
478 return SessionResult::Close;
479 }
480 SocketResult::WouldBlock => {
481 self.frontend_readiness.event.remove(Ready::WRITABLE);
482 }
483 SocketResult::Continue => {}
484 }
485
486 SessionResult::Continue
487 }
488
489 pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
491 trace!("pipe back_writable");
492
493 if self.frontend_buffer.available_data() == 0 {
494 self.frontend_readiness.interest.insert(Ready::READABLE);
495 self.backend_readiness.interest.remove(Ready::WRITABLE);
496 return SessionResult::Continue;
497 }
498
499 let output_size = self.frontend_buffer.available_data();
500
501 let mut sz = 0usize;
502 let mut socket_res = SocketResult::Continue;
503
504 if let Some(ref mut backend) = self.backend_socket {
505 while socket_res == SocketResult::Continue {
506 if self.frontend_buffer.available_data() == 0 {
508 self.frontend_readiness.interest.insert(Ready::READABLE);
509 self.backend_readiness.interest.remove(Ready::WRITABLE);
510 count!("back_bytes_out", sz as i64);
511 metrics.backend_bout += sz;
512 return SessionResult::Continue;
513 }
514
515 let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
516 socket_res = current_res;
517 self.frontend_buffer.consume(current_sz);
518 sz += current_sz;
519
520 if current_sz == 0 && current_res == SocketResult::Continue {
521 self.backend_status = match self.backend_status {
522 ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
523 ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
524 s => s,
525 };
526 }
527 }
528 }
529
530 count!("back_bytes_out", sz as i64);
531 metrics.backend_bout += sz;
532
533 if !self.check_connections() {
534 self.frontend_readiness.reset();
535 self.backend_readiness.reset();
536 self.log_request_success(metrics);
537 return SessionResult::Close;
538 }
539
540 debug!(
541 "{} Wrote {} bytes of {}",
542 log_context!(self),
543 sz,
544 output_size
545 );
546
547 match socket_res {
548 SocketResult::Error => {
549 self.frontend_readiness.reset();
550 self.backend_readiness.reset();
551 self.log_request_error(metrics, "back socket write error");
552 return SessionResult::Close;
553 }
554 SocketResult::Closed => {
555 self.frontend_readiness.reset();
556 self.backend_readiness.reset();
557 self.log_request_success(metrics);
558 return SessionResult::Close;
559 }
560 SocketResult::WouldBlock => {
561 self.backend_readiness.event.remove(Ready::WRITABLE);
562 }
563 SocketResult::Continue => {}
564 }
565 SessionResult::Continue
566 }
567
568 pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
570 self.reset_timeouts();
571
572 trace!("{} Pipe backend_readable", log_context!(self));
573 if self.backend_buffer.available_space() == 0 {
574 self.backend_readiness.interest.remove(Ready::READABLE);
575 return SessionResult::Continue;
576 }
577
578 if let Some(ref mut backend) = self.backend_socket {
579 let (size, remaining) = backend.socket_read(self.backend_buffer.space());
580 self.backend_buffer.fill(size);
581
582 debug!("{} Read {} bytes", log_context!(self), size);
583
584 if remaining != SocketResult::Continue || size == 0 {
585 self.backend_readiness.event.remove(Ready::READABLE);
586 }
587 if size > 0 {
588 self.frontend_readiness.interest.insert(Ready::WRITABLE);
589 metrics.backend_bin += size;
590 }
591
592 if size == 0 && remaining == SocketResult::Closed {
593 self.backend_status = match self.backend_status {
594 ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
595 ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
596 s => s,
597 };
598
599 if !self.check_connections() {
600 self.frontend_readiness.reset();
601 self.backend_readiness.reset();
602 self.log_request_success(metrics);
603 return SessionResult::Close;
604 }
605 }
606
607 match remaining {
608 SocketResult::Error => {
609 self.frontend_readiness.reset();
610 self.backend_readiness.reset();
611 self.log_request_error(metrics, "back socket read error");
612 return SessionResult::Close;
613 }
614 SocketResult::Closed => {
615 if !self.check_connections() {
616 self.frontend_readiness.reset();
617 self.backend_readiness.reset();
618 self.log_request_success(metrics);
619 return SessionResult::Close;
620 }
621 }
622 SocketResult::WouldBlock => {
623 self.backend_readiness.event.remove(Ready::READABLE);
624 }
625 SocketResult::Continue => {}
626 }
627 }
628
629 SessionResult::Continue
630 }
631
632 pub fn log_context(&self) -> LogContext {
633 LogContext {
634 request_id: self.request_id,
635 cluster_id: self.cluster_id.as_deref(),
636 backend_id: self.backend_id.as_deref(),
637 }
638 }
639
640 fn log_endpoint(&self) -> EndpointRecord {
641 match &self.websocket_context {
642 WebSocketContext::Http {
643 method,
644 authority,
645 path,
646 status,
647 reason,
648 } => EndpointRecord::Http {
649 method: method.as_deref(),
650 authority: authority.as_deref(),
651 path: path.as_deref(),
652 status: status.to_owned(),
653 reason: reason.as_deref(),
654 },
655 WebSocketContext::Tcp => EndpointRecord::Tcp,
656 }
657 }
658}
659
660impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
661 fn ready(
662 &mut self,
663 _session: Rc<RefCell<dyn crate::ProxySession>>,
664 _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
665 metrics: &mut SessionMetrics,
666 ) -> SessionResult {
667 let mut counter = 0;
668
669 if self.frontend_readiness.event.is_hup() {
670 return SessionResult::Close;
671 }
672
673 while counter < MAX_LOOP_ITERATIONS {
674 let frontend_interest = self.frontend_readiness.filter_interest();
675 let backend_interest = self.backend_readiness.filter_interest();
676
677 trace!(
678 "{} Frontend interest({:?}), backend interest({:?})",
679 log_context!(self),
680 frontend_interest,
681 backend_interest
682 );
683 if frontend_interest.is_empty() && backend_interest.is_empty() {
684 break;
685 }
686
687 if self.backend_readiness.event.is_hup()
688 && self.frontend_readiness.interest.is_writable()
689 && !self.frontend_readiness.event.is_writable()
690 {
691 break;
692 }
693
694 if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
695 return SessionResult::Close;
696 }
697
698 if backend_interest.is_writable()
699 && self.backend_writable(metrics) == SessionResult::Close
700 {
701 return SessionResult::Close;
702 }
703
704 if backend_interest.is_readable()
705 && self.backend_readable(metrics) == SessionResult::Close
706 {
707 return SessionResult::Close;
708 }
709
710 if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
711 return SessionResult::Close;
712 }
713
714 if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
715 return SessionResult::Close;
716 }
717
718 if frontend_interest.is_error() {
719 error!(
720 "{} Frontend socket error, disconnecting",
721 log_context!(self)
722 );
723
724 self.frontend_readiness.interest = Ready::EMPTY;
725 self.backend_readiness.interest = Ready::EMPTY;
726
727 return SessionResult::Close;
728 }
729
730 if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
731 self.frontend_readiness.interest = Ready::EMPTY;
732 self.backend_readiness.interest = Ready::EMPTY;
733
734 error!("{} Backend socket error, disconnecting", log_context!(self));
735 return SessionResult::Close;
736 }
737
738 counter += 1;
739 }
740
741 if counter >= MAX_LOOP_ITERATIONS {
742 error!(
743 "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
744 log_context!(self), MAX_LOOP_ITERATIONS
745 );
746
747 incr!("http.infinite_loop.error");
748 self.print_state(self.protocol_string());
749
750 return SessionResult::Close;
751 }
752
753 SessionResult::Continue
754 }
755
756 fn update_readiness(&mut self, token: Token, events: Ready) {
757 if self.frontend_token == token {
758 self.frontend_readiness.event |= events;
759 } else if self.backend_token == Some(token) {
760 self.backend_readiness.event |= events;
761 }
762 }
763
764 fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
765 if self.frontend_token == token {
767 self.log_request_error(metrics, "frontend socket timeout");
768 if let Some(timeout) = self.container_frontend_timeout.as_mut() {
769 timeout.triggered()
770 }
771 return StateResult::CloseSession;
772 }
773
774 if self.backend_token == Some(token) {
775 if let Some(timeout) = self.container_backend_timeout.as_mut() {
777 timeout.triggered()
778 }
779
780 self.log_request_error(metrics, "backend socket timeout");
781 return StateResult::CloseSession;
782 }
783
784 error!("{} Got timeout for an invalid token", log_context!(self));
785 self.log_request_error(metrics, "invalid token timeout");
786 StateResult::CloseSession
787 }
788
789 fn cancel_timeouts(&mut self) {
790 self.container_frontend_timeout.as_mut().map(|t| t.cancel());
791 self.container_backend_timeout.as_mut().map(|t| t.cancel());
792 }
793
794 fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
795 if let Some(backend) = self.backend.as_mut() {
796 let mut backend = backend.borrow_mut();
797 backend.active_requests = backend.active_requests.saturating_sub(1);
798 }
799 }
800
801 fn print_state(&self, context: &str) {
802 error!(
803 "\
804{} {} Session(Pipe)
805\tFrontend:
806\t\ttoken: {:?}\treadiness: {:?}
807\tBackend:
808\t\ttoken: {:?}\treadiness: {:?}",
809 log_context!(self),
810 context,
811 self.frontend_token,
812 self.frontend_readiness,
813 self.backend_token,
814 self.backend_readiness
815 );
816 }
817}