Skip to main content

sozu_lib/protocol/kawa_h1/
mod.rs

1//! HTTP/1.1 session state on top of Kawa.
2//!
3//! Owns the H1 session state machine: Kawa-driven request/response parsing,
4//! header rewriting via `editor.rs`, default-answer rendering via
5//! `answers.rs`, and route lookup via the listener
6//! (`cluster_id_from_request`). Long-form lifecycle:
7//! `lib/src/protocol/kawa_h1/LIFECYCLE.md` (created in this changeset).
8
9pub mod answers;
10pub mod diagnostics;
11pub mod editor;
12pub mod parser;
13
14use std::{
15    cell::RefCell,
16    io::ErrorKind,
17    net::{Shutdown, SocketAddr},
18    rc::{Rc, Weak},
19    time::{Duration, Instant},
20};
21
22use mio::{Interest, Token, net::TcpStream};
23use rusty_ulid::Ulid;
24use sozu_command::{
25    config::MAX_LOOP_ITERATIONS,
26    logging::EndpointRecord,
27    proto::command::{Event, EventKind, ListenerType},
28};
29
30// use time::{Duration, Instant};
31use crate::metrics::names;
32use crate::{
33    AcceptError, BackendConnectAction, BackendConnectionError, BackendConnectionStatus,
34    FrontendFromRequestError, L7ListenerHandler, L7Proxy, ListenerHandler, Protocol, ProxySession,
35    Readiness, RetrieveClusterError, SessionIsToBeClosed, SessionMetrics, SessionResult,
36    StateResult,
37    backends::{Backend, BackendError},
38    pool::{Checkout, Pool},
39    protocol::{
40        SessionState,
41        http::{
42            answers::DefaultAnswerStream,
43            diagnostics::{diagnostic_400_502, diagnostic_413_507},
44            editor::HttpContext,
45            parser::Method,
46        },
47        pipe::WebSocketContext,
48    },
49    retry::RetryPolicy,
50    server::{CONN_RETRIES, push_event},
51    socket::{SocketHandler, SocketResult, TransportProtocol, stats::socket_rtt},
52    sozu_command::{
53        logging::{LogContext, ansi_palette},
54        ready::Ready,
55    },
56    timer::TimeoutContainer,
57};
58
59/// This macro is defined uniquely in this module to help the tracking of
60/// kawa h1 issues inside Sōzu. Colored output: bold bright-white protocol
61/// label (uniform across every protocol), light grey `Session` keyword, gray
62/// keys, bright white values. The `[ulid - - -]` context comes first to stay
63/// aligned with `MUX-*` and `SOCKET` log lines.
64///
65/// Two invocation forms:
66/// - `log_context!(self)` — reads the response parsing phase via
67///   [`Http::response_parsing_phase`]. Use when no `&mut self.response_stream`
68///   borrow is currently live.
69/// - `log_context!(self, response_phase)` — caller supplies an already-resolved
70///   `Option<kawa::ParsingPhase>`. Use when a mutable borrow of
71///   `self.response_stream` (or its `BackendAnswer` inner) is live — pass
72///   `Some(inner.parsing_phase)` through the existing mutable reference.
73macro_rules! log_context {
74    ($self:expr) => {
75        log_context!($self, $self.response_parsing_phase())
76    };
77    ($self:expr, $response_phase:expr) => {{
78        let (open, reset, grey, gray, white) = ansi_palette();
79        format!(
80            "{gray}{ctx}{reset}\t{open}KAWA-H1{reset}\t{grey}Session{reset}({gray}public{reset}={white}{public}{reset}, {gray}session{reset}={white}{session}{reset}, {gray}frontend{reset}={white}{frontend}{reset}, {gray}request_parsing_phase{reset}={white}{request_parsing_phase:?}{reset}, {gray}response_parsing_phase{reset}={white}{response_parsing_phase:?}{reset}, {gray}frontend_readiness{reset}={white}{frontend_readiness}{reset}, {gray}backend{reset}={white}{backend}{reset}, {gray}backend_readiness{reset}={white}{backend_readiness}{reset})\t >>>",
81            open = open,
82            reset = reset,
83            grey = grey,
84            gray = gray,
85            white = white,
86            ctx = $self.context.log_context(),
87            public = $self.context.public_address.to_string(),
88            session = $self.context.session_address.map(|addr| addr.to_string()).unwrap_or_else(|| "<none>".to_string()),
89            frontend = $self.frontend_token.0,
90            request_parsing_phase = $self.request_stream.parsing_phase,
91            response_parsing_phase = $response_phase,
92            frontend_readiness = $self.frontend_readiness,
93            backend = $self.backend_token.map(|token| token.0.to_string()).unwrap_or_else(|| "<none>".to_string()),
94            backend_readiness = $self.backend_readiness,
95        )
96    }};
97}
98
99/// Generic Http representation using the Kawa crate using the Checkout of Sozu as buffer
100type GenericHttpStream = kawa::Kawa<Checkout>;
101
102impl kawa::AsBuffer for Checkout {
103    fn as_buffer(&self) -> &[u8] {
104        self.inner.extra()
105    }
106    fn as_mut_buffer(&mut self) -> &mut [u8] {
107        self.inner.extra_mut()
108    }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
112pub enum DefaultAnswer {
113    Answer301 {
114        location: String,
115    },
116    /// RFC 9110 §15.4.3 — temporary redirect; user agents may rewrite POST
117    /// to GET on follow. Surfaced by `RedirectPolicy::Found` (#1009).
118    Answer302 {
119        location: String,
120    },
121    /// RFC 9110 §15.4.9 — permanent redirect that PRESERVES the request
122    /// method on follow (no GET-rewrite on POST). Surfaced by
123    /// `RedirectPolicy::PermanentRedirect` (#1009).
124    Answer308 {
125        location: String,
126    },
127    Answer400 {
128        message: String,
129        phase: kawa::ParsingPhaseMarker,
130        successfully_parsed: String,
131        partially_parsed: String,
132        invalid: String,
133    },
134    Answer401 {
135        /// Optional `WWW-Authenticate` value (e.g. `Basic realm="…"`).
136        /// When `None` or empty the realm header line is elided by the
137        /// template engine so the response stays a bare 401.
138        www_authenticate: Option<String>,
139    },
140    Answer404 {},
141    Answer408 {
142        duration: String,
143    },
144    Answer413 {
145        message: String,
146        phase: kawa::ParsingPhaseMarker,
147        capacity: usize,
148    },
149    /// RFC 9110 §15.5.20 — returned when the request's `:authority` / `Host`
150    /// host does not match the TLS SNI negotiated for this connection.
151    /// The peer may retry on a fresh TLS connection that negotiates an SNI
152    /// matching the authority.
153    Answer421 {},
154    /// RFC 6585 §4 — emitted when the per-(cluster, source-IP) connection
155    /// limit is reached. `retry_after` is the suggested wait, in seconds;
156    /// `None` (or `Some(0)`) tells the template engine to omit the header
157    /// — `Retry-After: 0` invites an immediate retry that defeats the
158    /// limit.
159    Answer429 {
160        retry_after: Option<u32>,
161    },
162    Answer502 {
163        message: String,
164        phase: kawa::ParsingPhaseMarker,
165        successfully_parsed: String,
166        partially_parsed: String,
167        invalid: String,
168    },
169    Answer503 {
170        message: String,
171    },
172    Answer504 {
173        duration: String,
174    },
175    Answer507 {
176        phase: kawa::ParsingPhaseMarker,
177        message: String,
178        capacity: usize,
179    },
180}
181
182impl From<&DefaultAnswer> for u16 {
183    fn from(answer: &DefaultAnswer) -> u16 {
184        match answer {
185            DefaultAnswer::Answer301 { .. } => 301,
186            DefaultAnswer::Answer302 { .. } => 302,
187            DefaultAnswer::Answer308 { .. } => 308,
188            DefaultAnswer::Answer400 { .. } => 400,
189            DefaultAnswer::Answer401 { .. } => 401,
190            DefaultAnswer::Answer404 { .. } => 404,
191            DefaultAnswer::Answer408 { .. } => 408,
192            DefaultAnswer::Answer413 { .. } => 413,
193            DefaultAnswer::Answer421 { .. } => 421,
194            DefaultAnswer::Answer429 { .. } => 429,
195            DefaultAnswer::Answer502 { .. } => 502,
196            DefaultAnswer::Answer503 { .. } => 503,
197            DefaultAnswer::Answer504 { .. } => 504,
198            DefaultAnswer::Answer507 { .. } => 507,
199        }
200    }
201}
202
203#[derive(Debug, Clone, Copy, PartialEq, Eq)]
204pub enum TimeoutStatus {
205    Request,
206    Response,
207    WaitingForNewRequest,
208    WaitingForResponse,
209}
210
211pub enum ResponseStream {
212    BackendAnswer(GenericHttpStream),
213    DefaultAnswer(u16, DefaultAnswerStream),
214}
215
216/// Http will be contained in State which itself is contained by Session
217pub struct Http<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> {
218    answers: Rc<RefCell<answers::HttpAnswers>>,
219    pub backend: Option<Rc<RefCell<Backend>>>,
220    backend_connection_status: BackendConnectionStatus,
221    pub backend_readiness: Readiness,
222    pub backend_socket: Option<TcpStream>,
223    backend_stop: Option<Instant>,
224    pub backend_token: Option<Token>,
225    pub container_backend_timeout: TimeoutContainer,
226    pub container_frontend_timeout: TimeoutContainer,
227    configured_backend_timeout: Duration,
228    configured_connect_timeout: Duration,
229    configured_frontend_timeout: Duration,
230    /// attempts to connect to the backends during the session
231    connection_attempts: u8,
232    pub frontend_readiness: Readiness,
233    pub frontend_socket: Front,
234    frontend_token: Token,
235    keepalive_count: usize,
236    listener: Rc<RefCell<L>>,
237    pub request_stream: GenericHttpStream,
238    pub response_stream: ResponseStream,
239    /// The HTTP context was separated from the State for borrowing reasons.
240    /// Calling a kawa parser mutably borrows the State through request_stream or response_stream,
241    /// so Http can't be borrowed again to be used in callbacks. HttContext is an independant
242    /// subsection of Http that can be mutably borrowed for parser callbacks.
243    pub context: HttpContext,
244}
245
246impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
247    /// Instantiate a new HTTP SessionState with:
248    ///
249    /// - frontend_interest: READABLE | HUP | ERROR
250    /// - frontend_event: EMPTY
251    /// - backend_interest: EMPTY
252    /// - backend_event: EMPTY
253    ///
254    /// Remember to set the events from the previous State!
255    #[allow(clippy::too_many_arguments)]
256    pub fn new(
257        answers: Rc<RefCell<answers::HttpAnswers>>,
258        configured_backend_timeout: Duration,
259        configured_connect_timeout: Duration,
260        configured_frontend_timeout: Duration,
261        container_frontend_timeout: TimeoutContainer,
262        frontend_socket: Front,
263        frontend_token: Token,
264        listener: Rc<RefCell<L>>,
265        pool: Weak<RefCell<Pool>>,
266        protocol: Protocol,
267        public_address: SocketAddr,
268        session_id: Ulid,
269        request_id: Ulid,
270        session_address: Option<SocketAddr>,
271        sticky_name: String,
272    ) -> Result<Http<Front, L>, AcceptError> {
273        let (front_buffer, back_buffer) = match pool.upgrade() {
274            Some(pool) => {
275                let mut pool = pool.borrow_mut();
276                match (pool.checkout(), pool.checkout()) {
277                    (Some(front_buffer), Some(back_buffer)) => (front_buffer, back_buffer),
278                    _ => return Err(AcceptError::BufferCapacityReached),
279                }
280            }
281            None => return Err(AcceptError::BufferCapacityReached),
282        };
283        let sozu_id_header = listener.borrow().get_sozu_id_header().to_string();
284        let elide_x_real_ip = listener.borrow().get_elide_x_real_ip();
285        let send_x_real_ip = listener.borrow().get_send_x_real_ip();
286        Ok(Http {
287            answers,
288            backend_connection_status: BackendConnectionStatus::NotConnected,
289            backend_readiness: Readiness::new(),
290            backend_socket: None,
291            backend_stop: None,
292            backend_token: None,
293            backend: None,
294            configured_backend_timeout,
295            configured_connect_timeout,
296            configured_frontend_timeout,
297            connection_attempts: 0,
298            container_backend_timeout: TimeoutContainer::new_empty(configured_connect_timeout),
299            container_frontend_timeout,
300            frontend_readiness: Readiness {
301                interest: Ready::READABLE | Ready::HUP | Ready::ERROR,
302                event: Ready::EMPTY,
303            },
304            frontend_socket,
305            frontend_token,
306            keepalive_count: 0,
307            listener,
308            request_stream: GenericHttpStream::new(
309                kawa::Kind::Request,
310                kawa::Buffer::new(front_buffer),
311            ),
312            response_stream: ResponseStream::BackendAnswer(GenericHttpStream::new(
313                kawa::Kind::Response,
314                kawa::Buffer::new(back_buffer),
315            )),
316            context: HttpContext::new(
317                session_id,
318                request_id,
319                protocol,
320                public_address,
321                session_address,
322                sticky_name,
323                sozu_id_header,
324                elide_x_real_ip,
325                send_x_real_ip,
326            ),
327        })
328    }
329
330    /// Reset the connection in case of keep-alive to be ready for the next request
331    pub fn reset(&mut self) {
332        trace!("{} ============== reset", log_context!(self));
333        let response_stream = match &mut self.response_stream {
334            ResponseStream::BackendAnswer(response_stream) => response_stream,
335            _ => return,
336        };
337
338        self.context.id = Ulid::generate();
339        self.context.reset();
340
341        self.request_stream.clear();
342        response_stream.clear();
343        self.keepalive_count += 1;
344        gauge_add!(names::http::ACTIVE_REQUESTS, -1);
345
346        if let Some(backend) = &mut self.backend {
347            let mut backend = backend.borrow_mut();
348            backend.active_requests = backend.active_requests.saturating_sub(1);
349        }
350
351        // reset the front timeout and cancel the back timeout while we are
352        // waiting for a new request
353        self.container_backend_timeout.cancel();
354        self.container_frontend_timeout
355            .set_duration(self.configured_frontend_timeout);
356        self.frontend_readiness.interest = Ready::READABLE | Ready::HUP | Ready::ERROR;
357        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
358
359        // We are resetting the offset of request and response stream buffers
360        // We do have to keep cursor position on the request, if there is data
361        // in the request stream to preserve http pipelining.
362
363        // Print the left-over response buffer output to track in which case it
364        // may happens
365        let response_phase = response_stream.parsing_phase;
366        let response_storage = &mut response_stream.storage;
367        if !response_storage.is_empty() {
368            warn!(
369                "{} Leftover fragment from response: {}",
370                log_context!(self, Some(response_phase)),
371                parser::view(
372                    response_storage.used(),
373                    16,
374                    &[response_storage.start, response_storage.end,],
375                )
376            );
377        }
378
379        response_storage.clear();
380        if !self.request_stream.storage.is_empty() {
381            self.frontend_readiness.event.insert(Ready::READABLE);
382        } else {
383            self.request_stream.storage.clear();
384        }
385    }
386
387    pub fn readable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
388        trace!("{} ============== readable", log_context!(self));
389        if !self.container_frontend_timeout.reset() {
390            error!(
391                "could not reset front timeout {:?}",
392                self.configured_frontend_timeout
393            );
394            self.print_state(self.protocol_string());
395        }
396
397        let response_stream = match &mut self.response_stream {
398            ResponseStream::BackendAnswer(response_stream) => response_stream,
399            ResponseStream::DefaultAnswer(..) => {
400                error!(
401                    "{} Sending default answer, should not read from frontend socket",
402                    log_context!(self)
403                );
404
405                self.frontend_readiness.interest.remove(Ready::READABLE);
406                self.frontend_readiness.interest.insert(Ready::WRITABLE);
407                return StateResult::Continue;
408            }
409        };
410
411        if self.request_stream.storage.is_full() {
412            self.frontend_readiness.interest.remove(Ready::READABLE);
413            if self.request_stream.is_main_phase() {
414                self.backend_readiness.interest.insert(Ready::WRITABLE);
415            } else {
416                // client has filled its buffer and we can't empty it
417                self.set_answer(DefaultAnswer::Answer413 {
418                    capacity: self.request_stream.storage.capacity(),
419                    phase: self.request_stream.parsing_phase.marker(),
420                    message: diagnostic_413_507(self.request_stream.parsing_phase),
421                });
422            }
423            return StateResult::Continue;
424        }
425
426        let (size, socket_state) = self
427            .frontend_socket
428            .socket_read(self.request_stream.storage.space());
429
430        debug!(
431            "{} Read {} bytes",
432            log_context!(self, Some(response_stream.parsing_phase)),
433            size
434        );
435
436        if size > 0 {
437            self.request_stream.storage.fill(size);
438            count!(names::backend::BYTES_IN, size as i64);
439            metrics.bin += size;
440            // if self.kawa_request.storage.is_full() {
441            //     self.frontend_readiness.interest.remove(Ready::READABLE);
442            // }
443        } else {
444            self.frontend_readiness.event.remove(Ready::READABLE);
445        }
446
447        match socket_state {
448            SocketResult::Error | SocketResult::Closed => {
449                if self.request_stream.is_initial() {
450                    // count an error if we were waiting for the first request
451                    // otherwise, if we already had one completed request and response,
452                    // and are waiting for the next one, we do not count a socket
453                    // closing abruptly as an error
454                    if self.keepalive_count == 0 {
455                        self.frontend_socket.read_error();
456                    }
457                } else {
458                    self.frontend_socket.read_error();
459                    self.log_request_error(
460                        metrics,
461                        &format!(
462                            "front socket {socket_state:?}, closing the session. Readiness: {:?} -> {:?}, read {size} bytes",
463                            self.frontend_readiness,
464                            self.backend_readiness,
465                        )
466                    );
467                }
468                return StateResult::CloseSession;
469            }
470            SocketResult::WouldBlock => {
471                self.frontend_readiness.event.remove(Ready::READABLE);
472            }
473            SocketResult::Continue => {}
474        };
475
476        trace!(
477            "{} ============== readable_parse",
478            log_context!(self, Some(response_stream.parsing_phase))
479        );
480        let was_initial = self.request_stream.is_initial();
481        let was_not_proxying = !self.request_stream.is_main_phase();
482
483        kawa::h1::parse(&mut self.request_stream, &mut self.context);
484        // kawa::debug_kawa(&self.request_stream);
485
486        if was_initial && !self.request_stream.is_initial() {
487            // if it was the first request, the front timeout duration
488            // was set to request_timeout, which is much lower. For future
489            // requests on this connection, we can wait a bit more
490            self.container_frontend_timeout
491                .set_duration(self.configured_frontend_timeout);
492            gauge_add!(names::http::ACTIVE_REQUESTS, 1);
493            incr!(names::http::REQUESTS);
494        }
495
496        if let kawa::ParsingPhase::Error { marker, kind } = self.request_stream.parsing_phase {
497            incr!(names::http::FRONTEND_PARSE_ERRORS);
498            warn!(
499                "{} Parsing request error in {:?}: {}",
500                log_context!(self, Some(response_stream.parsing_phase)),
501                marker,
502                match kind {
503                    kawa::ParsingErrorKind::Consuming { index } => {
504                        let kawa = &self.request_stream;
505                        parser::view(
506                            kawa.storage.used(),
507                            16,
508                            &[
509                                kawa.storage.start,
510                                kawa.storage.head,
511                                index as usize,
512                                kawa.storage.end,
513                            ],
514                        )
515                    }
516                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
517                }
518            );
519            if response_stream.consumed {
520                self.log_request_error(metrics, "Parsing error on the request");
521                return StateResult::CloseSession;
522            } else {
523                let (message, successfully_parsed, partially_parsed, invalid) =
524                    diagnostic_400_502(marker, kind, &self.request_stream);
525                self.set_answer(DefaultAnswer::Answer400 {
526                    message,
527                    phase: marker,
528                    successfully_parsed,
529                    partially_parsed,
530                    invalid,
531                });
532                return StateResult::Continue;
533            }
534        }
535
536        if self.request_stream.is_main_phase() {
537            self.backend_readiness.interest.insert(Ready::WRITABLE);
538            if was_not_proxying {
539                // Sozu tries to connect only once all the headers were gathered and edited
540                // this could be improved
541                trace!("{} ============== HANDLE CONNECTION!", log_context!(self));
542                return StateResult::ConnectBackend;
543            }
544        }
545        if self.request_stream.is_terminated() {
546            self.frontend_readiness.interest.remove(Ready::READABLE);
547        }
548
549        StateResult::Continue
550    }
551
552    pub fn writable(&mut self, metrics: &mut SessionMetrics) -> StateResult {
553        trace!("{} ============== writable", log_context!(self));
554        let response_stream = match &mut self.response_stream {
555            ResponseStream::BackendAnswer(response_stream) => response_stream,
556            _ => return self.writable_default_answer(metrics),
557        };
558
559        response_stream.prepare(&mut kawa::h1::BlockConverter);
560
561        let bufs = response_stream.as_io_slice();
562        if bufs.is_empty() && !self.frontend_socket.socket_wants_write() {
563            self.frontend_readiness.interest.remove(Ready::WRITABLE);
564            // do not shortcut, response might have been terminated without anything more to send
565        }
566
567        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
568
569        debug!(
570            "{} Wrote {} bytes",
571            log_context!(self, Some(response_stream.parsing_phase)),
572            size
573        );
574
575        if size > 0 {
576            response_stream.consume(size);
577            count!(names::backend::BYTES_OUT, size as i64);
578            metrics.bout += size;
579            self.backend_readiness.interest.insert(Ready::READABLE);
580        }
581
582        match socket_state {
583            SocketResult::Error | SocketResult::Closed => {
584                self.frontend_socket.write_error();
585                self.log_request_error(
586                    metrics,
587                    &format!(
588                        "front socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
589                        self.frontend_readiness,
590                        self.backend_readiness,
591                    ),
592                );
593                return StateResult::CloseSession;
594            }
595            SocketResult::WouldBlock => {
596                self.frontend_readiness.event.remove(Ready::WRITABLE);
597            }
598            SocketResult::Continue => {}
599        }
600
601        if self.frontend_socket.socket_wants_write() {
602            return StateResult::Continue;
603        }
604
605        if response_stream.is_terminated() && response_stream.is_completed() {
606            if self.context.closing {
607                debug!("{} closing proxy, no keep alive", log_context!(self));
608                self.log_request_success(metrics);
609                return StateResult::CloseSession;
610            }
611
612            match response_stream.detached.status_line {
613                kawa::StatusLine::Response { code: 101, .. } => {
614                    trace!("{} ============== HANDLE UPGRADE!", log_context!(self));
615                    self.log_request_success(metrics);
616                    return StateResult::Upgrade;
617                }
618                kawa::StatusLine::Response { code: 100, .. } => {
619                    trace!(
620                        "{} ============== HANDLE CONTINUE!",
621                        log_context!(self, Some(response_stream.parsing_phase))
622                    );
623                    response_stream.clear();
624                    self.log_request_success(metrics);
625                    return StateResult::Continue;
626                }
627                kawa::StatusLine::Response { code: 103, .. } => {
628                    self.backend_readiness.event.insert(Ready::READABLE);
629                    trace!(
630                        "{} ============== HANDLE EARLY HINT!",
631                        log_context!(self, Some(response_stream.parsing_phase))
632                    );
633                    response_stream.clear();
634                    self.log_request_success(metrics);
635                    return StateResult::Continue;
636                }
637                _ => (),
638            }
639
640            let response_length_known = response_stream.body_size != kawa::BodySize::Empty;
641            let request_length_known = self.request_stream.body_size != kawa::BodySize::Empty;
642            if !(self.request_stream.is_terminated() && self.request_stream.is_completed())
643                && request_length_known
644            {
645                error!(
646                    "{} Response terminated before request, this case is not handled properly yet",
647                    log_context!(self)
648                );
649                incr!(names::http::EARLY_RESPONSE_CLOSE);
650                // FIXME: this will cause problems with pipelining
651                // return StateResult::CloseSession;
652            }
653
654            // FIXME: we could get smarter about this
655            // with no keepalive on backend, we could open a new backend ConnectionError
656            // with no keepalive on front but keepalive on backend, we could have
657            // a pool of connections
658            trace!(
659                "{} ============== HANDLE KEEP-ALIVE: {} {} {}",
660                log_context!(self),
661                self.context.keep_alive_frontend,
662                self.context.keep_alive_backend,
663                response_length_known
664            );
665
666            self.log_request_success(metrics);
667            return match (
668                self.context.keep_alive_frontend,
669                self.context.keep_alive_backend,
670                response_length_known,
671            ) {
672                (true, true, true) => {
673                    debug!("{} Keep alive frontend/backend", log_context!(self));
674                    metrics.reset();
675                    self.reset();
676                    StateResult::Continue
677                }
678                (true, false, true) => {
679                    debug!("{} Keep alive frontend", log_context!(self));
680                    metrics.reset();
681                    self.reset();
682                    StateResult::CloseBackend
683                }
684                _ => {
685                    debug!("{} No keep alive", log_context!(self));
686                    StateResult::CloseSession
687                }
688            };
689        }
690        StateResult::Continue
691    }
692
693    fn writable_default_answer(&mut self, metrics: &mut SessionMetrics) -> StateResult {
694        trace!(
695            "{} ============== writable_default_answer",
696            log_context!(self)
697        );
698        let response_stream = match &mut self.response_stream {
699            ResponseStream::DefaultAnswer(_, response_stream) => response_stream,
700            _ => return StateResult::CloseSession,
701        };
702        let bufs = response_stream.as_io_slice();
703        let (size, socket_state) = self.frontend_socket.socket_write_vectored(&bufs);
704
705        count!(names::backend::BYTES_OUT, size as i64);
706        metrics.bout += size;
707        response_stream.consume(size);
708
709        if size == 0 || socket_state != SocketResult::Continue {
710            self.frontend_readiness.event.remove(Ready::WRITABLE);
711        }
712
713        if response_stream.is_completed() {
714            save_http_status_metric(self.context.status, self.context.log_context());
715            self.log_default_answer_success(metrics);
716            self.frontend_readiness.reset();
717            self.backend_readiness.reset();
718            return StateResult::CloseSession;
719        }
720
721        if socket_state == SocketResult::Error {
722            self.frontend_socket.write_error();
723            self.log_request_error(
724                metrics,
725                "error writing default answer to front socket, closing",
726            );
727            StateResult::CloseSession
728        } else {
729            StateResult::Continue
730        }
731    }
732
733    pub fn backend_writable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
734        trace!("{} ============== backend_writable", log_context!(self));
735        if let ResponseStream::DefaultAnswer(..) = self.response_stream {
736            error!(
737                "{}\tsending default answer, should not write to back",
738                log_context!(self)
739            );
740            self.backend_readiness.interest.remove(Ready::WRITABLE);
741            self.frontend_readiness.interest.insert(Ready::WRITABLE);
742            return SessionResult::Continue;
743        }
744
745        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
746            backend_socket
747        } else {
748            self.log_request_error(metrics, "back socket not found, closing session");
749            return SessionResult::Close;
750        };
751
752        self.request_stream.prepare(&mut kawa::h1::BlockConverter);
753
754        let bufs = self.request_stream.as_io_slice();
755        if bufs.is_empty() {
756            self.backend_readiness.interest.remove(Ready::WRITABLE);
757            return SessionResult::Continue;
758        }
759
760        let (size, socket_state) = backend_socket.socket_write_vectored(&bufs);
761        debug!("{} Wrote {} bytes", log_context!(self), size);
762
763        if size > 0 {
764            self.request_stream.consume(size);
765            count!(names::backend::BACK_BYTES_OUT, size as i64);
766            metrics.backend_bout += size;
767            self.frontend_readiness.interest.insert(Ready::READABLE);
768            self.backend_readiness.interest.insert(Ready::READABLE);
769        } else {
770            self.backend_readiness.event.remove(Ready::WRITABLE);
771        }
772
773        match socket_state {
774            // the back socket is not writable anymore, so we can drop
775            // the front buffer, no more data can be transmitted.
776            // But the socket might still be readable, or if it is
777            // closed, we might still have some data in the buffer.
778            // As an example, we can get an early response for a large
779            // POST request to refuse it and prevent all of the data
780            // from being transmitted, with the backend server closing
781            // the socket right after sending the response
782            // FIXME: shouldn't we test for response_state then?
783            SocketResult::Error | SocketResult::Closed => {
784                self.frontend_readiness.interest.remove(Ready::READABLE);
785                self.backend_readiness.interest.remove(Ready::WRITABLE);
786                return SessionResult::Continue;
787            }
788            SocketResult::WouldBlock => {
789                self.backend_readiness.event.remove(Ready::WRITABLE);
790            }
791            SocketResult::Continue => {}
792        }
793
794        if self.request_stream.is_terminated() && self.request_stream.is_completed() {
795            self.backend_readiness.interest.remove(Ready::WRITABLE);
796
797            // cancel the front timeout while we are waiting for the server to answer
798            self.container_frontend_timeout.cancel();
799            self.container_backend_timeout.reset();
800        }
801        SessionResult::Continue
802    }
803
804    // Read content from cluster
805    pub fn backend_readable(&mut self, metrics: &mut SessionMetrics) -> SessionResult {
806        trace!("{} ============== backend_readable", log_context!(self));
807        if !self.container_backend_timeout.reset() {
808            error!(
809                "{} Could not reset back timeout {:?}",
810                log_context!(self),
811                self.configured_backend_timeout
812            );
813            self.print_state(self.protocol_string());
814        }
815
816        let response_stream = match &mut self.response_stream {
817            ResponseStream::BackendAnswer(response_stream) => response_stream,
818            _ => {
819                error!(
820                    "{} Sending default answer, should not read from backend socket",
821                    log_context!(self),
822                );
823
824                self.backend_readiness.interest.remove(Ready::READABLE);
825                self.frontend_readiness.interest.insert(Ready::WRITABLE);
826                return SessionResult::Continue;
827            }
828        };
829
830        let backend_socket = if let Some(backend_socket) = &mut self.backend_socket {
831            backend_socket
832        } else {
833            self.log_request_error(metrics, "back socket not found, closing session");
834            return SessionResult::Close;
835        };
836
837        if response_stream.storage.is_full() {
838            self.backend_readiness.interest.remove(Ready::READABLE);
839            if response_stream.is_main_phase() {
840                self.frontend_readiness.interest.insert(Ready::WRITABLE);
841            } else {
842                // server has filled its buffer and we can't empty it
843                let capacity = response_stream.storage.capacity();
844                let phase = response_stream.parsing_phase.marker();
845                let message = diagnostic_413_507(response_stream.parsing_phase);
846                self.set_answer(DefaultAnswer::Answer507 {
847                    capacity,
848                    phase,
849                    message,
850                });
851            }
852            return SessionResult::Continue;
853        }
854
855        let (size, socket_state) = backend_socket.socket_read(response_stream.storage.space());
856        debug!(
857            "{} Read {} bytes",
858            log_context!(self, Some(response_stream.parsing_phase)),
859            size
860        );
861
862        if size > 0 {
863            response_stream.storage.fill(size);
864            count!(names::backend::BACK_BYTES_IN, size as i64);
865            metrics.backend_bin += size;
866            // if self.kawa_response.storage.is_full() {
867            //     self.backend_readiness.interest.remove(Ready::READABLE);
868            // }
869
870            // In case the response starts while the request is not tagged as "terminated",
871            // we place the timeout responsibility on the backend.
872            // This can happen when:
873            // - the request is malformed and doesn't have length information
874            // - kawa fails to detect a properly terminated request (e.g. a GET request with no body and no length)
875            // - the response can start before the end of the request (e.g. stream processing like compression)
876            self.container_frontend_timeout.cancel();
877        } else {
878            self.backend_readiness.event.remove(Ready::READABLE);
879        }
880
881        // TODO: close delimited and backend_hup should be handled better
882        match socket_state {
883            SocketResult::Error => {
884                backend_socket.read_error();
885                self.log_request_error(
886                    metrics,
887                    &format!(
888                        "back socket {socket_state:?}, closing session.  Readiness: {:?} -> {:?}, read {size} bytes",
889                        self.frontend_readiness,
890                        self.backend_readiness,
891                    ),
892                );
893                return SessionResult::Close;
894            }
895            SocketResult::WouldBlock | SocketResult::Closed => {
896                self.backend_readiness.event.remove(Ready::READABLE);
897            }
898            SocketResult::Continue => {}
899        }
900
901        trace!(
902            "{} ============== backend_readable_parse",
903            log_context!(self, Some(response_stream.parsing_phase))
904        );
905        kawa::h1::parse(response_stream, &mut self.context);
906        // kawa::debug_kawa(&self.response_stream);
907
908        if let kawa::ParsingPhase::Error { marker, kind } = response_stream.parsing_phase {
909            incr!(names::http::BACKEND_PARSE_ERRORS);
910            warn!(
911                "{} Parsing response error in {:?}: {}",
912                log_context!(self, Some(response_stream.parsing_phase)),
913                marker,
914                match kind {
915                    kawa::ParsingErrorKind::Consuming { index } => {
916                        parser::view(
917                            response_stream.storage.used(),
918                            16,
919                            &[
920                                response_stream.storage.start,
921                                response_stream.storage.head,
922                                index as usize,
923                                response_stream.storage.end,
924                            ],
925                        )
926                    }
927                    kawa::ParsingErrorKind::Processing { message } => message.to_owned(),
928                }
929            );
930            if response_stream.consumed {
931                return SessionResult::Close;
932            } else {
933                let (message, successfully_parsed, partially_parsed, invalid) =
934                    diagnostic_400_502(marker, kind, response_stream);
935                self.set_answer(DefaultAnswer::Answer502 {
936                    message,
937                    phase: marker,
938                    successfully_parsed,
939                    partially_parsed,
940                    invalid,
941                });
942                return SessionResult::Continue;
943            }
944        }
945
946        if response_stream.is_main_phase() {
947            self.frontend_readiness.interest.insert(Ready::WRITABLE);
948        }
949        if response_stream.is_terminated() {
950            metrics.backend_stop();
951            self.backend_stop = Some(Instant::now());
952            self.backend_readiness.interest.remove(Ready::READABLE);
953        }
954        SessionResult::Continue
955    }
956}
957
958impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> Http<Front, L> {
959    fn log_endpoint(&self) -> EndpointRecord<'_> {
960        EndpointRecord::Http {
961            method: self.context.method.as_deref(),
962            authority: self.context.authority.as_deref(),
963            path: self.context.path.as_deref(),
964            reason: self.context.reason.as_deref(),
965            status: self.context.status,
966        }
967    }
968
969    /// Returns the kawa `ParsingPhase` of the response side, if a backend answer
970    /// has been accepted. Used by `log_context!` to surface H1 wedged-response
971    /// states without poking inside the `ResponseStream` enum at every log site.
972    fn response_parsing_phase(&self) -> Option<kawa::ParsingPhase> {
973        match &self.response_stream {
974            ResponseStream::BackendAnswer(inner) => Some(inner.parsing_phase),
975            _ => None,
976        }
977    }
978
979    pub fn get_session_address(&self) -> Option<SocketAddr> {
980        self.context
981            .session_address
982            .or_else(|| self.frontend_socket.socket_ref().peer_addr().ok())
983    }
984
985    pub fn get_backend_address(&self) -> Option<SocketAddr> {
986        self.backend
987            .as_ref()
988            .map(|backend| backend.borrow().address)
989            .or_else(|| {
990                self.backend_socket
991                    .as_ref()
992                    .and_then(|backend| backend.peer_addr().ok())
993            })
994    }
995
996    // The protocol name used in the access logs
997    fn protocol_string(&self) -> &'static str {
998        match self.context.protocol {
999            Protocol::HTTP => "HTTP",
1000            Protocol::HTTPS => match self.frontend_socket.protocol() {
1001                TransportProtocol::Ssl2 => "HTTPS-SSL2",
1002                TransportProtocol::Ssl3 => "HTTPS-SSL3",
1003                TransportProtocol::Tls1_0 => "HTTPS-TLS1.0",
1004                TransportProtocol::Tls1_1 => "HTTPS-TLS1.1",
1005                TransportProtocol::Tls1_2 => "HTTPS-TLS1.2",
1006                TransportProtocol::Tls1_3 => "HTTPS-TLS1.3",
1007                _ => unreachable!(),
1008            },
1009            _ => unreachable!(),
1010        }
1011    }
1012
1013    /// Format the context of the websocket into a loggable String
1014    pub fn websocket_context(&self) -> WebSocketContext {
1015        WebSocketContext::Http {
1016            method: self.context.method.clone(),
1017            authority: self.context.authority.clone(),
1018            path: self.context.path.clone(),
1019            reason: self.context.reason.clone(),
1020            status: self.context.status,
1021        }
1022    }
1023
1024    pub fn log_request(&self, metrics: &SessionMetrics, error: bool, message: Option<&str>) {
1025        let listener = self.listener.borrow();
1026        let tags = self.context.authority.as_ref().and_then(|host| {
1027            let hostname = match host.split_once(':') {
1028                None => host,
1029                Some((hostname, _)) => hostname,
1030            };
1031            listener.get_tags(hostname)
1032        });
1033
1034        let context = self.context.log_context();
1035        metrics.register_end_of_session(&context);
1036
1037        log_access! {
1038            error,
1039            on_failure: { incr!(names::access_logs::UNSENT) },
1040            message,
1041            context,
1042            session_address: self.get_session_address(),
1043            backend_address: self.get_backend_address(),
1044            protocol: self.protocol_string(),
1045            endpoint: self.log_endpoint(),
1046            tags,
1047            client_rtt: socket_rtt(self.front_socket()),
1048            server_rtt: self.backend_socket.as_ref().and_then(socket_rtt),
1049            service_time: metrics.service_time(),
1050            response_time: metrics.backend_response_time(),
1051            request_time: metrics.request_time(),
1052            bytes_in: metrics.bin,
1053            bytes_out: metrics.bout,
1054            user_agent: self.context.user_agent.as_deref(),
1055            x_request_id: self.context.x_request_id.as_deref(),
1056            tls_version: self.context.tls_version,
1057            tls_cipher: self.context.tls_cipher,
1058            tls_sni: self.context.tls_server_name.as_deref(),
1059            tls_alpn: self.context.tls_alpn,
1060            xff_chain: self.context.xff_chain.as_deref(),
1061            #[cfg(feature = "opentelemetry")]
1062            otel: self.context.otel.as_ref(),
1063            #[cfg(not(feature = "opentelemetry"))]
1064            otel: None,
1065        };
1066    }
1067
1068    pub fn log_request_success(&self, metrics: &SessionMetrics) {
1069        save_http_status_metric(self.context.status, self.context.log_context());
1070        self.log_request(metrics, false, None);
1071    }
1072
1073    pub fn log_default_answer_success(&self, metrics: &SessionMetrics) {
1074        // Surface the timeout discriminator (set by `timeout()` before
1075        // `set_answer`) on the access-log `message` field. Non-timeout
1076        // default-answer paths leave `access_log_message` as `None` and
1077        // emit `message: None` exactly as before.
1078        self.log_request(metrics, false, self.context.access_log_message);
1079    }
1080    pub fn log_request_error(&self, metrics: &mut SessionMetrics, message: &str) {
1081        // Labelled with `(cluster_id, backend_id)` so per-cluster dashboards
1082        // can attribute error rates. The labels are dropped centrally by
1083        // `filter_labels_for_detail` (`metrics/mod.rs`) when
1084        // `metrics.detail` is `process` / `frontend` / `cluster`, so default
1085        // setups still see the same proxy-wide counter — no double-counting,
1086        // no surprise cardinality.
1087        incr!(
1088            "http.errors",
1089            self.context.cluster_id.as_deref(),
1090            self.context.backend_id.as_deref()
1091        );
1092        error!(
1093            "{} Could not process request properly got: {}",
1094            log_context!(self),
1095            message
1096        );
1097        self.print_state(self.protocol_string());
1098        self.log_request(metrics, true, Some(message));
1099    }
1100
1101    pub fn set_answer(&mut self, answer: DefaultAnswer) {
1102        let status = u16::from(&answer);
1103        if let ResponseStream::DefaultAnswer(old_status, ..) = self.response_stream {
1104            error!(
1105                "already set the default answer to {}, trying to set to {}",
1106                old_status, status
1107            );
1108        } else {
1109            match answer {
1110                DefaultAnswer::Answer301 { .. } => incr!(
1111                    "http.301.redirection",
1112                    self.context.cluster_id.as_deref(),
1113                    self.context.backend_id.as_deref()
1114                ),
1115                DefaultAnswer::Answer302 { .. } => incr!(
1116                    "http.302.redirection",
1117                    self.context.cluster_id.as_deref(),
1118                    self.context.backend_id.as_deref()
1119                ),
1120                DefaultAnswer::Answer308 { .. } => incr!(
1121                    "http.308.redirection",
1122                    self.context.cluster_id.as_deref(),
1123                    self.context.backend_id.as_deref()
1124                ),
1125                DefaultAnswer::Answer400 { .. } => incr!(names::http::ERR_400),
1126                DefaultAnswer::Answer401 { .. } => incr!(
1127                    "http.401.errors",
1128                    self.context.cluster_id.as_deref(),
1129                    self.context.backend_id.as_deref()
1130                ),
1131                DefaultAnswer::Answer404 { .. } => incr!(names::http::ERR_404),
1132                DefaultAnswer::Answer408 { .. } => incr!(
1133                    "http.408.errors",
1134                    self.context.cluster_id.as_deref(),
1135                    self.context.backend_id.as_deref()
1136                ),
1137                DefaultAnswer::Answer413 { .. } => incr!(
1138                    "http.413.errors",
1139                    self.context.cluster_id.as_deref(),
1140                    self.context.backend_id.as_deref()
1141                ),
1142                DefaultAnswer::Answer421 { .. } => incr!(
1143                    "http.421.errors",
1144                    self.context.cluster_id.as_deref(),
1145                    self.context.backend_id.as_deref()
1146                ),
1147                DefaultAnswer::Answer429 { .. } => incr!(
1148                    "connections.rejected_per_cluster_ip",
1149                    self.context.cluster_id.as_deref(),
1150                    self.context.backend_id.as_deref()
1151                ),
1152                DefaultAnswer::Answer502 { .. } => incr!(
1153                    "http.502.errors",
1154                    self.context.cluster_id.as_deref(),
1155                    self.context.backend_id.as_deref()
1156                ),
1157                DefaultAnswer::Answer503 { .. } => incr!(
1158                    "http.503.errors",
1159                    self.context.cluster_id.as_deref(),
1160                    self.context.backend_id.as_deref()
1161                ),
1162                DefaultAnswer::Answer504 { .. } => incr!(
1163                    "http.504.errors",
1164                    self.context.cluster_id.as_deref(),
1165                    self.context.backend_id.as_deref()
1166                ),
1167                DefaultAnswer::Answer507 { .. } => incr!(
1168                    "http.507.errors",
1169                    self.context.cluster_id.as_deref(),
1170                    self.context.backend_id.as_deref()
1171                ),
1172            };
1173        }
1174
1175        let (resolved_status, keep_alive, mut kawa) = self.answers.borrow().get(
1176            answer,
1177            self.context.id.to_string(),
1178            self.context.cluster_id.as_deref(),
1179            self.context.backend_id.as_deref(),
1180            self.get_route(),
1181        );
1182        kawa.prepare(&mut kawa::h1::BlockConverter);
1183        // The template's resolved status may differ from the requested
1184        // code when a fallback template is selected (e.g. unknown status).
1185        let status = resolved_status;
1186        self.context.status = Some(status);
1187        self.context.reason = None;
1188        // Honour the rendered template's `Connection` header. Built-in
1189        // error templates carry `Connection: close`, but operators can
1190        // ship a custom template that keeps the frontend connection alive.
1191        if !keep_alive {
1192            self.context.keep_alive_frontend = false;
1193        }
1194        self.response_stream = ResponseStream::DefaultAnswer(status, kawa);
1195        self.frontend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1196        self.backend_readiness.interest = Ready::HUP | Ready::ERROR;
1197    }
1198
1199    pub fn test_backend_socket(&self) -> bool {
1200        match self.backend_socket {
1201            Some(ref s) => {
1202                let mut tmp = [0u8; 1];
1203                let res = s.peek(&mut tmp[..]);
1204
1205                match res {
1206                    // if the socket is half open, it will report 0 bytes read (EOF)
1207                    Ok(0) => false,
1208                    Ok(_) => true,
1209                    Err(e) => matches!(e.kind(), std::io::ErrorKind::WouldBlock),
1210                }
1211            }
1212            None => false,
1213        }
1214    }
1215
1216    pub fn is_valid_backend_socket(&self) -> bool {
1217        // if socket was last used in the last second, test it
1218        match self.backend_stop.as_ref() {
1219            Some(stop_instant) => {
1220                let now = Instant::now();
1221                let dur = now - *stop_instant;
1222                if dur > Duration::from_secs(1) {
1223                    return self.test_backend_socket();
1224                }
1225            }
1226            None => return self.test_backend_socket(),
1227        }
1228
1229        true
1230    }
1231
1232    pub fn set_backend_socket(&mut self, socket: TcpStream, backend: Option<Rc<RefCell<Backend>>>) {
1233        self.backend_socket = Some(socket);
1234        self.backend = backend;
1235    }
1236
1237    pub fn set_cluster_id(&mut self, cluster_id: String) {
1238        self.context.cluster_id = Some(cluster_id);
1239    }
1240
1241    pub fn set_backend_id(&mut self, backend_id: String) {
1242        self.context.backend_id = Some(backend_id);
1243    }
1244
1245    pub fn set_backend_token(&mut self, token: Token) {
1246        self.backend_token = Some(token);
1247    }
1248
1249    pub fn clear_backend_token(&mut self) {
1250        self.backend_token = None;
1251    }
1252
1253    pub fn set_backend_timeout(&mut self, dur: Duration) {
1254        if let Some(token) = self.backend_token.as_ref() {
1255            self.container_backend_timeout.set_duration(dur);
1256            self.container_backend_timeout.set(*token);
1257        }
1258    }
1259
1260    pub fn front_socket(&self) -> &TcpStream {
1261        self.frontend_socket.socket_ref()
1262    }
1263
1264    /// WARNING: this function removes the backend entry in the session manager
1265    /// IF the backend_token is set, so that entry can be reused for new backend.
1266    /// I don't think this is a good idea, but it is a quick fix
1267    fn close_backend(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
1268        self.container_backend_timeout.cancel();
1269        debug!(
1270            "{}\tPROXY [{}->{}] CLOSED BACKEND",
1271            log_context!(self),
1272            self.frontend_token.0,
1273            self.backend_token
1274                .map(|t| format!("{}", t.0))
1275                .unwrap_or_else(|| "-".to_string())
1276        );
1277
1278        let proxy = proxy.borrow();
1279        if let Some(socket) = &mut self.backend_socket.take() {
1280            if let Err(e) = proxy.deregister_socket(socket) {
1281                error!(
1282                    "{} Error deregistering back socket({:?}): {:?}",
1283                    log_context!(self),
1284                    socket,
1285                    e
1286                );
1287            }
1288            // SAFETY (TLS-truncation invariant): `Shutdown::Both` is permitted
1289            // here because this is the *backend* socket of an H1 session — Sōzu
1290            // currently speaks plaintext H1 to backends, so there is no
1291            // outbound TLS write buffer that the kernel could discard with a
1292            // RST. If backend-TLS lands later (#1218), this MUST move to
1293            // `Shutdown::Write` (or `socket.send_close_notify()`) to avoid the
1294            // canonical truncation anti-pattern documented in CLAUDE.md.
1295            if let Err(e) = socket.shutdown(Shutdown::Both) {
1296                if e.kind() != ErrorKind::NotConnected {
1297                    error!(
1298                        "{} Error shutting down back socket({:?}): {:?}",
1299                        log_context!(self),
1300                        socket,
1301                        e
1302                    );
1303                }
1304            }
1305        }
1306
1307        if let Some(token) = self.backend_token.take() {
1308            proxy.remove_session(token);
1309
1310            if self.backend_connection_status != BackendConnectionStatus::NotConnected {
1311                self.backend_readiness.event = Ready::EMPTY;
1312            }
1313
1314            if self.backend_connection_status == BackendConnectionStatus::Connected {
1315                gauge_add!(names::backend::CONNECTIONS, -1);
1316                gauge_add!(
1317                    names::backend::CONNECTIONS_PER_BACKEND,
1318                    -1,
1319                    self.context.cluster_id.as_deref(),
1320                    metrics.backend_id.as_deref()
1321                );
1322            }
1323
1324            self.set_backend_connected(BackendConnectionStatus::NotConnected, metrics);
1325
1326            if let Some(backend) = self.backend.take() {
1327                backend.borrow_mut().dec_connections();
1328            }
1329        }
1330    }
1331
1332    /// Check the number of connection attempts against authorized connection retries
1333    fn check_circuit_breaker(&mut self) -> Result<(), BackendConnectionError> {
1334        if self.connection_attempts >= CONN_RETRIES {
1335            incr!(
1336                "backend.connect.retries_exhausted",
1337                self.context.cluster_id.as_deref(),
1338                self.context.backend_id.as_deref()
1339            );
1340            warn!(
1341                "{} Max connection attempt reached ({})",
1342                log_context!(self),
1343                self.connection_attempts,
1344            );
1345
1346            self.set_answer(DefaultAnswer::Answer503 {
1347                message: format!(
1348                    "Max connection attempt reached: {}",
1349                    self.connection_attempts
1350                ),
1351            });
1352            return Err(BackendConnectionError::MaxConnectionRetries(None));
1353        }
1354        Ok(())
1355    }
1356
1357    fn check_backend_connection(&mut self, metrics: &mut SessionMetrics) -> bool {
1358        let is_valid_backend_socket = self.is_valid_backend_socket();
1359
1360        if !is_valid_backend_socket {
1361            return false;
1362        }
1363
1364        //matched on keepalive
1365        metrics.backend_id = self.backend.as_ref().map(|i| i.borrow().backend_id.clone());
1366
1367        metrics.backend_start();
1368        metrics.backend_connected();
1369        if let Some(b) = self.backend.as_mut() {
1370            b.borrow_mut().active_requests += 1;
1371        }
1372        true
1373    }
1374
1375    // -> host, path, method
1376    pub fn extract_route(&self) -> Result<(&str, &str, &Method), RetrieveClusterError> {
1377        let given_method = self
1378            .context
1379            .method
1380            .as_ref()
1381            .ok_or(RetrieveClusterError::NoMethod)?;
1382        let given_authority = self
1383            .context
1384            .authority
1385            .as_deref()
1386            .ok_or(RetrieveClusterError::NoHost)?;
1387        let given_path = self
1388            .context
1389            .path
1390            .as_deref()
1391            .ok_or(RetrieveClusterError::NoPath)?;
1392
1393        Ok((given_authority, given_path, given_method))
1394    }
1395
1396    pub fn get_route(&self) -> String {
1397        if let Some(method) = &self.context.method {
1398            if let Some(authority) = &self.context.authority {
1399                if let Some(path) = &self.context.path {
1400                    return format!("{method} {authority}{path}");
1401                }
1402                return format!("{method} {authority}");
1403            }
1404            return format!("{method}");
1405        }
1406        String::new()
1407    }
1408
1409    fn cluster_id_from_request(
1410        &mut self,
1411        proxy: Rc<RefCell<dyn L7Proxy>>,
1412    ) -> Result<String, RetrieveClusterError> {
1413        let (host, uri, method) = match self.extract_route() {
1414            Ok(tuple) => tuple,
1415            Err(cluster_error) => {
1416                self.set_answer(DefaultAnswer::Answer400 {
1417                    message: "Could not extract the route after connection started, this should not happen.".into(),
1418                    phase: self.request_stream.parsing_phase.marker(),
1419                    successfully_parsed: "null".into(),
1420                    partially_parsed: "null".into(),
1421                    invalid: "null".into(),
1422                });
1423                return Err(cluster_error);
1424            }
1425        };
1426
1427        let route_result = self
1428            .listener
1429            .borrow()
1430            .frontend_from_request(host, uri, method);
1431
1432        let route = match route_result {
1433            Ok(route) => route,
1434            Err(frontend_error) => {
1435                // RFC 9110 §15.5.1: a malformed authority is a 400. A
1436                // syntactically valid authority that simply has no matching
1437                // frontend stays on the historical 404 path.
1438                match &frontend_error {
1439                    FrontendFromRequestError::HostParse { .. }
1440                    | FrontendFromRequestError::InvalidCharsAfterHost(_) => {
1441                        self.set_answer(DefaultAnswer::Answer400 {
1442                            message: frontend_error.to_string(),
1443                            phase: self.request_stream.parsing_phase.marker(),
1444                            successfully_parsed: "null".into(),
1445                            partially_parsed: "null".into(),
1446                            invalid: "null".into(),
1447                        });
1448                    }
1449                    FrontendFromRequestError::NoClusterFound(_) => {
1450                        self.set_answer(DefaultAnswer::Answer404 {});
1451                    }
1452                }
1453                return Err(RetrieveClusterError::RetrieveFrontend(frontend_error));
1454            }
1455        };
1456
1457        // `RouteResult` already carries the full routing decision, but the
1458        // mux integration that consumes every field is not in place yet.
1459        // Until it lands, this legacy kawa_h1 path collapses `RouteResult`
1460        // down to its `cluster_id` and treats an absent cluster as the
1461        // historical `Route::Deny`.
1462        let cluster_id = match route.cluster_id {
1463            Some(cluster_id) => cluster_id,
1464            None => {
1465                self.set_answer(DefaultAnswer::Answer401 {
1466                    www_authenticate: None,
1467                });
1468                return Err(RetrieveClusterError::UnauthorizedRoute);
1469            }
1470        };
1471
1472        let frontend_should_redirect_https = matches!(proxy.borrow().kind(), ListenerType::Http)
1473            && proxy
1474                .borrow()
1475                .clusters()
1476                .get(&cluster_id)
1477                .map(|cluster| cluster.https_redirect)
1478                .unwrap_or(false);
1479
1480        if frontend_should_redirect_https {
1481            self.set_answer(DefaultAnswer::Answer301 {
1482                location: format!("https://{host}{uri}"),
1483            });
1484            return Err(RetrieveClusterError::UnauthorizedRoute);
1485        }
1486
1487        Ok(cluster_id)
1488    }
1489
1490    pub fn backend_from_request(
1491        &mut self,
1492        cluster_id: &str,
1493        frontend_should_stick: bool,
1494        proxy: Rc<RefCell<dyn L7Proxy>>,
1495        metrics: &mut SessionMetrics,
1496    ) -> Result<TcpStream, BackendConnectionError> {
1497        let (backend, conn) = self
1498            .get_backend_for_sticky_session(
1499                frontend_should_stick,
1500                self.context.sticky_session_found.as_deref(),
1501                cluster_id,
1502                proxy,
1503            )
1504            .map_err(|backend_error| {
1505                // some backend errors are actually retryable
1506                // TODO: maybe retry or return a different default answer
1507                self.set_answer(DefaultAnswer::Answer503 {
1508                    message: backend_error.to_string(),
1509                });
1510                BackendConnectionError::Backend(backend_error)
1511            })?;
1512
1513        if frontend_should_stick {
1514            // update sticky name in case it changed I guess?
1515            self.context.sticky_name = self.listener.borrow().get_sticky_name().to_string();
1516
1517            self.context.sticky_session = Some(
1518                backend
1519                    .borrow()
1520                    .sticky_id
1521                    .clone()
1522                    .unwrap_or_else(|| backend.borrow().backend_id.clone()),
1523            );
1524        }
1525
1526        metrics.backend_id = Some(backend.borrow().backend_id.clone());
1527        metrics.backend_start();
1528        self.set_backend_id(backend.borrow().backend_id.clone());
1529
1530        self.backend = Some(backend);
1531        Ok(conn)
1532    }
1533
1534    fn get_backend_for_sticky_session(
1535        &self,
1536        frontend_should_stick: bool,
1537        sticky_session: Option<&str>,
1538        cluster_id: &str,
1539        proxy: Rc<RefCell<dyn L7Proxy>>,
1540    ) -> Result<(Rc<RefCell<Backend>>, TcpStream), BackendError> {
1541        match (frontend_should_stick, sticky_session) {
1542            (true, Some(sticky_session)) => proxy
1543                .borrow()
1544                .backends()
1545                .borrow_mut()
1546                .backend_from_sticky_session(cluster_id, sticky_session),
1547            _ => proxy
1548                .borrow()
1549                .backends()
1550                .borrow_mut()
1551                .backend_from_cluster_id(cluster_id),
1552        }
1553    }
1554
1555    fn connect_to_backend(
1556        &mut self,
1557        session_rc: Rc<RefCell<dyn ProxySession>>,
1558        proxy: Rc<RefCell<dyn L7Proxy>>,
1559        metrics: &mut SessionMetrics,
1560    ) -> Result<BackendConnectAction, BackendConnectionError> {
1561        let old_cluster_id = self.context.cluster_id.clone();
1562        let old_backend_token = self.backend_token;
1563
1564        self.check_circuit_breaker()?;
1565
1566        let cluster_id = self
1567            .cluster_id_from_request(proxy.clone())
1568            .map_err(BackendConnectionError::RetrieveClusterError)?;
1569
1570        trace!(
1571            "{} Connect_to_backend: {:?} {:?} {:?}",
1572            log_context!(self),
1573            self.context.cluster_id,
1574            cluster_id,
1575            self.backend_connection_status
1576        );
1577        // check if we can reuse the backend connection
1578        if (self.context.cluster_id.as_ref()) == Some(&cluster_id)
1579            && self.backend_connection_status == BackendConnectionStatus::Connected
1580        {
1581            let has_backend = self
1582                .backend
1583                .as_ref()
1584                .map(|backend| {
1585                    let backend = backend.borrow();
1586                    proxy
1587                        .borrow()
1588                        .backends()
1589                        .borrow()
1590                        .has_backend(&cluster_id, &backend)
1591                })
1592                .unwrap_or(false);
1593
1594            if has_backend && self.check_backend_connection(metrics) {
1595                return Ok(BackendConnectAction::Reuse);
1596            } else if self.backend_token.take().is_some() {
1597                self.close_backend(proxy.clone(), metrics);
1598            }
1599        }
1600
1601        //replacing with a connection to another cluster
1602        if old_cluster_id.is_some()
1603            && old_cluster_id.as_ref() != Some(&cluster_id)
1604            && self.backend_token.take().is_some()
1605        {
1606            self.close_backend(proxy.clone(), metrics);
1607        }
1608
1609        self.context.cluster_id = Some(cluster_id.clone());
1610
1611        let frontend_should_stick = proxy
1612            .borrow()
1613            .clusters()
1614            .get(&cluster_id)
1615            .map(|cluster| cluster.sticky_session)
1616            .unwrap_or(false);
1617
1618        let mut socket =
1619            self.backend_from_request(&cluster_id, frontend_should_stick, proxy.clone(), metrics)?;
1620        if let Err(e) = socket.set_nodelay(true) {
1621            error!(
1622                "{} Error setting nodelay on backend socket({:?}): {:?}",
1623                log_context!(self),
1624                socket,
1625                e
1626            );
1627        }
1628
1629        self.backend_readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
1630        self.backend_connection_status = BackendConnectionStatus::Connecting(Instant::now());
1631
1632        match old_backend_token {
1633            Some(backend_token) => {
1634                self.set_backend_token(backend_token);
1635                if let Err(e) = proxy.borrow().register_socket(
1636                    &mut socket,
1637                    backend_token,
1638                    Interest::READABLE | Interest::WRITABLE,
1639                ) {
1640                    error!(
1641                        "{} Error registering back socket({:?}): {:?}",
1642                        log_context!(self),
1643                        socket,
1644                        e
1645                    );
1646                }
1647
1648                self.set_backend_socket(socket, self.backend.clone());
1649                self.set_backend_timeout(self.configured_connect_timeout);
1650
1651                Ok(BackendConnectAction::Replace)
1652            }
1653            None => {
1654                let backend_token = proxy.borrow().add_session(session_rc);
1655
1656                if let Err(e) = proxy.borrow().register_socket(
1657                    &mut socket,
1658                    backend_token,
1659                    Interest::READABLE | Interest::WRITABLE,
1660                ) {
1661                    error!(
1662                        "{} Error registering back socket({:?}): {:?}",
1663                        log_context!(self),
1664                        socket,
1665                        e
1666                    );
1667                }
1668
1669                self.set_backend_socket(socket, self.backend.clone());
1670                self.set_backend_token(backend_token);
1671                self.set_backend_timeout(self.configured_connect_timeout);
1672
1673                Ok(BackendConnectAction::New)
1674            }
1675        }
1676    }
1677
1678    fn set_backend_connected(
1679        &mut self,
1680        connected: BackendConnectionStatus,
1681        metrics: &mut SessionMetrics,
1682    ) {
1683        let last = self.backend_connection_status;
1684        self.backend_connection_status = connected;
1685
1686        if connected == BackendConnectionStatus::Connected {
1687            gauge_add!(names::backend::CONNECTIONS, 1);
1688            gauge_add!(
1689                names::backend::CONNECTIONS_PER_BACKEND,
1690                1,
1691                self.context.cluster_id.as_deref(),
1692                metrics.backend_id.as_deref()
1693            );
1694
1695            // the back timeout was of connect_timeout duration before,
1696            // now that we're connected, move to backend_timeout duration
1697            self.set_backend_timeout(self.configured_backend_timeout);
1698            // if we are not waiting for the backend response, its timeout is concelled
1699            // it should be set when the request has been entirely transmitted
1700            if !self.backend_readiness.interest.is_readable() {
1701                self.container_backend_timeout.cancel();
1702            }
1703
1704            if let Some(backend) = &self.backend {
1705                let mut backend = backend.borrow_mut();
1706
1707                if backend.retry_policy.is_down() {
1708                    incr!(
1709                        "backend.up",
1710                        self.context.cluster_id.as_deref(),
1711                        metrics.backend_id.as_deref()
1712                    );
1713                    gauge!(
1714                        names::backend::AVAILABLE,
1715                        1,
1716                        self.context.cluster_id.as_deref(),
1717                        metrics.backend_id.as_deref()
1718                    );
1719
1720                    info!(
1721                        "{} backend server {} at {} is up",
1722                        log_context!(self),
1723                        backend.backend_id,
1724                        backend.address
1725                    );
1726
1727                    push_event(Event {
1728                        kind: EventKind::BackendUp as i32,
1729                        backend_id: Some(backend.backend_id.to_owned()),
1730                        address: Some(backend.address.into()),
1731                        cluster_id: None,
1732                        metric_detail: None,
1733                    });
1734                }
1735
1736                if let BackendConnectionStatus::Connecting(start) = last {
1737                    backend.set_connection_time(Instant::now() - start);
1738                }
1739
1740                //successful connection, reset failure counter
1741                backend.failures = 0;
1742                backend.active_requests += 1;
1743                backend.retry_policy.succeed();
1744            }
1745        }
1746    }
1747
1748    fn fail_backend_connection(&mut self, metrics: &SessionMetrics) {
1749        if let Some(backend) = &self.backend {
1750            let mut backend = backend.borrow_mut();
1751            backend.failures += 1;
1752
1753            let already_unavailable = backend.retry_policy.is_down();
1754            backend.retry_policy.fail();
1755            incr!(
1756                "backend.connections.error",
1757                self.context.cluster_id.as_deref(),
1758                metrics.backend_id.as_deref()
1759            );
1760
1761            if !already_unavailable && backend.retry_policy.is_down() {
1762                error!(
1763                    "{} backend server {} at {} is down",
1764                    log_context!(self),
1765                    backend.backend_id,
1766                    backend.address
1767                );
1768
1769                incr!(
1770                    "backend.down",
1771                    self.context.cluster_id.as_deref(),
1772                    metrics.backend_id.as_deref()
1773                );
1774                gauge!(
1775                    names::backend::AVAILABLE,
1776                    0,
1777                    self.context.cluster_id.as_deref(),
1778                    metrics.backend_id.as_deref()
1779                );
1780
1781                push_event(Event {
1782                    kind: EventKind::BackendDown as i32,
1783                    backend_id: Some(backend.backend_id.to_owned()),
1784                    address: Some(backend.address.into()),
1785                    cluster_id: None,
1786                    metric_detail: None,
1787                });
1788            }
1789        }
1790    }
1791
1792    pub fn backend_hup(&mut self, _metrics: &mut SessionMetrics) -> StateResult {
1793        let response_stream = match &mut self.response_stream {
1794            ResponseStream::BackendAnswer(response_stream) => response_stream,
1795            _ => return StateResult::CloseBackend,
1796        };
1797
1798        // there might still data we can read on the socket
1799        if self.backend_readiness.event.is_readable()
1800            && self.backend_readiness.interest.is_readable()
1801        {
1802            return StateResult::Continue;
1803        }
1804
1805        // the backend finished to answer we can close
1806        if response_stream.is_terminated() {
1807            return StateResult::CloseBackend;
1808        }
1809        match (
1810            self.request_stream.is_initial(),
1811            response_stream.is_initial(),
1812        ) {
1813            // backend stopped before response is finished,
1814            // or maybe it was malformed in the first place (no Content-Length)
1815            (_, false) => {
1816                error!(
1817                    "{} Backend closed before session is over",
1818                    log_context!(self, Some(response_stream.parsing_phase)),
1819                );
1820
1821                trace!(
1822                    "{} Backend hang-up, setting the parsing phase of the response stream to terminated, this also takes care of responses that lack length information.",
1823                    log_context!(self, Some(response_stream.parsing_phase))
1824                );
1825
1826                response_stream.parsing_phase = kawa::ParsingPhase::Terminated;
1827
1828                // writable() will be called again and finish the session properly
1829                // for this reason, writable must not short cut
1830                self.frontend_readiness.interest.insert(Ready::WRITABLE);
1831                StateResult::Continue
1832            }
1833            // probably backend hup between keep alive request, change backend
1834            (true, true) => {
1835                trace!(
1836                    "{} Backend hanged up in between requests",
1837                    log_context!(self)
1838                );
1839                StateResult::CloseBackend
1840            }
1841            // the frontend already transmitted data so we can't redirect
1842            (false, true) => {
1843                error!(
1844                    "{}  Frontend transmitted data but the back closed",
1845                    log_context!(self)
1846                );
1847
1848                self.set_answer(DefaultAnswer::Answer503 {
1849                    message: "Backend closed after consuming part of the request".into(),
1850                });
1851
1852                self.backend_readiness.interest = Ready::EMPTY;
1853                StateResult::Continue
1854            }
1855        }
1856    }
1857
1858    /// The main session loop, processing all events triggered by mio since last time
1859    /// and proxying http traffic. The main flow can be summed up by:
1860    ///
1861    /// - if connecting an back has event:
1862    ///   - if backend hanged up, try again
1863    ///   - else, set as connected
1864    /// - while front or back has event:
1865    ///   - read request on front
1866    ///   - write request to back
1867    ///   - read response on back
1868    ///   - write response to front
1869    fn ready_inner(
1870        &mut self,
1871        session: Rc<RefCell<dyn crate::ProxySession>>,
1872        proxy: Rc<RefCell<dyn L7Proxy>>,
1873        metrics: &mut SessionMetrics,
1874    ) -> SessionResult {
1875        let mut counter = 0;
1876
1877        if self.backend_connection_status.is_connecting()
1878            && !self.backend_readiness.event.is_empty()
1879        {
1880            if self.backend_readiness.event.is_hup() && !self.test_backend_socket() {
1881                //retry connecting the backend
1882                warn!(
1883                    "{} Error connecting to backend, trying again, attempt {}",
1884                    log_context!(self),
1885                    self.connection_attempts
1886                );
1887
1888                self.connection_attempts += 1;
1889                self.fail_backend_connection(metrics);
1890
1891                self.backend_connection_status =
1892                    BackendConnectionStatus::Connecting(Instant::now());
1893
1894                // trigger a backend reconnection
1895                self.close_backend(proxy.clone(), metrics);
1896
1897                let connection_result =
1898                    self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1899                if let Err(err) = &connection_result {
1900                    match err {
1901                        // Already logged at warn! + metered at check_circuit_breaker;
1902                        // avoid double-emission.
1903                        BackendConnectionError::MaxConnectionRetries(_) => trace!(
1904                            "{} Error connecting to backend: {}",
1905                            log_context!(self),
1906                            err
1907                        ),
1908                        _ => warn!(
1909                            "{} Error connecting to backend: {}",
1910                            log_context!(self),
1911                            err
1912                        ),
1913                    }
1914                }
1915
1916                if let Some(session_result) = handle_connection_result(connection_result) {
1917                    return session_result;
1918                }
1919            } else {
1920                metrics.backend_connected();
1921                self.connection_attempts = 0;
1922                self.set_backend_connected(BackendConnectionStatus::Connected, metrics);
1923                // we might get an early response from the backend, so we want to look
1924                // at readable events
1925                self.backend_readiness.interest.insert(Ready::READABLE);
1926            }
1927        }
1928
1929        if self.frontend_readiness.event.is_hup() {
1930            if !self.request_stream.is_initial() {
1931                self.log_request_error(metrics, "Client disconnected abruptly");
1932            }
1933            return SessionResult::Close;
1934        }
1935
1936        while counter < MAX_LOOP_ITERATIONS {
1937            let frontend_interest = self.frontend_readiness.filter_interest();
1938            let backend_interest = self.backend_readiness.filter_interest();
1939
1940            trace!(
1941                "{} Frontend interest({:?}) and backend interest({:?})",
1942                log_context!(self),
1943                frontend_interest,
1944                backend_interest,
1945            );
1946
1947            if frontend_interest.is_empty() && backend_interest.is_empty() {
1948                break;
1949            }
1950
1951            if self.backend_readiness.event.is_hup()
1952                && self.frontend_readiness.interest.is_writable()
1953                && !self.frontend_readiness.event.is_writable()
1954            {
1955                break;
1956            }
1957
1958            if frontend_interest.is_readable() {
1959                let state_result = self.readable(metrics);
1960                trace!(
1961                    "{} frontend_readable: {:?}",
1962                    log_context!(self),
1963                    state_result
1964                );
1965
1966                match state_result {
1967                    StateResult::Continue => {}
1968                    StateResult::ConnectBackend => {
1969                        let connection_result =
1970                            self.connect_to_backend(session.clone(), proxy.clone(), metrics);
1971                        if let Err(err) = &connection_result {
1972                            match err {
1973                                // Already logged at warn! + metered at check_circuit_breaker;
1974                                // avoid double-emission.
1975                                BackendConnectionError::MaxConnectionRetries(_) => trace!(
1976                                    "{} Error connecting to backend: {}",
1977                                    log_context!(self),
1978                                    err
1979                                ),
1980                                _ => warn!(
1981                                    "{} Error connecting to backend: {}",
1982                                    log_context!(self),
1983                                    err
1984                                ),
1985                            }
1986                        }
1987
1988                        if let Some(session_result) = handle_connection_result(connection_result) {
1989                            return session_result;
1990                        }
1991                    }
1992                    StateResult::CloseBackend => unreachable!(),
1993                    StateResult::CloseSession => return SessionResult::Close,
1994                    StateResult::Upgrade => return SessionResult::Upgrade,
1995                }
1996            }
1997
1998            if backend_interest.is_writable() {
1999                let session_result = self.backend_writable(metrics);
2000                trace!(
2001                    "{} backend_writable: {:?}",
2002                    log_context!(self),
2003                    session_result
2004                );
2005                if session_result != SessionResult::Continue {
2006                    return session_result;
2007                }
2008            }
2009
2010            if backend_interest.is_readable() {
2011                let session_result = self.backend_readable(metrics);
2012                trace!(
2013                    "{} backend_readable: {:?}",
2014                    log_context!(self),
2015                    session_result
2016                );
2017                if session_result != SessionResult::Continue {
2018                    return session_result;
2019                }
2020            }
2021
2022            if frontend_interest.is_writable() {
2023                let state_result = self.writable(metrics);
2024                trace!(
2025                    "{} frontend_writable: {:?}",
2026                    log_context!(self),
2027                    state_result
2028                );
2029                match state_result {
2030                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2031                    StateResult::CloseSession => return SessionResult::Close,
2032                    StateResult::Upgrade => return SessionResult::Upgrade,
2033                    StateResult::Continue => {}
2034                    StateResult::ConnectBackend => unreachable!(),
2035                }
2036            }
2037
2038            if frontend_interest.is_error() {
2039                error!(
2040                    "{} frontend socket error, disconnecting",
2041                    log_context!(self)
2042                );
2043
2044                return SessionResult::Close;
2045            }
2046
2047            if backend_interest.is_hup() || backend_interest.is_error() {
2048                let state_result = self.backend_hup(metrics);
2049
2050                trace!("{} backend_hup: {:?}", log_context!(self), state_result);
2051                match state_result {
2052                    StateResult::Continue => {}
2053                    StateResult::CloseBackend => self.close_backend(proxy.clone(), metrics),
2054                    StateResult::CloseSession => return SessionResult::Close,
2055                    StateResult::ConnectBackend | StateResult::Upgrade => unreachable!(),
2056                }
2057            }
2058
2059            counter += 1;
2060        }
2061
2062        if counter >= MAX_LOOP_ITERATIONS {
2063            error!(
2064                "{}\tHandling session went through {} iterations, there's a probable infinite loop bug, closing the connection",
2065                log_context!(self),
2066                MAX_LOOP_ITERATIONS
2067            );
2068
2069            incr!(names::http::INFINITE_LOOP_ERROR);
2070            self.print_state(self.protocol_string());
2071
2072            return SessionResult::Close;
2073        }
2074
2075        SessionResult::Continue
2076    }
2077
2078    pub fn timeout_status(&self) -> TimeoutStatus {
2079        if self.request_stream.is_main_phase() {
2080            match &self.response_stream {
2081                ResponseStream::BackendAnswer(kawa) if kawa.is_initial() => {
2082                    TimeoutStatus::WaitingForResponse
2083                }
2084                _ => TimeoutStatus::Response,
2085            }
2086        } else if self.keepalive_count > 0 {
2087            TimeoutStatus::WaitingForNewRequest
2088        } else {
2089            TimeoutStatus::Request
2090        }
2091    }
2092}
2093
2094impl<Front: SocketHandler, L: ListenerHandler + L7ListenerHandler> SessionState for Http<Front, L> {
2095    fn ready(
2096        &mut self,
2097        session: Rc<RefCell<dyn crate::ProxySession>>,
2098        proxy: Rc<RefCell<dyn L7Proxy>>,
2099        metrics: &mut SessionMetrics,
2100    ) -> SessionResult {
2101        let session_result = self.ready_inner(session, proxy, metrics);
2102        if session_result == SessionResult::Upgrade {
2103            let response_storage = match &mut self.response_stream {
2104                ResponseStream::BackendAnswer(response_stream) => &mut response_stream.storage,
2105                _ => return SessionResult::Close,
2106            };
2107
2108            // sync the underlying Checkout buffers, if they contain remaining data
2109            // it will be processed once upgraded to websocket
2110            self.request_stream.storage.buffer.sync(
2111                self.request_stream.storage.end,
2112                self.request_stream.storage.head,
2113            );
2114            response_storage
2115                .buffer
2116                .sync(response_storage.end, response_storage.head);
2117        }
2118        session_result
2119    }
2120
2121    fn update_readiness(&mut self, token: Token, events: Ready) {
2122        if self.frontend_token == token {
2123            self.frontend_readiness.event |= events;
2124        } else if self.backend_token == Some(token) {
2125            self.backend_readiness.event |= events;
2126        }
2127    }
2128
2129    fn close(&mut self, proxy: Rc<RefCell<dyn L7Proxy>>, metrics: &mut SessionMetrics) {
2130        self.close_backend(proxy, metrics);
2131        self.frontend_socket.socket_close();
2132        let _ = self.frontend_socket.socket_write_vectored(&[]);
2133
2134        //if the state was initial, the connection was already reset
2135        if !self.request_stream.is_initial() {
2136            gauge_add!(names::http::ACTIVE_REQUESTS, -1);
2137
2138            if let Some(b) = self.backend.as_mut() {
2139                let mut backend = b.borrow_mut();
2140                backend.active_requests = backend.active_requests.saturating_sub(1);
2141            }
2142        }
2143    }
2144
2145    fn timeout(&mut self, token: Token, metrics: &mut SessionMetrics) -> StateResult {
2146        //info!("got timeout for token: {:?}", token);
2147        if self.frontend_token == token {
2148            self.container_frontend_timeout.triggered();
2149            return match self.timeout_status() {
2150                // we do not have a complete answer
2151                TimeoutStatus::Request => {
2152                    self.context.access_log_message = Some("client_timeout");
2153                    self.set_answer(DefaultAnswer::Answer408 {
2154                        duration: self.container_frontend_timeout.to_string(),
2155                    });
2156                    self.writable(metrics)
2157                }
2158                // we have a complete answer but the response did not start
2159                TimeoutStatus::WaitingForResponse => {
2160                    // this case is ambiguous, as it is the frontend timeout that triggers while we were waiting for response
2161                    // the timeout responsibility should have switched before
2162                    self.context.access_log_message = Some("client_timeout_during_response");
2163                    self.set_answer(DefaultAnswer::Answer504 {
2164                        duration: self.container_backend_timeout.to_string(),
2165                    });
2166                    self.writable(metrics)
2167                }
2168                // we have a complete answer and the start of a response, but the request was not tagged as terminated
2169                // for now we place responsibility of timeout on the backend in those cases, so we ignore this
2170                TimeoutStatus::Response => StateResult::Continue,
2171                // timeout in keep-alive, simply close the connection
2172                TimeoutStatus::WaitingForNewRequest => StateResult::CloseSession,
2173            };
2174        }
2175
2176        if self.backend_token == Some(token) {
2177            //info!("backend timeout triggered for token {:?}", token);
2178            self.container_backend_timeout.triggered();
2179            return match self.timeout_status() {
2180                TimeoutStatus::Request => {
2181                    error!(
2182                        "{} got backend timeout while waiting for a request, this should not happen",
2183                        log_context!(self)
2184                    );
2185                    // Operator-visible cause is the same as the regular
2186                    // backend-timeout arm — backend silent before we
2187                    // could send the request. The error! above keeps the
2188                    // internal-invariant signal for sozu maintainers.
2189                    self.context.access_log_message = Some("backend_timeout");
2190                    self.set_answer(DefaultAnswer::Answer504 {
2191                        duration: self.container_backend_timeout.to_string(),
2192                    });
2193                    self.writable(metrics)
2194                }
2195                TimeoutStatus::WaitingForResponse => {
2196                    self.context.access_log_message = Some("backend_timeout");
2197                    self.set_answer(DefaultAnswer::Answer504 {
2198                        duration: self.container_backend_timeout.to_string(),
2199                    });
2200                    self.writable(metrics)
2201                }
2202                TimeoutStatus::Response => {
2203                    error!(
2204                        "backend {:?} timeout while receiving response (cluster {:?})",
2205                        self.context.backend_id, self.context.cluster_id
2206                    );
2207                    self.context.access_log_message = Some("backend_response_timeout");
2208                    StateResult::CloseSession
2209                }
2210                // in keep-alive, we place responsibility of timeout on the frontend, so we ignore this
2211                TimeoutStatus::WaitingForNewRequest => StateResult::Continue,
2212            };
2213        }
2214
2215        error!("{} Got timeout for an invalid token", log_context!(self));
2216        StateResult::CloseSession
2217    }
2218
2219    fn cancel_timeouts(&mut self) {
2220        self.container_backend_timeout.cancel();
2221        self.container_frontend_timeout.cancel();
2222    }
2223
2224    fn print_state(&self, context: &str) {
2225        error!(
2226            "\
2227{} {} Session(Kawa)
2228\tFrontend:
2229\t\ttoken: {:?}\treadiness: {:?}\tstate: {:?}
2230\tBackend:
2231\t\ttoken: {:?}\treadiness: {:?}",
2232            log_context!(self),
2233            context,
2234            self.frontend_token,
2235            self.frontend_readiness,
2236            self.request_stream.parsing_phase,
2237            self.backend_token,
2238            self.backend_readiness,
2239            // self.response_stream.parsing_phase
2240        );
2241    }
2242
2243    fn shutting_down(&mut self) -> SessionIsToBeClosed {
2244        if self.request_stream.is_initial() && self.request_stream.storage.is_empty()
2245        // && self.response_stream.storage.is_empty()
2246        {
2247            true
2248        } else {
2249            self.context.closing = true;
2250            false
2251        }
2252    }
2253}
2254
2255fn handle_connection_result(
2256    connection_result: Result<BackendConnectAction, BackendConnectionError>,
2257) -> Option<SessionResult> {
2258    match connection_result {
2259        // reuse connection or send a default answer, we can continue
2260        Ok(BackendConnectAction::Reuse) => None,
2261        Ok(BackendConnectAction::New) | Ok(BackendConnectAction::Replace) => {
2262            // we must wait for an event
2263            Some(SessionResult::Continue)
2264        }
2265        Err(_) => {
2266            // All BackendConnectionError already set a default answer
2267            // the session must continue to serve it
2268            // - NotFound: not used for http (only tcp)
2269            // - RetrieveClusterError: 301/400/401/404,
2270            // - MaxConnectionRetries: 503,
2271            // - Backend: 503,
2272            // - MaxSessionsMemory: not checked in connect_to_backend (TODO: check it?)
2273            None
2274        }
2275    }
2276}
2277
2278/// Save the HTTP status code of the backend response.
2279///
2280/// Emits two counters per response with a known status:
2281///
2282/// 1. The bucket counter `http.status.{1xx,…,5xx,other}` (always),
2283/// 2. A per-code counter `http.status.{200,301,…}` when the code is on the
2284///    short-list maintained by [`crate::metrics::http_status_code_metric_name`].
2285///
2286/// Status-less responses still emit `http.status.none` upstream of this
2287/// function (see `generate_access_log` for the H2 path); the H1 entry point
2288/// only carries `Some(status)` so the `else` branch is unnecessary here.
2289fn save_http_status_metric(status: Option<u16>, context: LogContext) {
2290    if let Some(status) = status {
2291        match status {
2292            100..=199 => {
2293                incr!(
2294                    names::http::STATUS_1XX,
2295                    context.cluster_id,
2296                    context.backend_id
2297                );
2298            }
2299            200..=299 => {
2300                incr!(
2301                    names::http::STATUS_2XX,
2302                    context.cluster_id,
2303                    context.backend_id
2304                );
2305            }
2306            300..=399 => {
2307                incr!(
2308                    names::http::STATUS_3XX,
2309                    context.cluster_id,
2310                    context.backend_id
2311                );
2312            }
2313            400..=499 => {
2314                incr!(
2315                    names::http::STATUS_4XX,
2316                    context.cluster_id,
2317                    context.backend_id
2318                );
2319            }
2320            500..=599 => {
2321                incr!(
2322                    names::http::STATUS_5XX,
2323                    context.cluster_id,
2324                    context.backend_id
2325                );
2326            }
2327            _ => {
2328                // http responses with other codes (protocol error)
2329                incr!(names::http::STATUS_OTHER);
2330            }
2331        }
2332
2333        if let Some(per_code) = crate::metrics::http_status_code_metric_name(status) {
2334            incr!(per_code, context.cluster_id, context.backend_id);
2335        }
2336    }
2337}