Skip to main content

sozu_lib/protocol/mux/
h1.rs

1//! H1 mux connection wrapper.
2//!
3//! Hosts the single active H1 stream (`stream: GlobalStreamId`) and wires
4//! Kawa-owned H1 parsing + serialization into the shared mux `Context` so the
5//! same routing / shutdown / readiness machinery applies across H1 and H2
6//! connections. Long-form lifecycle: `lib/src/protocol/mux/LIFECYCLE.md`.
7
8use std::io::IoSlice;
9
10use rusty_ulid::Ulid;
11use sozu_command::{logging::ansi_palette, ready::Ready};
12
13use crate::metrics::names;
14use crate::{
15    L7ListenerHandler, ListenerHandler, Readiness,
16    protocol::mux::{
17        BackendStatus, Context, DebugEvent, Endpoint, GlobalStreamId, MuxResult, Position,
18        StreamState, forcefully_terminate_answer,
19        parser::H2Error,
20        remove_backend_stream, set_default_answer,
21        shared::{EndStreamAction, drain_tls_close_notify, end_stream_decision},
22        update_readiness_after_read, update_readiness_after_write,
23    },
24    socket::{SocketHandler, SocketResult, stats::socket_rtt},
25    timer::TimeoutContainer,
26};
27
28/// Prefix applied to every [`ConnectionH1`] log line. Matches the RUSTLS
29/// log-context convention (`MUX-H1\tSession(...)\t >>>`). When the logger is
30/// in colored mode the label is bold bright-white (uniform across every
31/// protocol) and the session detail is rendered in light grey.
32///
33/// Fields included in the session block (chosen to surface the most common
34/// H1 troubleshooting axes — keep-alive churn, stream pinning, buffer-pressure
35/// stall and graceful TLS shutdown):
36/// - `peer` — peer address (or `None` if the socket is gone)
37/// - `position` — `Server` / `Client(...)` orientation
38/// - `stream` — currently active [`GlobalStreamId`] (or `none`)
39/// - `requests` — request count served on this connection (keep-alive)
40/// - `parked` — set when the kawa buffer is full and `READABLE` is suspended
41/// - `close_notify` — TLS `close_notify` send state
42/// - `readiness` — connection-level mio readiness snapshot
43macro_rules! log_context {
44    ($self:expr) => {{
45        let (open, reset, grey, gray, white) = ansi_palette();
46        format!(
47            "[{ulid} - - -]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
48            open = open,
49            reset = reset,
50            grey = grey,
51            gray = gray,
52            white = white,
53            ulid = $self.session_ulid,
54            peer = $self.socket.socket_ref().peer_addr().ok(),
55            position = $self.position,
56            stream = $self.stream,
57            requests = $self.requests,
58            parked = $self.parked_on_buffer_pressure,
59            close_notify = $self.close_notify_sent,
60            readiness = $self.readiness,
61        )
62    }};
63}
64
65/// Per-stream variant of [`log_context!`] used when a `HttpContext` is in
66/// scope. Fills the `request_id` slot of the bracket so the log line can be
67/// grepped by the specific request that triggered it.
68#[allow(unused_macros)]
69macro_rules! log_context_stream {
70    ($self:expr, $http_context:expr) => {{
71        let (open, reset, grey, gray, white) = ansi_palette();
72        format!(
73            "[{ulid} {req} {cluster} {backend}]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
74            open = open,
75            reset = reset,
76            grey = grey,
77            gray = gray,
78            white = white,
79            ulid = $self.session_ulid,
80            req = $http_context.id,
81            cluster = $http_context.cluster_id.as_deref().unwrap_or("-"),
82            backend = $http_context.backend_id.as_deref().unwrap_or("-"),
83            peer = $self.socket.socket_ref().peer_addr().ok(),
84            position = $self.position,
85            stream = $self.stream,
86            requests = $self.requests,
87            parked = $self.parked_on_buffer_pressure,
88            close_notify = $self.close_notify_sent,
89            readiness = $self.readiness,
90        )
91    }};
92}
93
94/// Module-level prefix for logs without a [`ConnectionH1`] in scope. Honours
95/// the colored flag.
96macro_rules! log_module_context {
97    () => {{
98        let (open, reset, _, _, _) = ansi_palette();
99        format!("{open}MUX-H1{reset}\t >>>", open = open, reset = reset)
100    }};
101}
102
103/// HTTP/1.1 connection handler within the mux layer.
104///
105/// Manages a single HTTP/1.1 connection (either frontend or backend),
106/// handling request/response forwarding through kawa buffers. Supports
107/// keep-alive, chunked transfer encoding, close-delimited responses,
108/// and upgrade (e.g., WebSocket).
109pub struct ConnectionH1<Front: SocketHandler> {
110    pub position: Position,
111    pub readiness: Readiness,
112    pub requests: usize,
113    pub socket: Front,
114    /// Active stream index, or `None` when the connection has no assigned stream
115    /// (initial client state before `start_stream`, or after `end_stream` detaches).
116    pub stream: Option<GlobalStreamId>,
117    pub timeout_container: TimeoutContainer,
118    /// Set when `readable` exits early because the kawa buffer was full.
119    /// Edge-triggered epoll will not re-fire READABLE for data already in the
120    /// kernel socket buffer, so the cross-readiness mechanism must re-arm it
121    /// via `try_resume_reading` once the peer drains the buffer.
122    pub parked_on_buffer_pressure: bool,
123    /// True once we've asked rustls to emit TLS close_notify for this frontend.
124    pub close_notify_sent: bool,
125    /// Connection/session ULID propagated from the parent [`Mux`]. Used to
126    /// stamp the session slot of the `[session req cluster backend]` log
127    /// prefix emitted by the local `log_context!` macro.
128    pub session_ulid: Ulid,
129}
130
131impl<Front: SocketHandler> std::fmt::Debug for ConnectionH1<Front> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("ConnectionH1")
134            .field("position", &self.position)
135            .field("readiness", &self.readiness)
136            .field("socket", &self.socket.socket_ref())
137            .field("stream", &self.stream)
138            .finish()
139    }
140}
141
142impl<Front: SocketHandler> ConnectionH1<Front> {
143    fn defer_close_for_tls_flush(&mut self, reason: &'static str) -> MuxResult {
144        if self.initiate_close_notify() {
145            trace!(
146                "{} H1 writable delaying close after {}: stream={:?}, close_notify_sent={}, wants_write={}, readiness={:?}",
147                log_context!(self),
148                reason,
149                self.stream,
150                self.close_notify_sent,
151                self.socket.socket_wants_write(),
152                self.readiness
153            );
154            MuxResult::Continue
155        } else {
156            MuxResult::CloseSession
157        }
158    }
159
160    /// Terminate a close-delimited kawa body by pushing END_STREAM flags.
161    /// Called when the backend closes the connection to signal end-of-body
162    /// (no Content-Length, no chunked encoding).
163    ///
164    /// Chunked responses that TCP-close before the terminating `0\r\n\r\n`
165    /// are demoted to `ParsingPhase::Error` so the H2 converter emits
166    /// RST_STREAM(InternalError) rather than a silent END_STREAM with a
167    /// truncated body — RFC 9112 §7.1 requires the zero-chunk terminator.
168    fn terminate_close_delimited(kawa: &mut super::GenericHttpStream, stream_id: GlobalStreamId) {
169        // Pre: we only synthesize an end-of-body for a response still in its
170        // body phase. A kawa already Terminated/Error must not be re-terminated
171        // (it would double-push an END_STREAM flag onto the converter).
172        debug_assert!(
173            !kawa.is_terminated(),
174            "terminate_close_delimited must not run on an already-terminated kawa"
175        );
176        if kawa.body_size == kawa::BodySize::Chunked {
177            warn!(
178                "{} H1 backend EOF mid-chunked response on stream {}: emitting RST_STREAM",
179                log_module_context!(),
180                stream_id
181            );
182            incr!(names::h1::BACKEND_EOF_BEFORE_MESSAGE_COMPLETE);
183            kawa.parsing_phase
184                .error(kawa::ParsingErrorKind::Processing {
185                    message: "INTERNAL_ERROR",
186                });
187            // Post: a truncated chunked body is demoted to Error so the
188            // converter emits RST_STREAM, never a silent END_STREAM.
189            debug_assert!(
190                kawa.is_error(),
191                "truncated chunked response must end in the Error phase"
192            );
193            return;
194        }
195        debug!(
196            "{} H1 close-delimited EOF on stream {}: terminating body",
197            log_module_context!(),
198            stream_id
199        );
200        kawa.push_block(kawa::Block::Flags(kawa::Flags {
201            end_body: true,
202            end_chunk: false,
203            end_header: false,
204            end_stream: true,
205        }));
206        kawa.parsing_phase = kawa::ParsingPhase::Terminated;
207        // Post: a close-delimited body is now Terminated so the converter
208        // emits DATA with END_STREAM on the last frame.
209        debug_assert!(
210            kawa.is_terminated(),
211            "close-delimited body must end in the Terminated phase"
212        );
213    }
214
215    pub fn readable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
216    where
217        E: Endpoint,
218        L: ListenerHandler + L7ListenerHandler,
219    {
220        trace!(
221            "{} ======= MUX H1 READABLE {:?}",
222            log_context!(self),
223            self.position
224        );
225        let Some(stream_id) = self.stream else {
226            error!(
227                "{} readable() called on H1 connection with no active stream",
228                log_context!(self)
229            );
230            return MuxResult::Continue;
231        };
232        self.timeout_container.reset();
233        let answers_rc = context.listener.borrow().get_answers().clone();
234        let stream = &mut context.streams[stream_id];
235        if stream.metrics.start.is_none() {
236            stream.metrics.mark_request_start();
237        }
238        let parts = stream.split(&self.position);
239        let kawa = parts.rbuffer;
240
241        // If the buffer has no space, don't attempt a read — socket_read with
242        // an empty buffer returns (0, Continue) which is indistinguishable from
243        // a real EOF. Remove READABLE from the event so the inner loop doesn't
244        // spin; `try_resume_reading` will re-arm it once the peer drains the
245        // buffer (edge-triggered epoll won't re-fire for data already in the
246        // kernel socket buffer).
247        if kawa.storage.available_space() == 0 {
248            self.readiness.event.remove(Ready::READABLE);
249            self.parked_on_buffer_pressure = true;
250            // Pair the park flag with the cleared READABLE event: only
251            // `try_resume_reading` may re-arm it once the peer drains space.
252            debug_assert!(
253                self.parked_on_buffer_pressure && !self.readiness.event.is_readable(),
254                "parking on buffer pressure must clear the READABLE event"
255            );
256            return MuxResult::Continue;
257        }
258
259        self.parked_on_buffer_pressure = false;
260        // Pre: we never ask the socket to read into a full buffer (that path
261        // returned above) — `socket_read` requires a non-empty target slice.
262        let space_before = kawa.storage.available_space();
263        debug_assert!(
264            space_before > 0,
265            "socket_read must target a buffer with free space"
266        );
267        let (size, status) = self.socket.socket_read(kawa.storage.space());
268        // A read cannot deliver more bytes than the buffer had room for.
269        // `debug_assert!` only — `size` is socket-derived, never trusted to
270        // panic, but a violation here is a SocketHandler contract bug.
271        debug_assert!(
272            size <= space_before,
273            "socket_read returned more bytes than the buffer could hold"
274        );
275        context.debug.push(DebugEvent::StreamEvent(0, size));
276        kawa.storage.fill(size);
277        debug_assert_eq!(
278            kawa.storage.available_space(),
279            space_before - size,
280            "fill must consume exactly `size` bytes of free space"
281        );
282        self.position.count_bytes_in_counter(size);
283        self.position.count_bytes_in(parts.metrics, size);
284        if update_readiness_after_read(size, status, &mut self.readiness) {
285            // size=0: the socket returned EOF (Closed) or WouldBlock.
286            // For a close-delimited backend response (no Content-Length, no
287            // chunked), a graceful EOF IS the end-of-body signal. Terminate
288            // the kawa so the H2 converter emits DATA with END_STREAM.
289            // SocketResult::Error (ECONNRESET etc.) is NOT treated as a valid
290            // close-delimiter — transport errors should produce 502, not a
291            // truncated response.
292            if status == SocketResult::Closed
293                && self.position.is_client()
294                && kawa.is_main_phase()
295                && !kawa.is_terminated()
296                && !parts.context.keep_alive_backend
297            {
298                Self::terminate_close_delimited(kawa, stream_id);
299                self.timeout_container.cancel();
300                self.readiness.interest.remove(Ready::READABLE);
301                if let StreamState::Linked(token) = stream.state {
302                    // Signal pending write alongside the WRITABLE interest flip:
303                    // edge-triggered epoll won't re-fire for bytes we just queued
304                    // onto the peer — the synthetic event is the only wake path.
305                    let peer = endpoint.readiness_mut(token);
306                    peer.arm_writable();
307                }
308            }
309            return MuxResult::Continue;
310        }
311
312        let was_main_phase = kawa.is_main_phase();
313        kawa::h1::parse(kawa, parts.context);
314        if kawa.is_error() {
315            match self.position {
316                Position::Client(..) => {
317                    incr!(names::http::BACKEND_PARSE_ERRORS);
318                    let StreamState::Linked(token) = stream.state else {
319                        error!(
320                            "{} client stream in error is not in Linked state",
321                            log_context!(self)
322                        );
323                        return MuxResult::CloseSession;
324                    };
325                    let global_stream_id = stream_id;
326                    self.end_stream(global_stream_id, context);
327                    endpoint.end_stream(token, global_stream_id, context);
328                }
329                Position::Server => {
330                    incr!(names::http::FRONTEND_PARSE_ERRORS);
331                    let answers = answers_rc.borrow();
332                    set_default_answer(stream, &mut self.readiness, 400, &answers);
333                }
334            }
335            return MuxResult::Continue;
336        }
337        // Capture borrow-sensitive values after parsing but before the 1xx block
338        // accesses stream.state (which ends the split borrow from `parts`).
339        let is_keep_alive_backend = parts.context.keep_alive_backend;
340        let is_body_phase_after_parse = kawa.is_main_phase();
341
342        // 1xx informational responses (100 Continue, 103 Early Hints): the H1
343        // parser treats them as complete (Terminated + end_stream=true), but for
344        // H2 frontends they must be forwarded WITHOUT END_STREAM so the real
345        // response can follow on the same stream. Also keep READABLE interest
346        // so the backend can send the final response.
347        let is_1xx_backend = if self.position.is_client() {
348            if let kawa::StatusLine::Response { code, .. } = &kawa.detached.status_line {
349                if (100..200).contains(code) {
350                    debug!(
351                        "{} H1 backend: received {} informational response",
352                        log_context!(self),
353                        code
354                    );
355                    for block in &mut kawa.blocks {
356                        if let kawa::Block::Flags(flags) = block {
357                            flags.end_stream = false;
358                            flags.end_body = false;
359                        }
360                    }
361                    true
362                } else {
363                    false
364                }
365            } else {
366                false
367            }
368        } else {
369            false
370        };
371        if kawa.is_terminated() && !is_1xx_backend {
372            self.timeout_container.cancel();
373            self.readiness.interest.remove(Ready::READABLE);
374        }
375        if kawa.is_main_phase() {
376            if !was_main_phase && self.position.is_server() {
377                if parts.context.method.is_none()
378                    || parts.context.authority.is_none()
379                    || parts.context.path.is_none()
380                {
381                    if let kawa::StatusLine::Request {
382                        version: kawa::Version::V10,
383                        ..
384                    } = kawa.detached.status_line
385                    {
386                        error!(
387                            "{} Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
388                            log_context!(self),
389                            parts.context.session_address,
390                            parts.context.method,
391                            parts.context.authority,
392                            parts.context.path
393                        );
394                    } else {
395                        error!("{} Unexpected malformed request", log_context!(self));
396                        kawa::debug_kawa(kawa);
397                    }
398                    let answers = answers_rc.borrow();
399                    set_default_answer(stream, &mut self.readiness, 400, &answers);
400                    return MuxResult::Continue;
401                }
402                // First-seen request on this (server) connection: the keep-alive
403                // request counter advances by exactly one and the matching
404                // `http.active_requests` gauge `+1` is paired with flipping
405                // `request_counted` true (the `generate_access_log` `-1` is
406                // gated on that flag, so they must stay balanced).
407                let requests_before = self.requests;
408                let links_before = context.pending_links.len();
409                self.requests += 1;
410                debug_assert_eq!(
411                    self.requests,
412                    requests_before + 1,
413                    "server keep-alive request counter must advance by exactly one"
414                );
415                trace!("{} REQUESTS: {}", log_context!(self), self.requests);
416                incr!(names::http::REQUESTS);
417                gauge_add!(names::http::ACTIVE_REQUESTS, 1);
418                parts.metrics.service_start();
419                // Set request_counted after the last use of `parts` to satisfy the borrow checker
420                stream.request_counted = true;
421                stream.state = StreamState::Link;
422                context.pending_links.push_back(stream_id);
423                // Post: the stream is queued for backend linking exactly once
424                // and is now in the Link state the ready loop expects.
425                debug_assert!(
426                    stream.request_counted,
427                    "request_counted must be set when the active-requests gauge is incremented"
428                );
429                debug_assert_eq!(
430                    stream.state,
431                    StreamState::Link,
432                    "a first-seen request must transition the stream to Link"
433                );
434                debug_assert_eq!(
435                    context.pending_links.len(),
436                    links_before + 1,
437                    "a first-seen request must enqueue exactly one pending link"
438                );
439            }
440            if let StreamState::Linked(token) = stream.state {
441                // Signal pending write alongside the WRITABLE interest flip: the
442                // bytes we just parsed live in sozu's buffers, not the kernel,
443                // so edge-triggered epoll won't re-fire on its own.
444                let peer = endpoint.readiness_mut(token);
445                peer.arm_writable();
446            }
447        };
448        // 1xx informational: the 100 response skips main_phase (goes straight to
449        // Terminated), so the normal "set endpoint writable" above never fires.
450        // Trigger the frontend to write the 1xx response after all borrows end.
451        if is_1xx_backend {
452            if let StreamState::Linked(token) = stream.state {
453                let peer = endpoint.readiness_mut(token);
454                peer.arm_writable();
455            }
456        }
457
458        // Close-delimited response: socket_read returned (size > 0, Closed) —
459        // the last data chunk arrived together with the EOF in a single read.
460        // After parsing the data above, terminate the kawa now so the H2
461        // converter emits END_STREAM on the last DATA frame.
462        if status == SocketResult::Closed
463            && self.position.is_client()
464            && is_body_phase_after_parse
465            && !is_keep_alive_backend
466            && !context.streams[stream_id].back.is_terminated()
467        {
468            let kawa = &mut context.streams[stream_id].back;
469            Self::terminate_close_delimited(kawa, stream_id);
470            self.timeout_container.cancel();
471            self.readiness.interest.remove(Ready::READABLE);
472        }
473
474        MuxResult::Continue
475    }
476
477    pub fn writable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
478    where
479        E: Endpoint,
480        L: ListenerHandler + L7ListenerHandler,
481    {
482        trace!(
483            "{} ======= MUX H1 WRITABLE {:?}",
484            log_context!(self),
485            self.position
486        );
487        let Some(stream_id) = self.stream else {
488            if self.socket.socket_wants_write() {
489                let (size, status) = self.socket.socket_write_vectored(&[]);
490                let _ = update_readiness_after_write(size, status, &mut self.readiness);
491                if self.socket.socket_wants_write() {
492                    self.readiness.signal_pending_write();
493                }
494            }
495            return MuxResult::Continue;
496        };
497        self.timeout_container.reset();
498        let stream = &mut context.streams[stream_id];
499        let parts = stream.split(&self.position);
500        let kawa = parts.wbuffer;
501        // Apply per-frontend response-side header edits stashed by the
502        // routing layer at request time. Only the Server-position pass
503        // touches the response back-kawa; the Client-position pass
504        // (writing the request to the backend) has already had its
505        // edits applied in `Router::route_from_request`.
506        //
507        // Drained via `mem::take` so the injection runs exactly once
508        // per response. H1 keep-alive can re-enter this writable path
509        // for the same stream when the backend response spans more
510        // than one TCP read; without the take, the second pass would
511        // re-insert the same headers (typically as duplicate STS lines
512        // on the wire — RFC 6797 §6.1 expects a single header). On H2
513        // the same multi-prepare-cycle pattern surfaces as a
514        // `H2BlockConverter::finalize` "out buffer not empty" leak.
515        if matches!(self.position, Position::Server) && !parts.context.headers_response.is_empty() {
516            let edits = std::mem::take(&mut parts.context.headers_response);
517            super::shared::apply_response_header_edits(kawa, &edits);
518        }
519        kawa.prepare(&mut kawa::h1::BlockConverter);
520        let mut io_slices = Vec::new();
521        for block in kawa.out.iter() {
522            match block {
523                kawa::OutBlock::Delimiter => break,
524                kawa::OutBlock::Store(store) => {
525                    io_slices.push(IoSlice::new(store.data(kawa.storage.buffer())));
526                }
527            }
528        }
529        let can_finalize_server_close = matches!(self.position, Position::Server)
530            && kawa.is_terminated()
531            && kawa.is_completed();
532        if io_slices.is_empty() && !self.socket.socket_wants_write() && !can_finalize_server_close {
533            self.readiness.interest.remove(Ready::WRITABLE);
534            return MuxResult::Continue;
535        }
536        let tls_only_flush = io_slices.is_empty();
537        // Total bytes we offered the socket across the gathered slices; a
538        // vectored write can never report more consumed than we handed it.
539        let queued: usize = io_slices.iter().map(|s| s.len()).sum();
540        let (size, status) = self.socket.socket_write_vectored(&io_slices);
541        debug_assert!(
542            size <= queued,
543            "socket_write_vectored reported more bytes written than were queued"
544        );
545        context.debug.push(DebugEvent::StreamEvent(1, size));
546        kawa.consume(size);
547        self.position.count_bytes_out_counter(size);
548        self.position.count_bytes_out(parts.metrics, size);
549        let should_yield = update_readiness_after_write(size, status, &mut self.readiness);
550        if self.socket.socket_wants_write() {
551            self.readiness.signal_pending_write();
552            // Pair the queued-write signal with the socket's own report: we
553            // only synthesize a WRITABLE event when the socket still has bytes
554            // buffered (edge-triggered epoll won't re-fire on its own).
555            debug_assert!(
556                self.readiness.event.is_writable(),
557                "signal_pending_write must leave a WRITABLE event queued"
558            );
559            return MuxResult::Continue;
560        }
561        if !tls_only_flush && should_yield {
562            return MuxResult::Continue;
563        }
564
565        if kawa.is_terminated() && kawa.is_completed() {
566            match self.position {
567                Position::Client(..) => self.readiness.interest.insert(Ready::READABLE),
568                Position::Server => {
569                    if stream.context.closing {
570                        return self.defer_close_for_tls_flush("closing-context");
571                    }
572                    let kawa = &mut stream.back;
573                    match kawa.detached.status_line {
574                        kawa::StatusLine::Response { code: 101, .. } => {
575                            debug!("{} ============== HANDLE UPGRADE!", log_context!(self));
576                            stream.metrics.backend_stop();
577                            let client_rtt = socket_rtt(self.socket.socket_ref());
578                            let server_rtt = stream
579                                .linked_token()
580                                .and_then(|t| endpoint.socket(t))
581                                .and_then(socket_rtt);
582                            stream.generate_access_log(
583                                false,
584                                Some("H1::Upgrade"),
585                                context.listener.clone(),
586                                client_rtt,
587                                server_rtt,
588                            );
589                            return MuxResult::Upgrade;
590                        }
591                        kawa::StatusLine::Response { code: 100, .. } => {
592                            debug!("{} ============== HANDLE CONTINUE!", log_context!(self));
593                            // After a 100 Continue, we expect the client to continue
594                            // with its request body. Do NOT call generate_access_log
595                            // here — the final response will emit the access log.
596                            // Calling it here would double-decrement http.active_requests.
597                            self.timeout_container.reset();
598                            self.readiness.interest.insert(Ready::READABLE);
599                            kawa.clear();
600                            stream.metrics.backend_stop();
601                            if let StreamState::Linked(token) = stream.state {
602                                endpoint
603                                    .readiness_mut(token)
604                                    .interest
605                                    .insert(Ready::READABLE);
606                            }
607                            return MuxResult::Continue;
608                        }
609                        kawa::StatusLine::Response { code: 103, .. } => {
610                            debug!("{} ============== HANDLE EARLY HINT!", log_context!(self));
611                            // Do NOT call generate_access_log for 103 Early Hints.
612                            // The final response will emit the access log.
613                            // Calling it here would double-decrement http.active_requests.
614                            if let StreamState::Linked(token) = stream.state {
615                                // after a 103 early hints, we expect the backend to send its response
616                                endpoint
617                                    .readiness_mut(token)
618                                    .interest
619                                    .insert(Ready::READABLE);
620                                kawa.clear();
621                                stream.metrics.backend_stop();
622                                return MuxResult::Continue;
623                            } else {
624                                stream.metrics.backend_stop();
625                                let client_rtt = socket_rtt(self.socket.socket_ref());
626                                let server_rtt = stream
627                                    .linked_token()
628                                    .and_then(|t| endpoint.socket(t))
629                                    .and_then(socket_rtt);
630                                stream.generate_access_log(
631                                    false,
632                                    Some("H1::EarlyHint"),
633                                    context.listener.clone(),
634                                    client_rtt,
635                                    server_rtt,
636                                );
637                                return self.defer_close_for_tls_flush("early-hint");
638                            }
639                        }
640                        _ => {}
641                    }
642                    incr!(names::http::E2E_HTTP11);
643                    stream.metrics.backend_stop();
644                    let client_rtt = socket_rtt(self.socket.socket_ref());
645                    let server_rtt = stream
646                        .linked_token()
647                        .and_then(|t| endpoint.socket(t))
648                        .and_then(socket_rtt);
649                    stream.generate_access_log(
650                        false,
651                        Some("H1::Complete"),
652                        context.listener.clone(),
653                        client_rtt,
654                        server_rtt,
655                    );
656                    stream.metrics.reset();
657                    let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
658                    if let StreamState::Linked(token) = old_state {
659                        remove_backend_stream(&mut context.backend_streams, token, stream_id);
660                    }
661                    if stream.context.keep_alive_frontend {
662                        self.timeout_container.reset();
663                        if let StreamState::Linked(token) = old_state {
664                            endpoint.end_stream(token, stream_id, context);
665                        }
666                        self.readiness.interest.insert(Ready::READABLE);
667                        let stream = &mut context.streams[stream_id];
668                        stream.context.reset();
669                        stream.back.clear();
670                        stream.back.storage.clear();
671                        stream.front.clear();
672                        // do not stream.front.storage.clear() because of H1 pipelining
673                        stream.attempts = 0;
674                        // Transition back to Idle so buffered pipelined requests
675                        // trigger a phase transition on the next readable() call.
676                        stream.state = StreamState::Idle;
677                        // Post: the keep-alive reset leaves a clean, closed slot
678                        // ready for the next pipelined request. `request_counted`
679                        // was already cleared by `generate_access_log` above, so
680                        // the slot carries no pending active-requests charge.
681                        debug_assert_eq!(
682                            stream.state,
683                            StreamState::Idle,
684                            "keep-alive reset must return the stream to Idle"
685                        );
686                        debug_assert_eq!(stream.attempts, 0, "keep-alive reset must zero attempts");
687                        debug_assert!(
688                            !stream.request_counted,
689                            "keep-alive reset must leave no counted request (active-requests leak)"
690                        );
691                        debug_assert!(
692                            stream.back.storage.is_empty(),
693                            "keep-alive reset must drain the response storage"
694                        );
695                        // HTTP/1.1 pipelining: if there's still data in the frontend
696                        // storage (pipelined requests already read from the socket),
697                        // parse it now. We can't rely on a new READABLE event because
698                        // the socket buffer may be empty — all requests were already
699                        // read into kawa storage in the first socket_read.
700                        if !stream.front.storage.is_empty() {
701                            kawa::h1::parse(&mut stream.front, &mut stream.context);
702                            let is_error = stream.front.is_error();
703                            let is_main = stream.front.is_main_phase();
704                            let malformed = is_main
705                                && (stream.context.method.is_none()
706                                    || stream.context.authority.is_none()
707                                    || stream.context.path.is_none());
708                            if is_error || malformed {
709                                let answers_rc = context.listener.borrow().get_answers().clone();
710                                let answers = answers_rc.borrow();
711                                set_default_answer(stream, &mut self.readiness, 400, &answers);
712                            } else if is_main {
713                                self.requests += 1;
714                                incr!(names::http::REQUESTS);
715                                gauge_add!(names::http::ACTIVE_REQUESTS, 1);
716                                stream.metrics.service_start();
717                                stream.request_counted = true;
718                                stream.state = StreamState::Link;
719                                context.pending_links.push_back(stream_id);
720                            }
721                            // else: incomplete parse, wait for more data via READABLE
722                        }
723                    } else {
724                        return self.defer_close_for_tls_flush("response-complete");
725                    }
726                }
727            }
728        }
729        MuxResult::Continue
730    }
731
732    pub fn force_disconnect(&mut self) -> MuxResult {
733        match &mut self.position {
734            Position::Client(_, _, status) => {
735                *status = BackendStatus::Disconnecting;
736                self.readiness.event = Ready::HUP;
737                debug!(
738                    "{} H1 force_disconnect client: stream={:?}, wants_write={}, readiness={:?}",
739                    log_context!(self),
740                    self.stream,
741                    self.socket.socket_wants_write(),
742                    self.readiness
743                );
744                MuxResult::Continue
745            }
746            Position::Server => {
747                if self.socket.socket_wants_write() {
748                    debug!(
749                        "{} H1 force_disconnect delaying close: stream={:?}, wants_write=true, readiness={:?}",
750                        log_context!(self),
751                        self.stream,
752                        self.readiness
753                    );
754                    self.readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
755                    self.readiness.signal_pending_write();
756                    MuxResult::Continue
757                } else {
758                    debug!(
759                        "{} H1 force_disconnect closing session: stream={:?}, wants_write=false, readiness={:?}",
760                        log_context!(self),
761                        self.stream,
762                        self.readiness
763                    );
764                    MuxResult::CloseSession
765                }
766            }
767        }
768    }
769
770    pub fn has_pending_write(&self) -> bool {
771        self.socket.socket_wants_write()
772    }
773
774    pub fn initiate_close_notify(&mut self) -> bool {
775        if !self.position.is_server() {
776            return false;
777        }
778        // Past the guard we are always server-side; close_notify is a
779        // frontend-only TLS concern.
780        debug_assert!(
781            self.position.is_server(),
782            "initiate_close_notify past the guard must be server-side"
783        );
784        if !self.close_notify_sent {
785            trace!("{} H1 initiating CLOSE_NOTIFY", log_context!(self));
786            self.socket.socket_close();
787            self.close_notify_sent = true;
788        }
789        // `close_notify` is monotone: once requested it stays sent for the
790        // connection's lifetime (a second send would corrupt the TLS stream).
791        debug_assert!(
792            self.close_notify_sent,
793            "close_notify_sent must be set once initiate_close_notify has run"
794        );
795        if self.socket.socket_wants_write() {
796            self.readiness.arm_writable();
797            // arm_writable pairs interest + event so the deferred TLS flush is
798            // actually scheduled under edge-triggered epoll.
799            debug_assert!(
800                self.readiness.interest.is_writable() && self.readiness.event.is_writable(),
801                "arm_writable must set both WRITABLE interest and event"
802            );
803            true
804        } else {
805            false
806        }
807    }
808
809    pub fn close<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E)
810    where
811        E: Endpoint,
812        L: ListenerHandler + L7ListenerHandler,
813    {
814        match self.position {
815            Position::Client(_, _, BackendStatus::KeepAlive)
816            | Position::Client(_, _, BackendStatus::Disconnecting) => {
817                trace!("{} close detached client ConnectionH1", log_context!(self));
818                return;
819            }
820            Position::Client(_, _, BackendStatus::Connecting(_))
821            | Position::Client(_, _, BackendStatus::Connected) => {
822                debug!(
823                    "{} BACKEND CLOSING FOR: {:?} {:?}",
824                    log_context!(self),
825                    self.position,
826                    self.stream
827                );
828            }
829            Position::Server => {
830                let tls_pending_before = self.socket.socket_wants_write();
831                let (tls_pending_after, drain_rounds) =
832                    drain_tls_close_notify(&mut self.socket, &mut self.close_notify_sent);
833                if tls_pending_after {
834                    error!(
835                        "{} H1 TLS buffer NOT fully drained on close: pending_before={}, pending_after={}, drain_rounds={}, stream={:?}, close_notify_sent={}, readiness={:?}",
836                        log_context!(self),
837                        tls_pending_before,
838                        tls_pending_after,
839                        drain_rounds,
840                        self.stream,
841                        self.close_notify_sent,
842                        self.readiness
843                    );
844                }
845                return;
846            }
847        }
848        let Some(stream_id) = self.stream else {
849            trace!(
850                "{} closing detached H1 client with no active stream",
851                log_context!(self)
852            );
853            return;
854        };
855        // reconnection is handled by the server
856        let StreamState::Linked(token) = context.streams[stream_id].state else {
857            trace!(
858                "{} closing detached H1 client in state {:?} on stream {}",
859                log_context!(self),
860                context.streams[stream_id].state,
861                stream_id
862            );
863            return;
864        };
865        endpoint.end_stream(token, stream_id, context)
866    }
867
868    pub fn end_stream<L>(&mut self, stream: GlobalStreamId, context: &mut Context<L>)
869    where
870        L: ListenerHandler + L7ListenerHandler,
871    {
872        if self.stream != Some(stream) {
873            error!(
874                "{} end_stream called with stream {} but expected {:?}",
875                log_context!(self),
876                stream,
877                self.stream
878            );
879            return;
880        }
881        // Reached only on the matched-stream path: the connection's active
882        // stream is exactly the one being ended.
883        debug_assert_eq!(
884            self.stream,
885            Some(stream),
886            "end_stream past the guard must target the active stream"
887        );
888        context.unlink_stream(stream);
889        // Post: whatever backend token this stream was Linked to no longer
890        // lists it in the reverse index — `unlink_stream` is the single
891        // eviction point, so a subsequent end/close cannot double-remove it.
892        // (The `state` field is still `Linked` here; the arms below retire it.)
893        #[cfg(debug_assertions)]
894        if let StreamState::Linked(token) = context.streams[stream].state {
895            debug_assert!(
896                context
897                    .backend_streams
898                    .get(&token)
899                    .is_none_or(|ids| !ids.contains(&stream)),
900                "unlink_stream must evict the stream from the backend reverse index"
901            );
902        }
903        let answers_rc = context.listener.borrow().get_answers().clone();
904        let stream_id = stream;
905        let stream = &mut context.streams[stream_id];
906        let stream_context = &mut stream.context;
907        trace!(
908            "{} end H1 stream {:?}: {:#?}",
909            log_context!(self),
910            self.stream,
911            stream_context
912        );
913        match &mut self.position {
914            Position::Client(_, _, BackendStatus::Connecting(_)) => {
915                self.stream = None;
916                if stream.state != StreamState::Recycle {
917                    stream.state = StreamState::Unlinked;
918                }
919                // Post: the client connection detaches from its stream and the
920                // slot is retired (Unlinked) unless already marked for reuse —
921                // never left dangling in an open/Linked state.
922                debug_assert!(
923                    self.stream.is_none(),
924                    "client end_stream must detach the stream"
925                );
926                debug_assert!(
927                    !matches!(stream.state, StreamState::Linked(_)),
928                    "detached stream must not remain Linked"
929                );
930                self.readiness.interest.remove(Ready::ALL);
931                self.force_disconnect();
932            }
933            Position::Client(_, _, status @ BackendStatus::Connected) => {
934                self.stream = None;
935                if stream.state != StreamState::Recycle {
936                    stream.state = StreamState::Unlinked;
937                }
938                debug_assert!(
939                    self.stream.is_none(),
940                    "client end_stream must detach the stream"
941                );
942                debug_assert!(
943                    !matches!(stream.state, StreamState::Linked(_)),
944                    "detached stream must not remain Linked"
945                );
946                self.readiness.interest.remove(Ready::ALL);
947                // keep alive should probably be used only if the http context is fully reset
948                // in case end_stream occurs due to an error the connection state is probably
949                // unrecoverable and should be terminated
950                if stream_context.keep_alive_backend && stream.back.is_terminated() {
951                    *status = BackendStatus::KeepAlive;
952                } else {
953                    self.force_disconnect();
954                }
955            }
956            Position::Client(_, _, BackendStatus::KeepAlive)
957            | Position::Client(_, _, BackendStatus::Disconnecting) => {
958                error!(
959                    "{} end_stream called on KeepAlive or Disconnecting H1 client",
960                    log_context!(self)
961                );
962            }
963            Position::Server => match end_stream_decision(stream) {
964                EndStreamAction::ForwardTerminated => {
965                    debug!("{} CLOSING H1 TERMINATED STREAM", log_context!(self));
966                    stream.state = StreamState::Unlinked;
967                    self.readiness.interest.insert(Ready::WRITABLE);
968                    // End-of-stream was already queued into kawa by the parser;
969                    // no fresh WRITABLE event will arrive from the kernel.
970                    self.readiness.signal_pending_write();
971                }
972                EndStreamAction::CloseDelimited => {
973                    debug!("{} CLOSE DELIMITED", log_context!(self));
974                    stream.state = StreamState::Unlinked;
975                    self.readiness.arm_writable();
976                }
977                EndStreamAction::ForwardUnterminated => {
978                    debug!("{} CLOSING H1 UNTERMINATED STREAM", log_context!(self));
979                    forcefully_terminate_answer(
980                        stream,
981                        &mut self.readiness,
982                        H2Error::InternalError,
983                    );
984                }
985                EndStreamAction::SendDefault(status) => {
986                    let answers = answers_rc.borrow();
987                    set_default_answer(stream, &mut self.readiness, status, &answers);
988                }
989                EndStreamAction::Reconnect => {
990                    debug!("{} H1 RECONNECT", log_context!(self));
991                    stream.state = StreamState::Link;
992                    context.pending_links.push_back(stream_id);
993                }
994            },
995        }
996    }
997
998    pub fn start_stream<L>(&mut self, stream: GlobalStreamId, _context: &mut Context<L>) -> bool
999    where
1000        L: ListenerHandler + L7ListenerHandler,
1001    {
1002        trace!(
1003            "{} start H1 stream {} {:?}",
1004            log_context!(self),
1005            stream,
1006            self.readiness
1007        );
1008        self.readiness.interest.insert(Ready::ALL);
1009        self.stream = Some(stream);
1010        debug_assert_eq!(
1011            self.stream,
1012            Some(stream),
1013            "start_stream must pin the connection's active stream"
1014        );
1015        match &mut self.position {
1016            Position::Client(_, _, status @ BackendStatus::KeepAlive) => {
1017                *status = BackendStatus::Connected;
1018                // A keep-alive client transitions to Connected when it picks up
1019                // a new stream; it must not stay parked in KeepAlive.
1020                debug_assert!(
1021                    matches!(
1022                        self.position,
1023                        Position::Client(_, _, BackendStatus::Connected)
1024                    ),
1025                    "a reused keep-alive client must become Connected on start_stream"
1026                );
1027            }
1028            Position::Client(_, _, BackendStatus::Disconnecting) => {
1029                error!(
1030                    "{} start_stream called on Disconnecting H1 client",
1031                    log_context!(self)
1032                );
1033                return false;
1034            }
1035            Position::Client(_, _, _) => {}
1036            Position::Server => {
1037                error!(
1038                    "{} start_stream must not be called on H1 server connection",
1039                    log_context!(self)
1040                );
1041                return false;
1042            }
1043        }
1044        true
1045    }
1046}