sozu_lib/protocol/
pipe.rs

1use std::{cell::RefCell, net::SocketAddr, rc::Rc};
2
3use mio::{net::TcpStream, Token};
4use rusty_ulid::Ulid;
5use sozu_command::{
6    config::MAX_LOOP_ITERATIONS,
7    logging::{EndpointRecord, LogContext},
8};
9
10use crate::{
11    backends::Backend,
12    pool::Checkout,
13    protocol::{http::parser::Method, SessionState},
14    socket::{stats::socket_rtt, SocketHandler, SocketResult, TransportProtocol},
15    sozu_command::ready::Ready,
16    timer::TimeoutContainer,
17    L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
18};
19
20/// This macro is defined uniquely in this module to help the tracking of pipelining
21/// issues inside Sōzu
22macro_rules! log_context {
23    ($self:expr) => {
24        format!(
25            "PIPE\t{}\tSession(address={}, frontend={}, readiness={}, backend={}, readiness={})\t >>>",
26            $self.log_context(),
27            $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
28            $self.frontend_token.0,
29            $self.frontend_readiness,
30            $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
31            $self.backend_readiness,
32        )
33    };
34}
35
36#[derive(PartialEq, Eq)]
37pub enum SessionStatus {
38    Normal,
39    DefaultAnswer,
40}
41
42#[derive(Copy, Clone, Debug)]
43enum ConnectionStatus {
44    Normal,
45    ReadOpen,
46    WriteOpen,
47    Closed,
48}
49
50/// matches sozu_command_lib::logging::access_logs::EndpointRecords
51pub enum WebSocketContext {
52    Http {
53        method: Option<Method>,
54        authority: Option<String>,
55        path: Option<String>,
56        status: Option<u16>,
57        reason: Option<String>,
58    },
59    Tcp,
60}
61
62pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
63    backend_buffer: Checkout,
64    backend_id: Option<String>,
65    pub backend_readiness: Readiness,
66    backend_socket: Option<TcpStream>,
67    backend_status: ConnectionStatus,
68    backend_token: Option<Token>,
69    pub backend: Option<Rc<RefCell<Backend>>>,
70    cluster_id: Option<String>,
71    pub container_backend_timeout: Option<TimeoutContainer>,
72    pub container_frontend_timeout: Option<TimeoutContainer>,
73    frontend_buffer: Checkout,
74    pub frontend_readiness: Readiness,
75    frontend_status: ConnectionStatus,
76    frontend_token: Token,
77    frontend: Front,
78    listener: Rc<RefCell<L>>,
79    protocol: Protocol,
80    request_id: Ulid,
81    session_address: Option<SocketAddr>,
82    websocket_context: WebSocketContext,
83}
84
85impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
86    /// Instantiate a new Pipe SessionState with:
87    ///
88    /// - frontend_interest: READABLE | WRITABLE | HUP | ERROR
89    /// - frontend_event: EMPTY
90    /// - backend_interest: READABLE | WRITABLE | HUP | ERROR
91    /// - backend_event: EMPTY
92    ///
93    /// Remember to set the events from the previous State!
94    #[allow(clippy::too_many_arguments)]
95    pub fn new(
96        backend_buffer: Checkout,
97        backend_id: Option<String>,
98        backend_socket: Option<TcpStream>,
99        backend: Option<Rc<RefCell<Backend>>>,
100        container_backend_timeout: Option<TimeoutContainer>,
101        container_frontend_timeout: Option<TimeoutContainer>,
102        cluster_id: Option<String>,
103        frontend_buffer: Checkout,
104        frontend_token: Token,
105        frontend: Front,
106        listener: Rc<RefCell<L>>,
107        protocol: Protocol,
108        request_id: Ulid,
109        session_address: Option<SocketAddr>,
110        websocket_context: WebSocketContext,
111    ) -> Pipe<Front, L> {
112        let frontend_status = ConnectionStatus::Normal;
113        let backend_status = if backend_socket.is_none() {
114            ConnectionStatus::Closed
115        } else {
116            ConnectionStatus::Normal
117        };
118
119        let session = Pipe {
120            backend_buffer,
121            backend_id,
122            backend_readiness: Readiness {
123                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
124                event: Ready::EMPTY,
125            },
126            backend_socket,
127            backend_status,
128            backend_token: None,
129            backend,
130            cluster_id,
131            container_backend_timeout,
132            container_frontend_timeout,
133            frontend_buffer,
134            frontend_readiness: Readiness {
135                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
136                event: Ready::EMPTY,
137            },
138            frontend_status,
139            frontend_token,
140            frontend,
141            listener,
142            protocol,
143            request_id,
144            session_address,
145            websocket_context,
146        };
147
148        trace!("created pipe");
149        session
150    }
151
152    pub fn front_socket(&self) -> &TcpStream {
153        self.frontend.socket_ref()
154    }
155
156    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
157        self.frontend.socket_mut()
158    }
159
160    pub fn back_socket(&self) -> Option<&TcpStream> {
161        self.backend_socket.as_ref()
162    }
163
164    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
165        self.backend_socket.as_mut()
166    }
167
168    pub fn set_back_socket(&mut self, socket: TcpStream) {
169        self.backend_socket = Some(socket);
170        self.backend_status = ConnectionStatus::Normal;
171    }
172
173    pub fn back_token(&self) -> Vec<Token> {
174        self.backend_token.iter().cloned().collect()
175    }
176
177    fn reset_timeouts(&mut self) {
178        if let Some(t) = self.container_frontend_timeout.as_mut() {
179            if !t.reset() {
180                error!(
181                    "{} Could not reset front timeout (pipe)",
182                    log_context!(self)
183                );
184            }
185        }
186
187        if let Some(t) = self.container_backend_timeout.as_mut() {
188            if !t.reset() {
189                error!("{} Could not reset back timeout (pipe)", log_context!(self));
190            }
191        }
192    }
193
194    pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
195        self.cluster_id = cluster_id;
196    }
197
198    pub fn set_backend_id(&mut self, backend_id: Option<String>) {
199        self.backend_id = backend_id;
200    }
201
202    pub fn set_back_token(&mut self, token: Token) {
203        self.backend_token = Some(token);
204    }
205
206    pub fn get_session_address(&self) -> Option<SocketAddr> {
207        self.session_address
208            .or_else(|| self.frontend.socket_ref().peer_addr().ok())
209    }
210
211    pub fn get_backend_address(&self) -> Option<SocketAddr> {
212        self.backend_socket
213            .as_ref()
214            .and_then(|backend| backend.peer_addr().ok())
215    }
216
217    fn protocol_string(&self) -> &'static str {
218        match self.protocol {
219            Protocol::TCP => "TCP",
220            Protocol::HTTP => "WS",
221            Protocol::HTTPS => match self.frontend.protocol() {
222                TransportProtocol::Ssl2 => "WSS-SSL2",
223                TransportProtocol::Ssl3 => "WSS-SSL3",
224                TransportProtocol::Tls1_0 => "WSS-TLS1.0",
225                TransportProtocol::Tls1_1 => "WSS-TLS1.1",
226                TransportProtocol::Tls1_2 => "WSS-TLS1.2",
227                TransportProtocol::Tls1_3 => "WSS-TLS1.3",
228                _ => unreachable!(),
229            },
230            _ => unreachable!(),
231        }
232    }
233
234    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
235        let listener = self.listener.borrow();
236        let context = self.log_context();
237        let endpoint = self.log_endpoint();
238        metrics.register_end_of_session(&context);
239        log_access!(
240            error,
241            on_failure: { incr!("unsent-access-logs") },
242            message,
243            context,
244            session_address: self.get_session_address(),
245            backend_address: self.get_backend_address(),
246            protocol: self.protocol_string(),
247            endpoint,
248            tags: listener.get_tags(&listener.get_addr().to_string()),
249            client_rtt: socket_rtt(self.front_socket()),
250            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
251            service_time: metrics.service_time(),
252            response_time: metrics.backend_response_time(),
253            request_time: metrics.request_time(),
254            bytes_in: metrics.bin,
255            bytes_out: metrics.bout,
256            user_agent: None
257        );
258    }
259
260    pub fn log_request_success(&self, metrics: &SessionMetrics) {
261        self.log_request(metrics, false, None);
262    }
263
264    pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
265        incr!("pipe.errors");
266        error!(
267            "{} Could not process request properly got: {}",
268            log_context!(self),
269            message
270        );
271        self.print_state(self.protocol_string());
272        self.log_request(metrics, true, Some(message));
273    }
274
275    /// Wether the session should be kept open, depending on endpoints status
276    /// and buffer usage (both in memory and in kernel)
277    pub fn check_connections(&self) -> bool {
278        let request_is_inflight = self.frontend_buffer.available_data() > 0
279            || self.frontend_readiness.event.is_readable();
280        let response_is_inflight =
281            self.backend_buffer.available_data() > 0 || self.backend_readiness.event.is_readable();
282        match (self.frontend_status, self.backend_status) {
283            (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
284            (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
285            (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
286                // technically we should keep it open, but we'll assume that if the front
287                // is not readable and there is no in flight data front -> back or back -> front,
288                // we'll close the session, otherwise it interacts badly with HTTP connections
289                // with Connection: close header and no Content-length
290                request_is_inflight || response_is_inflight
291            }
292            (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
293
294            (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
295                // technically we should keep it open, but we'll assume that if the back
296                // is not readable and there is no in flight data back -> front or front -> back, we'll close the session
297                request_is_inflight || response_is_inflight
298            }
299            (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
300            (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
301                request_is_inflight || response_is_inflight
302            }
303            (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
304
305            (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
306            (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
307            (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
308            (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
309
310            (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
311            (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
312            (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
313            (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
314        }
315    }
316
317    pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
318        self.log_request_success(metrics);
319        self.frontend_status = ConnectionStatus::Closed;
320        SessionResult::Close
321    }
322
323    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
324        self.backend_status = ConnectionStatus::Closed;
325        if self.backend_buffer.available_data() == 0 {
326            if self.backend_readiness.event.is_readable() {
327                self.backend_readiness.interest.insert(Ready::READABLE);
328                debug!("{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.", log_context!(self));
329                SessionResult::Continue
330            } else {
331                self.log_request_success(metrics);
332                SessionResult::Close
333            }
334        } else {
335            debug!("{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.", log_context!(self));
336            self.frontend_readiness.interest.insert(Ready::WRITABLE);
337            if self.backend_readiness.event.is_readable() {
338                self.backend_readiness.interest.insert(Ready::READABLE);
339            }
340            SessionResult::Continue
341        }
342    }
343
344    // Read content from the session
345    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
346        self.reset_timeouts();
347
348        trace!("pipe readable");
349        if self.frontend_buffer.available_space() == 0 {
350            self.frontend_readiness.interest.remove(Ready::READABLE);
351            self.backend_readiness.interest.insert(Ready::WRITABLE);
352            return SessionResult::Continue;
353        }
354
355        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
356        debug!("{} Read {} bytes", log_context!(self), sz);
357
358        if sz > 0 {
359            //FIXME: replace with copy()
360            self.frontend_buffer.fill(sz);
361
362            count!("bytes_in", sz as i64);
363            metrics.bin += sz;
364
365            if self.frontend_buffer.available_space() == 0 {
366                self.frontend_readiness.interest.remove(Ready::READABLE);
367            }
368            self.backend_readiness.interest.insert(Ready::WRITABLE);
369        } else {
370            self.frontend_readiness.event.remove(Ready::READABLE);
371
372            if res == SocketResult::Continue {
373                self.frontend_status = match self.frontend_status {
374                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
375                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
376                    s => s,
377                };
378            }
379        }
380
381        if !self.check_connections() {
382            self.frontend_readiness.reset();
383            self.backend_readiness.reset();
384            self.log_request_success(metrics);
385            return SessionResult::Close;
386        }
387
388        match res {
389            SocketResult::Error => {
390                self.frontend_readiness.reset();
391                self.backend_readiness.reset();
392                self.log_request_error(metrics, "front socket read error");
393                return SessionResult::Close;
394            }
395            SocketResult::Closed => {
396                self.frontend_readiness.reset();
397                self.backend_readiness.reset();
398                self.log_request_success(metrics);
399                return SessionResult::Close;
400            }
401            SocketResult::WouldBlock => {
402                self.frontend_readiness.event.remove(Ready::READABLE);
403            }
404            SocketResult::Continue => {}
405        };
406
407        self.backend_readiness.interest.insert(Ready::WRITABLE);
408        SessionResult::Continue
409    }
410
411    // Forward content to session
412    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
413        trace!("{} Pipe writable", log_context!(self));
414        if self.backend_buffer.available_data() == 0 {
415            self.backend_readiness.interest.insert(Ready::READABLE);
416            self.frontend_readiness.interest.remove(Ready::WRITABLE);
417            return SessionResult::Continue;
418        }
419
420        let mut sz = 0usize;
421        let mut res = SocketResult::Continue;
422        while res == SocketResult::Continue {
423            // no more data in buffer, stop here
424            if self.backend_buffer.available_data() == 0 {
425                count!("bytes_out", sz as i64);
426                metrics.bout += sz;
427                self.backend_readiness.interest.insert(Ready::READABLE);
428                self.frontend_readiness.interest.remove(Ready::WRITABLE);
429                return SessionResult::Continue;
430            }
431            let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
432            res = current_res;
433            self.backend_buffer.consume(current_sz);
434            sz += current_sz;
435
436            if current_sz == 0 && res == SocketResult::Continue {
437                self.frontend_status = match self.frontend_status {
438                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
439                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
440                    s => s,
441                };
442            }
443
444            if !self.check_connections() {
445                metrics.bout += sz;
446                count!("bytes_out", sz as i64);
447                self.frontend_readiness.reset();
448                self.backend_readiness.reset();
449                self.log_request_success(metrics);
450                return SessionResult::Close;
451            }
452        }
453
454        if sz > 0 {
455            count!("bytes_out", sz as i64);
456            self.backend_readiness.interest.insert(Ready::READABLE);
457            metrics.bout += sz;
458        }
459
460        debug!(
461            "{} Wrote {} bytes of {}",
462            log_context!(self),
463            sz,
464            self.backend_buffer.available_data()
465        );
466
467        match res {
468            SocketResult::Error => {
469                self.frontend_readiness.reset();
470                self.backend_readiness.reset();
471                self.log_request_error(metrics, "front socket write error");
472                return SessionResult::Close;
473            }
474            SocketResult::Closed => {
475                self.frontend_readiness.reset();
476                self.backend_readiness.reset();
477                self.log_request_success(metrics);
478                return SessionResult::Close;
479            }
480            SocketResult::WouldBlock => {
481                self.frontend_readiness.event.remove(Ready::WRITABLE);
482            }
483            SocketResult::Continue => {}
484        }
485
486        SessionResult::Continue
487    }
488
489    // Forward content to cluster
490    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
491        trace!("pipe back_writable");
492
493        if self.frontend_buffer.available_data() == 0 {
494            self.frontend_readiness.interest.insert(Ready::READABLE);
495            self.backend_readiness.interest.remove(Ready::WRITABLE);
496            return SessionResult::Continue;
497        }
498
499        let output_size = self.frontend_buffer.available_data();
500
501        let mut sz = 0usize;
502        let mut socket_res = SocketResult::Continue;
503
504        if let Some(ref mut backend) = self.backend_socket {
505            while socket_res == SocketResult::Continue {
506                // no more data in buffer, stop here
507                if self.frontend_buffer.available_data() == 0 {
508                    self.frontend_readiness.interest.insert(Ready::READABLE);
509                    self.backend_readiness.interest.remove(Ready::WRITABLE);
510                    count!("back_bytes_out", sz as i64);
511                    metrics.backend_bout += sz;
512                    return SessionResult::Continue;
513                }
514
515                let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
516                socket_res = current_res;
517                self.frontend_buffer.consume(current_sz);
518                sz += current_sz;
519
520                if current_sz == 0 && current_res == SocketResult::Continue {
521                    self.backend_status = match self.backend_status {
522                        ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
523                        ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
524                        s => s,
525                    };
526                }
527            }
528        }
529
530        count!("back_bytes_out", sz as i64);
531        metrics.backend_bout += sz;
532
533        if !self.check_connections() {
534            self.frontend_readiness.reset();
535            self.backend_readiness.reset();
536            self.log_request_success(metrics);
537            return SessionResult::Close;
538        }
539
540        debug!(
541            "{} Wrote {} bytes of {}",
542            log_context!(self),
543            sz,
544            output_size
545        );
546
547        match socket_res {
548            SocketResult::Error => {
549                self.frontend_readiness.reset();
550                self.backend_readiness.reset();
551                self.log_request_error(metrics, "back socket write error");
552                return SessionResult::Close;
553            }
554            SocketResult::Closed => {
555                self.frontend_readiness.reset();
556                self.backend_readiness.reset();
557                self.log_request_success(metrics);
558                return SessionResult::Close;
559            }
560            SocketResult::WouldBlock => {
561                self.backend_readiness.event.remove(Ready::WRITABLE);
562            }
563            SocketResult::Continue => {}
564        }
565        SessionResult::Continue
566    }
567
568    // Read content from cluster
569    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
570        self.reset_timeouts();
571
572        trace!("{} Pipe backend_readable", log_context!(self));
573        if self.backend_buffer.available_space() == 0 {
574            self.backend_readiness.interest.remove(Ready::READABLE);
575            return SessionResult::Continue;
576        }
577
578        if let Some(ref mut backend) = self.backend_socket {
579            let (size, remaining) = backend.socket_read(self.backend_buffer.space());
580            self.backend_buffer.fill(size);
581
582            debug!("{} Read {} bytes", log_context!(self), size);
583
584            if remaining != SocketResult::Continue || size == 0 {
585                self.backend_readiness.event.remove(Ready::READABLE);
586            }
587            if size > 0 {
588                self.frontend_readiness.interest.insert(Ready::WRITABLE);
589                metrics.backend_bin += size;
590            }
591
592            if size == 0 && remaining == SocketResult::Closed {
593                self.backend_status = match self.backend_status {
594                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
595                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
596                    s => s,
597                };
598
599                if !self.check_connections() {
600                    self.frontend_readiness.reset();
601                    self.backend_readiness.reset();
602                    self.log_request_success(metrics);
603                    return SessionResult::Close;
604                }
605            }
606
607            match remaining {
608                SocketResult::Error => {
609                    self.frontend_readiness.reset();
610                    self.backend_readiness.reset();
611                    self.log_request_error(metrics, "back socket read error");
612                    return SessionResult::Close;
613                }
614                SocketResult::Closed => {
615                    if !self.check_connections() {
616                        self.frontend_readiness.reset();
617                        self.backend_readiness.reset();
618                        self.log_request_success(metrics);
619                        return SessionResult::Close;
620                    }
621                }
622                SocketResult::WouldBlock => {
623                    self.backend_readiness.event.remove(Ready::READABLE);
624                }
625                SocketResult::Continue => {}
626            }
627        }
628
629        SessionResult::Continue
630    }
631
632    pub fn log_context(&self) -> LogContext {
633        LogContext {
634            request_id: self.request_id,
635            cluster_id: self.cluster_id.as_deref(),
636            backend_id: self.backend_id.as_deref(),
637        }
638    }
639
640    fn log_endpoint(&self) -> EndpointRecord {
641        match &self.websocket_context {
642            WebSocketContext::Http {
643                method,
644                authority,
645                path,
646                status,
647                reason,
648            } => EndpointRecord::Http {
649                method: method.as_deref(),
650                authority: authority.as_deref(),
651                path: path.as_deref(),
652                status: status.to_owned(),
653                reason: reason.as_deref(),
654            },
655            WebSocketContext::Tcp => EndpointRecord::Tcp,
656        }
657    }
658}
659
660impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
661    fn ready(
662        &mut self,
663        _session: Rc<RefCell<dyn crate::ProxySession>>,
664        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
665        metrics: &mut SessionMetrics,
666    ) -> SessionResult {
667        let mut counter = 0;
668
669        if self.frontend_readiness.event.is_hup() {
670            return SessionResult::Close;
671        }
672
673        while counter < MAX_LOOP_ITERATIONS {
674            let frontend_interest = self.frontend_readiness.filter_interest();
675            let backend_interest = self.backend_readiness.filter_interest();
676
677            trace!(
678                "{} Frontend interest({:?}), backend interest({:?})",
679                log_context!(self),
680                frontend_interest,
681                backend_interest
682            );
683            if frontend_interest.is_empty() && backend_interest.is_empty() {
684                break;
685            }
686
687            if self.backend_readiness.event.is_hup()
688                && self.frontend_readiness.interest.is_writable()
689                && !self.frontend_readiness.event.is_writable()
690            {
691                break;
692            }
693
694            if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
695                return SessionResult::Close;
696            }
697
698            if backend_interest.is_writable()
699                && self.backend_writable(metrics) == SessionResult::Close
700            {
701                return SessionResult::Close;
702            }
703
704            if backend_interest.is_readable()
705                && self.backend_readable(metrics) == SessionResult::Close
706            {
707                return SessionResult::Close;
708            }
709
710            if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
711                return SessionResult::Close;
712            }
713
714            if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
715                return SessionResult::Close;
716            }
717
718            if frontend_interest.is_error() {
719                error!(
720                    "{} Frontend socket error, disconnecting",
721                    log_context!(self)
722                );
723
724                self.frontend_readiness.interest = Ready::EMPTY;
725                self.backend_readiness.interest = Ready::EMPTY;
726
727                return SessionResult::Close;
728            }
729
730            if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
731                self.frontend_readiness.interest = Ready::EMPTY;
732                self.backend_readiness.interest = Ready::EMPTY;
733
734                error!("{} Backend socket error, disconnecting", log_context!(self));
735                return SessionResult::Close;
736            }
737
738            counter += 1;
739        }
740
741        if counter >= MAX_LOOP_ITERATIONS {
742            error!(
743                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
744                log_context!(self), MAX_LOOP_ITERATIONS
745            );
746
747            incr!("http.infinite_loop.error");
748            self.print_state(self.protocol_string());
749
750            return SessionResult::Close;
751        }
752
753        SessionResult::Continue
754    }
755
756    fn update_readiness(&mut self, token: Token, events: Ready) {
757        if self.frontend_token == token {
758            self.frontend_readiness.event |= events;
759        } else if self.backend_token == Some(token) {
760            self.backend_readiness.event |= events;
761        }
762    }
763
764    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
765        //info!("got timeout for token: {:?}", token);
766        if self.frontend_token == token {
767            self.log_request_error(metrics, "frontend socket timeout");
768            if let Some(timeout) = self.container_frontend_timeout.as_mut() {
769                timeout.triggered()
770            }
771            return StateResult::CloseSession;
772        }
773
774        if self.backend_token == Some(token) {
775            //info!("backend timeout triggered for token {:?}", token);
776            if let Some(timeout) = self.container_backend_timeout.as_mut() {
777                timeout.triggered()
778            }
779
780            self.log_request_error(metrics, "backend socket timeout");
781            return StateResult::CloseSession;
782        }
783
784        error!("{} Got timeout for an invalid token", log_context!(self));
785        self.log_request_error(metrics, "invalid token timeout");
786        StateResult::CloseSession
787    }
788
789    fn cancel_timeouts(&mut self) {
790        self.container_frontend_timeout.as_mut().map(|t| t.cancel());
791        self.container_backend_timeout.as_mut().map(|t| t.cancel());
792    }
793
794    fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
795        if let Some(backend) = self.backend.as_mut() {
796            let mut backend = backend.borrow_mut();
797            backend.active_requests = backend.active_requests.saturating_sub(1);
798        }
799    }
800
801    fn print_state(&self, context: &str) {
802        error!(
803            "\
804{} {} Session(Pipe)
805\tFrontend:
806\t\ttoken: {:?}\treadiness: {:?}
807\tBackend:
808\t\ttoken: {:?}\treadiness: {:?}",
809            log_context!(self),
810            context,
811            self.frontend_token,
812            self.frontend_readiness,
813            self.backend_token,
814            self.backend_readiness
815        );
816    }
817}