sozu_lib/protocol/
pipe.rs

1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
2
3use mio::{Token, net::TcpStream};
4use rusty_ulid::Ulid;
5use sozu_command::{
6    config::MAX_LOOP_ITERATIONS,
7    logging::{EndpointRecord, LogContext},
8};
9
10use crate::{
11    L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
12    backends::Backend,
13    pool::Checkout,
14    protocol::{SessionState, http::parser::Method},
15    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
16    sozu_command::ready::Ready,
17    timer::TimeoutContainer,
18};
19
20/// This macro is defined uniquely in this module to help the tracking of pipelining
21/// issues inside Sōzu
22macro_rules! log_context {
23    ($self:expr) => {
24        format!(
25            "PIPE\t{}\tSession(address={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
26            $self.log_context(),
27            $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
28            $self.frontend_token.0,
29            $self.frontend_readiness,
30            $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
31            $self.backend_readiness,
32        )
33    };
34}
35
36#[derive(PartialEq, Eq)]
37pub enum SessionStatus {
38    Normal,
39    DefaultAnswer,
40}
41
42#[derive(Copy, Clone, Debug)]
43enum ConnectionStatus {
44    Normal,
45    ReadOpen,
46    WriteOpen,
47    Closed,
48}
49
50/// matches sozu_command_lib::logging::access_logs::EndpointRecords
51pub enum WebSocketContext {
52    Http {
53        method: Option<Method>,
54        authority: Option<String>,
55        path: Option<String>,
56        status: Option<u16>,
57        reason: Option<String>,
58    },
59    Tcp,
60}
61
62pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
63    backend_buffer: Checkout,
64    backend_id: Option<String>,
65    pub backend_readiness: Readiness,
66    backend_socket: Option<TcpStream>,
67    backend_status: ConnectionStatus,
68    backend_token: Option<Token>,
69    pub backend: Option<Rc<RefCell<Backend>>>,
70    cluster_id: Option<String>,
71    pub container_backend_timeout: Option<TimeoutContainer>,
72    pub container_frontend_timeout: Option<TimeoutContainer>,
73    frontend_buffer: Checkout,
74    pub frontend_readiness: Readiness,
75    frontend_status: ConnectionStatus,
76    frontend_token: Token,
77    frontend: Front,
78    listener: Rc<RefCell<L>>,
79    protocol: Protocol,
80    request_id: Ulid,
81    session_address: Option<SocketAddr>,
82    websocket_context: WebSocketContext,
83}
84
85impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
86    /// Instantiate a new Pipe SessionState with:
87    ///
88    /// - frontend_interest: READABLE | WRITABLE | HUP | ERROR
89    /// - frontend_event: EMPTY
90    /// - backend_interest: READABLE | WRITABLE | HUP | ERROR
91    /// - backend_event: EMPTY
92    ///
93    /// Remember to set the events from the previous State!
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        backend_buffer: Checkout,
97        backend_id: Option<String>,
98        backend_socket: Option<TcpStream>,
99        backend: Option<Rc<RefCell<Backend>>>,
100        container_backend_timeout: Option<TimeoutContainer>,
101        container_frontend_timeout: Option<TimeoutContainer>,
102        cluster_id: Option<String>,
103        frontend_buffer: Checkout,
104        frontend_token: Token,
105        frontend: Front,
106        listener: Rc<RefCell<L>>,
107        protocol: Protocol,
108        request_id: Ulid,
109        session_address: Option<SocketAddr>,
110        websocket_context: WebSocketContext,
111    ) -> Pipe<Front, L> {
112        let frontend_status = ConnectionStatus::Normal;
113        let backend_status = if backend_socket.is_none() {
114            ConnectionStatus::Closed
115        } else {
116            ConnectionStatus::Normal
117        };
118
119        let session = Pipe {
120            backend_buffer,
121            backend_id,
122            backend_readiness: Readiness {
123                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
124                event: Ready::EMPTY,
125            },
126            backend_socket,
127            backend_status,
128            backend_token: None,
129            backend,
130            cluster_id,
131            container_backend_timeout,
132            container_frontend_timeout,
133            frontend_buffer,
134            frontend_readiness: Readiness {
135                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
136                event: Ready::EMPTY,
137            },
138            frontend_status,
139            frontend_token,
140            frontend,
141            listener,
142            protocol,
143            request_id,
144            session_address,
145            websocket_context,
146        };
147
148        trace!("created pipe");
149        session
150    }
151
152    pub fn front_socket(&self) -> &TcpStream {
153        self.frontend.socket_ref()
154    }
155
156    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
157        self.frontend.socket_mut()
158    }
159
160    pub fn back_socket(&self) -> Option<&TcpStream> {
161        self.backend_socket.as_ref()
162    }
163
164    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
165        self.backend_socket.as_mut()
166    }
167
168    pub fn set_back_socket(&mut self, socket: TcpStream) {
169        self.backend_socket = Some(socket);
170        self.backend_status = ConnectionStatus::Normal;
171    }
172
173    pub fn back_token(&self) -> Vec<Token> {
174        self.backend_token.iter().cloned().collect()
175    }
176
177    fn reset_timeouts(&mut self) {
178        if let Some(t) = self.container_frontend_timeout.as_mut() {
179            if !t.reset() {
180                error!(
181                    "{} Could not reset front timeout (pipe)",
182                    log_context!(self)
183                );
184            }
185        }
186
187        if let Some(t) = self.container_backend_timeout.as_mut() {
188            if !t.reset() {
189                error!("{} Could not reset back timeout (pipe)", log_context!(self));
190            }
191        }
192    }
193
194    pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
195        self.cluster_id = cluster_id;
196    }
197
198    pub fn set_backend_id(&mut self, backend_id: Option<String>) {
199        self.backend_id = backend_id;
200    }
201
202    pub fn set_back_token(&mut self, token: Token) {
203        self.backend_token = Some(token);
204    }
205
206    pub fn get_session_address(&self) -> Option<SocketAddr> {
207        self.session_address
208            .or_else(|| self.frontend.socket_ref().peer_addr().ok())
209    }
210
211    pub fn get_backend_address(&self) -> Option<SocketAddr> {
212        self.backend_socket
213            .as_ref()
214            .and_then(|backend| backend.peer_addr().ok())
215    }
216
217    fn protocol_string(&self) -> &'static str {
218        match self.protocol {
219            Protocol::TCP => "TCP",
220            Protocol::HTTP => "WS",
221            Protocol::HTTPS => match self.frontend.protocol() {
222                TransportProtocol::Ssl2 => "WSS-SSL2",
223                TransportProtocol::Ssl3 => "WSS-SSL3",
224                TransportProtocol::Tls1_0 => "WSS-TLS1.0",
225                TransportProtocol::Tls1_1 => "WSS-TLS1.1",
226                TransportProtocol::Tls1_2 => "WSS-TLS1.2",
227                TransportProtocol::Tls1_3 => "WSS-TLS1.3",
228                _ => unreachable!(),
229            },
230            _ => unreachable!(),
231        }
232    }
233
234    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
235        let listener = self.listener.borrow();
236        let context = self.log_context();
237        let endpoint = self.log_endpoint();
238        metrics.register_end_of_session(&context);
239        log_access!(
240            error,
241            on_failure: { incr!("unsent-access-logs") },
242            message,
243            context,
244            session_address: self.get_session_address(),
245            backend_address: self.get_backend_address(),
246            protocol: self.protocol_string(),
247            endpoint,
248            tags: listener.get_tags(&listener.get_addr().to_string()),
249            client_rtt: socket_rtt(self.front_socket()),
250            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
251            service_time: metrics.service_time(),
252            response_time: metrics.backend_response_time(),
253            request_time: metrics.request_time(),
254            bytes_in: metrics.bin,
255            bytes_out: metrics.bout,
256            user_agent: None,
257            otel: None,
258        );
259    }
260
261    pub fn log_request_success(&self, metrics: &SessionMetrics) {
262        self.log_request(metrics, false, None);
263    }
264
265    pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
266        incr!("pipe.errors");
267        error!(
268            "{} Could not process request properly got: {}",
269            log_context!(self),
270            message
271        );
272        self.print_state(self.protocol_string());
273        self.log_request(metrics, true, Some(message));
274    }
275
276    /// Wether the session should be kept open, depending on endpoints status
277    /// and buffer usage (both in memory and in kernel)
278    pub fn check_connections(&self) -> bool {
279        let request_is_inflight = self.frontend_buffer.available_data() > 0
280            || self.frontend_readiness.event.is_readable();
281        let response_is_inflight =
282            self.backend_buffer.available_data() > 0 || self.backend_readiness.event.is_readable();
283        match (self.frontend_status, self.backend_status) {
284            (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
285            (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
286            (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
287                // technically we should keep it open, but we'll assume that if the front
288                // is not readable and there is no in flight data front -> back or back -> front,
289                // we'll close the session, otherwise it interacts badly with HTTP connections
290                // with Connection: close header and no Content-length
291                request_is_inflight || response_is_inflight
292            }
293            (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
294
295            (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
296                // technically we should keep it open, but we'll assume that if the back
297                // is not readable and there is no in flight data back -> front or front -> back, we'll close the session
298                request_is_inflight || response_is_inflight
299            }
300            (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
301            (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
302                request_is_inflight || response_is_inflight
303            }
304            (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
305
306            (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
307            (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
308            (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
309            (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
310
311            (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
312            (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
313            (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
314            (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
315        }
316    }
317
318    pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
319        self.log_request_success(metrics);
320        self.frontend_status = ConnectionStatus::Closed;
321        SessionResult::Close
322    }
323
324    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
325        self.backend_status = ConnectionStatus::Closed;
326        if self.backend_buffer.available_data() == 0 {
327            if self.backend_readiness.event.is_readable() {
328                self.backend_readiness.interest.insert(Ready::READABLE);
329                debug!(
330                    "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
331                    log_context!(self)
332                );
333                SessionResult::Continue
334            } else {
335                self.log_request_success(metrics);
336                SessionResult::Close
337            }
338        } else {
339            debug!(
340                "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
341                log_context!(self)
342            );
343            self.frontend_readiness.interest.insert(Ready::WRITABLE);
344            if self.backend_readiness.event.is_readable() {
345                self.backend_readiness.interest.insert(Ready::READABLE);
346            }
347            SessionResult::Continue
348        }
349    }
350
351    // Read content from the session
352    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
353        self.reset_timeouts();
354
355        trace!("pipe readable");
356        if self.frontend_buffer.available_space() == 0 {
357            self.frontend_readiness.interest.remove(Ready::READABLE);
358            self.backend_readiness.interest.insert(Ready::WRITABLE);
359            return SessionResult::Continue;
360        }
361
362        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
363        debug!("{} Read {} bytes", log_context!(self), sz);
364
365        if sz > 0 {
366            //FIXME: replace with copy()
367            self.frontend_buffer.fill(sz);
368
369            count!("bytes_in", sz as i64);
370            metrics.bin += sz;
371
372            if self.frontend_buffer.available_space() == 0 {
373                self.frontend_readiness.interest.remove(Ready::READABLE);
374            }
375            self.backend_readiness.interest.insert(Ready::WRITABLE);
376        } else {
377            self.frontend_readiness.event.remove(Ready::READABLE);
378
379            if res == SocketResult::Continue {
380                self.frontend_status = match self.frontend_status {
381                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
382                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
383                    s => s,
384                };
385            }
386        }
387
388        if !self.check_connections() {
389            self.frontend_readiness.reset();
390            self.backend_readiness.reset();
391            self.log_request_success(metrics);
392            return SessionResult::Close;
393        }
394
395        match res {
396            SocketResult::Error => {
397                self.frontend_readiness.reset();
398                self.backend_readiness.reset();
399                self.log_request_error(metrics, "front socket read error");
400                return SessionResult::Close;
401            }
402            SocketResult::Closed => {
403                self.frontend_readiness.reset();
404                self.backend_readiness.reset();
405                self.log_request_success(metrics);
406                return SessionResult::Close;
407            }
408            SocketResult::WouldBlock => {
409                self.frontend_readiness.event.remove(Ready::READABLE);
410            }
411            SocketResult::Continue => {}
412        };
413
414        self.backend_readiness.interest.insert(Ready::WRITABLE);
415        SessionResult::Continue
416    }
417
418    // Forward content to session
419    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
420        trace!("{} Pipe writable", log_context!(self));
421        if self.backend_buffer.available_data() == 0 {
422            self.backend_readiness.interest.insert(Ready::READABLE);
423            self.frontend_readiness.interest.remove(Ready::WRITABLE);
424            return SessionResult::Continue;
425        }
426
427        let mut sz = 0usize;
428        let mut res = SocketResult::Continue;
429        while res == SocketResult::Continue {
430            // no more data in buffer, stop here
431            if self.backend_buffer.available_data() == 0 {
432                count!("bytes_out", sz as i64);
433                metrics.bout += sz;
434                self.backend_readiness.interest.insert(Ready::READABLE);
435                self.frontend_readiness.interest.remove(Ready::WRITABLE);
436                return SessionResult::Continue;
437            }
438            let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
439            res = current_res;
440            self.backend_buffer.consume(current_sz);
441            sz += current_sz;
442
443            if current_sz == 0 && res == SocketResult::Continue {
444                self.frontend_status = match self.frontend_status {
445                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
446                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
447                    s => s,
448                };
449            }
450
451            if !self.check_connections() {
452                metrics.bout += sz;
453                count!("bytes_out", sz as i64);
454                self.frontend_readiness.reset();
455                self.backend_readiness.reset();
456                self.log_request_success(metrics);
457                return SessionResult::Close;
458            }
459        }
460
461        if sz > 0 {
462            count!("bytes_out", sz as i64);
463            self.backend_readiness.interest.insert(Ready::READABLE);
464            metrics.bout += sz;
465        }
466
467        debug!(
468            "{} Wrote {} bytes of {}",
469            log_context!(self),
470            sz,
471            self.backend_buffer.available_data()
472        );
473
474        match res {
475            SocketResult::Error => {
476                self.frontend_readiness.reset();
477                self.backend_readiness.reset();
478                self.log_request_error(metrics, "front socket write error");
479                return SessionResult::Close;
480            }
481            SocketResult::Closed => {
482                self.frontend_readiness.reset();
483                self.backend_readiness.reset();
484                self.log_request_success(metrics);
485                return SessionResult::Close;
486            }
487            SocketResult::WouldBlock => {
488                self.frontend_readiness.event.remove(Ready::WRITABLE);
489            }
490            SocketResult::Continue => {}
491        }
492
493        SessionResult::Continue
494    }
495
496    // Forward content to cluster
497    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
498        trace!("pipe back_writable");
499
500        if self.frontend_buffer.available_data() == 0 {
501            self.frontend_readiness.interest.insert(Ready::READABLE);
502            self.backend_readiness.interest.remove(Ready::WRITABLE);
503            return SessionResult::Continue;
504        }
505
506        let output_size = self.frontend_buffer.available_data();
507
508        let mut sz = 0usize;
509        let mut socket_res = SocketResult::Continue;
510
511        if let Some(ref mut backend) = self.backend_socket {
512            while socket_res == SocketResult::Continue {
513                // no more data in buffer, stop here
514                if self.frontend_buffer.available_data() == 0 {
515                    self.frontend_readiness.interest.insert(Ready::READABLE);
516                    self.backend_readiness.interest.remove(Ready::WRITABLE);
517                    count!("back_bytes_out", sz as i64);
518                    metrics.backend_bout += sz;
519                    return SessionResult::Continue;
520                }
521
522                let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
523                socket_res = current_res;
524                self.frontend_buffer.consume(current_sz);
525                sz += current_sz;
526
527                if current_sz == 0 && current_res == SocketResult::Continue {
528                    self.backend_status = match self.backend_status {
529                        ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
530                        ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
531                        s => s,
532                    };
533                }
534            }
535        }
536
537        count!("back_bytes_out", sz as i64);
538        metrics.backend_bout += sz;
539
540        if !self.check_connections() {
541            self.frontend_readiness.reset();
542            self.backend_readiness.reset();
543            self.log_request_success(metrics);
544            return SessionResult::Close;
545        }
546
547        debug!(
548            "{} Wrote {} bytes of {}",
549            log_context!(self),
550            sz,
551            output_size
552        );
553
554        match socket_res {
555            SocketResult::Error => {
556                self.frontend_readiness.reset();
557                self.backend_readiness.reset();
558                self.log_request_error(metrics, "back socket write error");
559                return SessionResult::Close;
560            }
561            SocketResult::Closed => {
562                self.frontend_readiness.reset();
563                self.backend_readiness.reset();
564                self.log_request_success(metrics);
565                return SessionResult::Close;
566            }
567            SocketResult::WouldBlock => {
568                self.backend_readiness.event.remove(Ready::WRITABLE);
569            }
570            SocketResult::Continue => {}
571        }
572        SessionResult::Continue
573    }
574
575    // Read content from cluster
576    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
577        self.reset_timeouts();
578
579        trace!("{} Pipe backend_readable", log_context!(self));
580        if self.backend_buffer.available_space() == 0 {
581            self.backend_readiness.interest.remove(Ready::READABLE);
582            return SessionResult::Continue;
583        }
584
585        if let Some(ref mut backend) = self.backend_socket {
586            let (size, remaining) = backend.socket_read(self.backend_buffer.space());
587            self.backend_buffer.fill(size);
588
589            debug!("{} Read {} bytes", log_context!(self), size);
590
591            if remaining != SocketResult::Continue || size == 0 {
592                self.backend_readiness.event.remove(Ready::READABLE);
593            }
594            if size > 0 {
595                self.frontend_readiness.interest.insert(Ready::WRITABLE);
596                metrics.backend_bin += size;
597            }
598
599            if size == 0 && remaining == SocketResult::Closed {
600                self.backend_status = match self.backend_status {
601                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
602                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
603                    s => s,
604                };
605
606                if !self.check_connections() {
607                    self.frontend_readiness.reset();
608                    self.backend_readiness.reset();
609                    self.log_request_success(metrics);
610                    return SessionResult::Close;
611                }
612            }
613
614            match remaining {
615                SocketResult::Error => {
616                    self.frontend_readiness.reset();
617                    self.backend_readiness.reset();
618                    self.log_request_error(metrics, "back socket read error");
619                    return SessionResult::Close;
620                }
621                SocketResult::Closed => {
622                    if !self.check_connections() {
623                        self.frontend_readiness.reset();
624                        self.backend_readiness.reset();
625                        self.log_request_success(metrics);
626                        return SessionResult::Close;
627                    }
628                }
629                SocketResult::WouldBlock => {
630                    self.backend_readiness.event.remove(Ready::READABLE);
631                }
632                SocketResult::Continue => {}
633            }
634        }
635
636        SessionResult::Continue
637    }
638
639    pub fn log_context(&self) -> LogContext<'_> {
640        LogContext {
641            request_id: self.request_id,
642            cluster_id: self.cluster_id.as_deref(),
643            backend_id: self.backend_id.as_deref(),
644        }
645    }
646
647    fn log_endpoint(&self) -> EndpointRecord<'_> {
648        match &self.websocket_context {
649            WebSocketContext::Http {
650                method,
651                authority,
652                path,
653                status,
654                reason,
655            } => EndpointRecord::Http {
656                method: method.as_deref(),
657                authority: authority.as_deref(),
658                path: path.as_deref(),
659                status: status.to_owned(),
660                reason: reason.as_deref(),
661            },
662            WebSocketContext::Tcp => EndpointRecord::Tcp,
663        }
664    }
665}
666
667impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
668    fn ready(
669        &mut self,
670        _session: Rc<RefCell<dyn crate::ProxySession>>,
671        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
672        metrics: &mut SessionMetrics,
673    ) -> SessionResult {
674        let mut counter = 0;
675
676        if self.frontend_readiness.event.is_hup() {
677            return SessionResult::Close;
678        }
679
680        while counter < MAX_LOOP_ITERATIONS {
681            let frontend_interest = self.frontend_readiness.filter_interest();
682            let backend_interest = self.backend_readiness.filter_interest();
683
684            trace!(
685                "{} Frontend interest({:?}), backend interest({:?})",
686                log_context!(self),
687                frontend_interest,
688                backend_interest
689            );
690            if frontend_interest.is_empty() && backend_interest.is_empty() {
691                break;
692            }
693
694            if self.backend_readiness.event.is_hup()
695                && self.frontend_readiness.interest.is_writable()
696                && !self.frontend_readiness.event.is_writable()
697            {
698                break;
699            }
700
701            if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
702                return SessionResult::Close;
703            }
704
705            if backend_interest.is_writable()
706                && self.backend_writable(metrics) == SessionResult::Close
707            {
708                return SessionResult::Close;
709            }
710
711            if backend_interest.is_readable()
712                && self.backend_readable(metrics) == SessionResult::Close
713            {
714                return SessionResult::Close;
715            }
716
717            if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
718                return SessionResult::Close;
719            }
720
721            if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
722                return SessionResult::Close;
723            }
724
725            if frontend_interest.is_error() {
726                error!(
727                    "{} Frontend socket error, disconnecting",
728                    log_context!(self)
729                );
730
731                self.frontend_readiness.interest = Ready::EMPTY;
732                self.backend_readiness.interest = Ready::EMPTY;
733
734                return SessionResult::Close;
735            }
736
737            if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
738                self.frontend_readiness.interest = Ready::EMPTY;
739                self.backend_readiness.interest = Ready::EMPTY;
740
741                error!("{} Backend socket error, disconnecting", log_context!(self));
742                return SessionResult::Close;
743            }
744
745            counter += 1;
746        }
747
748        if counter >= MAX_LOOP_ITERATIONS {
749            error!(
750                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
751                log_context!(self),
752                MAX_LOOP_ITERATIONS
753            );
754
755            incr!("http.infinite_loop.error");
756            self.print_state(self.protocol_string());
757
758            return SessionResult::Close;
759        }
760
761        SessionResult::Continue
762    }
763
764    fn update_readiness(&mut self, token: Token, events: Ready) {
765        if self.frontend_token == token {
766            self.frontend_readiness.event |= events;
767        } else if self.backend_token == Some(token) {
768            self.backend_readiness.event |= events;
769        }
770    }
771
772    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
773        //info!("got timeout for token: {:?}", token);
774        if self.frontend_token == token {
775            self.log_request_error(metrics, "frontend socket timeout");
776            if let Some(timeout) = self.container_frontend_timeout.as_mut() {
777                timeout.triggered()
778            }
779            return StateResult::CloseSession;
780        }
781
782        if self.backend_token == Some(token) {
783            //info!("backend timeout triggered for token {:?}", token);
784            if let Some(timeout) = self.container_backend_timeout.as_mut() {
785                timeout.triggered()
786            }
787
788            self.log_request_error(metrics, "backend socket timeout");
789            return StateResult::CloseSession;
790        }
791
792        error!("{} Got timeout for an invalid token", log_context!(self));
793        self.log_request_error(metrics, "invalid token timeout");
794        StateResult::CloseSession
795    }
796
797    fn cancel_timeouts(&mut self) {
798        self.container_frontend_timeout.as_mut().map(|t| t.cancel());
799        self.container_backend_timeout.as_mut().map(|t| t.cancel());
800    }
801
802    fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
803        if let Some(backend) = self.backend.as_mut() {
804            let mut backend = backend.borrow_mut();
805            backend.active_requests = backend.active_requests.saturating_sub(1);
806        }
807    }
808
809    fn print_state(&self, context: &str) {
810        error!(
811            "\
812{} {} Session(Pipe)
813\tFrontend:
814\t\ttoken: {:?}\treadiness: {:?}
815\tBackend:
816\t\ttoken: {:?}\treadiness: {:?}",
817            log_context!(self),
818            context,
819            self.frontend_token,
820            self.frontend_readiness,
821            self.backend_token,
822            self.backend_readiness
823        );
824    }
825}