Skip to main content

sozu_lib/protocol/
pipe.rs

1//! Transparent byte-stream forwarder (TCP + WebSocket post-upgrade).
2//!
3//! Forwards bytes between front and back through fixed-size buffers without
4//! payload inspection. Readiness is managed via direct mio interest toggles
5//! (no `signal_pending_write` / `arm_writable` here — those belong to the
6//! mux H2 path). Used as the post-handshake state for raw TCP listeners and
7//! after a successful WebSocket upgrade on the H1 path.
8
9use std::{cell::RefCell, net::SocketAddr, rc::Rc};
10
11use mio::{Token, net::TcpStream};
12use rusty_ulid::Ulid;
13use sozu_command::{
14    config::MAX_LOOP_ITERATIONS,
15    logging::{EndpointRecord, LogContext, ansi_palette},
16};
17
18use crate::metrics::names;
19use crate::{
20    L7Proxy, ListenerHandler, Protocol, Readiness, SessionMetrics, SessionResult, StateResult,
21    backends::Backend,
22    pool::Checkout,
23    protocol::{SessionState, http::parser::Method},
24    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
25    sozu_command::ready::Ready,
26    timer::TimeoutContainer,
27};
28
29#[cfg(all(target_os = "linux", feature = "splice"))]
30use crate::splice::{self, SplicePipe};
31
32/// This macro is defined uniquely in this module to help the tracking of
33/// pipelining issues inside Sōzu. Colored output uses bold bright-white
34/// (uniform across every protocol) for the protocol label, light grey for the
35/// `Session` keyword, gray for keys and bright white for values. The
36/// `[ulid - - -]` context comes first to stay aligned with `MUX-*` and
37/// `SOCKET` log lines.
38macro_rules! log_context {
39    ($self:expr) => {{
40        let (open, reset, grey, gray, white) = ansi_palette();
41        format!(
42            "{gray}{ctx}{reset}\t{open}PIPE{reset}\t{grey}Session{reset}({gray}address{reset}={white}{address}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}frontend_status{reset}={white}{frontend_status:?}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_status{reset}={white}{backend_status:?}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
43            open = open,
44            reset = reset,
45            grey = grey,
46            gray = gray,
47            white = white,
48            ctx = $self.log_context(),
49            address = $self.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
50            frontend = $self.frontend_token.0,
51            frontend_readiness = $self.frontend_readiness,
52            frontend_status = $self.frontend_status,
53            backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
54            backend_status = $self.backend_status,
55            backend_readiness = $self.backend_readiness,
56        )
57    }};
58}
59
60#[derive(PartialEq, Eq)]
61pub enum SessionStatus {
62    Normal,
63    DefaultAnswer,
64}
65
66#[derive(Copy, Clone, Debug)]
67enum ConnectionStatus {
68    Normal,
69    ReadOpen,
70    WriteOpen,
71    Closed,
72}
73
74/// matches sozu_command_lib::logging::access_logs::EndpointRecords
75pub enum WebSocketContext {
76    Http {
77        method: Option<Method>,
78        authority: Option<String>,
79        path: Option<String>,
80        status: Option<u16>,
81        reason: Option<String>,
82    },
83    Tcp,
84}
85
86pub struct Pipe<Front: SocketHandler, L: ListenerHandler> {
87    backend_buffer: Checkout,
88    backend_id: Option<String>,
89    pub backend_readiness: Readiness,
90    backend_socket: Option<TcpStream>,
91    backend_status: ConnectionStatus,
92    backend_token: Option<Token>,
93    pub backend: Option<Rc<RefCell<Backend>>>,
94    cluster_id: Option<String>,
95    pub container_backend_timeout: Option<TimeoutContainer>,
96    pub container_frontend_timeout: Option<TimeoutContainer>,
97    frontend_buffer: Checkout,
98    pub frontend_readiness: Readiness,
99    frontend_status: ConnectionStatus,
100    frontend_token: Token,
101    frontend: Front,
102    listener: Rc<RefCell<L>>,
103    protocol: Protocol,
104    /// Connection/session ULID inherited from the parent mux or handshake.
105    /// Emitted in the first slot of the legacy log-context bracket.
106    session_id: Ulid,
107    request_id: Ulid,
108    session_address: Option<SocketAddr>,
109    websocket_context: WebSocketContext,
110    /// Connection-scoped TLS metadata captured at handshake completion,
111    /// inherited from the upstream mux `HttpContext` when `Pipe` is created
112    /// via WSS upgrade. `None` on plaintext paths (plain TCP, plain WS,
113    /// proxy-protocol) where no TLS was terminated by Sōzu.
114    tls_version: Option<&'static str>,
115    tls_cipher: Option<&'static str>,
116    /// Negotiated SNI hostname, pre-lowercased, no port. `None` on plaintext
117    /// paths or when the client omitted the SNI extension.
118    tls_sni: Option<String>,
119    tls_alpn: Option<&'static str>,
120    /// Kernel-pipe pair used for zero-copy `splice(2)` forwarding on
121    /// `Protocol::TCP` listeners. Allocated lazily in `new()` and
122    /// `None` for WebSocket-after-upgrade paths or when allocation
123    /// failed (caller falls back to the buffered path).
124    #[cfg(all(target_os = "linux", feature = "splice"))]
125    splice_pipe: Option<SplicePipe>,
126}
127
128impl<Front: SocketHandler, L: ListenerHandler> Pipe<Front, L> {
129    /// Instantiate a new Pipe SessionState with:
130    ///
131    /// - frontend_interest: READABLE | WRITABLE | HUP | ERROR
132    /// - frontend_event: EMPTY
133    /// - backend_interest: READABLE | WRITABLE | HUP | ERROR
134    /// - backend_event: EMPTY
135    ///
136    /// Remember to set the events from the previous State!
137    #[allow(clippy::too_many_arguments)]
138    pub fn new(
139        backend_buffer: Checkout,
140        backend_id: Option<String>,
141        backend_socket: Option<TcpStream>,
142        backend: Option<Rc<RefCell<Backend>>>,
143        container_backend_timeout: Option<TimeoutContainer>,
144        container_frontend_timeout: Option<TimeoutContainer>,
145        cluster_id: Option<String>,
146        frontend_buffer: Checkout,
147        frontend_token: Token,
148        frontend: Front,
149        listener: Rc<RefCell<L>>,
150        protocol: Protocol,
151        session_id: Ulid,
152        request_id: Ulid,
153        session_address: Option<SocketAddr>,
154        websocket_context: WebSocketContext,
155    ) -> Pipe<Front, L> {
156        let frontend_status = ConnectionStatus::Normal;
157        let backend_status = if backend_socket.is_none() {
158            ConnectionStatus::Closed
159        } else {
160            ConnectionStatus::Normal
161        };
162
163        let session = Pipe {
164            backend_buffer,
165            backend_id,
166            backend_readiness: Readiness {
167                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
168                event: Ready::EMPTY,
169            },
170            backend_socket,
171            backend_status,
172            backend_token: None,
173            backend,
174            cluster_id,
175            container_backend_timeout,
176            container_frontend_timeout,
177            frontend_buffer,
178            frontend_readiness: Readiness {
179                interest: Ready::READABLE | Ready::WRITABLE | Ready::HUP | Ready::ERROR,
180                event: Ready::EMPTY,
181            },
182            frontend_status,
183            frontend_token,
184            frontend,
185            listener,
186            protocol,
187            session_id,
188            request_id,
189            session_address,
190            websocket_context,
191            tls_version: None,
192            tls_cipher: None,
193            tls_sni: None,
194            tls_alpn: None,
195            #[cfg(all(target_os = "linux", feature = "splice"))]
196            splice_pipe: if protocol == Protocol::TCP {
197                SplicePipe::new()
198            } else {
199                None
200            },
201        };
202
203        trace!("{} created pipe", log_context!(session));
204        session
205    }
206
207    /// Stamp connection-scoped TLS metadata captured at handshake time onto
208    /// the pipe for access-log emission. Called from the HTTPS→WSS upgrade
209    /// path in `https.rs::upgrade_mux` after the `Pipe` has been built from
210    /// the prior mux `HttpContext`. Leaves plaintext paths (plain TCP, plain
211    /// WS, proxy-protocol) untouched so their access logs continue to emit
212    /// `None` for all TLS fields.
213    pub fn set_tls_metadata(
214        &mut self,
215        version: Option<&'static str>,
216        cipher: Option<&'static str>,
217        sni: Option<String>,
218        alpn: Option<&'static str>,
219    ) {
220        self.tls_version = version;
221        self.tls_cipher = cipher;
222        self.tls_sni = sni;
223        self.tls_alpn = alpn;
224    }
225
226    pub fn front_socket(&self) -> &TcpStream {
227        self.frontend.socket_ref()
228    }
229
230    pub fn front_socket_mut(&mut self) -> &mut TcpStream {
231        self.frontend.socket_mut()
232    }
233
234    pub fn back_socket(&self) -> Option<&TcpStream> {
235        self.backend_socket.as_ref()
236    }
237
238    pub fn back_socket_mut(&mut self) -> Option<&mut TcpStream> {
239        self.backend_socket.as_mut()
240    }
241
242    pub fn set_back_socket(&mut self, socket: TcpStream) {
243        self.backend_socket = Some(socket);
244        self.backend_status = ConnectionStatus::Normal;
245    }
246
247    pub fn back_token(&self) -> Vec<Token> {
248        self.backend_token.iter().cloned().collect()
249    }
250
251    fn reset_timeouts(&mut self) {
252        if let Some(t) = self.container_frontend_timeout.as_mut() {
253            if !t.reset() {
254                error!(
255                    "{} Could not reset front timeout (pipe)",
256                    log_context!(self)
257                );
258            }
259        }
260
261        if let Some(t) = self.container_backend_timeout.as_mut() {
262            if !t.reset() {
263                error!("{} Could not reset back timeout (pipe)", log_context!(self));
264            }
265        }
266    }
267
268    pub fn set_cluster_id(&mut self, cluster_id: Option<String>) {
269        self.cluster_id = cluster_id;
270    }
271
272    pub fn set_backend_id(&mut self, backend_id: Option<String>) {
273        self.backend_id = backend_id;
274    }
275
276    pub fn set_back_token(&mut self, token: Token) {
277        self.backend_token = Some(token);
278    }
279
280    pub fn get_session_address(&self) -> Option<SocketAddr> {
281        self.session_address
282            .or_else(|| self.frontend.socket_ref().peer_addr().ok())
283    }
284
285    pub fn get_backend_address(&self) -> Option<SocketAddr> {
286        self.backend_socket
287            .as_ref()
288            .and_then(|backend| backend.peer_addr().ok())
289    }
290
291    fn protocol_string(&self) -> &'static str {
292        match self.protocol {
293            Protocol::TCP => "TCP",
294            Protocol::HTTP => "WS",
295            Protocol::HTTPS => match self.frontend.protocol() {
296                TransportProtocol::Ssl2 => "WSS-SSL2",
297                TransportProtocol::Ssl3 => "WSS-SSL3",
298                TransportProtocol::Tls1_0 => "WSS-TLS1.0",
299                TransportProtocol::Tls1_1 => "WSS-TLS1.1",
300                TransportProtocol::Tls1_2 => "WSS-TLS1.2",
301                TransportProtocol::Tls1_3 => "WSS-TLS1.3",
302                _ => unreachable!(),
303            },
304            _ => unreachable!(),
305        }
306    }
307
308    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
309        let listener = self.listener.borrow();
310        let context = self.log_context();
311        let endpoint = self.log_endpoint();
312        metrics.register_end_of_session(&context);
313        log_access!(
314            error,
315            on_failure: { incr!(names::access_logs::UNSENT) },
316            message,
317            context,
318            session_address: self.get_session_address(),
319            backend_address: self.get_backend_address(),
320            protocol: self.protocol_string(),
321            endpoint,
322            tags: listener.get_tags(&listener.get_addr().to_string()),
323            client_rtt: socket_rtt(self.front_socket()),
324            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
325            service_time: metrics.service_time(),
326            response_time: metrics.backend_response_time(),
327            request_time: metrics.request_time(),
328            start_time_ns: metrics.start_wall_ns(),
329            bytes_in: metrics.bin,
330            bytes_out: metrics.bout,
331            user_agent: None,
332            x_request_id: None,
333            // Pipe is post-upgrade; the TLS metadata was captured once at
334            // handshake in `https.rs::upgrade_handshake` and plumbed through
335            // via `set_tls_metadata`. Plaintext paths leave these fields as
336            // `None` — matching the TCP log shape.
337            tls_version: self.tls_version,
338            tls_cipher: self.tls_cipher,
339            tls_sni: self.tls_sni.as_deref(),
340            tls_alpn: self.tls_alpn,
341            xff_chain: None,
342            otel: None,
343        );
344    }
345
346    pub fn log_request_success(&self, metrics: &SessionMetrics) {
347        self.log_request(metrics, false, None);
348    }
349
350    pub fn log_request_error(&self, metrics: &SessionMetrics, message: &str) {
351        incr!(names::pipe::ERRORS);
352        error!(
353            "{} Could not process request properly got: {}",
354            log_context!(self),
355            message
356        );
357        self.print_state(self.protocol_string());
358        self.log_request(metrics, true, Some(message));
359    }
360
361    /// Access-log wrapper for benign idle-timeout tear-downs.
362    ///
363    /// Unlike `log_request_error`, this path logs at `debug!` and skips the
364    /// state dump — an idle pipe hitting its front/back_timeout is expected
365    /// behaviour (e.g. a WebSocket with no keepalive) and should not pollute
366    /// the error stream.
367    pub fn log_request_timeout(&self, metrics: &SessionMetrics, message: &str) {
368        debug!("{} pipe timeout: {}", log_context!(self), message);
369        self.log_request(metrics, true, Some(message));
370    }
371
372    /// Bytes currently sitting inside the `splice` frontend→backend
373    /// kernel pipe (`0` if splice is disabled or the pipe was not
374    /// allocated). Counted as "request in flight" by `check_connections`
375    /// so a half-closed session stays alive until the kernel drains.
376    #[cfg(all(target_os = "linux", feature = "splice"))]
377    fn splice_in_pending(&self) -> usize {
378        self.splice_pipe
379            .as_ref()
380            .map(|p| p.in_pipe_pending)
381            .unwrap_or(0)
382    }
383    #[cfg(not(all(target_os = "linux", feature = "splice")))]
384    fn splice_in_pending(&self) -> usize {
385        0
386    }
387
388    /// Bytes currently sitting inside the `splice` backend→frontend
389    /// kernel pipe. Counterpart to `splice_in_pending` for the response
390    /// direction.
391    #[cfg(all(target_os = "linux", feature = "splice"))]
392    fn splice_out_pending(&self) -> usize {
393        self.splice_pipe
394            .as_ref()
395            .map(|p| p.out_pipe_pending)
396            .unwrap_or(0)
397    }
398    #[cfg(not(all(target_os = "linux", feature = "splice")))]
399    fn splice_out_pending(&self) -> usize {
400        0
401    }
402
403    /// Realised kernel-pipe capacity per direction (`0` if splice is
404    /// disabled). Drives the "pipe is full" backpressure check in the
405    /// splice readable methods and the per-call `len` for `splice_in`.
406    #[cfg(all(target_os = "linux", feature = "splice"))]
407    fn splice_capacity(&self) -> usize {
408        self.splice_pipe.as_ref().map(|p| p.capacity).unwrap_or(0)
409    }
410
411    /// Tear down both readiness trackers ahead of a `SessionResult::Close`.
412    ///
413    /// This is the *write-only-shutdown discipline* (CLAUDE.md gotcha: never
414    /// `shutdown(Shutdown::Both)` on a TLS frontend — it emits a TCP RST that
415    /// truncates the already-queued response). `Pipe` never issues an explicit
416    /// `shutdown`; it closes purely by clearing interest+event so the event
417    /// loop stops driving I/O and lets the kernel flush queued bytes, with the
418    /// peer close arriving via the normal read path. The post-condition
419    /// asserts both trackers are fully cleared.
420    fn reset_readiness_for_close(&mut self) {
421        self.frontend_readiness.reset();
422        self.backend_readiness.reset();
423        debug_assert!(
424            self.frontend_readiness.interest.is_empty() && self.frontend_readiness.event.is_empty(),
425            "frontend readiness must be fully cleared on close (write-only-shutdown discipline)"
426        );
427        debug_assert!(
428            self.backend_readiness.interest.is_empty() && self.backend_readiness.event.is_empty(),
429            "backend readiness must be fully cleared on close (write-only-shutdown discipline)"
430        );
431    }
432
433    /// Wether the session should be kept open, depending on endpoints status
434    /// and buffer usage (both in memory and in kernel)
435    pub fn check_connections(&self) -> bool {
436        // In-flight accounting must never see more *buffered* bytes than the
437        // backing Checkout buffer can hold. We intentionally do NOT bound the
438        // splice-pending counters by the pipe `capacity`: a kernel pipe buffers
439        // well beyond its nominal `F_GETPIPE_SZ` when `splice(2)` moves
440        // skb-backed GRO segments, so `splice_*_pending` legitimately exceeds it
441        // (see `splice_readable`). A violation here means a `fill`/`consume`
442        // elsewhere desynced the counters, corrupting the keep-alive decision.
443        debug_assert!(
444            self.frontend_buffer.available_data() <= self.frontend_buffer.capacity(),
445            "frontend buffered data exceeds its capacity"
446        );
447        debug_assert!(
448            self.backend_buffer.available_data() <= self.backend_buffer.capacity(),
449            "backend buffered data exceeds its capacity"
450        );
451
452        let request_is_inflight = self.frontend_buffer.available_data() > 0
453            || self.frontend_readiness.event.is_readable()
454            || self.splice_in_pending() > 0;
455        let response_is_inflight = self.backend_buffer.available_data() > 0
456            || self.backend_readiness.event.is_readable()
457            || self.splice_out_pending() > 0;
458        match (self.frontend_status, self.backend_status) {
459            (ConnectionStatus::Normal, ConnectionStatus::Normal) => true,
460            (ConnectionStatus::Normal, ConnectionStatus::ReadOpen) => true,
461            (ConnectionStatus::Normal, ConnectionStatus::WriteOpen) => {
462                // technically we should keep it open, but we'll assume that if the front
463                // is not readable and there is no in flight data front -> back or back -> front,
464                // we'll close the session, otherwise it interacts badly with HTTP connections
465                // with Connection: close header and no Content-length
466                request_is_inflight || response_is_inflight
467            }
468            (ConnectionStatus::Normal, ConnectionStatus::Closed) => response_is_inflight,
469
470            (ConnectionStatus::WriteOpen, ConnectionStatus::Normal) => {
471                // technically we should keep it open, but we'll assume that if the back
472                // is not readable and there is no in flight data back -> front or front -> back, we'll close the session
473                request_is_inflight || response_is_inflight
474            }
475            (ConnectionStatus::WriteOpen, ConnectionStatus::ReadOpen) => true,
476            (ConnectionStatus::WriteOpen, ConnectionStatus::WriteOpen) => {
477                request_is_inflight || response_is_inflight
478            }
479            (ConnectionStatus::WriteOpen, ConnectionStatus::Closed) => response_is_inflight,
480
481            (ConnectionStatus::ReadOpen, ConnectionStatus::Normal) => true,
482            (ConnectionStatus::ReadOpen, ConnectionStatus::ReadOpen) => false,
483            (ConnectionStatus::ReadOpen, ConnectionStatus::WriteOpen) => true,
484            (ConnectionStatus::ReadOpen, ConnectionStatus::Closed) => false,
485
486            (ConnectionStatus::Closed, ConnectionStatus::Normal) => request_is_inflight,
487            (ConnectionStatus::Closed, ConnectionStatus::ReadOpen) => false,
488            (ConnectionStatus::Closed, ConnectionStatus::WriteOpen) => request_is_inflight,
489            (ConnectionStatus::Closed, ConnectionStatus::Closed) => false,
490        }
491    }
492
493    pub fn frontend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
494        self.log_request_success(metrics);
495        self.frontend_status = ConnectionStatus::Closed;
496        SessionResult::Close
497    }
498
499    pub fn backend_hup(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
500        self.backend_status = ConnectionStatus::Closed;
501        // The backend hung up: its status is now terminal regardless of which
502        // keep-alive branch we take below.
503        debug_assert!(
504            matches!(self.backend_status, ConnectionStatus::Closed),
505            "backend_hup must mark the backend Closed"
506        );
507        let pipe_has_data = self.splice_out_pending() > 0;
508        if self.backend_buffer.available_data() == 0 && !pipe_has_data {
509            // No buffered or in-kernel response data: there is nothing left to
510            // drain toward the frontend on this no-data branch.
511            debug_assert_eq!(
512                self.backend_buffer.available_data(),
513                0,
514                "no-data branch entered with response bytes still buffered"
515            );
516            if self.backend_readiness.event.is_readable() {
517                self.backend_readiness.interest.insert(Ready::READABLE);
518                debug!(
519                    "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in kernel.",
520                    log_context!(self)
521                );
522                SessionResult::Continue
523            } else {
524                self.log_request_success(metrics);
525                SessionResult::Close
526            }
527        } else {
528            debug!(
529                "{} Pipe::backend_hup: backend connection closed, keeping alive due to inflight data in buffers.",
530                log_context!(self)
531            );
532            self.frontend_readiness.interest.insert(Ready::WRITABLE);
533            if self.backend_readiness.event.is_readable() {
534                self.backend_readiness.interest.insert(Ready::READABLE);
535            }
536            SessionResult::Continue
537        }
538    }
539
540    // Read content from the session
541    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
542        #[cfg(all(target_os = "linux", feature = "splice"))]
543        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
544            return self.splice_readable(metrics);
545        }
546
547        self.reset_timeouts();
548
549        trace!("{} pipe readable", log_context!(self));
550        if self.frontend_buffer.available_space() == 0 {
551            self.frontend_readiness.interest.remove(Ready::READABLE);
552            self.backend_readiness.interest.insert(Ready::WRITABLE);
553            return SessionResult::Continue;
554        }
555
556        let space_before = self.frontend_buffer.available_space();
557        let data_before = self.frontend_buffer.available_data();
558        let bin_before = metrics.bin;
559        let (sz, res) = self.frontend.socket_read(self.frontend_buffer.space());
560        // `socket_read` fills `buf[..]` and returns `min(read, buf.len())`; it
561        // can never report more bytes than the space slice it was handed.
562        debug_assert!(
563            sz <= space_before,
564            "frontend socket_read reported more bytes ({sz}) than the buffer space offered ({space_before})"
565        );
566        debug!("{} Read {} bytes", log_context!(self), sz);
567
568        if sz > 0 {
569            //FIXME: replace with copy()
570            self.frontend_buffer.fill(sz);
571            // `fill(sz)` with `sz <= available_space` moves exactly `sz` bytes
572            // from free space into readable data — no truncation, no growth.
573            debug_assert_eq!(
574                self.frontend_buffer.available_data(),
575                data_before + sz,
576                "fill must grow readable data by exactly the bytes read"
577            );
578
579            count!(names::backend::BYTES_IN, sz as i64);
580            metrics.bin += sz;
581            // Front→proxy ingress metric advances by exactly the bytes read.
582            debug_assert_eq!(
583                metrics.bin,
584                bin_before + sz,
585                "metrics.bin must advance by exactly the bytes read"
586            );
587
588            if self.frontend_buffer.available_space() == 0 {
589                self.frontend_readiness.interest.remove(Ready::READABLE);
590            }
591            self.backend_readiness.interest.insert(Ready::WRITABLE);
592        } else {
593            self.frontend_readiness.event.remove(Ready::READABLE);
594
595            if res == SocketResult::Continue {
596                self.frontend_status = match self.frontend_status {
597                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
598                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
599                    s => s,
600                };
601            }
602        }
603
604        if !self.check_connections() {
605            self.reset_readiness_for_close();
606            self.log_request_success(metrics);
607            return SessionResult::Close;
608        }
609
610        match res {
611            SocketResult::Error => {
612                self.reset_readiness_for_close();
613                self.log_request_error(metrics, "front socket read error");
614                return SessionResult::Close;
615            }
616            SocketResult::Closed => {
617                self.reset_readiness_for_close();
618                self.log_request_success(metrics);
619                return SessionResult::Close;
620            }
621            SocketResult::WouldBlock => {
622                self.frontend_readiness.event.remove(Ready::READABLE);
623            }
624            SocketResult::Continue => {}
625        };
626
627        self.backend_readiness.interest.insert(Ready::WRITABLE);
628        SessionResult::Continue
629    }
630
631    // Forward content to session
632    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
633        #[cfg(all(target_os = "linux", feature = "splice"))]
634        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
635            return self.splice_writable(metrics);
636        }
637
638        trace!("{} Pipe writable", log_context!(self));
639        if self.backend_buffer.available_data() == 0 {
640            self.backend_readiness.interest.insert(Ready::READABLE);
641            self.frontend_readiness.interest.remove(Ready::WRITABLE);
642            return SessionResult::Continue;
643        }
644
645        let queued_total = self.backend_buffer.available_data();
646        let mut sz = 0usize;
647        let mut res = SocketResult::Continue;
648        while res == SocketResult::Continue {
649            // no more data in buffer, stop here
650            if self.backend_buffer.available_data() == 0 {
651                count!(names::backend::BYTES_OUT, sz as i64);
652                metrics.bout += sz;
653                self.backend_readiness.interest.insert(Ready::READABLE);
654                self.frontend_readiness.interest.remove(Ready::WRITABLE);
655                return SessionResult::Continue;
656            }
657            let queued = self.backend_buffer.available_data();
658            let (current_sz, current_res) = self.frontend.socket_write(self.backend_buffer.data());
659            // A partial write can never report more than was queued: the
660            // socket writes from `data()` and returns `min(written, data.len())`.
661            debug_assert!(
662                current_sz <= queued,
663                "frontend socket_write reported {current_sz} bytes but only {queued} were queued"
664            );
665            res = current_res;
666            let consumed = self.backend_buffer.consume(current_sz);
667            // `consume` drops exactly the written bytes (we already proved
668            // `current_sz <= available_data`, so no clamping occurs).
669            debug_assert_eq!(
670                consumed, current_sz,
671                "consume must drop exactly the bytes written to the frontend"
672            );
673            sz += current_sz;
674            // Cumulative transfer never overruns what was queued at entry.
675            debug_assert!(
676                sz <= queued_total,
677                "cumulative frontend write ({sz}) exceeded the queued backend data ({queued_total})"
678            );
679
680            if current_sz == 0 && res == SocketResult::Continue {
681                self.frontend_status = match self.frontend_status {
682                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
683                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
684                    s => s,
685                };
686            }
687
688            if !self.check_connections() {
689                metrics.bout += sz;
690                count!(names::backend::BYTES_OUT, sz as i64);
691                self.reset_readiness_for_close();
692                self.log_request_success(metrics);
693                return SessionResult::Close;
694            }
695        }
696
697        if sz > 0 {
698            count!(names::backend::BYTES_OUT, sz as i64);
699            self.backend_readiness.interest.insert(Ready::READABLE);
700            metrics.bout += sz;
701        }
702
703        debug!(
704            "{} Wrote {} bytes of {}",
705            log_context!(self),
706            sz,
707            self.backend_buffer.available_data()
708        );
709
710        match res {
711            SocketResult::Error => {
712                self.reset_readiness_for_close();
713                self.log_request_error(metrics, "front socket write error");
714                return SessionResult::Close;
715            }
716            SocketResult::Closed => {
717                self.reset_readiness_for_close();
718                self.log_request_success(metrics);
719                return SessionResult::Close;
720            }
721            SocketResult::WouldBlock => {
722                self.frontend_readiness.event.remove(Ready::WRITABLE);
723            }
724            SocketResult::Continue => {}
725        }
726
727        SessionResult::Continue
728    }
729
730    // Forward content to cluster
731    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
732        #[cfg(all(target_os = "linux", feature = "splice"))]
733        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
734            return self.splice_backend_writable(metrics);
735        }
736
737        trace!("{} pipe back_writable", log_context!(self));
738
739        if self.frontend_buffer.available_data() == 0 {
740            self.frontend_readiness.interest.insert(Ready::READABLE);
741            self.backend_readiness.interest.remove(Ready::WRITABLE);
742            return SessionResult::Continue;
743        }
744
745        let output_size = self.frontend_buffer.available_data();
746
747        let mut sz = 0usize;
748        let mut socket_res = SocketResult::Continue;
749
750        if let Some(ref mut backend) = self.backend_socket {
751            while socket_res == SocketResult::Continue {
752                // no more data in buffer, stop here
753                if self.frontend_buffer.available_data() == 0 {
754                    self.frontend_readiness.interest.insert(Ready::READABLE);
755                    self.backend_readiness.interest.remove(Ready::WRITABLE);
756                    count!(names::backend::BACK_BYTES_OUT, sz as i64);
757                    metrics.backend_bout += sz;
758                    return SessionResult::Continue;
759                }
760
761                let queued = self.frontend_buffer.available_data();
762                let (current_sz, current_res) = backend.socket_write(self.frontend_buffer.data());
763                // A partial write can never report more than was queued.
764                debug_assert!(
765                    current_sz <= queued,
766                    "backend socket_write reported {current_sz} bytes but only {queued} were queued"
767                );
768                socket_res = current_res;
769                let consumed = self.frontend_buffer.consume(current_sz);
770                debug_assert_eq!(
771                    consumed, current_sz,
772                    "consume must drop exactly the bytes written to the backend"
773                );
774                sz += current_sz;
775                // Cumulative transfer never overruns the data queued at entry.
776                debug_assert!(
777                    sz <= output_size,
778                    "cumulative backend write ({sz}) exceeded the queued frontend data ({output_size})"
779                );
780
781                if current_sz == 0 && current_res == SocketResult::Continue {
782                    self.backend_status = match self.backend_status {
783                        ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
784                        ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
785                        s => s,
786                    };
787                }
788            }
789        }
790
791        let backend_bout_before = metrics.backend_bout;
792        count!(names::backend::BACK_BYTES_OUT, sz as i64);
793        metrics.backend_bout += sz;
794        // Proxy→backend egress metric advances by exactly the bytes written.
795        debug_assert_eq!(
796            metrics.backend_bout,
797            backend_bout_before + sz,
798            "metrics.backend_bout must advance by exactly the bytes written"
799        );
800
801        if !self.check_connections() {
802            self.reset_readiness_for_close();
803            self.log_request_success(metrics);
804            return SessionResult::Close;
805        }
806
807        debug!(
808            "{} Wrote {} bytes of {}",
809            log_context!(self),
810            sz,
811            output_size
812        );
813
814        match socket_res {
815            SocketResult::Error => {
816                self.reset_readiness_for_close();
817                self.log_request_error(metrics, "back socket write error");
818                return SessionResult::Close;
819            }
820            SocketResult::Closed => {
821                self.reset_readiness_for_close();
822                self.log_request_success(metrics);
823                return SessionResult::Close;
824            }
825            SocketResult::WouldBlock => {
826                self.backend_readiness.event.remove(Ready::WRITABLE);
827            }
828            SocketResult::Continue => {}
829        }
830        SessionResult::Continue
831    }
832
833    // Read content from cluster
834    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
835        #[cfg(all(target_os = "linux", feature = "splice"))]
836        if self.protocol == Protocol::TCP && self.splice_pipe.is_some() {
837            return self.splice_backend_readable(metrics);
838        }
839
840        self.reset_timeouts();
841
842        trace!("{} Pipe backend_readable", log_context!(self));
843        if self.backend_buffer.available_space() == 0 {
844            self.backend_readiness.interest.remove(Ready::READABLE);
845            return SessionResult::Continue;
846        }
847
848        let space_before = self.backend_buffer.available_space();
849        let data_before = self.backend_buffer.available_data();
850        let backend_bin_before = metrics.backend_bin;
851        if let Some(ref mut backend) = self.backend_socket {
852            let (size, remaining) = backend.socket_read(self.backend_buffer.space());
853            // `socket_read` reports at most the space slice it was handed.
854            debug_assert!(
855                size <= space_before,
856                "backend socket_read reported more bytes ({size}) than the buffer space offered ({space_before})"
857            );
858            self.backend_buffer.fill(size);
859            // `fill(size)` with `size <= available_space` moves exactly `size`
860            // bytes from free space into readable data.
861            debug_assert_eq!(
862                self.backend_buffer.available_data(),
863                data_before + size,
864                "fill must grow readable data by exactly the bytes read"
865            );
866
867            debug!("{} Read {} bytes", log_context!(self), size);
868
869            if remaining != SocketResult::Continue || size == 0 {
870                self.backend_readiness.event.remove(Ready::READABLE);
871            }
872            if size > 0 {
873                self.frontend_readiness.interest.insert(Ready::WRITABLE);
874                count!(names::backend::BACK_BYTES_IN, size as i64);
875                metrics.backend_bin += size;
876                // Backend→proxy ingress metric advances by exactly bytes read.
877                debug_assert_eq!(
878                    metrics.backend_bin,
879                    backend_bin_before + size,
880                    "metrics.backend_bin must advance by exactly the bytes read"
881                );
882            }
883
884            if size == 0 && remaining == SocketResult::Closed {
885                self.backend_status = match self.backend_status {
886                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
887                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
888                    s => s,
889                };
890
891                if !self.check_connections() {
892                    self.reset_readiness_for_close();
893                    self.log_request_success(metrics);
894                    return SessionResult::Close;
895                }
896            }
897
898            match remaining {
899                SocketResult::Error => {
900                    self.reset_readiness_for_close();
901                    self.log_request_error(metrics, "back socket read error");
902                    return SessionResult::Close;
903                }
904                SocketResult::Closed => {
905                    if !self.check_connections() {
906                        self.reset_readiness_for_close();
907                        self.log_request_success(metrics);
908                        return SessionResult::Close;
909                    }
910                }
911                SocketResult::WouldBlock => {
912                    self.backend_readiness.event.remove(Ready::READABLE);
913                }
914                SocketResult::Continue => {}
915            }
916        }
917
918        SessionResult::Continue
919    }
920
921    /// Zero-copy fast path of `readable`: pull bytes off the frontend
922    /// socket into the kernel `in_pipe` via `splice(2)`, then mark the
923    /// backend writable so the data drains in the next event loop tick.
924    ///
925    /// Mirrors `readable`'s `ConnectionStatus` transitions and metric
926    /// emissions exactly so observability and the `check_connections`
927    /// state machine behave the same with or without the feature flag.
928    #[cfg(all(target_os = "linux", feature = "splice"))]
929    fn splice_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
930        self.reset_timeouts();
931
932        trace!("{} pipe splice_readable", log_context!(self));
933        let capacity = self.splice_capacity();
934        if self.splice_in_pending() >= capacity {
935            // Pipe is full — stop reading and let the backend drain it.
936            self.frontend_readiness.interest.remove(Ready::READABLE);
937            self.backend_readiness.interest.insert(Ready::WRITABLE);
938            return SessionResult::Continue;
939        }
940
941        let pending_before = self.splice_in_pending();
942        let bin_before = metrics.bin;
943        let pipe_write_end = self.splice_pipe.as_ref().unwrap().in_pipe[1];
944        let (sz, res) = splice::splice_in(self.frontend.socket_ref(), pipe_write_end, capacity);
945        // `splice_in` is asked for at most `capacity` bytes, so the kernel can
946        // never report moving more than that in one call. We deliberately do
947        // NOT assert `in_pipe_pending <= capacity`: a kernel pipe buffers well
948        // beyond its nominal `F_GETPIPE_SZ` when `splice(2)` moves skb-backed
949        // segments — a GRO super-packet on loopback hands a single ring slot far
950        // more than a page — so byte-occupancy legitimately exceeds `capacity`.
951        // `capacity` is the per-call `len` and a soft backpressure threshold,
952        // not a hard occupancy bound.
953        debug_assert!(
954            sz <= capacity,
955            "splice_in reported {sz} bytes but was capped at len {capacity}"
956        );
957        debug!("{} Spliced {} bytes from frontend", log_context!(self), sz);
958
959        if sz > 0 {
960            self.splice_pipe.as_mut().unwrap().in_pipe_pending += sz;
961            // Pending advanced by exactly the spliced bytes (tracks real
962            // kernel-pipe occupancy; see the capacity note above).
963            debug_assert_eq!(
964                self.splice_in_pending(),
965                pending_before + sz,
966                "in_pipe_pending must grow by exactly the spliced bytes"
967            );
968            count!(names::backend::BYTES_IN, sz as i64);
969            metrics.bin += sz;
970            debug_assert_eq!(
971                metrics.bin,
972                bin_before + sz,
973                "metrics.bin must advance by exactly the spliced bytes"
974            );
975            self.backend_readiness.interest.insert(Ready::WRITABLE);
976        } else {
977            self.frontend_readiness.event.remove(Ready::READABLE);
978
979            if res == SocketResult::Continue {
980                self.frontend_status = match self.frontend_status {
981                    ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
982                    ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
983                    s => s,
984                };
985            }
986        }
987
988        if !self.check_connections() {
989            self.reset_readiness_for_close();
990            self.log_request_success(metrics);
991            return SessionResult::Close;
992        }
993
994        match res {
995            SocketResult::Error => {
996                self.reset_readiness_for_close();
997                self.log_request_error(metrics, "splice front socket read error");
998                return SessionResult::Close;
999            }
1000            SocketResult::Closed => {
1001                self.reset_readiness_for_close();
1002                self.log_request_success(metrics);
1003                return SessionResult::Close;
1004            }
1005            SocketResult::WouldBlock => {
1006                self.frontend_readiness.event.remove(Ready::READABLE);
1007            }
1008            SocketResult::Continue => {}
1009        }
1010
1011        self.backend_readiness.interest.insert(Ready::WRITABLE);
1012        SessionResult::Continue
1013    }
1014
1015    /// Zero-copy fast path of `writable`: drain the backend→frontend
1016    /// kernel `out_pipe` toward the frontend socket via `splice(2)`.
1017    /// Mirrors `writable`'s loop, status transitions, and metric
1018    /// emissions.
1019    #[cfg(all(target_os = "linux", feature = "splice"))]
1020    fn splice_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1021        trace!("{} Pipe splice_writable", log_context!(self));
1022        if self.splice_out_pending() == 0 {
1023            self.backend_readiness.interest.insert(Ready::READABLE);
1024            self.frontend_readiness.interest.remove(Ready::WRITABLE);
1025            return SessionResult::Continue;
1026        }
1027
1028        let mut sz = 0usize;
1029        let mut res = SocketResult::Continue;
1030        while res == SocketResult::Continue {
1031            let pending = self.splice_out_pending();
1032            // no more data in pipe, stop here
1033            if pending == 0 {
1034                count!(names::backend::BYTES_OUT, sz as i64);
1035                metrics.bout += sz;
1036                self.backend_readiness.interest.insert(Ready::READABLE);
1037                self.frontend_readiness.interest.remove(Ready::WRITABLE);
1038                return SessionResult::Continue;
1039            }
1040
1041            let pipe_read_end = self.splice_pipe.as_ref().unwrap().out_pipe[0];
1042            let (current_sz, current_res) =
1043                splice::splice_out(pipe_read_end, self.frontend.socket_ref(), pending);
1044            // `splice_out` was asked for `pending` bytes and can drain no more
1045            // than the pipe holds; draining more than `pending` would underflow
1046            // `out_pipe_pending` below.
1047            debug_assert!(
1048                current_sz <= pending,
1049                "splice_out drained {current_sz} bytes but only {pending} were pending (would underflow)"
1050            );
1051            res = current_res;
1052            if current_sz > 0 {
1053                self.splice_pipe.as_mut().unwrap().out_pipe_pending -= current_sz;
1054                debug_assert_eq!(
1055                    self.splice_out_pending(),
1056                    pending - current_sz,
1057                    "out_pipe_pending must shrink by exactly the drained bytes"
1058                );
1059            }
1060            sz += current_sz;
1061
1062            if current_sz == 0 && res == SocketResult::Continue {
1063                self.frontend_status = match self.frontend_status {
1064                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
1065                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1066                    s => s,
1067                };
1068            }
1069
1070            if !self.check_connections() {
1071                metrics.bout += sz;
1072                count!(names::backend::BYTES_OUT, sz as i64);
1073                self.reset_readiness_for_close();
1074                self.log_request_success(metrics);
1075                return SessionResult::Close;
1076            }
1077        }
1078
1079        if sz > 0 {
1080            count!(names::backend::BYTES_OUT, sz as i64);
1081            self.backend_readiness.interest.insert(Ready::READABLE);
1082            metrics.bout += sz;
1083        }
1084
1085        debug!(
1086            "{} Spliced {} bytes (out_pipe_pending={})",
1087            log_context!(self),
1088            sz,
1089            self.splice_out_pending()
1090        );
1091
1092        match res {
1093            SocketResult::Error => {
1094                self.reset_readiness_for_close();
1095                self.log_request_error(metrics, "splice front socket write error");
1096                return SessionResult::Close;
1097            }
1098            SocketResult::Closed => {
1099                self.reset_readiness_for_close();
1100                self.log_request_success(metrics);
1101                return SessionResult::Close;
1102            }
1103            SocketResult::WouldBlock => {
1104                self.frontend_readiness.event.remove(Ready::WRITABLE);
1105            }
1106            SocketResult::Continue => {}
1107        }
1108
1109        SessionResult::Continue
1110    }
1111
1112    /// Zero-copy fast path of `backend_writable`: drain the
1113    /// frontend→backend kernel `in_pipe` toward the backend socket via
1114    /// `splice(2)`. Mirrors `backend_writable`'s loop, status
1115    /// transitions, and metric emissions.
1116    #[cfg(all(target_os = "linux", feature = "splice"))]
1117    fn splice_backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1118        trace!("{} pipe splice_backend_writable", log_context!(self));
1119
1120        if self.splice_in_pending() == 0 {
1121            self.frontend_readiness.interest.insert(Ready::READABLE);
1122            self.backend_readiness.interest.remove(Ready::WRITABLE);
1123            return SessionResult::Continue;
1124        }
1125
1126        let output_size = self.splice_in_pending();
1127        let mut sz = 0usize;
1128        let mut socket_res = SocketResult::Continue;
1129
1130        while socket_res == SocketResult::Continue {
1131            let pending = self.splice_in_pending();
1132            // no more data in pipe, stop here
1133            if pending == 0 {
1134                self.frontend_readiness.interest.insert(Ready::READABLE);
1135                self.backend_readiness.interest.remove(Ready::WRITABLE);
1136                count!(names::backend::BACK_BYTES_OUT, sz as i64);
1137                metrics.backend_bout += sz;
1138                return SessionResult::Continue;
1139            }
1140
1141            let pipe_read_end = self.splice_pipe.as_ref().unwrap().in_pipe[0];
1142            let (current_sz, current_res) = match self.backend_socket.as_ref() {
1143                Some(b) => splice::splice_out(pipe_read_end, b, pending),
1144                None => break,
1145            };
1146            // Draining more than `pending` would underflow `in_pipe_pending`.
1147            debug_assert!(
1148                current_sz <= pending,
1149                "splice_out drained {current_sz} bytes but only {pending} were pending (would underflow)"
1150            );
1151            socket_res = current_res;
1152            if current_sz > 0 {
1153                self.splice_pipe.as_mut().unwrap().in_pipe_pending -= current_sz;
1154                debug_assert_eq!(
1155                    self.splice_in_pending(),
1156                    pending - current_sz,
1157                    "in_pipe_pending must shrink by exactly the drained bytes"
1158                );
1159            }
1160            sz += current_sz;
1161            // Cumulative drain never exceeds what was pending at entry.
1162            debug_assert!(
1163                sz <= output_size,
1164                "cumulative splice drain ({sz}) exceeded the bytes pending at entry ({output_size})"
1165            );
1166
1167            if current_sz == 0 && current_res == SocketResult::Continue {
1168                self.backend_status = match self.backend_status {
1169                    ConnectionStatus::Normal => ConnectionStatus::ReadOpen,
1170                    ConnectionStatus::WriteOpen => ConnectionStatus::Closed,
1171                    s => s,
1172                };
1173            }
1174        }
1175
1176        count!(names::backend::BACK_BYTES_OUT, sz as i64);
1177        metrics.backend_bout += sz;
1178
1179        if !self.check_connections() {
1180            self.reset_readiness_for_close();
1181            self.log_request_success(metrics);
1182            return SessionResult::Close;
1183        }
1184
1185        debug!(
1186            "{} Spliced {} bytes of {}",
1187            log_context!(self),
1188            sz,
1189            output_size
1190        );
1191
1192        match socket_res {
1193            SocketResult::Error => {
1194                self.reset_readiness_for_close();
1195                self.log_request_error(metrics, "splice back socket write error");
1196                return SessionResult::Close;
1197            }
1198            SocketResult::Closed => {
1199                self.reset_readiness_for_close();
1200                self.log_request_success(metrics);
1201                return SessionResult::Close;
1202            }
1203            SocketResult::WouldBlock => {
1204                self.backend_readiness.event.remove(Ready::WRITABLE);
1205            }
1206            SocketResult::Continue => {}
1207        }
1208        SessionResult::Continue
1209    }
1210
1211    /// Zero-copy fast path of `backend_readable`: pull bytes off the
1212    /// backend socket into the kernel `out_pipe` via `splice(2)`, then
1213    /// mark the frontend writable so the data drains in the next event
1214    /// loop tick. Mirrors `backend_readable`'s status transitions and
1215    /// metric emissions.
1216    #[cfg(all(target_os = "linux", feature = "splice"))]
1217    fn splice_backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
1218        self.reset_timeouts();
1219
1220        trace!("{} Pipe splice_backend_readable", log_context!(self));
1221        let capacity = self.splice_capacity();
1222        if self.splice_out_pending() >= capacity {
1223            // Pipe is full — stop reading and let the frontend drain it.
1224            self.backend_readiness.interest.remove(Ready::READABLE);
1225            self.frontend_readiness.interest.insert(Ready::WRITABLE);
1226            return SessionResult::Continue;
1227        }
1228
1229        let pending_before = self.splice_out_pending();
1230        let backend_bin_before = metrics.backend_bin;
1231        let pipe_write_end = self.splice_pipe.as_ref().unwrap().out_pipe[1];
1232        let (size, remaining) = match self.backend_socket.as_ref() {
1233            Some(b) => splice::splice_in(b, pipe_write_end, capacity),
1234            None => return SessionResult::Continue,
1235        };
1236        // `splice_in` is capped at `len = capacity`, so the kernel never reports
1237        // moving more than that per call. As in `splice_readable`, we do NOT
1238        // assert `out_pipe_pending <= capacity`: a kernel pipe holds well beyond
1239        // its nominal `F_GETPIPE_SZ` when `splice(2)` moves skb-backed (GRO)
1240        // segments, so byte-occupancy legitimately exceeds `capacity` — it is
1241        // only the per-call `len` and a soft backpressure threshold.
1242        debug_assert!(
1243            size <= capacity,
1244            "splice_in reported {size} bytes but was capped at len {capacity}"
1245        );
1246
1247        debug!("{} Spliced {} bytes from backend", log_context!(self), size);
1248
1249        if remaining != SocketResult::Continue || size == 0 {
1250            self.backend_readiness.event.remove(Ready::READABLE);
1251        }
1252        if size > 0 {
1253            self.splice_pipe.as_mut().unwrap().out_pipe_pending += size;
1254            debug_assert_eq!(
1255                self.splice_out_pending(),
1256                pending_before + size,
1257                "out_pipe_pending must grow by exactly the spliced bytes"
1258            );
1259            self.frontend_readiness.interest.insert(Ready::WRITABLE);
1260            count!(names::backend::BACK_BYTES_IN, size as i64);
1261            metrics.backend_bin += size;
1262            debug_assert_eq!(
1263                metrics.backend_bin,
1264                backend_bin_before + size,
1265                "metrics.backend_bin must advance by exactly the spliced bytes"
1266            );
1267        }
1268
1269        if size == 0 && remaining == SocketResult::Closed {
1270            self.backend_status = match self.backend_status {
1271                ConnectionStatus::Normal => ConnectionStatus::WriteOpen,
1272                ConnectionStatus::ReadOpen => ConnectionStatus::Closed,
1273                s => s,
1274            };
1275
1276            if !self.check_connections() {
1277                self.reset_readiness_for_close();
1278                self.log_request_success(metrics);
1279                return SessionResult::Close;
1280            }
1281        }
1282
1283        match remaining {
1284            SocketResult::Error => {
1285                self.reset_readiness_for_close();
1286                self.log_request_error(metrics, "splice back socket read error");
1287                return SessionResult::Close;
1288            }
1289            SocketResult::Closed => {
1290                if !self.check_connections() {
1291                    self.reset_readiness_for_close();
1292                    self.log_request_success(metrics);
1293                    return SessionResult::Close;
1294                }
1295            }
1296            SocketResult::WouldBlock => {
1297                self.backend_readiness.event.remove(Ready::READABLE);
1298            }
1299            SocketResult::Continue => {}
1300        }
1301
1302        SessionResult::Continue
1303    }
1304
1305    pub fn log_context(&self) -> LogContext<'_> {
1306        LogContext {
1307            session_id: self.session_id,
1308            request_id: Some(self.request_id),
1309            cluster_id: self.cluster_id.as_deref(),
1310            backend_id: self.backend_id.as_deref(),
1311        }
1312    }
1313
1314    fn log_endpoint(&self) -> EndpointRecord<'_> {
1315        match &self.websocket_context {
1316            WebSocketContext::Http {
1317                method,
1318                authority,
1319                path,
1320                status,
1321                reason,
1322            } => EndpointRecord::Http {
1323                method: method.as_deref(),
1324                authority: authority.as_deref(),
1325                path: path.as_deref(),
1326                status: status.to_owned(),
1327                reason: reason.as_deref(),
1328            },
1329            WebSocketContext::Tcp => EndpointRecord::Tcp,
1330        }
1331    }
1332}
1333
1334impl<Front: SocketHandler, L: ListenerHandler> SessionState for Pipe<Front, L> {
1335    fn ready(
1336        &mut self,
1337        _session: Rc<RefCell<dyn crate::ProxySession>>,
1338        _proxy: Rc<RefCell<dyn crate::L7Proxy>>,
1339        metrics: &mut SessionMetrics,
1340    ) -> SessionResult {
1341        let mut counter = 0;
1342
1343        if self.frontend_readiness.event.is_hup() {
1344            return SessionResult::Close;
1345        }
1346
1347        while counter < MAX_LOOP_ITERATIONS {
1348            let frontend_interest = self.frontend_readiness.filter_interest();
1349            let backend_interest = self.backend_readiness.filter_interest();
1350
1351            trace!(
1352                "{} Frontend interest({:?}), backend interest({:?})",
1353                log_context!(self),
1354                frontend_interest,
1355                backend_interest
1356            );
1357            if frontend_interest.is_empty() && backend_interest.is_empty() {
1358                break;
1359            }
1360
1361            if self.backend_readiness.event.is_hup()
1362                && self.frontend_readiness.interest.is_writable()
1363                && !self.frontend_readiness.event.is_writable()
1364            {
1365                break;
1366            }
1367
1368            if frontend_interest.is_readable() && self.readable(metrics) == SessionResult::Close {
1369                return SessionResult::Close;
1370            }
1371
1372            if backend_interest.is_writable()
1373                && self.backend_writable(metrics) == SessionResult::Close
1374            {
1375                return SessionResult::Close;
1376            }
1377
1378            if backend_interest.is_readable()
1379                && self.backend_readable(metrics) == SessionResult::Close
1380            {
1381                return SessionResult::Close;
1382            }
1383
1384            if frontend_interest.is_writable() && self.writable(metrics) == SessionResult::Close {
1385                return SessionResult::Close;
1386            }
1387
1388            if backend_interest.is_hup() && self.backend_hup(metrics) == SessionResult::Close {
1389                return SessionResult::Close;
1390            }
1391
1392            if frontend_interest.is_error() {
1393                error!(
1394                    "{} Frontend socket error, disconnecting",
1395                    log_context!(self)
1396                );
1397
1398                self.frontend_readiness.interest = Ready::EMPTY;
1399                self.backend_readiness.interest = Ready::EMPTY;
1400
1401                return SessionResult::Close;
1402            }
1403
1404            if backend_interest.is_error() && self.backend_hup(metrics) == SessionResult::Close {
1405                self.frontend_readiness.interest = Ready::EMPTY;
1406                self.backend_readiness.interest = Ready::EMPTY;
1407
1408                error!("{} Backend socket error, disconnecting", log_context!(self));
1409                return SessionResult::Close;
1410            }
1411
1412            counter += 1;
1413        }
1414
1415        if counter >= MAX_LOOP_ITERATIONS {
1416            error!(
1417                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
1418                log_context!(self),
1419                MAX_LOOP_ITERATIONS
1420            );
1421
1422            incr!(names::http::INFINITE_LOOP_ERROR);
1423            self.print_state(self.protocol_string());
1424
1425            return SessionResult::Close;
1426        }
1427
1428        SessionResult::Continue
1429    }
1430
1431    fn update_readiness(&mut self, token: Token, events: Ready) {
1432        if self.frontend_token == token {
1433            self.frontend_readiness.event |= events;
1434        } else if self.backend_token == Some(token) {
1435            self.backend_readiness.event |= events;
1436        }
1437    }
1438
1439    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
1440        //info!("got timeout for token: {:?}", token);
1441        if self.frontend_token == token {
1442            self.log_request_timeout(metrics, "frontend socket timeout");
1443            if let Some(timeout) = self.container_frontend_timeout.as_mut() {
1444                timeout.triggered()
1445            }
1446            return StateResult::CloseSession;
1447        }
1448
1449        if self.backend_token == Some(token) {
1450            //info!("backend timeout triggered for token {:?}", token);
1451            if let Some(timeout) = self.container_backend_timeout.as_mut() {
1452                timeout.triggered()
1453            }
1454
1455            self.log_request_timeout(metrics, "backend socket timeout");
1456            return StateResult::CloseSession;
1457        }
1458
1459        error!("{} Got timeout for an invalid token", log_context!(self));
1460        self.log_request_error(metrics, "invalid token timeout");
1461        StateResult::CloseSession
1462    }
1463
1464    fn cancel_timeouts(&mut self) {
1465        self.container_frontend_timeout.as_mut().map(|t| t.cancel());
1466        self.container_backend_timeout.as_mut().map(|t| t.cancel());
1467    }
1468
1469    fn close(&mut self, _proxy: Rc<RefCell<dyn L7Proxy>>, _metrics: &mut SessionMetrics) {
1470        if let Some(backend) = self.backend.as_mut() {
1471            let mut backend = backend.borrow_mut();
1472            backend.active_requests = backend.active_requests.saturating_sub(1);
1473        }
1474    }
1475
1476    fn print_state(&self, context: &str) {
1477        error!(
1478            "\
1479{} {} Session(Pipe)
1480\tFrontend:
1481\t\ttoken: {:?}\treadiness: {:?}
1482\tBackend:
1483\t\ttoken: {:?}\treadiness: {:?}",
1484            log_context!(self),
1485            context,
1486            self.frontend_token,
1487            self.frontend_readiness,
1488            self.backend_token,
1489            self.backend_readiness
1490        );
1491    }
1492}