Skip to main content

sozu_lib/protocol/kawa_h1/
mod.rs

1//! HTTP/1.1 session state on top of Kawa.
2//!
3//! Owns the H1 session state machine: Kawa-driven request/response parsing,
4//! header rewriting via `editor.rs`, default-answer rendering via
5//! `answers.rs`, and route lookup via the listener
6//! (`cluster_id_from_request`). Long-form lifecycle:
7//! `lib/src/protocol/kawa_h1/LIFECYCLE.md` (created in this changeset).
8
9pub mod answers;
10pub mod diagnostics;
11pub mod editor;
12pub mod parser;
13
14use std::{
15    cell::RefCell,
16    io::ErrorKind,
17    net::{Shutdown, SocketAddr},
18    rc::{Rc, Weak},
19    time::{Duration, Instant},
20};
21
22use mio::{Interest, Token, net::TcpStream};
23use rusty_ulid::Ulid;
24use sozu_command::{
25    config::MAX_LOOP_ITERATIONS,
26    logging::EndpointRecord,
27    proto::command::{Event, EventKind, ListenerType},
28};
29
30// use time::{Duration, Instant};
31use crate::metrics::names;
32use crate::{
33    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
34    FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession,
35    Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult,
36    StateResult,
37    backends::{Backend, BackendError},
38    pool::{Checkout, Pool},
39    protocol::{
40        SessionState,
41        http::{
42            answers::DefaultAnswerStream,
43            diagnostics::{diagnostic_400_502, diagnostic_413_507},
44            editor::HttpContext,
45            parser::Method,
46        },
47        pipe::WebSocketContext,
48    },
49    retry::RetryPolicy,
50    server::{CONN_RETRIES, push_event},
51    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
52    sozu_command::{
53        logging::{LogContext, ansi_palette},
54        ready::Ready,
55    },
56    timer::TimeoutContainer,
57};
58
59/// This macro is defined uniquely in this module to help the tracking of
60/// kawa h1 issues inside Sōzu. Colored output: bold bright-white protocol
61/// label (uniform across every protocol), light grey `Session` keyword, gray
62/// keys, bright white values. The `[ulid - - -]` context comes first to stay
63/// aligned with `MUX-*` and `SOCKET` log lines.
64///
65/// Two invocation forms:
66/// - `log_context!(self)` — reads the response parsing phase via
67///   [`Http::response_parsing_phase`]. Use when no `&mut self.response_stream`
68///   borrow is currently live.
69/// - `log_context!(self, response_phase)` — caller supplies an already-resolved
70///   `Option<kawa::ParsingPhase>`. Use when a mutable borrow of
71///   `self.response_stream` (or its `BackendAnswer` inner) is live — pass
72///   `Some(inner.parsing_phase)` through the existing mutable reference.
73macro_rules! log_context {
74    ($self:expr) => {
75        log_context!($self, $self.response_parsing_phase())
76    };
77    ($self:expr, $response_phase:expr) => {{
78        let (open, reset, grey, gray, white) = ansi_palette();
79        format!(
80            "{gray}{ctx}{reset}\t{open}KAWA-H1{reset}\t{grey}Session{reset}({gray}public{reset}={white}{public}{reset}, {gray}session{reset}={white}{session}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}request_parsing_phase{reset}={white}{request_parsing_phase:?}{reset}, {gray}response_parsing_phase{reset}={white}{response_parsing_phase:?}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
81            open = open,
82            reset = reset,
83            grey = grey,
84            gray = gray,
85            white = white,
86            ctx = $self.context.log_context(),
87            public = $self.context.public_address.to_string(),
88            session = $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
89            frontend = $self.frontend_token.0,
90            request_parsing_phase = $self.request_stream.parsing_phase,
91            response_parsing_phase = $response_phase,
92            frontend_readiness = $self.frontend_readiness,
93            backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
94            backend_readiness = $self.backend_readiness,
95        )
96    }};
97}
98
99/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
100type GenericHttpStream = kawa::Kawa<Checkout>;
101
102impl kawa::AsBuffer for Checkout {
103    fn as_buffer(&self) -> &[u8] {
104        self.inner.extra()
105    }
106    fn as_mut_buffer(&mut self) -> &mut [u8] {
107        self.inner.extra_mut()
108    }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum DefaultAnswer {
113    Answer301 {
114        location: String,
115    },
116    /// RFC 9110 §15.4.3 — temporary redirect; user agents may rewrite POST
117    /// to GET on follow. Surfaced by `RedirectPolicy::Found` (#1009).
118    Answer302 {
119        location: String,
120    },
121    /// RFC 9110 §15.4.9 — permanent redirect that PRESERVES the request
122    /// method on follow (no GET-rewrite on POST). Surfaced by
123    /// `RedirectPolicy::PermanentRedirect` (#1009).
124    Answer308 {
125        location: String,
126    },
127    Answer400 {
128        message: String,
129        phase: kawa::ParsingPhaseMarker,
130        successfully_parsed: String,
131        partially_parsed: String,
132        invalid: String,
133    },
134    Answer401 {
135        /// Optional `WWW-Authenticate` value (e.g. `Basic realm="…"`).
136        /// When `None` or empty the realm header line is elided by the
137        /// template engine so the response stays a bare 401.
138        www_authenticate: Option<String>,
139    },
140    Answer404 {},
141    Answer408 {
142        duration: String,
143    },
144    Answer413 {
145        message: String,
146        phase: kawa::ParsingPhaseMarker,
147        capacity: usize,
148    },
149    /// RFC 9110 §15.5.20 — returned when the request's `:authority` / `Host`
150    /// host does not match the TLS SNI negotiated for this connection.
151    /// The peer may retry on a fresh TLS connection that negotiates an SNI
152    /// matching the authority.
153    Answer421 {},
154    /// RFC 6585 §4 — emitted when the per-(cluster, source-IP) connection
155    /// limit is reached. `retry_after` is the suggested wait, in seconds;
156    /// `None` (or `Some(0)`) tells the template engine to omit the header
157    /// — `Retry-After: 0` invites an immediate retry that defeats the
158    /// limit.
159    Answer429 {
160        retry_after: Option<u32>,
161    },
162    Answer502 {
163        message: String,
164        phase: kawa::ParsingPhaseMarker,
165        successfully_parsed: String,
166        partially_parsed: String,
167        invalid: String,
168    },
169    Answer503 {
170        message: String,
171    },
172    Answer504 {
173        duration: String,
174    },
175    Answer507 {
176        phase: kawa::ParsingPhaseMarker,
177        message: String,
178        capacity: usize,
179    },
180}
181
182impl From<&DefaultAnswer> for u16 {
183    fn from(answer: &DefaultAnswer) -> u16 {
184        match answer {
185            DefaultAnswer::Answer301 { .. } => 301,
186            DefaultAnswer::Answer302 { .. } => 302,
187            DefaultAnswer::Answer308 { .. } => 308,
188            DefaultAnswer::Answer400 { .. } => 400,
189            DefaultAnswer::Answer401 { .. } => 401,
190            DefaultAnswer::Answer404 { .. } => 404,
191            DefaultAnswer::Answer408 { .. } => 408,
192            DefaultAnswer::Answer413 { .. } => 413,
193            DefaultAnswer::Answer421 { .. } => 421,
194            DefaultAnswer::Answer429 { .. } => 429,
195            DefaultAnswer::Answer502 { .. } => 502,
196            DefaultAnswer::Answer503 { .. } => 503,
197            DefaultAnswer::Answer504 { .. } => 504,
198            DefaultAnswer::Answer507 { .. } => 507,
199        }
200    }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum TimeoutStatus {
205    Request,
206    Response,
207    WaitingForNewRequest,
208    WaitingForResponse,
209}
210
211pub enum ResponseStream {
212    BackendAnswer(GenericHttpStream),
213    DefaultAnswer(u16, DefaultAnswerStream),
214}
215
216/// Http will be contained in State which itself is contained by Session
217pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
218    answers: Rc<RefCell<answers::HttpAnswers>>,
219    pub backend: Option<Rc<RefCell<Backend>>>,
220    backend_connection_status: BackendConnectionStatus,
221    pub backend_readiness: Readiness,
222    pub backend_socket: Option<TcpStream>,
223    backend_stop: Option<Instant>,
224    pub backend_token: Option<Token>,
225    pub container_backend_timeout: TimeoutContainer,
226    pub container_frontend_timeout: TimeoutContainer,
227    configured_backend_timeout: Duration,
228    configured_connect_timeout: Duration,
229    configured_frontend_timeout: Duration,
230    /// attempts to connect to the backends during the session
231    connection_attempts: u8,
232    pub frontend_readiness: Readiness,
233    pub frontend_socket: Front,
234    frontend_token: Token,
235    keepalive_count: usize,
236    listener: Rc<RefCell<L>>,
237    pub request_stream: GenericHttpStream,
238    pub response_stream: ResponseStream,
239    /// The HTTP context was separated from the State for borrowing reasons.
240    /// Calling a kawa parser mutably borrows the State through request_stream or response_stream,
241    /// so Http can't be borrowed again to be used in callbacks. HttContext is an independant
242    /// subsection of Http that can be mutably borrowed for parser callbacks.
243    pub context: HttpContext,
244}
245
246impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
247    /// Instantiate a new HTTP SessionState with:
248    ///
249    /// - frontend_interest: READABLE | HUP | ERROR
250    /// - frontend_event: EMPTY
251    /// - backend_interest: EMPTY
252    /// - backend_event: EMPTY
253    ///
254    /// Remember to set the events from the previous State!
255    #[allow(clippy::too_many_arguments)]
256    pub fn new(
257        answers: Rc<RefCell<answers::HttpAnswers>>,
258        configured_backend_timeout: Duration,
259        configured_connect_timeout: Duration,
260        configured_frontend_timeout: Duration,
261        container_frontend_timeout: TimeoutContainer,
262        frontend_socket: Front,
263        frontend_token: Token,
264        listener: Rc<RefCell<L>>,
265        pool: Weak<RefCell<Pool>>,
266        protocol: Protocol,
267        public_address: SocketAddr,
268        session_id: Ulid,
269        request_id: Ulid,
270        session_address: Option<SocketAddr>,
271        sticky_name: String,
272    ) -> Result<Http<Front, L>, AcceptError> {
273        let (front_buffer, back_buffer) = match pool.upgrade() {
274            Some(pool) => {
275                let mut pool = pool.borrow_mut();
276                match (pool.checkout(), pool.checkout()) {
277                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
278                    _ => return Err(AcceptError::BufferCapacityReached),
279                }
280            }
281            None => return Err(AcceptError::BufferCapacityReached),
282        };
283        let sozu_id_header = listener.borrow().get_sozu_id_header().to_string();
284        let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
285        let send_x_real_ip = listener.borrow().get_send_x_real_ip();
286        Ok(Http {
287            answers,
288            backend_connection_status: BackendConnectionStatus::NotConnected,
289            backend_readiness: Readiness::new(),
290            backend_socket: None,
291            backend_stop: None,
292            backend_token: None,
293            backend: None,
294            configured_backend_timeout,
295            configured_connect_timeout,
296            configured_frontend_timeout,
297            connection_attempts: 0,
298            container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
299            container_frontend_timeout,
300            frontend_readiness: Readiness {
301                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
302                event: Ready::EMPTY,
303            },
304            frontend_socket,
305            frontend_token,
306            keepalive_count: 0,
307            listener,
308            request_stream: GenericHttpStream::new(
309                kawa::Kind::Request,
310                kawa::Buffer::new(front_buffer),
311            ),
312            response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
313                kawa::Kind::Response,
314                kawa::Buffer::new(back_buffer),
315            )),
316            context: HttpContext::new(
317                session_id,
318                request_id,
319                protocol,
320                public_address,
321                session_address,
322                sticky_name,
323                sozu_id_header,
324                elide_x_real_ip,
325                send_x_real_ip,
326            ),
327        })
328    }
329
330    /// Reset the connection in case of keep-alive to be ready for the next request
331    pub fn reset(&mut self) {
332        trace!("{} ============== reset", log_context!(self));
333        let response_stream = match &mut self.response_stream {
334            ResponseStream::BackendAnswer(response_stream) => response_stream,
335            _ => return,
336        };
337
338        self.context.id = Ulid::generate();
339        self.context.reset();
340
341        self.request_stream.clear();
342        response_stream.clear();
343        // Pair-assert the keep-alive counter advances by exactly one per
344        // completed request: it gates the `WaitingForNewRequest` timeout-status
345        // branch and the keepalive-vs-error accounting in `readable`. A
346        // double-increment or stall would mis-attribute the next idle close.
347        let keepalive_before = self.keepalive_count;
348        self.keepalive_count += 1;
349        debug_assert_eq!(
350            self.keepalive_count,
351            keepalive_before + 1,
352            "reset() must advance keepalive_count by exactly one"
353        );
354        // The response side has just been cleared, so it is back to its
355        // initial parsing phase — `writable` must not see a half-consumed
356        // response when the next request starts.
357        debug_assert!(
358            response_stream.is_initial(),
359            "response stream must be reset to initial on keep-alive"
360        );
361        gauge_add!(names::http::ACTIVE_REQUESTS, -1);
362
363        if let Some(backend) = &mut self.backend {
364            let mut backend = backend.borrow_mut();
365            backend.active_requests = backend.active_requests.saturating_sub(1);
366        }
367
368        // reset the front timeout and cancel the back timeout while we are
369        // waiting for a new request
370        self.container_backend_timeout.cancel();
371        self.container_frontend_timeout
372            .set_duration(self.configured_frontend_timeout);
373        self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
374        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
375
376        // We are resetting the offset of request and response stream buffers
377        // We do have to keep cursor position on the request, if there is data
378        // in the request stream to preserve http pipelining.
379
380        // Print the left-over response buffer output to track in which case it
381        // may happens
382        let response_phase = response_stream.parsing_phase;
383        let response_storage = &mut response_stream.storage;
384        if !response_storage.is_empty() {
385            warn!(
386                "{} Leftover fragment from response: {}",
387                log_context!(self, Some(response_phase)),
388                parser::view(
389                    response_storage.used(),
390                    16,
391                    &[response_storage.start, response_storage.end,],
392                )
393            );
394        }
395
396        response_storage.clear();
397        if !self.request_stream.storage.is_empty() {
398            self.frontend_readiness.event.insert(Ready::READABLE);
399        } else {
400            self.request_stream.storage.clear();
401        }
402    }
403
404    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
405        trace!("{} ============== readable", log_context!(self));
406        if !self.container_frontend_timeout.reset() {
407            error!(
408                "could not reset front timeout {:?}",
409                self.configured_frontend_timeout
410            );
411            self.print_state(self.protocol_string());
412        }
413
414        let response_stream = match &mut self.response_stream {
415            ResponseStream::BackendAnswer(response_stream) => response_stream,
416            ResponseStream::DefaultAnswer(..) => {
417                error!(
418                    "{} Sending default answer, should not read from frontend socket",
419                    log_context!(self)
420                );
421
422                self.frontend_readiness.interest.remove(Ready::READABLE);
423                self.frontend_readiness.interest.insert(Ready::WRITABLE);
424                return StateResult::Continue;
425            }
426        };
427
428        if self.request_stream.storage.is_full() {
429            self.frontend_readiness.interest.remove(Ready::READABLE);
430            if self.request_stream.is_main_phase() {
431                self.backend_readiness.interest.insert(Ready::WRITABLE);
432            } else {
433                // client has filled its buffer and we can't empty it
434                self.set_answer(DefaultAnswer::Answer413 {
435                    capacity: self.request_stream.storage.capacity(),
436                    phase: self.request_stream.parsing_phase.marker(),
437                    message: diagnostic_413_507(self.request_stream.parsing_phase),
438                });
439            }
440            return StateResult::Continue;
441        }
442
443        let available_space = self.request_stream.storage.available_space();
444        let (size, socket_state) = self
445            .frontend_socket
446            .socket_read(self.request_stream.storage.space());
447
448        debug!(
449            "{} Read {} bytes",
450            log_context!(self, Some(response_stream.parsing_phase)),
451            size
452        );
453
454        // A read can never deliver more bytes than the buffer slice we handed
455        // it; `storage.fill(size)` below trusts this to advance the write
456        // cursor, and an over-long `size` would push the cursor past the
457        // buffer end (OOB on the next parse).
458        debug_assert!(
459            size <= available_space,
460            "socket_read returned {size} bytes for a {available_space}-byte buffer slice"
461        );
462
463        if size > 0 {
464            let used_before = self.request_stream.storage.used().len();
465            self.request_stream.storage.fill(size);
466            debug_assert_eq!(
467                self.request_stream.storage.used().len(),
468                used_before + size,
469                "storage.fill(size) must grow the used region by exactly the bytes read"
470            );
471            count!(names::backend::BYTES_IN, size as i64);
472            metrics.bin += size;
473            // if self.kawa_request.storage.is_full() {
474            //     self.frontend_readiness.interest.remove(Ready::READABLE);
475            // }
476        } else {
477            self.frontend_readiness.event.remove(Ready::READABLE);
478        }
479
480        match socket_state {
481            SocketResult::Error | SocketResult::Closed => {
482                if self.request_stream.is_initial() {
483                    // count an error if we were waiting for the first request
484                    // otherwise, if we already had one completed request and response,
485                    // and are waiting for the next one, we do not count a socket
486                    // closing abruptly as an error
487                    if self.keepalive_count == 0 {
488                        self.frontend_socket.read_error();
489                    }
490                } else {
491                    self.frontend_socket.read_error();
492                    self.log_request_error(
493                        metrics,
494                        &format!(
495                            "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
496                            self.frontend_readiness,
497                            self.backend_readiness,
498                        )
499                    );
500                }
501                return StateResult::CloseSession;
502            }
503            SocketResult::WouldBlock => {
504                self.frontend_readiness.event.remove(Ready::READABLE);
505            }
506            SocketResult::Continue => {}
507        };
508
509        trace!(
510            "{} ============== readable_parse",
511            log_context!(self, Some(response_stream.parsing_phase))
512        );
513        let was_initial = self.request_stream.is_initial();
514        let was_not_proxying = !self.request_stream.is_main_phase();
515
516        kawa::h1::parse(&mut self.request_stream, &mut self.context);
517        // kawa::debug_kawa(&self.request_stream);
518
519        if was_initial && !self.request_stream.is_initial() {
520            // if it was the first request, the front timeout duration
521            // was set to request_timeout, which is much lower. For future
522            // requests on this connection, we can wait a bit more
523            self.container_frontend_timeout
524                .set_duration(self.configured_frontend_timeout);
525            gauge_add!(names::http::ACTIVE_REQUESTS, 1);
526            incr!(names::http::REQUESTS);
527        }
528
529        if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
530            incr!(names::http::FRONTEND_PARSE_ERRORS);
531            warn!(
532                "{} Parsing request error in {:?}: {}",
533                log_context!(self, Some(response_stream.parsing_phase)),
534                marker,
535                match kind {
536                    kawa::ParsingErrorKind::Consuming { index } => {
537                        let kawa = &self.request_stream;
538                        parser::view(
539                            kawa.storage.used(),
540                            16,
541                            &[
542                                kawa.storage.start,
543                                kawa.storage.head,
544                                index as usize,
545                                kawa.storage.end,
546                            ],
547                        )
548                    }
549                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
550                }
551            );
552            if response_stream.consumed {
553                self.log_request_error(metrics, "Parsing error on the request");
554                return StateResult::CloseSession;
555            } else {
556                let (message, successfully_parsed, partially_parsed, invalid) =
557                    diagnostic_400_502(marker, kind, &self.request_stream);
558                self.set_answer(DefaultAnswer::Answer400 {
559                    message,
560                    phase: marker,
561                    successfully_parsed,
562                    partially_parsed,
563                    invalid,
564                });
565                return StateResult::Continue;
566            }
567        }
568
569        if self.request_stream.is_main_phase() {
570            self.backend_readiness.interest.insert(Ready::WRITABLE);
571            if was_not_proxying {
572                // Sozu tries to connect only once all the headers were gathered and edited
573                // this could be improved
574                trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
575                return StateResult::ConnectBackend;
576            }
577        }
578        if self.request_stream.is_terminated() {
579            self.frontend_readiness.interest.remove(Ready::READABLE);
580        }
581
582        StateResult::Continue
583    }
584
585    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
586        trace!("{} ============== writable", log_context!(self));
587        let response_stream = match &mut self.response_stream {
588            ResponseStream::BackendAnswer(response_stream) => response_stream,
589            _ => return self.writable_default_answer(metrics),
590        };
591
592        response_stream.prepare(&mut kawa::h1::BlockConverter);
593
594        let bufs = response_stream.as_io_slice();
595        if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
596            self.frontend_readiness.interest.remove(Ready::WRITABLE);
597            // do not shortcut, response might have been terminated without anything more to send
598        }
599
600        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
601
602        debug!(
603            "{} Wrote {} bytes",
604            log_context!(self, Some(response_stream.parsing_phase)),
605            size
606        );
607
608        if size > 0 {
609            // A vectored write can only have flushed bytes that were actually
610            // queued in `bufs`; consuming more than we sent would advance the
611            // kawa output cursor past the data the peer received and truncate
612            // the next chunk. (`as_io_slice` is built from the same stream we
613            // now consume from.)
614            let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
615            debug_assert!(
616                size <= queued,
617                "socket_write_vectored reported {size} bytes for {queued} queued"
618            );
619            response_stream.consume(size);
620            count!(names::backend::BYTES_OUT, size as i64);
621            metrics.bout += size;
622            self.backend_readiness.interest.insert(Ready::READABLE);
623        }
624
625        match socket_state {
626            SocketResult::Error | SocketResult::Closed => {
627                self.frontend_socket.write_error();
628                self.log_request_error(
629                    metrics,
630                    &format!(
631                        "front socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
632                        self.frontend_readiness,
633                        self.backend_readiness,
634                    ),
635                );
636                return StateResult::CloseSession;
637            }
638            SocketResult::WouldBlock => {
639                self.frontend_readiness.event.remove(Ready::WRITABLE);
640            }
641            SocketResult::Continue => {}
642        }
643
644        if self.frontend_socket.socket_wants_write() {
645            return StateResult::Continue;
646        }
647
648        if response_stream.is_terminated() && response_stream.is_completed() {
649            if self.context.closing {
650                debug!("{} closing proxy, no keep alive", log_context!(self));
651                self.log_request_success(metrics);
652                return StateResult::CloseSession;
653            }
654
655            match response_stream.detached.status_line {
656                kawa::StatusLine::Response { code: 101, .. } => {
657                    trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
658                    self.log_request_success(metrics);
659                    return StateResult::Upgrade;
660                }
661                kawa::StatusLine::Response { code: 100, .. } => {
662                    trace!(
663                        "{} ============== HANDLE CONTINUE!",
664                        log_context!(self, Some(response_stream.parsing_phase))
665                    );
666                    response_stream.clear();
667                    self.log_request_success(metrics);
668                    return StateResult::Continue;
669                }
670                kawa::StatusLine::Response { code: 103, .. } => {
671                    self.backend_readiness.event.insert(Ready::READABLE);
672                    trace!(
673                        "{} ============== HANDLE EARLY HINT!",
674                        log_context!(self, Some(response_stream.parsing_phase))
675                    );
676                    response_stream.clear();
677                    self.log_request_success(metrics);
678                    return StateResult::Continue;
679                }
680                _ => (),
681            }
682
683            let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
684            let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
685            if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
686                && request_length_known
687            {
688                error!(
689                    "{} Response terminated before request, this case is not handled properly yet",
690                    log_context!(self)
691                );
692                incr!(names::http::EARLY_RESPONSE_CLOSE);
693                // FIXME: this will cause problems with pipelining
694                // return StateResult::CloseSession;
695            }
696
697            // FIXME: we could get smarter about this
698            // with no keepalive on backend, we could open a new backend ConnectionError
699            // with no keepalive on front but keepalive on backend, we could have
700            // a pool of connections
701            trace!(
702                "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
703                log_context!(self),
704                self.context.keep_alive_frontend,
705                self.context.keep_alive_backend,
706                response_length_known
707            );
708
709            self.log_request_success(metrics);
710            return match (
711                self.context.keep_alive_frontend,
712                self.context.keep_alive_backend,
713                response_length_known,
714            ) {
715                (true, true, true) => {
716                    debug!("{} Keep alive frontend/backend", log_context!(self));
717                    metrics.reset();
718                    self.reset();
719                    StateResult::Continue
720                }
721                (true, false, true) => {
722                    debug!("{} Keep alive frontend", log_context!(self));
723                    metrics.reset();
724                    self.reset();
725                    StateResult::CloseBackend
726                }
727                _ => {
728                    debug!("{} No keep alive", log_context!(self));
729                    StateResult::CloseSession
730                }
731            };
732        }
733        StateResult::Continue
734    }
735
736    fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
737        trace!(
738            "{} ============== writable_default_answer",
739            log_context!(self)
740        );
741        let response_stream = match &mut self.response_stream {
742            ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
743            _ => return StateResult::CloseSession,
744        };
745        let bufs = response_stream.as_io_slice();
746        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
747
748        // Never consume more of the rendered default answer than the socket
749        // actually accepted, or the next flush would skip bytes and ship a
750        // truncated error page.
751        let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
752        debug_assert!(
753            size <= queued,
754            "default-answer write reported {size} bytes for {queued} queued"
755        );
756        count!(names::backend::BYTES_OUT, size as i64);
757        metrics.bout += size;
758        response_stream.consume(size);
759
760        if size == 0 || socket_state != SocketResult::Continue {
761            self.frontend_readiness.event.remove(Ready::WRITABLE);
762        }
763
764        if response_stream.is_completed() {
765            save_http_status_metric(self.context.status, self.context.log_context());
766            self.log_default_answer_success(metrics);
767            self.frontend_readiness.reset();
768            self.backend_readiness.reset();
769            return StateResult::CloseSession;
770        }
771
772        if socket_state == SocketResult::Error {
773            self.frontend_socket.write_error();
774            self.log_request_error(
775                metrics,
776                "error writing default answer to front socket, closing",
777            );
778            StateResult::CloseSession
779        } else {
780            StateResult::Continue
781        }
782    }
783
784    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
785        trace!("{} ============== backend_writable", log_context!(self));
786        if let ResponseStream::DefaultAnswer(..) = self.response_stream {
787            error!(
788                "{}\tsending default answer, should not write to back",
789                log_context!(self)
790            );
791            self.backend_readiness.interest.remove(Ready::WRITABLE);
792            self.frontend_readiness.interest.insert(Ready::WRITABLE);
793            return SessionResult::Continue;
794        }
795
796        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
797            backend_socket
798        } else {
799            self.log_request_error(metrics, "back socket not found, closing session");
800            return SessionResult::Close;
801        };
802
803        self.request_stream.prepare(&mut kawa::h1::BlockConverter);
804
805        let bufs = self.request_stream.as_io_slice();
806        if bufs.is_empty() {
807            self.backend_readiness.interest.remove(Ready::WRITABLE);
808            return SessionResult::Continue;
809        }
810
811        let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
812        debug!("{} Wrote {} bytes", log_context!(self), size);
813
814        if size > 0 {
815            // Consume only what the backend socket actually accepted from the
816            // queued request slices — over-consuming would drop request bytes
817            // the backend never saw (silent request truncation / smuggling).
818            let queued = bufs.iter().map(|b| b.len()).sum::<usize>();
819            debug_assert!(
820                size <= queued,
821                "backend socket_write_vectored reported {size} bytes for {queued} queued"
822            );
823            self.request_stream.consume(size);
824            count!(names::backend::BACK_BYTES_OUT, size as i64);
825            metrics.backend_bout += size;
826            self.frontend_readiness.interest.insert(Ready::READABLE);
827            self.backend_readiness.interest.insert(Ready::READABLE);
828        } else {
829            self.backend_readiness.event.remove(Ready::WRITABLE);
830        }
831
832        match socket_state {
833            // the back socket is not writable anymore, so we can drop
834            // the front buffer, no more data can be transmitted.
835            // But the socket might still be readable, or if it is
836            // closed, we might still have some data in the buffer.
837            // As an example, we can get an early response for a large
838            // POST request to refuse it and prevent all of the data
839            // from being transmitted, with the backend server closing
840            // the socket right after sending the response
841            // FIXME: shouldn't we test for response_state then?
842            SocketResult::Error | SocketResult::Closed => {
843                self.frontend_readiness.interest.remove(Ready::READABLE);
844                self.backend_readiness.interest.remove(Ready::WRITABLE);
845                return SessionResult::Continue;
846            }
847            SocketResult::WouldBlock => {
848                self.backend_readiness.event.remove(Ready::WRITABLE);
849            }
850            SocketResult::Continue => {}
851        }
852
853        if self.request_stream.is_terminated() && self.request_stream.is_completed() {
854            self.backend_readiness.interest.remove(Ready::WRITABLE);
855
856            // cancel the front timeout while we are waiting for the server to answer
857            self.container_frontend_timeout.cancel();
858            self.container_backend_timeout.reset();
859        }
860        SessionResult::Continue
861    }
862
863    // Read content from cluster
864    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
865        trace!("{} ============== backend_readable", log_context!(self));
866        if !self.container_backend_timeout.reset() {
867            error!(
868                "{} Could not reset back timeout {:?}",
869                log_context!(self),
870                self.configured_backend_timeout
871            );
872            self.print_state(self.protocol_string());
873        }
874
875        let response_stream = match &mut self.response_stream {
876            ResponseStream::BackendAnswer(response_stream) => response_stream,
877            _ => {
878                error!(
879                    "{} Sending default answer, should not read from backend socket",
880                    log_context!(self),
881                );
882
883                self.backend_readiness.interest.remove(Ready::READABLE);
884                self.frontend_readiness.interest.insert(Ready::WRITABLE);
885                return SessionResult::Continue;
886            }
887        };
888
889        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
890            backend_socket
891        } else {
892            self.log_request_error(metrics, "back socket not found, closing session");
893            return SessionResult::Close;
894        };
895
896        if response_stream.storage.is_full() {
897            self.backend_readiness.interest.remove(Ready::READABLE);
898            if response_stream.is_main_phase() {
899                self.frontend_readiness.interest.insert(Ready::WRITABLE);
900            } else {
901                // server has filled its buffer and we can't empty it
902                let capacity = response_stream.storage.capacity();
903                let phase = response_stream.parsing_phase.marker();
904                let message = diagnostic_413_507(response_stream.parsing_phase);
905                self.set_answer(DefaultAnswer::Answer507 {
906                    capacity,
907                    phase,
908                    message,
909                });
910            }
911            return SessionResult::Continue;
912        }
913
914        let available_space = response_stream.storage.available_space();
915        let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
916        debug!(
917            "{} Read {} bytes",
918            log_context!(self, Some(response_stream.parsing_phase)),
919            size
920        );
921
922        // Same buffer-slice bound as the frontend read path: a backend read
923        // can never exceed the space slice we handed it, or `fill(size)` would
924        // push the response write cursor past the buffer end.
925        debug_assert!(
926            size <= available_space,
927            "backend socket_read returned {size} bytes for a {available_space}-byte slice"
928        );
929
930        if size > 0 {
931            let used_before = response_stream.storage.used().len();
932            response_stream.storage.fill(size);
933            debug_assert_eq!(
934                response_stream.storage.used().len(),
935                used_before + size,
936                "response storage.fill(size) must grow the used region by exactly the bytes read"
937            );
938            count!(names::backend::BACK_BYTES_IN, size as i64);
939            metrics.backend_bin += size;
940            // if self.kawa_response.storage.is_full() {
941            //     self.backend_readiness.interest.remove(Ready::READABLE);
942            // }
943
944            // In case the response starts while the request is not tagged as "terminated",
945            // we place the timeout responsibility on the backend.
946            // This can happen when:
947            // - the request is malformed and doesn't have length information
948            // - kawa fails to detect a properly terminated request (e.g. a GET request with no body and no length)
949            // - the response can start before the end of the request (e.g. stream processing like compression)
950            self.container_frontend_timeout.cancel();
951        } else {
952            self.backend_readiness.event.remove(Ready::READABLE);
953        }
954
955        // TODO: close delimited and backend_hup should be handled better
956        match socket_state {
957            SocketResult::Error => {
958                backend_socket.read_error();
959                self.log_request_error(
960                    metrics,
961                    &format!(
962                        "back socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
963                        self.frontend_readiness,
964                        self.backend_readiness,
965                    ),
966                );
967                return SessionResult::Close;
968            }
969            SocketResult::WouldBlock | SocketResult::Closed => {
970                self.backend_readiness.event.remove(Ready::READABLE);
971            }
972            SocketResult::Continue => {}
973        }
974
975        trace!(
976            "{} ============== backend_readable_parse",
977            log_context!(self, Some(response_stream.parsing_phase))
978        );
979        kawa::h1::parse(response_stream, &mut self.context);
980        // kawa::debug_kawa(&self.response_stream);
981
982        if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
983            incr!(names::http::BACKEND_PARSE_ERRORS);
984            warn!(
985                "{} Parsing response error in {:?}: {}",
986                log_context!(self, Some(response_stream.parsing_phase)),
987                marker,
988                match kind {
989                    kawa::ParsingErrorKind::Consuming { index } => {
990                        parser::view(
991                            response_stream.storage.used(),
992                            16,
993                            &[
994                                response_stream.storage.start,
995                                response_stream.storage.head,
996                                index as usize,
997                                response_stream.storage.end,
998                            ],
999                        )
1000                    }
1001                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
1002                }
1003            );
1004            if response_stream.consumed {
1005                return SessionResult::Close;
1006            } else {
1007                let (message, successfully_parsed, partially_parsed, invalid) =
1008                    diagnostic_400_502(marker, kind, response_stream);
1009                self.set_answer(DefaultAnswer::Answer502 {
1010                    message,
1011                    phase: marker,
1012                    successfully_parsed,
1013                    partially_parsed,
1014                    invalid,
1015                });
1016                return SessionResult::Continue;
1017            }
1018        }
1019
1020        if response_stream.is_main_phase() {
1021            self.frontend_readiness.interest.insert(Ready::WRITABLE);
1022        }
1023        if response_stream.is_terminated() {
1024            metrics.backend_stop();
1025            self.backend_stop = Some(Instant::now());
1026            self.backend_readiness.interest.remove(Ready::READABLE);
1027        }
1028        SessionResult::Continue
1029    }
1030}
1031
1032impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
1033    fn log_endpoint(&self) -> EndpointRecord<'_> {
1034        EndpointRecord::Http {
1035            method: self.context.method.as_deref(),
1036            authority: self.context.authority.as_deref(),
1037            path: self.context.path.as_deref(),
1038            reason: self.context.reason.as_deref(),
1039            status: self.context.status,
1040        }
1041    }
1042
1043    /// Returns the kawa `ParsingPhase` of the response side, if a backend answer
1044    /// has been accepted. Used by `log_context!` to surface H1 wedged-response
1045    /// states without poking inside the `ResponseStream` enum at every log site.
1046    fn response_parsing_phase(&self) -> Option<kawa::ParsingPhase> {
1047        match &self.response_stream {
1048            ResponseStream::BackendAnswer(inner) => Some(inner.parsing_phase),
1049            _ => None,
1050        }
1051    }
1052
1053    pub fn get_session_address(&self) -> Option<SocketAddr> {
1054        self.context
1055            .session_address
1056            .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
1057    }
1058
1059    pub fn get_backend_address(&self) -> Option<SocketAddr> {
1060        self.backend
1061            .as_ref()
1062            .map(|backend| backend.borrow().address)
1063            .or_else(|| {
1064                self.backend_socket
1065                    .as_ref()
1066                    .and_then(|backend| backend.peer_addr().ok())
1067            })
1068    }
1069
1070    // The protocol name used in the access logs
1071    fn protocol_string(&self) -> &'static str {
1072        match self.context.protocol {
1073            Protocol::HTTP => "HTTP",
1074            Protocol::HTTPS => match self.frontend_socket.protocol() {
1075                TransportProtocol::Ssl2 => "HTTPS-SSL2",
1076                TransportProtocol::Ssl3 => "HTTPS-SSL3",
1077                TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
1078                TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
1079                TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
1080                TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
1081                _ => unreachable!(),
1082            },
1083            _ => unreachable!(),
1084        }
1085    }
1086
1087    /// Format the context of the websocket into a loggable String
1088    pub fn websocket_context(&self) -> WebSocketContext {
1089        WebSocketContext::Http {
1090            method: self.context.method.clone(),
1091            authority: self.context.authority.clone(),
1092            path: self.context.path.clone(),
1093            reason: self.context.reason.clone(),
1094            status: self.context.status,
1095        }
1096    }
1097
1098    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
1099        let listener = self.listener.borrow();
1100        let tags = self.context.authority.as_ref().and_then(|host| {
1101            let hostname = match host.split_once(':') {
1102                None => host,
1103                Some((hostname, _)) => hostname,
1104            };
1105            listener.get_tags(hostname)
1106        });
1107
1108        let context = self.context.log_context();
1109        metrics.register_end_of_session(&context);
1110
1111        log_access! {
1112            error,
1113            on_failure: { incr!(names::access_logs::UNSENT) },
1114            message,
1115            context,
1116            session_address: self.get_session_address(),
1117            backend_address: self.get_backend_address(),
1118            protocol: self.protocol_string(),
1119            endpoint: self.log_endpoint(),
1120            tags,
1121            client_rtt: socket_rtt(self.front_socket()),
1122            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
1123            service_time: metrics.service_time(),
1124            response_time: metrics.backend_response_time(),
1125            request_time: metrics.request_time(),
1126            start_time_ns: metrics.start_wall_ns(),
1127            bytes_in: metrics.bin,
1128            bytes_out: metrics.bout,
1129            user_agent: self.context.user_agent.as_deref(),
1130            x_request_id: self.context.x_request_id.as_deref(),
1131            tls_version: self.context.tls_version,
1132            tls_cipher: self.context.tls_cipher,
1133            tls_sni: self.context.tls_server_name.as_deref(),
1134            tls_alpn: self.context.tls_alpn,
1135            xff_chain: self.context.xff_chain.as_deref(),
1136            #[cfg(feature = "opentelemetry")]
1137            otel: self.context.otel.as_ref(),
1138            #[cfg(not(feature = "opentelemetry"))]
1139            otel: None,
1140        };
1141    }
1142
1143    pub fn log_request_success(&self, metrics: &SessionMetrics) {
1144        save_http_status_metric(self.context.status, self.context.log_context());
1145        self.log_request(metrics, false, None);
1146    }
1147
1148    pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
1149        // Surface the timeout discriminator (set by `timeout()` before
1150        // `set_answer`) on the access-log `message` field. Non-timeout
1151        // default-answer paths leave `access_log_message` as `None` and
1152        // emit `message: None` exactly as before.
1153        self.log_request(metrics, false, self.context.access_log_message);
1154    }
1155    pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
1156        // Labelled with `(cluster_id, backend_id)` so per-cluster dashboards
1157        // can attribute error rates. The labels are dropped centrally by
1158        // `filter_labels_for_detail` (`metrics/mod.rs`) when
1159        // `metrics.detail` is `process` / `frontend` / `cluster`, so default
1160        // setups still see the same proxy-wide counter — no double-counting,
1161        // no surprise cardinality.
1162        incr!(
1163            "http.errors",
1164            self.context.cluster_id.as_deref(),
1165            self.context.backend_id.as_deref()
1166        );
1167        error!(
1168            "{} Could not process request properly got: {}",
1169            log_context!(self),
1170            message
1171        );
1172        self.print_state(self.protocol_string());
1173        self.log_request(metrics, true, Some(message));
1174    }
1175
1176    pub fn set_answer(&mut self, answer: DefaultAnswer) {
1177        let status = u16::from(&answer);
1178        if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
1179            error!(
1180                "already set the default answer to {}, trying to set to {}",
1181                old_status, status
1182            );
1183        } else {
1184            match answer {
1185                DefaultAnswer::Answer301 { .. } => incr!(
1186                    "http.301.redirection",
1187                    self.context.cluster_id.as_deref(),
1188                    self.context.backend_id.as_deref()
1189                ),
1190                DefaultAnswer::Answer302 { .. } => incr!(
1191                    "http.302.redirection",
1192                    self.context.cluster_id.as_deref(),
1193                    self.context.backend_id.as_deref()
1194                ),
1195                DefaultAnswer::Answer308 { .. } => incr!(
1196                    "http.308.redirection",
1197                    self.context.cluster_id.as_deref(),
1198                    self.context.backend_id.as_deref()
1199                ),
1200                DefaultAnswer::Answer400 { .. } => incr!(names::http::ERR_400),
1201                DefaultAnswer::Answer401 { .. } => incr!(
1202                    "http.401.errors",
1203                    self.context.cluster_id.as_deref(),
1204                    self.context.backend_id.as_deref()
1205                ),
1206                DefaultAnswer::Answer404 { .. } => incr!(names::http::ERR_404),
1207                DefaultAnswer::Answer408 { .. } => incr!(
1208                    "http.408.errors",
1209                    self.context.cluster_id.as_deref(),
1210                    self.context.backend_id.as_deref()
1211                ),
1212                DefaultAnswer::Answer413 { .. } => incr!(
1213                    "http.413.errors",
1214                    self.context.cluster_id.as_deref(),
1215                    self.context.backend_id.as_deref()
1216                ),
1217                DefaultAnswer::Answer421 { .. } => incr!(
1218                    "http.421.errors",
1219                    self.context.cluster_id.as_deref(),
1220                    self.context.backend_id.as_deref()
1221                ),
1222                DefaultAnswer::Answer429 { .. } => incr!(
1223                    "connections.rejected_per_cluster_ip",
1224                    self.context.cluster_id.as_deref(),
1225                    self.context.backend_id.as_deref()
1226                ),
1227                DefaultAnswer::Answer502 { .. } => incr!(
1228                    "http.502.errors",
1229                    self.context.cluster_id.as_deref(),
1230                    self.context.backend_id.as_deref()
1231                ),
1232                DefaultAnswer::Answer503 { .. } => incr!(
1233                    "http.503.errors",
1234                    self.context.cluster_id.as_deref(),
1235                    self.context.backend_id.as_deref()
1236                ),
1237                DefaultAnswer::Answer504 { .. } => incr!(
1238                    "http.504.errors",
1239                    self.context.cluster_id.as_deref(),
1240                    self.context.backend_id.as_deref()
1241                ),
1242                DefaultAnswer::Answer507 { .. } => incr!(
1243                    "http.507.errors",
1244                    self.context.cluster_id.as_deref(),
1245                    self.context.backend_id.as_deref()
1246                ),
1247            };
1248        }
1249
1250        let (resolved_status, keep_alive, mut kawa) = self.answers.borrow().get(
1251            answer,
1252            self.context.id.to_string(),
1253            self.context.cluster_id.as_deref(),
1254            self.context.backend_id.as_deref(),
1255            self.get_route(),
1256        );
1257        // The answer engine only ever resolves to a template whose status line
1258        // parsed as a 3-digit HTTP code (`Template::new` enforces it); a value
1259        // outside `100..=999` here would mean a malformed template slipped
1260        // through and the bytes on the wire would carry a bogus status.
1261        debug_assert!(
1262            (100..=999).contains(&resolved_status),
1263            "default answer must resolve to a 3-digit HTTP status, got {resolved_status}"
1264        );
1265        kawa.prepare(&mut kawa::h1::BlockConverter);
1266        // The template's resolved status may differ from the requested
1267        // code when a fallback template is selected (e.g. unknown status).
1268        let status = resolved_status;
1269        self.context.status = Some(status);
1270        self.context.reason = None;
1271        // Honour the rendered template's `Connection` header. Built-in
1272        // error templates carry `Connection: close`, but operators can
1273        // ship a custom template that keeps the frontend connection alive.
1274        if !keep_alive {
1275            self.context.keep_alive_frontend = false;
1276        }
1277        self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1278        self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1279        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1280        // Post: the session is now in the default-answer regime —
1281        // `writable_default_answer` flushes the queued bytes and the frontend
1282        // is steered to WRITABLE (never READABLE) while the backend is parked.
1283        // This mirrors the `DefaultAnswer` arm of `check_invariants`.
1284        debug_assert!(
1285            matches!(self.response_stream, ResponseStream::DefaultAnswer(s, _) if s == status),
1286            "set_answer must leave the response stream as the queued DefaultAnswer"
1287        );
1288        debug_assert!(
1289            self.frontend_readiness.interest.is_writable()
1290                && !self.frontend_readiness.interest.is_readable(),
1291            "default answer must steer the frontend to WRITABLE only"
1292        );
1293    }
1294
1295    pub fn test_backend_socket(&self) -> bool {
1296        match self.backend_socket {
1297            Some(ref s) => {
1298                let mut tmp = [0u8; 1];
1299                let res = s.peek(&mut tmp[..]);
1300
1301                match res {
1302                    // if the socket is half open, it will report 0 bytes read (EOF)
1303                    Ok(0) => false,
1304                    Ok(_) => true,
1305                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1306                }
1307            }
1308            None => false,
1309        }
1310    }
1311
1312    pub fn is_valid_backend_socket(&self) -> bool {
1313        // if socket was last used in the last second, test it
1314        match self.backend_stop.as_ref() {
1315            Some(stop_instant) => {
1316                let now = Instant::now();
1317                let dur = now - *stop_instant;
1318                if dur > Duration::from_secs(1) {
1319                    return self.test_backend_socket();
1320                }
1321            }
1322            None => return self.test_backend_socket(),
1323        }
1324
1325        true
1326    }
1327
1328    pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1329        self.backend_socket = Some(socket);
1330        self.backend = backend;
1331    }
1332
1333    pub fn set_cluster_id(&mut self, cluster_id: String) {
1334        self.context.cluster_id = Some(cluster_id);
1335    }
1336
1337    pub fn set_backend_id(&mut self, backend_id: String) {
1338        self.context.backend_id = Some(backend_id);
1339    }
1340
1341    pub fn set_backend_token(&mut self, token: Token) {
1342        self.backend_token = Some(token);
1343    }
1344
1345    pub fn clear_backend_token(&mut self) {
1346        self.backend_token = None;
1347    }
1348
1349    pub fn set_backend_timeout(&mut self, dur: Duration) {
1350        if let Some(token) = self.backend_token.as_ref() {
1351            self.container_backend_timeout.set_duration(dur);
1352            self.container_backend_timeout.set(*token);
1353        }
1354    }
1355
1356    pub fn front_socket(&self) -> &TcpStream {
1357        self.frontend_socket.socket_ref()
1358    }
1359
1360    /// WARNING: this function removes the backend entry in the session manager
1361    /// IF the backend_token is set, so that entry can be reused for new backend.
1362    /// I don't think this is a good idea, but it is a quick fix
1363    fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1364        self.container_backend_timeout.cancel();
1365        debug!(
1366            "{}\tPROXY [{}->{}] CLOSED BACKEND",
1367            log_context!(self),
1368            self.frontend_token.0,
1369            self.backend_token
1370                .map(|t| format!("{}", t.0))
1371                .unwrap_or_else(|| "-".to_string())
1372        );
1373
1374        let proxy = proxy.borrow();
1375        if let Some(socket) = &mut self.backend_socket.take() {
1376            if let Err(e) = proxy.deregister_socket(socket) {
1377                error!(
1378                    "{} Error deregistering back socket({:?}): {:?}",
1379                    log_context!(self),
1380                    socket,
1381                    e
1382                );
1383            }
1384            // SAFETY (TLS-truncation invariant): `Shutdown::Both` is permitted
1385            // here because this is the *backend* socket of an H1 session — Sōzu
1386            // currently speaks plaintext H1 to backends, so there is no
1387            // outbound TLS write buffer that the kernel could discard with a
1388            // RST. If backend-TLS lands later (#1218), this MUST move to
1389            // `Shutdown::Write` (or `socket.send_close_notify()`) to avoid the
1390            // canonical truncation anti-pattern documented in CLAUDE.md.
1391            if let Err(e) = socket.shutdown(Shutdown::Both) {
1392                if e.kind() != ErrorKind::NotConnected {
1393                    error!(
1394                        "{} Error shutting down back socket({:?}): {:?}",
1395                        log_context!(self),
1396                        socket,
1397                        e
1398                    );
1399                }
1400            }
1401        }
1402
1403        if let Some(token) = self.backend_token.take() {
1404            proxy.remove_session(token);
1405
1406            if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1407                self.backend_readiness.event = Ready::EMPTY;
1408            }
1409
1410            if self.backend_connection_status == BackendConnectionStatus::Connected {
1411                gauge_add!(names::backend::CONNECTIONS, -1);
1412                gauge_add!(
1413                    names::backend::CONNECTIONS_PER_BACKEND,
1414                    -1,
1415                    self.context.cluster_id.as_deref(),
1416                    metrics.backend_id.as_deref()
1417                );
1418            }
1419
1420            self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1421
1422            if let Some(backend) = self.backend.take() {
1423                backend.borrow_mut().dec_connections();
1424            }
1425        }
1426        // Post: the backend token is fully released — `close_backend` is the
1427        // sole site that decrements the `CONNECTIONS` gauge, paired with the
1428        // `+1` in `set_backend_connected`. Leaving a token behind would orphan
1429        // the slab entry and skip the next session's gauge decrement.
1430        debug_assert!(
1431            self.backend_token.is_none(),
1432            "close_backend must release the backend token"
1433        );
1434        debug_assert_ne!(
1435            self.backend_connection_status,
1436            BackendConnectionStatus::Connected,
1437            "close_backend must leave the backend out of the Connected state"
1438        );
1439    }
1440
1441    /// Check the number of connection attempts against authorized connection retries
1442    fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1443        if self.connection_attempts >= CONN_RETRIES {
1444            incr!(
1445                "backend.connect.retries_exhausted",
1446                self.context.cluster_id.as_deref(),
1447                self.context.backend_id.as_deref()
1448            );
1449            warn!(
1450                "{} Max connection attempt reached ({})",
1451                log_context!(self),
1452                self.connection_attempts,
1453            );
1454
1455            self.set_answer(DefaultAnswer::Answer503 {
1456                message: format!(
1457                    "Max connection attempt reached: {}",
1458                    self.connection_attempts
1459                ),
1460            });
1461            return Err(BackendConnectionError::MaxConnectionRetries(None));
1462        }
1463        // Post (Ok branch): we only fall through when there is still budget for
1464        // another connect attempt. `connect_to_backend` relies on this — it
1465        // proceeds straight to the dial after a clean breaker check.
1466        debug_assert!(
1467            self.connection_attempts < CONN_RETRIES,
1468            "check_circuit_breaker must only return Ok with retry budget remaining"
1469        );
1470        Ok(())
1471    }
1472
1473    fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1474        let is_valid_backend_socket = self.is_valid_backend_socket();
1475
1476        if !is_valid_backend_socket {
1477            return false;
1478        }
1479
1480        //matched on keepalive
1481        metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1482
1483        metrics.backend_start();
1484        metrics.backend_connected();
1485        if let Some(b) = self.backend.as_mut() {
1486            b.borrow_mut().active_requests += 1;
1487        }
1488        true
1489    }
1490
1491    // -> host, path, method
1492    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1493        let given_method = self
1494            .context
1495            .method
1496            .as_ref()
1497            .ok_or(RetrieveClusterError::NoMethod)?;
1498        let given_authority = self
1499            .context
1500            .authority
1501            .as_deref()
1502            .ok_or(RetrieveClusterError::NoHost)?;
1503        let given_path = self
1504            .context
1505            .path
1506            .as_deref()
1507            .ok_or(RetrieveClusterError::NoPath)?;
1508
1509        Ok((given_authority, given_path, given_method))
1510    }
1511
1512    pub fn get_route(&self) -> String {
1513        if let Some(method) = &self.context.method {
1514            if let Some(authority) = &self.context.authority {
1515                if let Some(path) = &self.context.path {
1516                    return format!("{method} {authority}{path}");
1517                }
1518                return format!("{method} {authority}");
1519            }
1520            return format!("{method}");
1521        }
1522        String::new()
1523    }
1524
1525    fn cluster_id_from_request(
1526        &mut self,
1527        proxy: Rc<RefCell<dyn L7Proxy>>,
1528    ) -> Result<String, RetrieveClusterError> {
1529        let (host, uri, method) = match self.extract_route() {
1530            Ok(tuple) => tuple,
1531            Err(cluster_error) => {
1532                self.set_answer(DefaultAnswer::Answer400 {
1533                    message: "Could not extract the route after connection started, this should not happen.".into(),
1534                    phase: self.request_stream.parsing_phase.marker(),
1535                    successfully_parsed: "null".into(),
1536                    partially_parsed: "null".into(),
1537                    invalid: "null".into(),
1538                });
1539                return Err(cluster_error);
1540            }
1541        };
1542
1543        let route_result = self
1544            .listener
1545            .borrow()
1546            .frontend_from_request(host, uri, method);
1547
1548        let route = match route_result {
1549            Ok(route) => route,
1550            Err(frontend_error) => {
1551                // RFC 9110 §15.5.1: a malformed authority is a 400. A
1552                // syntactically valid authority that simply has no matching
1553                // frontend stays on the historical 404 path.
1554                match &frontend_error {
1555                    FrontendFromRequestError::HostParse { .. }
1556                    | FrontendFromRequestError::InvalidCharsAfterHost(_) => {
1557                        self.set_answer(DefaultAnswer::Answer400 {
1558                            message: frontend_error.to_string(),
1559                            phase: self.request_stream.parsing_phase.marker(),
1560                            successfully_parsed: "null".into(),
1561                            partially_parsed: "null".into(),
1562                            invalid: "null".into(),
1563                        });
1564                    }
1565                    FrontendFromRequestError::NoClusterFound(_) => {
1566                        self.set_answer(DefaultAnswer::Answer404 {});
1567                    }
1568                }
1569                return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1570            }
1571        };
1572
1573        // `RouteResult` already carries the full routing decision, but the
1574        // mux integration that consumes every field is not in place yet.
1575        // Until it lands, this legacy kawa_h1 path collapses `RouteResult`
1576        // down to its `cluster_id` and treats an absent cluster as the
1577        // historical `Route::Deny`.
1578        let cluster_id = match route.cluster_id {
1579            Some(cluster_id) => cluster_id,
1580            None => {
1581                self.set_answer(DefaultAnswer::Answer401 {
1582                    www_authenticate: None,
1583                });
1584                return Err(RetrieveClusterError::UnauthorizedRoute);
1585            }
1586        };
1587
1588        let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1589            && proxy
1590                .borrow()
1591                .clusters()
1592                .get(&cluster_id)
1593                .map(|cluster| cluster.https_redirect)
1594                .unwrap_or(false);
1595
1596        if frontend_should_redirect_https {
1597            self.set_answer(DefaultAnswer::Answer301 {
1598                location: format!("https://{host}{uri}"),
1599            });
1600            return Err(RetrieveClusterError::UnauthorizedRoute);
1601        }
1602
1603        Ok(cluster_id)
1604    }
1605
1606    pub fn backend_from_request(
1607        &mut self,
1608        cluster_id: &str,
1609        frontend_should_stick: bool,
1610        proxy: Rc<RefCell<dyn L7Proxy>>,
1611        metrics: &mut SessionMetrics,
1612    ) -> Result<TcpStream, BackendConnectionError> {
1613        let (backend, conn) = self
1614            .get_backend_for_sticky_session(
1615                frontend_should_stick,
1616                self.context.sticky_session_found.as_deref(),
1617                cluster_id,
1618                proxy,
1619            )
1620            .map_err(|backend_error| {
1621                // some backend errors are actually retryable
1622                // TODO: maybe retry or return a different default answer
1623                self.set_answer(DefaultAnswer::Answer503 {
1624                    message: backend_error.to_string(),
1625                });
1626                BackendConnectionError::Backend(backend_error)
1627            })?;
1628
1629        if frontend_should_stick {
1630            // update sticky name in case it changed I guess?
1631            self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1632
1633            self.context.sticky_session = Some(
1634                backend
1635                    .borrow()
1636                    .sticky_id
1637                    .clone()
1638                    .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1639            );
1640        }
1641
1642        metrics.backend_id = Some(backend.borrow().backend_id.clone());
1643        metrics.backend_start();
1644        self.set_backend_id(backend.borrow().backend_id.clone());
1645
1646        self.backend = Some(backend);
1647        Ok(conn)
1648    }
1649
1650    fn get_backend_for_sticky_session(
1651        &self,
1652        frontend_should_stick: bool,
1653        sticky_session: Option<&str>,
1654        cluster_id: &str,
1655        proxy: Rc<RefCell<dyn L7Proxy>>,
1656    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1657        match (frontend_should_stick, sticky_session) {
1658            (true, Some(sticky_session)) => proxy
1659                .borrow()
1660                .backends()
1661                .borrow_mut()
1662                .backend_from_sticky_session(cluster_id, sticky_session),
1663            _ => proxy
1664                .borrow()
1665                .backends()
1666                .borrow_mut()
1667                .backend_from_cluster_id(cluster_id),
1668        }
1669    }
1670
1671    fn connect_to_backend(
1672        &mut self,
1673        session_rc: Rc<RefCell<dyn ProxySession>>,
1674        proxy: Rc<RefCell<dyn L7Proxy>>,
1675        metrics: &mut SessionMetrics,
1676    ) -> Result<BackendConnectAction, BackendConnectionError> {
1677        let old_cluster_id = self.context.cluster_id.clone();
1678        let old_backend_token = self.backend_token;
1679
1680        self.check_circuit_breaker()?;
1681
1682        let cluster_id = self
1683            .cluster_id_from_request(proxy.clone())
1684            .map_err(BackendConnectionError::RetrieveClusterError)?;
1685
1686        trace!(
1687            "{} Connect_to_backend: {:?} {:?} {:?}",
1688            log_context!(self),
1689            self.context.cluster_id,
1690            cluster_id,
1691            self.backend_connection_status
1692        );
1693        // check if we can reuse the backend connection
1694        if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1695            && self.backend_connection_status == BackendConnectionStatus::Connected
1696        {
1697            let has_backend = self
1698                .backend
1699                .as_ref()
1700                .map(|backend| {
1701                    let backend = backend.borrow();
1702                    proxy
1703                        .borrow()
1704                        .backends()
1705                        .borrow()
1706                        .has_backend(&cluster_id, &backend)
1707                })
1708                .unwrap_or(false);
1709
1710            if has_backend && self.check_backend_connection(metrics) {
1711                return Ok(BackendConnectAction::Reuse);
1712            } else if self.backend_token.take().is_some() {
1713                self.close_backend(proxy.clone(), metrics);
1714            }
1715        }
1716
1717        //replacing with a connection to another cluster
1718        if old_cluster_id.is_some()
1719            && old_cluster_id.as_ref() != Some(&cluster_id)
1720            && self.backend_token.take().is_some()
1721        {
1722            self.close_backend(proxy.clone(), metrics);
1723        }
1724
1725        self.context.cluster_id = Some(cluster_id.clone());
1726
1727        let frontend_should_stick = proxy
1728            .borrow()
1729            .clusters()
1730            .get(&cluster_id)
1731            .map(|cluster| cluster.sticky_session)
1732            .unwrap_or(false);
1733
1734        let mut socket =
1735            self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1736        if let Err(e) = socket.set_nodelay(true) {
1737            error!(
1738                "{} Error setting nodelay on backend socket({:?}): {:?}",
1739                log_context!(self),
1740                socket,
1741                e
1742            );
1743        }
1744
1745        self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1746        self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1747
1748        match old_backend_token {
1749            Some(backend_token) => {
1750                self.set_backend_token(backend_token);
1751                if let Err(e) = proxy.borrow().register_socket(
1752                    &mut socket,
1753                    backend_token,
1754                    Interest::READABLE | Interest::WRITABLE,
1755                ) {
1756                    error!(
1757                        "{} Error registering back socket({:?}): {:?}",
1758                        log_context!(self),
1759                        socket,
1760                        e
1761                    );
1762                }
1763
1764                self.set_backend_socket(socket, self.backend.clone());
1765                self.set_backend_timeout(self.configured_connect_timeout);
1766
1767                Ok(BackendConnectAction::Replace)
1768            }
1769            None => {
1770                let backend_token = proxy.borrow().add_session(session_rc);
1771
1772                if let Err(e) = proxy.borrow().register_socket(
1773                    &mut socket,
1774                    backend_token,
1775                    Interest::READABLE | Interest::WRITABLE,
1776                ) {
1777                    error!(
1778                        "{} Error registering back socket({:?}): {:?}",
1779                        log_context!(self),
1780                        socket,
1781                        e
1782                    );
1783                }
1784
1785                self.set_backend_socket(socket, self.backend.clone());
1786                self.set_backend_token(backend_token);
1787                self.set_backend_timeout(self.configured_connect_timeout);
1788
1789                Ok(BackendConnectAction::New)
1790            }
1791        }
1792    }
1793
1794    fn set_backend_connected(
1795        &mut self,
1796        connected: BackendConnectionStatus,
1797        metrics: &mut SessionMetrics,
1798    ) {
1799        let last = self.backend_connection_status;
1800        self.backend_connection_status = connected;
1801
1802        if connected == BackendConnectionStatus::Connected {
1803            // Gauge ±1 pairing: the `CONNECTIONS` gauge is incremented here and
1804            // decremented exactly once in `close_backend`. Transitioning into
1805            // `Connected` from an already-`Connected` state would double-count
1806            // and leave the gauge permanently inflated (it never underflows,
1807            // but it would never return to zero either). The only legitimate
1808            // predecessors are `NotConnected` and `Connecting(_)`.
1809            debug_assert_ne!(
1810                last,
1811                BackendConnectionStatus::Connected,
1812                "set_backend_connected(Connected) must not run twice without a close in between"
1813            );
1814            gauge_add!(names::backend::CONNECTIONS, 1);
1815            gauge_add!(
1816                names::backend::CONNECTIONS_PER_BACKEND,
1817                1,
1818                self.context.cluster_id.as_deref(),
1819                metrics.backend_id.as_deref()
1820            );
1821
1822            // the back timeout was of connect_timeout duration before,
1823            // now that we're connected, move to backend_timeout duration
1824            self.set_backend_timeout(self.configured_backend_timeout);
1825            // if we are not waiting for the backend response, its timeout is concelled
1826            // it should be set when the request has been entirely transmitted
1827            if !self.backend_readiness.interest.is_readable() {
1828                self.container_backend_timeout.cancel();
1829            }
1830
1831            if let Some(backend) = &self.backend {
1832                let mut backend = backend.borrow_mut();
1833
1834                if backend.retry_policy.is_down() {
1835                    incr!(
1836                        "backend.up",
1837                        self.context.cluster_id.as_deref(),
1838                        metrics.backend_id.as_deref()
1839                    );
1840                    gauge!(
1841                        names::backend::AVAILABLE,
1842                        1,
1843                        self.context.cluster_id.as_deref(),
1844                        metrics.backend_id.as_deref()
1845                    );
1846
1847                    info!(
1848                        "{} backend server {} at {} is up",
1849                        log_context!(self),
1850                        backend.backend_id,
1851                        backend.address
1852                    );
1853
1854                    push_event(Event {
1855                        kind: EventKind::BackendUp as i32,
1856                        backend_id: Some(backend.backend_id.to_owned()),
1857                        address: Some(backend.address.into()),
1858                        cluster_id: None,
1859                        metric_detail: None,
1860                    });
1861                }
1862
1863                if let BackendConnectionStatus::Connecting(start) = last {
1864                    backend.set_connection_time(Instant::now() - start);
1865                }
1866
1867                //successful connection, reset failure counter
1868                backend.failures = 0;
1869                backend.active_requests += 1;
1870                backend.retry_policy.succeed();
1871            }
1872        }
1873    }
1874
1875    fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1876        if let Some(backend) = &self.backend {
1877            let mut backend = backend.borrow_mut();
1878            backend.failures += 1;
1879
1880            let already_unavailable = backend.retry_policy.is_down();
1881            backend.retry_policy.fail();
1882            incr!(
1883                "backend.connections.error",
1884                self.context.cluster_id.as_deref(),
1885                metrics.backend_id.as_deref()
1886            );
1887
1888            if !already_unavailable && backend.retry_policy.is_down() {
1889                error!(
1890                    "{} backend server {} at {} is down",
1891                    log_context!(self),
1892                    backend.backend_id,
1893                    backend.address
1894                );
1895
1896                incr!(
1897                    "backend.down",
1898                    self.context.cluster_id.as_deref(),
1899                    metrics.backend_id.as_deref()
1900                );
1901                gauge!(
1902                    names::backend::AVAILABLE,
1903                    0,
1904                    self.context.cluster_id.as_deref(),
1905                    metrics.backend_id.as_deref()
1906                );
1907
1908                push_event(Event {
1909                    kind: EventKind::BackendDown as i32,
1910                    backend_id: Some(backend.backend_id.to_owned()),
1911                    address: Some(backend.address.into()),
1912                    cluster_id: None,
1913                    metric_detail: None,
1914                });
1915            }
1916        }
1917    }
1918
1919    pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1920        let response_stream = match &mut self.response_stream {
1921            ResponseStream::BackendAnswer(response_stream) => response_stream,
1922            _ => return StateResult::CloseBackend,
1923        };
1924
1925        // there might still data we can read on the socket
1926        if self.backend_readiness.event.is_readable()
1927            && self.backend_readiness.interest.is_readable()
1928        {
1929            return StateResult::Continue;
1930        }
1931
1932        // the backend finished to answer we can close
1933        if response_stream.is_terminated() {
1934            return StateResult::CloseBackend;
1935        }
1936        // Reaching the match means the response is neither fully readable nor
1937        // already terminated — the early returns above peeled those cases off.
1938        // The `(_, false)` arm relies on this when it force-terminates a
1939        // length-less response: terminating an already-terminated stream would
1940        // be a no-op masking a real double-close.
1941        debug_assert!(
1942            !response_stream.is_terminated(),
1943            "backend_hup reached its decision match with an already-terminated response"
1944        );
1945        match (
1946            self.request_stream.is_initial(),
1947            response_stream.is_initial(),
1948        ) {
1949            // backend stopped before response is finished,
1950            // or maybe it was malformed in the first place (no Content-Length)
1951            (_, false) => {
1952                error!(
1953                    "{} Backend closed before session is over",
1954                    log_context!(self, Some(response_stream.parsing_phase)),
1955                );
1956
1957                trace!(
1958                    "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1959                    log_context!(self, Some(response_stream.parsing_phase))
1960                );
1961
1962                response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1963
1964                // Post: the forced termination took — required for the
1965                // length-less / truncated-response close path to make progress.
1966                debug_assert!(
1967                    response_stream.is_terminated(),
1968                    "forced backend-hup termination must mark the response terminated"
1969                );
1970
1971                // writable() will be called again and finish the session properly
1972                // for this reason, writable must not short cut
1973                self.frontend_readiness.interest.insert(Ready::WRITABLE);
1974                // Post: the frontend is now armed to write so `writable` can
1975                // flush whatever the backend already sent and then close.
1976                debug_assert!(
1977                    self.frontend_readiness.interest.is_writable(),
1978                    "backend-hup termination must arm the frontend for the final flush"
1979                );
1980                StateResult::Continue
1981            }
1982            // probably backend hup between keep alive request, change backend
1983            (true, true) => {
1984                trace!(
1985                    "{} Backend hanged up in between requests",
1986                    log_context!(self)
1987                );
1988                StateResult::CloseBackend
1989            }
1990            // the frontend already transmitted data so we can't redirect
1991            (false, true) => {
1992                error!(
1993                    "{}  Frontend transmitted data but the back closed",
1994                    log_context!(self)
1995                );
1996
1997                self.set_answer(DefaultAnswer::Answer503 {
1998                    message: "Backend closed after consuming part of the request".into(),
1999                });
2000
2001                self.backend_readiness.interest = Ready::EMPTY;
2002                StateResult::Continue
2003            }
2004        }
2005    }
2006
2007    /// The main session loop, processing all events triggered by mio since last time
2008    /// and proxying http traffic. The main flow can be summed up by:
2009    ///
2010    /// - if connecting an back has event:
2011    ///   - if backend hanged up, try again
2012    ///   - else, set as connected
2013    /// - while front or back has event:
2014    ///   - read request on front
2015    ///   - write request to back
2016    ///   - read response on back
2017    ///   - write response to front
2018    fn ready_inner(
2019        &mut self,
2020        session: Rc<RefCell<dyn crate::ProxySession>>,
2021        proxy: Rc<RefCell<dyn L7Proxy>>,
2022        metrics: &mut SessionMetrics,
2023    ) -> SessionResult {
2024        let mut counter = 0;
2025
2026        if self.backend_connection_status.is_connecting()
2027            && !self.backend_readiness.event.is_empty()
2028        {
2029            if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
2030                //retry connecting the backend
2031                warn!(
2032                    "{} Error connecting to backend, trying again, attempt {}",
2033                    log_context!(self),
2034                    self.connection_attempts
2035                );
2036
2037                // Each backend HUP-retry bumps the attempt counter by exactly
2038                // one; the very next `connect_to_backend` runs the circuit
2039                // breaker, which converts the `== CONN_RETRIES` case into a 503
2040                // instead of dialling again — so the counter is never observed
2041                // above `CONN_RETRIES` once the loop settles (see
2042                // `check_invariants`).
2043                let attempts_before = self.connection_attempts;
2044                self.connection_attempts += 1;
2045                debug_assert_eq!(
2046                    self.connection_attempts,
2047                    attempts_before + 1,
2048                    "a backend retry must bump connection_attempts by exactly one"
2049                );
2050                debug_assert!(
2051                    self.connection_attempts <= CONN_RETRIES,
2052                    "connection_attempts ({}) must not exceed the breaker budget on retry",
2053                    self.connection_attempts
2054                );
2055                self.fail_backend_connection(metrics);
2056
2057                self.backend_connection_status =
2058                    BackendConnectionStatus::Connecting(Instant::now());
2059
2060                // trigger a backend reconnection
2061                self.close_backend(proxy.clone(), metrics);
2062
2063                let connection_result =
2064                    self.connect_to_backend(session.clone(), proxy.clone(), metrics);
2065                if let Err(err) = &connection_result {
2066                    match err {
2067                        // Already logged at warn! + metered at check_circuit_breaker;
2068                        // avoid double-emission.
2069                        BackendConnectionError::MaxConnectionRetries(_) => trace!(
2070                            "{} Error connecting to backend: {}",
2071                            log_context!(self),
2072                            err
2073                        ),
2074                        _ => warn!(
2075                            "{} Error connecting to backend: {}",
2076                            log_context!(self),
2077                            err
2078                        ),
2079                    }
2080                }
2081
2082                if let Some(session_result) = handle_connection_result(connection_result) {
2083                    return session_result;
2084                }
2085            } else {
2086                metrics.backend_connected();
2087                self.connection_attempts = 0;
2088                self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
2089                // we might get an early response from the backend, so we want to look
2090                // at readable events
2091                self.backend_readiness.interest.insert(Ready::READABLE);
2092            }
2093        }
2094
2095        if self.frontend_readiness.event.is_hup() {
2096            if !self.request_stream.is_initial() {
2097                self.log_request_error(metrics, "Client disconnected abruptly");
2098            }
2099            return SessionResult::Close;
2100        }
2101
2102        while counter < MAX_LOOP_ITERATIONS {
2103            let frontend_interest = self.frontend_readiness.filter_interest();
2104            let backend_interest = self.backend_readiness.filter_interest();
2105
2106            trace!(
2107                "{} Frontend interest({:?}) and backend interest({:?})",
2108                log_context!(self),
2109                frontend_interest,
2110                backend_interest,
2111            );
2112
2113            if frontend_interest.is_empty() && backend_interest.is_empty() {
2114                break;
2115            }
2116
2117            if self.backend_readiness.event.is_hup()
2118                && self.frontend_readiness.interest.is_writable()
2119                && !self.frontend_readiness.event.is_writable()
2120            {
2121                break;
2122            }
2123
2124            if frontend_interest.is_readable() {
2125                let state_result = self.readable(metrics);
2126                trace!(
2127                    "{} frontend_readable: {:?}",
2128                    log_context!(self),
2129                    state_result
2130                );
2131
2132                match state_result {
2133                    StateResult::Continue => {}
2134                    StateResult::ConnectBackend => {
2135                        let connection_result =
2136                            self.connect_to_backend(session.clone(), proxy.clone(), metrics);
2137                        if let Err(err) = &connection_result {
2138                            match err {
2139                                // Already logged at warn! + metered at check_circuit_breaker;
2140                                // avoid double-emission.
2141                                BackendConnectionError::MaxConnectionRetries(_) => trace!(
2142                                    "{} Error connecting to backend: {}",
2143                                    log_context!(self),
2144                                    err
2145                                ),
2146                                _ => warn!(
2147                                    "{} Error connecting to backend: {}",
2148                                    log_context!(self),
2149                                    err
2150                                ),
2151                            }
2152                        }
2153
2154                        if let Some(session_result) = handle_connection_result(connection_result) {
2155                            return session_result;
2156                        }
2157                    }
2158                    StateResult::CloseBackend => unreachable!(),
2159                    StateResult::CloseSession => return SessionResult::Close,
2160                    StateResult::Upgrade => return SessionResult::Upgrade,
2161                }
2162            }
2163
2164            if backend_interest.is_writable() {
2165                let session_result = self.backend_writable(metrics);
2166                trace!(
2167                    "{} backend_writable: {:?}",
2168                    log_context!(self),
2169                    session_result
2170                );
2171                if session_result != SessionResult::Continue {
2172                    return session_result;
2173                }
2174            }
2175
2176            if backend_interest.is_readable() {
2177                let session_result = self.backend_readable(metrics);
2178                trace!(
2179                    "{} backend_readable: {:?}",
2180                    log_context!(self),
2181                    session_result
2182                );
2183                if session_result != SessionResult::Continue {
2184                    return session_result;
2185                }
2186            }
2187
2188            if frontend_interest.is_writable() {
2189                let state_result = self.writable(metrics);
2190                trace!(
2191                    "{} frontend_writable: {:?}",
2192                    log_context!(self),
2193                    state_result
2194                );
2195                match state_result {
2196                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2197                    StateResult::CloseSession => return SessionResult::Close,
2198                    StateResult::Upgrade => return SessionResult::Upgrade,
2199                    StateResult::Continue => {}
2200                    StateResult::ConnectBackend => unreachable!(),
2201                }
2202            }
2203
2204            if frontend_interest.is_error() {
2205                error!(
2206                    "{} frontend socket error, disconnecting",
2207                    log_context!(self)
2208                );
2209
2210                return SessionResult::Close;
2211            }
2212
2213            if backend_interest.is_hup() || backend_interest.is_error() {
2214                let state_result = self.backend_hup(metrics);
2215
2216                trace!("{} backend_hup: {:?}", log_context!(self), state_result);
2217                match state_result {
2218                    StateResult::Continue => {}
2219                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2220                    StateResult::CloseSession => return SessionResult::Close,
2221                    StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
2222                }
2223            }
2224
2225            counter += 1;
2226        }
2227
2228        if counter >= MAX_LOOP_ITERATIONS {
2229            error!(
2230                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
2231                log_context!(self),
2232                MAX_LOOP_ITERATIONS
2233            );
2234
2235            incr!(names::http::INFINITE_LOOP_ERROR);
2236            self.print_state(self.protocol_string());
2237
2238            return SessionResult::Close;
2239        }
2240
2241        // Post: a `Continue` exit means the loop drained all pending interest
2242        // within the iteration budget (the `>= MAX_LOOP_ITERATIONS` arm above
2243        // returns `Close` otherwise). This pins the no-infinite-loop contract.
2244        debug_assert!(
2245            counter < MAX_LOOP_ITERATIONS,
2246            "ready_inner must only Continue when the loop settled within the iteration budget"
2247        );
2248        SessionResult::Continue
2249    }
2250
2251    pub fn timeout_status(&self) -> TimeoutStatus {
2252        if self.request_stream.is_main_phase() {
2253            match &self.response_stream {
2254                ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
2255                    TimeoutStatus::WaitingForResponse
2256                }
2257                _ => TimeoutStatus::Response,
2258            }
2259        } else if self.keepalive_count > 0 {
2260            TimeoutStatus::WaitingForNewRequest
2261        } else {
2262            TimeoutStatus::Request
2263        }
2264    }
2265
2266    /// Cross-field invariant sweep for the H1 session state machine.
2267    ///
2268    /// Run-to-completion postcondition: called via a `#[cfg(debug_assertions)]`
2269    /// guard at the end of the [`SessionState::ready`] entry point, after the
2270    /// inner loop has settled the session into a stable, between-events state.
2271    /// It encodes relationships that must hold for ANY `Http` regardless of the
2272    /// path (request, response, default answer, backend retry) that drove the
2273    /// loop:
2274    ///
2275    /// - the circuit-breaker counter never exceeds the configured retry budget
2276    ///   (`check_circuit_breaker` short-circuits at `CONN_RETRIES`, so a higher
2277    ///   value would mean a retry slipped past the breaker);
2278    /// - a `Connected` backend always names a live socket — `close_backend`
2279    ///   takes the socket and drops back to `NotConnected` atomically, so any
2280    ///   `Connected`-without-socket state is a teardown-ordering bug that would
2281    ///   make `backend_writable`/`backend_readable` log "back socket not found";
2282    /// - while a default answer is queued the frontend is steered toward
2283    ///   WRITABLE and never asked to read more request bytes (the read path
2284    ///   `error!`s and bails if it sees a `DefaultAnswer`), and the backend is
2285    ///   never asked to read or write (we have nothing left to proxy);
2286    /// - `context.status` agrees with the queued default-answer status, so the
2287    ///   access-log / `save_http_status_metric` code never reports a different
2288    ///   code than the bytes on the wire.
2289    ///
2290    /// Network input never reaches a hard `assert!` here — invalid traffic is
2291    /// turned into a default answer or a close upstream; these `debug_assert!`s
2292    /// fire only on our own logic bugs and compile out in release.
2293    #[cfg(debug_assertions)]
2294    fn check_invariants(&self) {
2295        debug_assert!(
2296            self.connection_attempts <= CONN_RETRIES,
2297            "connection_attempts ({}) must stay within the circuit-breaker budget ({CONN_RETRIES})",
2298            self.connection_attempts
2299        );
2300        if self.backend_connection_status == BackendConnectionStatus::Connected {
2301            debug_assert!(
2302                self.backend_socket.is_some(),
2303                "a Connected backend must own a live socket (teardown-ordering bug)"
2304            );
2305        }
2306        if let ResponseStream::DefaultAnswer(status, _) = &self.response_stream {
2307            debug_assert!(
2308                !self.frontend_readiness.interest.is_readable(),
2309                "frontend must not be asked to read more request bytes while a default answer is queued"
2310            );
2311            debug_assert!(
2312                !self.backend_readiness.interest.is_readable()
2313                    && !self.backend_readiness.interest.is_writable(),
2314                "backend must be idle (no read/write interest) while a default answer is queued"
2315            );
2316            debug_assert_eq!(
2317                self.context.status,
2318                Some(*status),
2319                "context.status must match the queued default-answer status"
2320            );
2321        }
2322    }
2323}
2324
2325impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
2326    fn ready(
2327        &mut self,
2328        session: Rc<RefCell<dyn crate::ProxySession>>,
2329        proxy: Rc<RefCell<dyn L7Proxy>>,
2330        metrics: &mut SessionMetrics,
2331    ) -> SessionResult {
2332        let session_result = self.ready_inner(session, proxy, metrics);
2333        if session_result == SessionResult::Upgrade {
2334            let response_storage = match &mut self.response_stream {
2335                ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
2336                _ => return SessionResult::Close,
2337            };
2338
2339            // sync the underlying Checkout buffers, if they contain remaining data
2340            // it will be processed once upgraded to websocket
2341            self.request_stream.storage.buffer.sync(
2342                self.request_stream.storage.end,
2343                self.request_stream.storage.head,
2344            );
2345            response_storage
2346                .buffer
2347                .sync(response_storage.end, response_storage.head);
2348        }
2349        // Run-to-completion postcondition: the session has settled into a
2350        // stable between-events state. Compiled out in release.
2351        #[cfg(debug_assertions)]
2352        self.check_invariants();
2353        session_result
2354    }
2355
2356    fn update_readiness(&mut self, token: Token, events: Ready) {
2357        if self.frontend_token == token {
2358            self.frontend_readiness.event |= events;
2359        } else if self.backend_token == Some(token) {
2360            self.backend_readiness.event |= events;
2361        }
2362    }
2363
2364    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
2365        self.close_backend(proxy, metrics);
2366        self.frontend_socket.socket_close();
2367        let _ = self.frontend_socket.socket_write_vectored(&[]);
2368
2369        //if the state was initial, the connection was already reset
2370        if !self.request_stream.is_initial() {
2371            gauge_add!(names::http::ACTIVE_REQUESTS, -1);
2372
2373            if let Some(b) = self.backend.as_mut() {
2374                let mut backend = b.borrow_mut();
2375                backend.active_requests = backend.active_requests.saturating_sub(1);
2376            }
2377        }
2378    }
2379
2380    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
2381        //info!("got timeout for token: {:?}", token);
2382        if self.frontend_token == token {
2383            self.container_frontend_timeout.triggered();
2384            return match self.timeout_status() {
2385                // we do not have a complete answer
2386                TimeoutStatus::Request => {
2387                    self.context.access_log_message = Some("client_timeout");
2388                    self.set_answer(DefaultAnswer::Answer408 {
2389                        duration: self.container_frontend_timeout.to_string(),
2390                    });
2391                    self.writable(metrics)
2392                }
2393                // we have a complete answer but the response did not start
2394                TimeoutStatus::WaitingForResponse => {
2395                    // this case is ambiguous, as it is the frontend timeout that triggers while we were waiting for response
2396                    // the timeout responsibility should have switched before
2397                    self.context.access_log_message = Some("client_timeout_during_response");
2398                    self.set_answer(DefaultAnswer::Answer504 {
2399                        duration: self.container_backend_timeout.to_string(),
2400                    });
2401                    self.writable(metrics)
2402                }
2403                // we have a complete answer and the start of a response, but the request was not tagged as terminated
2404                // for now we place responsibility of timeout on the backend in those cases, so we ignore this
2405                TimeoutStatus::Response => StateResult::Continue,
2406                // timeout in keep-alive, simply close the connection
2407                TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
2408            };
2409        }
2410
2411        if self.backend_token == Some(token) {
2412            //info!("backend timeout triggered for token {:?}", token);
2413            self.container_backend_timeout.triggered();
2414            return match self.timeout_status() {
2415                TimeoutStatus::Request => {
2416                    error!(
2417                        "{} got backend timeout while waiting for a request, this should not happen",
2418                        log_context!(self)
2419                    );
2420                    // Operator-visible cause is the same as the regular
2421                    // backend-timeout arm — backend silent before we
2422                    // could send the request. The error! above keeps the
2423                    // internal-invariant signal for sozu maintainers.
2424                    self.context.access_log_message = Some("backend_timeout");
2425                    self.set_answer(DefaultAnswer::Answer504 {
2426                        duration: self.container_backend_timeout.to_string(),
2427                    });
2428                    self.writable(metrics)
2429                }
2430                TimeoutStatus::WaitingForResponse => {
2431                    self.context.access_log_message = Some("backend_timeout");
2432                    self.set_answer(DefaultAnswer::Answer504 {
2433                        duration: self.container_backend_timeout.to_string(),
2434                    });
2435                    self.writable(metrics)
2436                }
2437                TimeoutStatus::Response => {
2438                    error!(
2439                        "backend {:?} timeout while receiving response (cluster {:?})",
2440                        self.context.backend_id, self.context.cluster_id
2441                    );
2442                    self.context.access_log_message = Some("backend_response_timeout");
2443                    StateResult::CloseSession
2444                }
2445                // in keep-alive, we place responsibility of timeout on the frontend, so we ignore this
2446                TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
2447            };
2448        }
2449
2450        error!("{} Got timeout for an invalid token", log_context!(self));
2451        StateResult::CloseSession
2452    }
2453
2454    fn cancel_timeouts(&mut self) {
2455        self.container_backend_timeout.cancel();
2456        self.container_frontend_timeout.cancel();
2457    }
2458
2459    fn print_state(&self, context: &str) {
2460        error!(
2461            "\
2462{} {} Session(Kawa)
2463\tFrontend:
2464\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2465\tBackend:
2466\t\ttoken: {:?}\treadiness: {:?}",
2467            log_context!(self),
2468            context,
2469            self.frontend_token,
2470            self.frontend_readiness,
2471            self.request_stream.parsing_phase,
2472            self.backend_token,
2473            self.backend_readiness,
2474            // self.response_stream.parsing_phase
2475        );
2476    }
2477
2478    fn shutting_down(&mut self) -> SessionIsToBeClosed {
2479        if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2480        // && self.response_stream.storage.is_empty()
2481        {
2482            true
2483        } else {
2484            self.context.closing = true;
2485            false
2486        }
2487    }
2488}
2489
2490fn handle_connection_result(
2491    connection_result: Result<BackendConnectAction, BackendConnectionError>,
2492) -> Option<SessionResult> {
2493    match connection_result {
2494        // reuse connection or send a default answer, we can continue
2495        Ok(BackendConnectAction::Reuse) => None,
2496        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2497            // we must wait for an event
2498            Some(SessionResult::Continue)
2499        }
2500        Err(_) => {
2501            // All BackendConnectionError already set a default answer
2502            // the session must continue to serve it
2503            // - NotFound: not used for http (only tcp)
2504            // - RetrieveClusterError: 301/400/401/404,
2505            // - MaxConnectionRetries: 503,
2506            // - Backend: 503,
2507            // - MaxSessionsMemory: not checked in connect_to_backend (TODO: check it?)
2508            None
2509        }
2510    }
2511}
2512
2513/// Save the HTTP status code of the backend response.
2514///
2515/// Emits two counters per response with a known status:
2516///
2517/// 1. The bucket counter `http.status.{1xx,…,5xx,other}` (always),
2518/// 2. A per-code counter `http.status.{200,301,…}` when the code is on the
2519///    short-list maintained by [`crate::metrics::http_status_code_metric_name`].
2520///
2521/// Status-less responses still emit `http.status.none` upstream of this
2522/// function (see `generate_access_log` for the H2 path); the H1 entry point
2523/// only carries `Some(status)` so the `else` branch is unnecessary here.
2524fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2525    if let Some(status) = status {
2526        // Every status reaching this bucketer originates either from a parsed
2527        // backend response status line or a validated answer template, both of
2528        // which are 3-digit HTTP codes. A value outside `100..=999` would fall
2529        // into the `STATUS_OTHER` catch-all below and signal a status that was
2530        // never a real HTTP code — a logic bug upstream, not hostile traffic.
2531        debug_assert!(
2532            (100..=999).contains(&status),
2533            "save_http_status_metric got a non-3-digit status: {status}"
2534        );
2535        match status {
2536            100..=199 => {
2537                incr!(
2538                    names::http::STATUS_1XX,
2539                    context.cluster_id,
2540                    context.backend_id
2541                );
2542            }
2543            200..=299 => {
2544                incr!(
2545                    names::http::STATUS_2XX,
2546                    context.cluster_id,
2547                    context.backend_id
2548                );
2549            }
2550            300..=399 => {
2551                incr!(
2552                    names::http::STATUS_3XX,
2553                    context.cluster_id,
2554                    context.backend_id
2555                );
2556            }
2557            400..=499 => {
2558                incr!(
2559                    names::http::STATUS_4XX,
2560                    context.cluster_id,
2561                    context.backend_id
2562                );
2563            }
2564            500..=599 => {
2565                incr!(
2566                    names::http::STATUS_5XX,
2567                    context.cluster_id,
2568                    context.backend_id
2569                );
2570            }
2571            _ => {
2572                // http responses with other codes (protocol error)
2573                incr!(names::http::STATUS_OTHER);
2574            }
2575        }
2576
2577        if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
2578            incr!(per_code, context.cluster_id, context.backend_id);
2579        }
2580    }
2581}