Skip to main content

sozu_lib/protocol/
pipe.rs

1//! Transparent byte-stream forwarder (TCP + WebSocket post-upgrade).
2//!
3//! Forwards bytes between front and back through fixed-size buffers without
4//! payload inspection. Readiness is managed via direct mio interest toggles
5//! (no `signal_pending_write` / `arm_writable` here — those belong to the
6//! mux H2 path). Used as the post-handshake state for raw TCP listeners and
7//! after a successful WebSocket upgrade on the H1 path.
8
9use std::{cell::RefCell, net::SocketAddr, rc::Rc};
10
11use mio::{Token, net::TcpStream};
12use rusty_ulid::Ulid;
13use sozu_command::{
14    config::MAX_LOOP_ITERATIONS,
15    logging::{EndpointRecord, LogContext, ansi_palette},
16};
17
18use crate::metrics::names;
19use crate::{
20    L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
21    backends::Backend,
22    pool::Checkout,
23    protocol::{SessionState, http::parser::Method},
24    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
25    sozu_command::ready::Ready,
26    timer::TimeoutContainer,
27};
28
29#[cfg(all(target_os = "linux", feature = "splice"))]
30use crate::splice::{self, SplicePipe};
31
32/// This macro is defined uniquely in this module to help the tracking of
33/// pipelining issues inside Sōzu. Colored output uses bold bright-white
34/// (uniform across every protocol) for the protocol label, light grey for the
35/// `Session` keyword, gray for keys and bright white for values. The
36/// `[ulid - - -]` context comes first to stay aligned with `MUX-*` and
37/// `SOCKET` log lines.
38macro_rules! log_context {
39    ($self:expr) => {{
40        let (open, reset, grey, gray, white) = ansi_palette();
41        format!(
42            "{gray}{ctx}{reset}\t{open}PIPE{reset}\t{grey}Session{reset}({gray}address{reset}={white}{address}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}frontend_status{reset}={white}{frontend_status:?}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_status{reset}={white}{backend_status:?}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
43            open = open,
44            reset = reset,
45            grey = grey,
46            gray = gray,
47            white = white,
48            ctx = $self.log_context(),
49            address = $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
50            frontend = $self.frontend_token.0,
51            frontend_readiness = $self.frontend_readiness,
52            frontend_status = $self.frontend_status,
53            backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
54            backend_status = $self.backend_status,
55            backend_readiness = $self.backend_readiness,
56        )
57    }};
58}
59
60#[derive(PartialEq, Eq)]
61pub enum SessionStatus {
62    Normal,
63    DefaultAnswer,
64}
65
66#[derive(Copy, Clone, Debug)]
67enum ConnectionStatus {
68    Normal,
69    ReadOpen,
70    WriteOpen,
71    Closed,
72}
73
74/// matches sozu_command_lib::logging::access_logs::EndpointRecords
75pub enum WebSocketContext {
76    Http {
77        method: Option<Method>,
78        authority: Option<String>,
79        path: Option<String>,
80        status: Option<u16>,
81        reason: Option<String>,
82    },
83    Tcp,
84}
85
86pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
87    backend_buffer: Checkout,
88    backend_id: Option<String>,
89    pub backend_readiness: Readiness,
90    backend_socket: Option<TcpStream>,
91    backend_status: ConnectionStatus,
92    backend_token: Option<Token>,
93    pub backend: Option<Rc<RefCell<Backend>>>,
94    cluster_id: Option<String>,
95    pub container_backend_timeout: Option<TimeoutContainer>,
96    pub container_frontend_timeout: Option<TimeoutContainer>,
97    frontend_buffer: Checkout,
98    pub frontend_readiness: Readiness,
99    frontend_status: ConnectionStatus,
100    frontend_token: Token,
101    frontend: Front,
102    listener: Rc<RefCell<L>>,
103    protocol: Protocol,
104    /// Connection/session ULID inherited from the parent mux or handshake.
105    /// Emitted in the first slot of the legacy log-context bracket.
106    session_id: Ulid,
107    request_id: Ulid,
108    session_address: Option<SocketAddr>,
109    websocket_context: WebSocketContext,
110    /// Connection-scoped TLS metadata captured at handshake completion,
111    /// inherited from the upstream mux `HttpContext` when `Pipe` is created
112    /// via WSS upgrade. `None` on plaintext paths (plain TCP, plain WS,
113    /// proxy-protocol) where no TLS was terminated by Sōzu.
114    tls_version: Option<&'static str>,
115    tls_cipher: Option<&'static str>,
116    /// Negotiated SNI hostname, pre-lowercased, no port. `None` on plaintext
117    /// paths or when the client omitted the SNI extension.
118    tls_sni: Option<String>,
119    tls_alpn: Option<&'static str>,
120    /// Kernel-pipe pair used for zero-copy `splice(2)` forwarding on
121    /// `Protocol::TCP` listeners. Allocated lazily in `new()` and
122    /// `None` for WebSocket-after-upgrade paths or when allocation
123    /// failed (caller falls back to the buffered path).
124    #[cfg(all(target_os = "linux", feature = "splice"))]
125    splice_pipe: Option<SplicePipe>,
126}
127
128impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
129    /// Instantiate a new Pipe SessionState with:
130    ///
131    /// - frontend_interest: READABLE | WRITABLE | HUP | ERROR
132    /// - frontend_event: EMPTY
133    /// - backend_interest: READABLE | WRITABLE | HUP | ERROR
134    /// - backend_event: EMPTY
135    ///
136    /// Remember to set the events from the previous State!
137    #[allow(clippy::too_many_arguments)]
138    pub fn new(
139        backend_buffer: Checkout,
140        backend_id: Option<String>,
141        backend_socket: Option<TcpStream>,
142        backend: Option<Rc<RefCell<Backend>>>,
143        container_backend_timeout: Option<TimeoutContainer>,
144        container_frontend_timeout: Option<TimeoutContainer>,
145        cluster_id: Option<String>,
146        frontend_buffer: Checkout,
147        frontend_token: Token,
148        frontend: Front,
149        listener: Rc<RefCell<L>>,
150        protocol: Protocol,
151        session_id: Ulid,
152        request_id: Ulid,
153        session_address: Option<SocketAddr>,
154        websocket_context: WebSocketContext,
155    ) -> Pipe<Front, L> {
156        let frontend_status = ConnectionStatus::Normal;
157        let backend_status = if backend_socket.is_none() {
158            ConnectionStatus::Closed
159        } else {
160            ConnectionStatus::Normal
161        };
162
163        let session = Pipe {
164            backend_buffer,
165            backend_id,
166            backend_readiness: Readiness {
167                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
168                event: Ready::EMPTY,
169            },
170            backend_socket,
171            backend_status,
172            backend_token: None,
173            backend,
174            cluster_id,
175            container_backend_timeout,
176            container_frontend_timeout,
177            frontend_buffer,
178            frontend_readiness: Readiness {
179                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
180                event: Ready::EMPTY,
181            },
182            frontend_status,
183            frontend_token,
184            frontend,
185            listener,
186            protocol,
187            session_id,
188            request_id,
189            session_address,
190            websocket_context,
191            tls_version: None,
192            tls_cipher: None,
193            tls_sni: None,
194            tls_alpn: None,
195            #[cfg(all(target_os = "linux", feature = "splice"))]
196            splice_pipe: if protocol == Protocol::TCP {
197                SplicePipe::new()
198            } else {
199                None
200            },
201        };
202
203        trace!("{} created pipe", log_context!(session));
204        session
205    }
206
207    /// Stamp connection-scoped TLS metadata captured at handshake time onto
208    /// the pipe for access-log emission. Called from the HTTPS→WSS upgrade
209    /// path in `https.rs::upgrade_mux` after the `Pipe` has been built from
210    /// the prior mux `HttpContext`. Leaves plaintext paths (plain TCP, plain
211    /// WS, proxy-protocol) untouched so their access logs continue to emit
212    /// `None` for all TLS fields.
213    pub fn set_tls_metadata(
214        &mut self,
215        version: Option<&'static str>,
216        cipher: Option<&'static str>,
217        sni: Option<String>,
218        alpn: Option<&'static str>,
219    ) {
220        self.tls_version = version;
221        self.tls_cipher = cipher;
222        self.tls_sni = sni;
223        self.tls_alpn = alpn;
224    }
225
226    pub fn front_socket(&self) -> &TcpStream {
227        self.frontend.socket_ref()
228    }
229
230    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
231        self.frontend.socket_mut()
232    }
233
234    pub fn back_socket(&self) -> Option<&TcpStream> {
235        self.backend_socket.as_ref()
236    }
237
238    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
239        self.backend_socket.as_mut()
240    }
241
242    pub fn set_back_socket(&mut self, socket: TcpStream) {
243        self.backend_socket = Some(socket);
244        self.backend_status = ConnectionStatus::Normal;
245    }
246
247    pub fn back_token(&self) -> Vec<Token> {
248        self.backend_token.iter().cloned().collect()
249    }
250
251    fn reset_timeouts(&mut self) {
252        if let Some(t) = self.container_frontend_timeout.as_mut() {
253            if !t.reset() {
254                error!(
255                    "{} Could not reset front timeout (pipe)",
256                    log_context!(self)
257                );
258            }
259        }
260
261        if let Some(t) = self.container_backend_timeout.as_mut() {
262            if !t.reset() {
263                error!("{} Could not reset back timeout (pipe)", log_context!(self));
264            }
265        }
266    }
267
268    pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
269        self.cluster_id = cluster_id;
270    }
271
272    pub fn set_backend_id(&mut self, backend_id: Option<String>) {
273        self.backend_id = backend_id;
274    }
275
276    pub fn set_back_token(&mut self, token: Token) {
277        self.backend_token = Some(token);
278    }
279
280    pub fn get_session_address(&self) -> Option<SocketAddr> {
281        self.session_address
282            .or_else(|| self.frontend.socket_ref().peer_addr().ok())
283    }
284
285    pub fn get_backend_address(&self) -> Option<SocketAddr> {
286        self.backend_socket
287            .as_ref()
288            .and_then(|backend| backend.peer_addr().ok())
289    }
290
291    fn protocol_string(&self) -> &'static str {
292        match self.protocol {
293            Protocol::TCP => "TCP",
294            Protocol::HTTP => "WS",
295            Protocol::HTTPS => match self.frontend.protocol() {
296                TransportProtocol::Ssl2 => "WSS-SSL2",
297                TransportProtocol::Ssl3 => "WSS-SSL3",
298                TransportProtocol::Tls1_0 => "WSS-TLS1.0",
299                TransportProtocol::Tls1_1 => "WSS-TLS1.1",
300                TransportProtocol::Tls1_2 => "WSS-TLS1.2",
301                TransportProtocol::Tls1_3 => "WSS-TLS1.3",
302                _ => unreachable!(),
303            },
304            _ => unreachable!(),
305        }
306    }
307
308    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
309        let listener = self.listener.borrow();
310        let context = self.log_context();
311        let endpoint = self.log_endpoint();
312        metrics.register_end_of_session(&context);
313        log_access!(
314            error,
315            on_failure: { incr!(names::access_logs::UNSENT) },
316            message,
317            context,
318            session_address: self.get_session_address(),
319            backend_address: self.get_backend_address(),
320            protocol: self.protocol_string(),
321            endpoint,
322            tags: listener.get_tags(&listener.get_addr().to_string()),
323            client_rtt: socket_rtt(self.front_socket()),
324            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
325            service_time: metrics.service_time(),
326            response_time: metrics.backend_response_time(),
327            request_time: metrics.request_time(),
328            bytes_in: metrics.bin,
329            bytes_out: metrics.bout,
330            user_agent: None,
331            x_request_id: None,
332            // Pipe is post-upgrade; the TLS metadata was captured once at
333            // handshake in `https.rs::upgrade_handshake` and plumbed through
334            // via `set_tls_metadata`. Plaintext paths leave these fields as
335            // `None` — matching the TCP log shape.
336            tls_version: self.tls_version,
337            tls_cipher: self.tls_cipher,
338            tls_sni: self.tls_sni.as_deref(),
339            tls_alpn: self.tls_alpn,
340            xff_chain: None,
341            otel: None,
342        );
343    }
344
345    pub fn log_request_success(&self, metrics: &SessionMetrics) {
346        self.log_request(metrics, false, None);
347    }
348
349    pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
350        incr!(names::pipe::ERRORS);
351        error!(
352            "{} Could not process request properly got: {}",
353            log_context!(self),
354            message
355        );
356        self.print_state(self.protocol_string());
357        self.log_request(metrics, true, Some(message));
358    }
359
360    /// Access-log wrapper for benign idle-timeout tear-downs.
361    ///
362    /// Unlike `log_request_error`, this path logs at `debug!` and skips the
363    /// state dump — an idle pipe hitting its front/back_timeout is expected
364    /// behaviour (e.g. a WebSocket with no keepalive) and should not pollute
365    /// the error stream.
366    pub fn log_request_timeout(&self, metrics: &SessionMetrics, message: &str) {
367        debug!("{} pipe timeout: {}", log_context!(self), message);
368        self.log_request(metrics, true, Some(message));
369    }
370
371    /// Bytes currently sitting inside the `splice` frontend→backend
372    /// kernel pipe (`0` if splice is disabled or the pipe was not
373    /// allocated). Counted as "request in flight" by `check_connections`
374    /// so a half-closed session stays alive until the kernel drains.
375    #[cfg(all(target_os = "linux", feature = "splice"))]
376    fn splice_in_pending(&self) -> usize {
377        self.splice_pipe
378            .as_ref()
379            .map(|p| p.in_pipe_pending)
380            .unwrap_or(0)
381    }
382    #[cfg(not(all(target_os = "linux", feature = "splice")))]
383    fn splice_in_pending(&self) -> usize {
384        0
385    }
386
387    /// Bytes currently sitting inside the `splice` backend→frontend
388    /// kernel pipe. Counterpart to `splice_in_pending` for the response
389    /// direction.
390    #[cfg(all(target_os = "linux", feature = "splice"))]
391    fn splice_out_pending(&self) -> usize {
392        self.splice_pipe
393            .as_ref()
394            .map(|p| p.out_pipe_pending)
395            .unwrap_or(0)
396    }
397    #[cfg(not(all(target_os = "linux", feature = "splice")))]
398    fn splice_out_pending(&self) -> usize {
399        0
400    }
401
402    /// Realised kernel-pipe capacity per direction (`0` if splice is
403    /// disabled). Drives the "pipe is full" backpressure check in the
404    /// splice readable methods and the per-call `len` for `splice_in`.
405    #[cfg(all(target_os = "linux", feature = "splice"))]
406    fn splice_capacity(&self) -> usize {
407        self.splice_pipe.as_ref().map(|p| p.capacity).unwrap_or(0)
408    }
409
410    /// Wether the session should be kept open, depending on endpoints status
411    /// and buffer usage (both in memory and in kernel)
412    pub fn check_connections(&self) -> bool {
413        let request_is_inflight = self.frontend_buffer.available_data() > 0
414            || self.frontend_readiness.event.is_readable()
415            || self.splice_in_pending() > 0;
416        let response_is_inflight = self.backend_buffer.available_data() > 0
417            || self.backend_readiness.event.is_readable()
418            || self.splice_out_pending() > 0;
419        match (self.frontend_status, self.backend_status) {
420            (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
421            (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
422            (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
423                // technically we should keep it open, but we'll assume that if the front
424                // is not readable and there is no in flight data front -> back or back -> front,
425                // we'll close the session, otherwise it interacts badly with HTTP connections
426                // with Connection: close header and no Content-length
427                request_is_inflight || response_is_inflight
428            }
429            (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
430
431            (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
432                // technically we should keep it open, but we'll assume that if the back
433                // is not readable and there is no in flight data back -> front or front -> back, we'll close the session
434                request_is_inflight || response_is_inflight
435            }
436            (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
437            (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
438                request_is_inflight || response_is_inflight
439            }
440            (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
441
442            (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
443            (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
444            (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
445            (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
446
447            (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
448            (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
449            (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
450            (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
451        }
452    }
453
454    pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
455        self.log_request_success(metrics);
456        self.frontend_status = ConnectionStatus::Closed;
457        SessionResult::Close
458    }
459
460    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
461        self.backend_status = ConnectionStatus::Closed;
462        let pipe_has_data = self.splice_out_pending() > 0;
463        if self.backend_buffer.available_data() == 0 && !pipe_has_data {
464            if self.backend_readiness.event.is_readable() {
465                self.backend_readiness.interest.insert(Ready::READABLE);
466                debug!(
467                    "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
468                    log_context!(self)
469                );
470                SessionResult::Continue
471            } else {
472                self.log_request_success(metrics);
473                SessionResult::Close
474            }
475        } else {
476            debug!(
477                "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
478                log_context!(self)
479            );
480            self.frontend_readiness.interest.insert(Ready::WRITABLE);
481            if self.backend_readiness.event.is_readable() {
482                self.backend_readiness.interest.insert(Ready::READABLE);
483            }
484            SessionResult::Continue
485        }
486    }
487
488    // Read content from the session
489    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
490        #[cfg(all(target_os = "linux", feature = "splice"))]
491        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
492            return self.splice_readable(metrics);
493        }
494
495        self.reset_timeouts();
496
497        trace!("{} pipe readable", log_context!(self));
498        if self.frontend_buffer.available_space() == 0 {
499            self.frontend_readiness.interest.remove(Ready::READABLE);
500            self.backend_readiness.interest.insert(Ready::WRITABLE);
501            return SessionResult::Continue;
502        }
503
504        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
505        debug!("{} Read {} bytes", log_context!(self), sz);
506
507        if sz > 0 {
508            //FIXME: replace with copy()
509            self.frontend_buffer.fill(sz);
510
511            count!(names::backend::BYTES_IN, sz as i64);
512            metrics.bin += sz;
513
514            if self.frontend_buffer.available_space() == 0 {
515                self.frontend_readiness.interest.remove(Ready::READABLE);
516            }
517            self.backend_readiness.interest.insert(Ready::WRITABLE);
518        } else {
519            self.frontend_readiness.event.remove(Ready::READABLE);
520
521            if res == SocketResult::Continue {
522                self.frontend_status = match self.frontend_status {
523                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
524                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
525                    s => s,
526                };
527            }
528        }
529
530        if !self.check_connections() {
531            self.frontend_readiness.reset();
532            self.backend_readiness.reset();
533            self.log_request_success(metrics);
534            return SessionResult::Close;
535        }
536
537        match res {
538            SocketResult::Error => {
539                self.frontend_readiness.reset();
540                self.backend_readiness.reset();
541                self.log_request_error(metrics, "front socket read error");
542                return SessionResult::Close;
543            }
544            SocketResult::Closed => {
545                self.frontend_readiness.reset();
546                self.backend_readiness.reset();
547                self.log_request_success(metrics);
548                return SessionResult::Close;
549            }
550            SocketResult::WouldBlock => {
551                self.frontend_readiness.event.remove(Ready::READABLE);
552            }
553            SocketResult::Continue => {}
554        };
555
556        self.backend_readiness.interest.insert(Ready::WRITABLE);
557        SessionResult::Continue
558    }
559
560    // Forward content to session
561    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
562        #[cfg(all(target_os = "linux", feature = "splice"))]
563        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
564            return self.splice_writable(metrics);
565        }
566
567        trace!("{} Pipe writable", log_context!(self));
568        if self.backend_buffer.available_data() == 0 {
569            self.backend_readiness.interest.insert(Ready::READABLE);
570            self.frontend_readiness.interest.remove(Ready::WRITABLE);
571            return SessionResult::Continue;
572        }
573
574        let mut sz = 0usize;
575        let mut res = SocketResult::Continue;
576        while res == SocketResult::Continue {
577            // no more data in buffer, stop here
578            if self.backend_buffer.available_data() == 0 {
579                count!(names::backend::BYTES_OUT, sz as i64);
580                metrics.bout += sz;
581                self.backend_readiness.interest.insert(Ready::READABLE);
582                self.frontend_readiness.interest.remove(Ready::WRITABLE);
583                return SessionResult::Continue;
584            }
585            let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
586            res = current_res;
587            self.backend_buffer.consume(current_sz);
588            sz += current_sz;
589
590            if current_sz == 0 && res == SocketResult::Continue {
591                self.frontend_status = match self.frontend_status {
592                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
593                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
594                    s => s,
595                };
596            }
597
598            if !self.check_connections() {
599                metrics.bout += sz;
600                count!(names::backend::BYTES_OUT, sz as i64);
601                self.frontend_readiness.reset();
602                self.backend_readiness.reset();
603                self.log_request_success(metrics);
604                return SessionResult::Close;
605            }
606        }
607
608        if sz > 0 {
609            count!(names::backend::BYTES_OUT, sz as i64);
610            self.backend_readiness.interest.insert(Ready::READABLE);
611            metrics.bout += sz;
612        }
613
614        debug!(
615            "{} Wrote {} bytes of {}",
616            log_context!(self),
617            sz,
618            self.backend_buffer.available_data()
619        );
620
621        match res {
622            SocketResult::Error => {
623                self.frontend_readiness.reset();
624                self.backend_readiness.reset();
625                self.log_request_error(metrics, "front socket write error");
626                return SessionResult::Close;
627            }
628            SocketResult::Closed => {
629                self.frontend_readiness.reset();
630                self.backend_readiness.reset();
631                self.log_request_success(metrics);
632                return SessionResult::Close;
633            }
634            SocketResult::WouldBlock => {
635                self.frontend_readiness.event.remove(Ready::WRITABLE);
636            }
637            SocketResult::Continue => {}
638        }
639
640        SessionResult::Continue
641    }
642
643    // Forward content to cluster
644    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
645        #[cfg(all(target_os = "linux", feature = "splice"))]
646        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
647            return self.splice_backend_writable(metrics);
648        }
649
650        trace!("{} pipe back_writable", log_context!(self));
651
652        if self.frontend_buffer.available_data() == 0 {
653            self.frontend_readiness.interest.insert(Ready::READABLE);
654            self.backend_readiness.interest.remove(Ready::WRITABLE);
655            return SessionResult::Continue;
656        }
657
658        let output_size = self.frontend_buffer.available_data();
659
660        let mut sz = 0usize;
661        let mut socket_res = SocketResult::Continue;
662
663        if let Some(ref mut backend) = self.backend_socket {
664            while socket_res == SocketResult::Continue {
665                // no more data in buffer, stop here
666                if self.frontend_buffer.available_data() == 0 {
667                    self.frontend_readiness.interest.insert(Ready::READABLE);
668                    self.backend_readiness.interest.remove(Ready::WRITABLE);
669                    count!(names::backend::BACK_BYTES_OUT, sz as i64);
670                    metrics.backend_bout += sz;
671                    return SessionResult::Continue;
672                }
673
674                let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
675                socket_res = current_res;
676                self.frontend_buffer.consume(current_sz);
677                sz += current_sz;
678
679                if current_sz == 0 && current_res == SocketResult::Continue {
680                    self.backend_status = match self.backend_status {
681                        ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
682                        ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
683                        s => s,
684                    };
685                }
686            }
687        }
688
689        count!(names::backend::BACK_BYTES_OUT, sz as i64);
690        metrics.backend_bout += sz;
691
692        if !self.check_connections() {
693            self.frontend_readiness.reset();
694            self.backend_readiness.reset();
695            self.log_request_success(metrics);
696            return SessionResult::Close;
697        }
698
699        debug!(
700            "{} Wrote {} bytes of {}",
701            log_context!(self),
702            sz,
703            output_size
704        );
705
706        match socket_res {
707            SocketResult::Error => {
708                self.frontend_readiness.reset();
709                self.backend_readiness.reset();
710                self.log_request_error(metrics, "back socket write error");
711                return SessionResult::Close;
712            }
713            SocketResult::Closed => {
714                self.frontend_readiness.reset();
715                self.backend_readiness.reset();
716                self.log_request_success(metrics);
717                return SessionResult::Close;
718            }
719            SocketResult::WouldBlock => {
720                self.backend_readiness.event.remove(Ready::WRITABLE);
721            }
722            SocketResult::Continue => {}
723        }
724        SessionResult::Continue
725    }
726
727    // Read content from cluster
728    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
729        #[cfg(all(target_os = "linux", feature = "splice"))]
730        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
731            return self.splice_backend_readable(metrics);
732        }
733
734        self.reset_timeouts();
735
736        trace!("{} Pipe backend_readable", log_context!(self));
737        if self.backend_buffer.available_space() == 0 {
738            self.backend_readiness.interest.remove(Ready::READABLE);
739            return SessionResult::Continue;
740        }
741
742        if let Some(ref mut backend) = self.backend_socket {
743            let (size, remaining) = backend.socket_read(self.backend_buffer.space());
744            self.backend_buffer.fill(size);
745
746            debug!("{} Read {} bytes", log_context!(self), size);
747
748            if remaining != SocketResult::Continue || size == 0 {
749                self.backend_readiness.event.remove(Ready::READABLE);
750            }
751            if size > 0 {
752                self.frontend_readiness.interest.insert(Ready::WRITABLE);
753                count!(names::backend::BACK_BYTES_IN, size as i64);
754                metrics.backend_bin += size;
755            }
756
757            if size == 0 && remaining == SocketResult::Closed {
758                self.backend_status = match self.backend_status {
759                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
760                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
761                    s => s,
762                };
763
764                if !self.check_connections() {
765                    self.frontend_readiness.reset();
766                    self.backend_readiness.reset();
767                    self.log_request_success(metrics);
768                    return SessionResult::Close;
769                }
770            }
771
772            match remaining {
773                SocketResult::Error => {
774                    self.frontend_readiness.reset();
775                    self.backend_readiness.reset();
776                    self.log_request_error(metrics, "back socket read error");
777                    return SessionResult::Close;
778                }
779                SocketResult::Closed => {
780                    if !self.check_connections() {
781                        self.frontend_readiness.reset();
782                        self.backend_readiness.reset();
783                        self.log_request_success(metrics);
784                        return SessionResult::Close;
785                    }
786                }
787                SocketResult::WouldBlock => {
788                    self.backend_readiness.event.remove(Ready::READABLE);
789                }
790                SocketResult::Continue => {}
791            }
792        }
793
794        SessionResult::Continue
795    }
796
797    /// Zero-copy fast path of `readable`: pull bytes off the frontend
798    /// socket into the kernel `in_pipe` via `splice(2)`, then mark the
799    /// backend writable so the data drains in the next event loop tick.
800    ///
801    /// Mirrors `readable`'s `ConnectionStatus` transitions and metric
802    /// emissions exactly so observability and the `check_connections`
803    /// state machine behave the same with or without the feature flag.
804    #[cfg(all(target_os = "linux", feature = "splice"))]
805    fn splice_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
806        self.reset_timeouts();
807
808        trace!("{} pipe splice_readable", log_context!(self));
809        let capacity = self.splice_capacity();
810        if self.splice_in_pending() >= capacity {
811            // Pipe is full — stop reading and let the backend drain it.
812            self.frontend_readiness.interest.remove(Ready::READABLE);
813            self.backend_readiness.interest.insert(Ready::WRITABLE);
814            return SessionResult::Continue;
815        }
816
817        let pipe_write_end = self.splice_pipe.as_ref().unwrap().in_pipe[1];
818        let (sz, res) = splice::splice_in(self.frontend.socket_ref(), pipe_write_end, capacity);
819        debug!("{} Spliced {} bytes from frontend", log_context!(self), sz);
820
821        if sz > 0 {
822            self.splice_pipe.as_mut().unwrap().in_pipe_pending += sz;
823            count!(names::backend::BYTES_IN, sz as i64);
824            metrics.bin += sz;
825            self.backend_readiness.interest.insert(Ready::WRITABLE);
826        } else {
827            self.frontend_readiness.event.remove(Ready::READABLE);
828
829            if res == SocketResult::Continue {
830                self.frontend_status = match self.frontend_status {
831                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
832                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
833                    s => s,
834                };
835            }
836        }
837
838        if !self.check_connections() {
839            self.frontend_readiness.reset();
840            self.backend_readiness.reset();
841            self.log_request_success(metrics);
842            return SessionResult::Close;
843        }
844
845        match res {
846            SocketResult::Error => {
847                self.frontend_readiness.reset();
848                self.backend_readiness.reset();
849                self.log_request_error(metrics, "splice front socket read error");
850                return SessionResult::Close;
851            }
852            SocketResult::Closed => {
853                self.frontend_readiness.reset();
854                self.backend_readiness.reset();
855                self.log_request_success(metrics);
856                return SessionResult::Close;
857            }
858            SocketResult::WouldBlock => {
859                self.frontend_readiness.event.remove(Ready::READABLE);
860            }
861            SocketResult::Continue => {}
862        }
863
864        self.backend_readiness.interest.insert(Ready::WRITABLE);
865        SessionResult::Continue
866    }
867
868    /// Zero-copy fast path of `writable`: drain the backend→frontend
869    /// kernel `out_pipe` toward the frontend socket via `splice(2)`.
870    /// Mirrors `writable`'s loop, status transitions, and metric
871    /// emissions.
872    #[cfg(all(target_os = "linux", feature = "splice"))]
873    fn splice_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
874        trace!("{} Pipe splice_writable", log_context!(self));
875        if self.splice_out_pending() == 0 {
876            self.backend_readiness.interest.insert(Ready::READABLE);
877            self.frontend_readiness.interest.remove(Ready::WRITABLE);
878            return SessionResult::Continue;
879        }
880
881        let mut sz = 0usize;
882        let mut res = SocketResult::Continue;
883        while res == SocketResult::Continue {
884            let pending = self.splice_out_pending();
885            // no more data in pipe, stop here
886            if pending == 0 {
887                count!(names::backend::BYTES_OUT, sz as i64);
888                metrics.bout += sz;
889                self.backend_readiness.interest.insert(Ready::READABLE);
890                self.frontend_readiness.interest.remove(Ready::WRITABLE);
891                return SessionResult::Continue;
892            }
893
894            let pipe_read_end = self.splice_pipe.as_ref().unwrap().out_pipe[0];
895            let (current_sz, current_res) =
896                splice::splice_out(pipe_read_end, self.frontend.socket_ref(), pending);
897            res = current_res;
898            if current_sz > 0 {
899                self.splice_pipe.as_mut().unwrap().out_pipe_pending -= current_sz;
900            }
901            sz += current_sz;
902
903            if current_sz == 0 && res == SocketResult::Continue {
904                self.frontend_status = match self.frontend_status {
905                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
906                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
907                    s => s,
908                };
909            }
910
911            if !self.check_connections() {
912                metrics.bout += sz;
913                count!(names::backend::BYTES_OUT, sz as i64);
914                self.frontend_readiness.reset();
915                self.backend_readiness.reset();
916                self.log_request_success(metrics);
917                return SessionResult::Close;
918            }
919        }
920
921        if sz > 0 {
922            count!(names::backend::BYTES_OUT, sz as i64);
923            self.backend_readiness.interest.insert(Ready::READABLE);
924            metrics.bout += sz;
925        }
926
927        debug!(
928            "{} Spliced {} bytes (out_pipe_pending={})",
929            log_context!(self),
930            sz,
931            self.splice_out_pending()
932        );
933
934        match res {
935            SocketResult::Error => {
936                self.frontend_readiness.reset();
937                self.backend_readiness.reset();
938                self.log_request_error(metrics, "splice front socket write error");
939                return SessionResult::Close;
940            }
941            SocketResult::Closed => {
942                self.frontend_readiness.reset();
943                self.backend_readiness.reset();
944                self.log_request_success(metrics);
945                return SessionResult::Close;
946            }
947            SocketResult::WouldBlock => {
948                self.frontend_readiness.event.remove(Ready::WRITABLE);
949            }
950            SocketResult::Continue => {}
951        }
952
953        SessionResult::Continue
954    }
955
956    /// Zero-copy fast path of `backend_writable`: drain the
957    /// frontend→backend kernel `in_pipe` toward the backend socket via
958    /// `splice(2)`. Mirrors `backend_writable`'s loop, status
959    /// transitions, and metric emissions.
960    #[cfg(all(target_os = "linux", feature = "splice"))]
961    fn splice_backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
962        trace!("{} pipe splice_backend_writable", log_context!(self));
963
964        if self.splice_in_pending() == 0 {
965            self.frontend_readiness.interest.insert(Ready::READABLE);
966            self.backend_readiness.interest.remove(Ready::WRITABLE);
967            return SessionResult::Continue;
968        }
969
970        let output_size = self.splice_in_pending();
971        let mut sz = 0usize;
972        let mut socket_res = SocketResult::Continue;
973
974        while socket_res == SocketResult::Continue {
975            let pending = self.splice_in_pending();
976            // no more data in pipe, stop here
977            if pending == 0 {
978                self.frontend_readiness.interest.insert(Ready::READABLE);
979                self.backend_readiness.interest.remove(Ready::WRITABLE);
980                count!(names::backend::BACK_BYTES_OUT, sz as i64);
981                metrics.backend_bout += sz;
982                return SessionResult::Continue;
983            }
984
985            let pipe_read_end = self.splice_pipe.as_ref().unwrap().in_pipe[0];
986            let (current_sz, current_res) = match self.backend_socket.as_ref() {
987                Some(b) => splice::splice_out(pipe_read_end, b, pending),
988                None => break,
989            };
990            socket_res = current_res;
991            if current_sz > 0 {
992                self.splice_pipe.as_mut().unwrap().in_pipe_pending -= current_sz;
993            }
994            sz += current_sz;
995
996            if current_sz == 0 && current_res == SocketResult::Continue {
997                self.backend_status = match self.backend_status {
998                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
999                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1000                    s => s,
1001                };
1002            }
1003        }
1004
1005        count!(names::backend::BACK_BYTES_OUT, sz as i64);
1006        metrics.backend_bout += sz;
1007
1008        if !self.check_connections() {
1009            self.frontend_readiness.reset();
1010            self.backend_readiness.reset();
1011            self.log_request_success(metrics);
1012            return SessionResult::Close;
1013        }
1014
1015        debug!(
1016            "{} Spliced {} bytes of {}",
1017            log_context!(self),
1018            sz,
1019            output_size
1020        );
1021
1022        match socket_res {
1023            SocketResult::Error => {
1024                self.frontend_readiness.reset();
1025                self.backend_readiness.reset();
1026                self.log_request_error(metrics, "splice back socket write error");
1027                return SessionResult::Close;
1028            }
1029            SocketResult::Closed => {
1030                self.frontend_readiness.reset();
1031                self.backend_readiness.reset();
1032                self.log_request_success(metrics);
1033                return SessionResult::Close;
1034            }
1035            SocketResult::WouldBlock => {
1036                self.backend_readiness.event.remove(Ready::WRITABLE);
1037            }
1038            SocketResult::Continue => {}
1039        }
1040        SessionResult::Continue
1041    }
1042
1043    /// Zero-copy fast path of `backend_readable`: pull bytes off the
1044    /// backend socket into the kernel `out_pipe` via `splice(2)`, then
1045    /// mark the frontend writable so the data drains in the next event
1046    /// loop tick. Mirrors `backend_readable`'s status transitions and
1047    /// metric emissions.
1048    #[cfg(all(target_os = "linux", feature = "splice"))]
1049    fn splice_backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1050        self.reset_timeouts();
1051
1052        trace!("{} Pipe splice_backend_readable", log_context!(self));
1053        let capacity = self.splice_capacity();
1054        if self.splice_out_pending() >= capacity {
1055            // Pipe is full — stop reading and let the frontend drain it.
1056            self.backend_readiness.interest.remove(Ready::READABLE);
1057            self.frontend_readiness.interest.insert(Ready::WRITABLE);
1058            return SessionResult::Continue;
1059        }
1060
1061        let pipe_write_end = self.splice_pipe.as_ref().unwrap().out_pipe[1];
1062        let (size, remaining) = match self.backend_socket.as_ref() {
1063            Some(b) => splice::splice_in(b, pipe_write_end, capacity),
1064            None => return SessionResult::Continue,
1065        };
1066
1067        debug!("{} Spliced {} bytes from backend", log_context!(self), size);
1068
1069        if remaining != SocketResult::Continue || size == 0 {
1070            self.backend_readiness.event.remove(Ready::READABLE);
1071        }
1072        if size > 0 {
1073            self.splice_pipe.as_mut().unwrap().out_pipe_pending += size;
1074            self.frontend_readiness.interest.insert(Ready::WRITABLE);
1075            count!(names::backend::BACK_BYTES_IN, size as i64);
1076            metrics.backend_bin += size;
1077        }
1078
1079        if size == 0 && remaining == SocketResult::Closed {
1080            self.backend_status = match self.backend_status {
1081                ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
1082                ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
1083                s => s,
1084            };
1085
1086            if !self.check_connections() {
1087                self.frontend_readiness.reset();
1088                self.backend_readiness.reset();
1089                self.log_request_success(metrics);
1090                return SessionResult::Close;
1091            }
1092        }
1093
1094        match remaining {
1095            SocketResult::Error => {
1096                self.frontend_readiness.reset();
1097                self.backend_readiness.reset();
1098                self.log_request_error(metrics, "splice back socket read error");
1099                return SessionResult::Close;
1100            }
1101            SocketResult::Closed => {
1102                if !self.check_connections() {
1103                    self.frontend_readiness.reset();
1104                    self.backend_readiness.reset();
1105                    self.log_request_success(metrics);
1106                    return SessionResult::Close;
1107                }
1108            }
1109            SocketResult::WouldBlock => {
1110                self.backend_readiness.event.remove(Ready::READABLE);
1111            }
1112            SocketResult::Continue => {}
1113        }
1114
1115        SessionResult::Continue
1116    }
1117
1118    pub fn log_context(&self) -> LogContext<'_> {
1119        LogContext {
1120            session_id: self.session_id,
1121            request_id: Some(self.request_id),
1122            cluster_id: self.cluster_id.as_deref(),
1123            backend_id: self.backend_id.as_deref(),
1124        }
1125    }
1126
1127    fn log_endpoint(&self) -> EndpointRecord<'_> {
1128        match &self.websocket_context {
1129            WebSocketContext::Http {
1130                method,
1131                authority,
1132                path,
1133                status,
1134                reason,
1135            } => EndpointRecord::Http {
1136                method: method.as_deref(),
1137                authority: authority.as_deref(),
1138                path: path.as_deref(),
1139                status: status.to_owned(),
1140                reason: reason.as_deref(),
1141            },
1142            WebSocketContext::Tcp => EndpointRecord::Tcp,
1143        }
1144    }
1145}
1146
1147impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
1148    fn ready(
1149        &mut self,
1150        _session: Rc<RefCell<dyn crate::ProxySession>>,
1151        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
1152        metrics: &mut SessionMetrics,
1153    ) -> SessionResult {
1154        let mut counter = 0;
1155
1156        if self.frontend_readiness.event.is_hup() {
1157            return SessionResult::Close;
1158        }
1159
1160        while counter < MAX_LOOP_ITERATIONS {
1161            let frontend_interest = self.frontend_readiness.filter_interest();
1162            let backend_interest = self.backend_readiness.filter_interest();
1163
1164            trace!(
1165                "{} Frontend interest({:?}), backend interest({:?})",
1166                log_context!(self),
1167                frontend_interest,
1168                backend_interest
1169            );
1170            if frontend_interest.is_empty() && backend_interest.is_empty() {
1171                break;
1172            }
1173
1174            if self.backend_readiness.event.is_hup()
1175                && self.frontend_readiness.interest.is_writable()
1176                && !self.frontend_readiness.event.is_writable()
1177            {
1178                break;
1179            }
1180
1181            if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
1182                return SessionResult::Close;
1183            }
1184
1185            if backend_interest.is_writable()
1186                && self.backend_writable(metrics) == SessionResult::Close
1187            {
1188                return SessionResult::Close;
1189            }
1190
1191            if backend_interest.is_readable()
1192                && self.backend_readable(metrics) == SessionResult::Close
1193            {
1194                return SessionResult::Close;
1195            }
1196
1197            if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
1198                return SessionResult::Close;
1199            }
1200
1201            if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
1202                return SessionResult::Close;
1203            }
1204
1205            if frontend_interest.is_error() {
1206                error!(
1207                    "{} Frontend socket error, disconnecting",
1208                    log_context!(self)
1209                );
1210
1211                self.frontend_readiness.interest = Ready::EMPTY;
1212                self.backend_readiness.interest = Ready::EMPTY;
1213
1214                return SessionResult::Close;
1215            }
1216
1217            if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
1218                self.frontend_readiness.interest = Ready::EMPTY;
1219                self.backend_readiness.interest = Ready::EMPTY;
1220
1221                error!("{} Backend socket error, disconnecting", log_context!(self));
1222                return SessionResult::Close;
1223            }
1224
1225            counter += 1;
1226        }
1227
1228        if counter >= MAX_LOOP_ITERATIONS {
1229            error!(
1230                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1231                log_context!(self),
1232                MAX_LOOP_ITERATIONS
1233            );
1234
1235            incr!(names::http::INFINITE_LOOP_ERROR);
1236            self.print_state(self.protocol_string());
1237
1238            return SessionResult::Close;
1239        }
1240
1241        SessionResult::Continue
1242    }
1243
1244    fn update_readiness(&mut self, token: Token, events: Ready) {
1245        if self.frontend_token == token {
1246            self.frontend_readiness.event |= events;
1247        } else if self.backend_token == Some(token) {
1248            self.backend_readiness.event |= events;
1249        }
1250    }
1251
1252    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1253        //info!("got timeout for token: {:?}", token);
1254        if self.frontend_token == token {
1255            self.log_request_timeout(metrics, "frontend socket timeout");
1256            if let Some(timeout) = self.container_frontend_timeout.as_mut() {
1257                timeout.triggered()
1258            }
1259            return StateResult::CloseSession;
1260        }
1261
1262        if self.backend_token == Some(token) {
1263            //info!("backend timeout triggered for token {:?}", token);
1264            if let Some(timeout) = self.container_backend_timeout.as_mut() {
1265                timeout.triggered()
1266            }
1267
1268            self.log_request_timeout(metrics, "backend socket timeout");
1269            return StateResult::CloseSession;
1270        }
1271
1272        error!("{} Got timeout for an invalid token", log_context!(self));
1273        self.log_request_error(metrics, "invalid token timeout");
1274        StateResult::CloseSession
1275    }
1276
1277    fn cancel_timeouts(&mut self) {
1278        self.container_frontend_timeout.as_mut().map(|t| t.cancel());
1279        self.container_backend_timeout.as_mut().map(|t| t.cancel());
1280    }
1281
1282    fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1283        if let Some(backend) = self.backend.as_mut() {
1284            let mut backend = backend.borrow_mut();
1285            backend.active_requests = backend.active_requests.saturating_sub(1);
1286        }
1287    }
1288
1289    fn print_state(&self, context: &str) {
1290        error!(
1291            "\
1292{} {} Session(Pipe)
1293\tFrontend:
1294\t\ttoken: {:?}\treadiness: {:?}
1295\tBackend:
1296\t\ttoken: {:?}\treadiness: {:?}",
1297            log_context!(self),
1298            context,
1299            self.frontend_token,
1300            self.frontend_readiness,
1301            self.backend_token,
1302            self.backend_readiness
1303        );
1304    }
1305}