Skip to main content

sozu_lib/protocol/mux/
h1.rs

1//! H1 mux connection wrapper.
2//!
3//! Hosts the single active H1 stream (`stream: GlobalStreamId`) and wires
4//! Kawa-owned H1 parsing + serialization into the shared mux `Context` so the
5//! same routing / shutdown / readiness machinery applies across H1 and H2
6//! connections. Long-form lifecycle: `lib/src/protocol/mux/LIFECYCLE.md`.
7
8use std::{io::IoSlice, time::Instant};
9
10use rusty_ulid::Ulid;
11use sozu_command::{logging::ansi_palette, ready::Ready};
12
13use crate::metrics::names;
14use crate::{
15    L7ListenerHandler, ListenerHandler, Readiness,
16    protocol::mux::{
17        BackendStatus, Context, DebugEvent, Endpoint, GlobalStreamId, MuxResult, Position,
18        StreamState, forcefully_terminate_answer,
19        parser::H2Error,
20        remove_backend_stream, set_default_answer,
21        shared::{EndStreamAction, drain_tls_close_notify, end_stream_decision},
22        update_readiness_after_read, update_readiness_after_write,
23    },
24    socket::{SocketHandler, SocketResult, stats::socket_rtt},
25    timer::TimeoutContainer,
26};
27
28/// Prefix applied to every [`ConnectionH1`] log line. Matches the RUSTLS
29/// log-context convention (`MUX-H1\tSession(...)\t >>>`). When the logger is
30/// in colored mode the label is bold bright-white (uniform across every
31/// protocol) and the session detail is rendered in light grey.
32///
33/// Fields included in the session block (chosen to surface the most common
34/// H1 troubleshooting axes — keep-alive churn, stream pinning, buffer-pressure
35/// stall and graceful TLS shutdown):
36/// - `peer` — peer address (or `None` if the socket is gone)
37/// - `position` — `Server` / `Client(...)` orientation
38/// - `stream` — currently active [`GlobalStreamId`] (or `none`)
39/// - `requests` — request count served on this connection (keep-alive)
40/// - `parked` — set when the kawa buffer is full and `READABLE` is suspended
41/// - `close_notify` — TLS `close_notify` send state
42/// - `readiness` — connection-level mio readiness snapshot
43macro_rules! log_context {
44    ($self:expr) => {{
45        let (open, reset, grey, gray, white) = ansi_palette();
46        format!(
47            "[{ulid} - - -]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
48            open = open,
49            reset = reset,
50            grey = grey,
51            gray = gray,
52            white = white,
53            ulid = $self.session_ulid,
54            peer = $self.socket.socket_ref().peer_addr().ok(),
55            position = $self.position,
56            stream = $self.stream,
57            requests = $self.requests,
58            parked = $self.parked_on_buffer_pressure,
59            close_notify = $self.close_notify_sent,
60            readiness = $self.readiness,
61        )
62    }};
63}
64
65/// Per-stream variant of [`log_context!`] used when a `HttpContext` is in
66/// scope. Fills the `request_id` slot of the bracket so the log line can be
67/// grepped by the specific request that triggered it.
68#[allow(unused_macros)]
69macro_rules! log_context_stream {
70    ($self:expr, $http_context:expr) => {{
71        let (open, reset, grey, gray, white) = ansi_palette();
72        format!(
73            "[{ulid} {req} {cluster} {backend}]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
74            open = open,
75            reset = reset,
76            grey = grey,
77            gray = gray,
78            white = white,
79            ulid = $self.session_ulid,
80            req = $http_context.id,
81            cluster = $http_context.cluster_id.as_deref().unwrap_or("-"),
82            backend = $http_context.backend_id.as_deref().unwrap_or("-"),
83            peer = $self.socket.socket_ref().peer_addr().ok(),
84            position = $self.position,
85            stream = $self.stream,
86            requests = $self.requests,
87            parked = $self.parked_on_buffer_pressure,
88            close_notify = $self.close_notify_sent,
89            readiness = $self.readiness,
90        )
91    }};
92}
93
94/// Module-level prefix for logs without a [`ConnectionH1`] in scope. Honours
95/// the colored flag.
96macro_rules! log_module_context {
97    () => {{
98        let (open, reset, _, _, _) = ansi_palette();
99        format!("{open}MUX-H1{reset}\t >>>", open = open, reset = reset)
100    }};
101}
102
103/// HTTP/1.1 connection handler within the mux layer.
104///
105/// Manages a single HTTP/1.1 connection (either frontend or backend),
106/// handling request/response forwarding through kawa buffers. Supports
107/// keep-alive, chunked transfer encoding, close-delimited responses,
108/// and upgrade (e.g., WebSocket).
109pub struct ConnectionH1<Front: SocketHandler> {
110    pub position: Position,
111    pub readiness: Readiness,
112    pub requests: usize,
113    pub socket: Front,
114    /// Active stream index, or `None` when the connection has no assigned stream
115    /// (initial client state before `start_stream`, or after `end_stream` detaches).
116    pub stream: Option<GlobalStreamId>,
117    pub timeout_container: TimeoutContainer,
118    /// Set when `readable` exits early because the kawa buffer was full.
119    /// Edge-triggered epoll will not re-fire READABLE for data already in the
120    /// kernel socket buffer, so the cross-readiness mechanism must re-arm it
121    /// via `try_resume_reading` once the peer drains the buffer.
122    pub parked_on_buffer_pressure: bool,
123    /// True once we've asked rustls to emit TLS close_notify for this frontend.
124    pub close_notify_sent: bool,
125    /// Connection/session ULID propagated from the parent [`Mux`]. Used to
126    /// stamp the session slot of the `[session req cluster backend]` log
127    /// prefix emitted by the local `log_context!` macro.
128    pub session_ulid: Ulid,
129}
130
131impl<Front: SocketHandler> std::fmt::Debug for ConnectionH1<Front> {
132    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133        f.debug_struct("ConnectionH1")
134            .field("position", &self.position)
135            .field("readiness", &self.readiness)
136            .field("socket", &self.socket.socket_ref())
137            .field("stream", &self.stream)
138            .finish()
139    }
140}
141
142impl<Front: SocketHandler> ConnectionH1<Front> {
143    fn defer_close_for_tls_flush(&mut self, reason: &'static str) -> MuxResult {
144        if self.initiate_close_notify() {
145            trace!(
146                "{} H1 writable delaying close after {}: stream={:?}, close_notify_sent={}, wants_write={}, readiness={:?}",
147                log_context!(self),
148                reason,
149                self.stream,
150                self.close_notify_sent,
151                self.socket.socket_wants_write(),
152                self.readiness
153            );
154            MuxResult::Continue
155        } else {
156            MuxResult::CloseSession
157        }
158    }
159
160    /// Terminate a close-delimited kawa body by pushing END_STREAM flags.
161    /// Called when the backend closes the connection to signal end-of-body
162    /// (no Content-Length, no chunked encoding).
163    ///
164    /// Chunked responses that TCP-close before the terminating `0\r\n\r\n`
165    /// are demoted to `ParsingPhase::Error` so the H2 converter emits
166    /// RST_STREAM(InternalError) rather than a silent END_STREAM with a
167    /// truncated body — RFC 9112 §7.1 requires the zero-chunk terminator.
168    fn terminate_close_delimited(kawa: &mut super::GenericHttpStream, stream_id: GlobalStreamId) {
169        if kawa.body_size == kawa::BodySize::Chunked {
170            warn!(
171                "{} H1 backend EOF mid-chunked response on stream {}: emitting RST_STREAM",
172                log_module_context!(),
173                stream_id
174            );
175            incr!(names::h1::BACKEND_EOF_BEFORE_MESSAGE_COMPLETE);
176            kawa.parsing_phase
177                .error(kawa::ParsingErrorKind::Processing {
178                    message: "INTERNAL_ERROR",
179                });
180            return;
181        }
182        debug!(
183            "{} H1 close-delimited EOF on stream {}: terminating body",
184            log_module_context!(),
185            stream_id
186        );
187        kawa.push_block(kawa::Block::Flags(kawa::Flags {
188            end_body: true,
189            end_chunk: false,
190            end_header: false,
191            end_stream: true,
192        }));
193        kawa.parsing_phase = kawa::ParsingPhase::Terminated;
194    }
195
196    pub fn readable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
197    where
198        E: Endpoint,
199        L: ListenerHandler + L7ListenerHandler,
200    {
201        trace!(
202            "{} ======= MUX H1 READABLE {:?}",
203            log_context!(self),
204            self.position
205        );
206        let Some(stream_id) = self.stream else {
207            error!(
208                "{} readable() called on H1 connection with no active stream",
209                log_context!(self)
210            );
211            return MuxResult::Continue;
212        };
213        self.timeout_container.reset();
214        let answers_rc = context.listener.borrow().get_answers().clone();
215        let stream = &mut context.streams[stream_id];
216        if stream.metrics.start.is_none() {
217            stream.metrics.start = Some(Instant::now());
218        }
219        let parts = stream.split(&self.position);
220        let kawa = parts.rbuffer;
221
222        // If the buffer has no space, don't attempt a read — socket_read with
223        // an empty buffer returns (0, Continue) which is indistinguishable from
224        // a real EOF. Remove READABLE from the event so the inner loop doesn't
225        // spin; `try_resume_reading` will re-arm it once the peer drains the
226        // buffer (edge-triggered epoll won't re-fire for data already in the
227        // kernel socket buffer).
228        if kawa.storage.available_space() == 0 {
229            self.readiness.event.remove(Ready::READABLE);
230            self.parked_on_buffer_pressure = true;
231            return MuxResult::Continue;
232        }
233
234        self.parked_on_buffer_pressure = false;
235        let (size, status) = self.socket.socket_read(kawa.storage.space());
236        context.debug.push(DebugEvent::StreamEvent(0, size));
237        kawa.storage.fill(size);
238        self.position.count_bytes_in_counter(size);
239        self.position.count_bytes_in(parts.metrics, size);
240        if update_readiness_after_read(size, status, &mut self.readiness) {
241            // size=0: the socket returned EOF (Closed) or WouldBlock.
242            // For a close-delimited backend response (no Content-Length, no
243            // chunked), a graceful EOF IS the end-of-body signal. Terminate
244            // the kawa so the H2 converter emits DATA with END_STREAM.
245            // SocketResult::Error (ECONNRESET etc.) is NOT treated as a valid
246            // close-delimiter — transport errors should produce 502, not a
247            // truncated response.
248            if status == SocketResult::Closed
249                && self.position.is_client()
250                && kawa.is_main_phase()
251                && !kawa.is_terminated()
252                && !parts.context.keep_alive_backend
253            {
254                Self::terminate_close_delimited(kawa, stream_id);
255                self.timeout_container.cancel();
256                self.readiness.interest.remove(Ready::READABLE);
257                if let StreamState::Linked(token) = stream.state {
258                    // Signal pending write alongside the WRITABLE interest flip:
259                    // edge-triggered epoll won't re-fire for bytes we just queued
260                    // onto the peer — the synthetic event is the only wake path.
261                    let peer = endpoint.readiness_mut(token);
262                    peer.arm_writable();
263                }
264            }
265            return MuxResult::Continue;
266        }
267
268        let was_main_phase = kawa.is_main_phase();
269        kawa::h1::parse(kawa, parts.context);
270        if kawa.is_error() {
271            match self.position {
272                Position::Client(..) => {
273                    incr!(names::http::BACKEND_PARSE_ERRORS);
274                    let StreamState::Linked(token) = stream.state else {
275                        error!(
276                            "{} client stream in error is not in Linked state",
277                            log_context!(self)
278                        );
279                        return MuxResult::CloseSession;
280                    };
281                    let global_stream_id = stream_id;
282                    self.end_stream(global_stream_id, context);
283                    endpoint.end_stream(token, global_stream_id, context);
284                }
285                Position::Server => {
286                    incr!(names::http::FRONTEND_PARSE_ERRORS);
287                    let answers = answers_rc.borrow();
288                    set_default_answer(stream, &mut self.readiness, 400, &answers);
289                }
290            }
291            return MuxResult::Continue;
292        }
293        // Capture borrow-sensitive values after parsing but before the 1xx block
294        // accesses stream.state (which ends the split borrow from `parts`).
295        let is_keep_alive_backend = parts.context.keep_alive_backend;
296        let is_body_phase_after_parse = kawa.is_main_phase();
297
298        // 1xx informational responses (100 Continue, 103 Early Hints): the H1
299        // parser treats them as complete (Terminated + end_stream=true), but for
300        // H2 frontends they must be forwarded WITHOUT END_STREAM so the real
301        // response can follow on the same stream. Also keep READABLE interest
302        // so the backend can send the final response.
303        let is_1xx_backend = if self.position.is_client() {
304            if let kawa::StatusLine::Response { code, .. } = &kawa.detached.status_line {
305                if (100..200).contains(code) {
306                    debug!(
307                        "{} H1 backend: received {} informational response",
308                        log_context!(self),
309                        code
310                    );
311                    for block in &mut kawa.blocks {
312                        if let kawa::Block::Flags(flags) = block {
313                            flags.end_stream = false;
314                            flags.end_body = false;
315                        }
316                    }
317                    true
318                } else {
319                    false
320                }
321            } else {
322                false
323            }
324        } else {
325            false
326        };
327        if kawa.is_terminated() && !is_1xx_backend {
328            self.timeout_container.cancel();
329            self.readiness.interest.remove(Ready::READABLE);
330        }
331        if kawa.is_main_phase() {
332            if !was_main_phase && self.position.is_server() {
333                if parts.context.method.is_none()
334                    || parts.context.authority.is_none()
335                    || parts.context.path.is_none()
336                {
337                    if let kawa::StatusLine::Request {
338                        version: kawa::Version::V10,
339                        ..
340                    } = kawa.detached.status_line
341                    {
342                        error!(
343                            "{} Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
344                            log_context!(self),
345                            parts.context.session_address,
346                            parts.context.method,
347                            parts.context.authority,
348                            parts.context.path
349                        );
350                    } else {
351                        error!("{} Unexpected malformed request", log_context!(self));
352                        kawa::debug_kawa(kawa);
353                    }
354                    let answers = answers_rc.borrow();
355                    set_default_answer(stream, &mut self.readiness, 400, &answers);
356                    return MuxResult::Continue;
357                }
358                self.requests += 1;
359                trace!("{} REQUESTS: {}", log_context!(self), self.requests);
360                incr!(names::http::REQUESTS);
361                gauge_add!(names::http::ACTIVE_REQUESTS, 1);
362                parts.metrics.service_start();
363                // Set request_counted after the last use of `parts` to satisfy the borrow checker
364                stream.request_counted = true;
365                stream.state = StreamState::Link;
366                context.pending_links.push_back(stream_id);
367            }
368            if let StreamState::Linked(token) = stream.state {
369                // Signal pending write alongside the WRITABLE interest flip: the
370                // bytes we just parsed live in sozu's buffers, not the kernel,
371                // so edge-triggered epoll won't re-fire on its own.
372                let peer = endpoint.readiness_mut(token);
373                peer.arm_writable();
374            }
375        };
376        // 1xx informational: the 100 response skips main_phase (goes straight to
377        // Terminated), so the normal "set endpoint writable" above never fires.
378        // Trigger the frontend to write the 1xx response after all borrows end.
379        if is_1xx_backend {
380            if let StreamState::Linked(token) = stream.state {
381                let peer = endpoint.readiness_mut(token);
382                peer.arm_writable();
383            }
384        }
385
386        // Close-delimited response: socket_read returned (size > 0, Closed) —
387        // the last data chunk arrived together with the EOF in a single read.
388        // After parsing the data above, terminate the kawa now so the H2
389        // converter emits END_STREAM on the last DATA frame.
390        if status == SocketResult::Closed
391            && self.position.is_client()
392            && is_body_phase_after_parse
393            && !is_keep_alive_backend
394            && !context.streams[stream_id].back.is_terminated()
395        {
396            let kawa = &mut context.streams[stream_id].back;
397            Self::terminate_close_delimited(kawa, stream_id);
398            self.timeout_container.cancel();
399            self.readiness.interest.remove(Ready::READABLE);
400        }
401
402        MuxResult::Continue
403    }
404
405    pub fn writable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
406    where
407        E: Endpoint,
408        L: ListenerHandler + L7ListenerHandler,
409    {
410        trace!(
411            "{} ======= MUX H1 WRITABLE {:?}",
412            log_context!(self),
413            self.position
414        );
415        let Some(stream_id) = self.stream else {
416            if self.socket.socket_wants_write() {
417                let (size, status) = self.socket.socket_write_vectored(&[]);
418                let _ = update_readiness_after_write(size, status, &mut self.readiness);
419                if self.socket.socket_wants_write() {
420                    self.readiness.signal_pending_write();
421                }
422            }
423            return MuxResult::Continue;
424        };
425        self.timeout_container.reset();
426        let stream = &mut context.streams[stream_id];
427        let parts = stream.split(&self.position);
428        let kawa = parts.wbuffer;
429        // Apply per-frontend response-side header edits stashed by the
430        // routing layer at request time. Only the Server-position pass
431        // touches the response back-kawa; the Client-position pass
432        // (writing the request to the backend) has already had its
433        // edits applied in `Router::route_from_request`.
434        //
435        // Drained via `mem::take` so the injection runs exactly once
436        // per response. H1 keep-alive can re-enter this writable path
437        // for the same stream when the backend response spans more
438        // than one TCP read; without the take, the second pass would
439        // re-insert the same headers (typically as duplicate STS lines
440        // on the wire — RFC 6797 §6.1 expects a single header). On H2
441        // the same multi-prepare-cycle pattern surfaces as a
442        // `H2BlockConverter::finalize` "out buffer not empty" leak.
443        if matches!(self.position, Position::Server) && !parts.context.headers_response.is_empty() {
444            let edits = std::mem::take(&mut parts.context.headers_response);
445            super::shared::apply_response_header_edits(kawa, &edits);
446        }
447        kawa.prepare(&mut kawa::h1::BlockConverter);
448        let mut io_slices = Vec::new();
449        for block in kawa.out.iter() {
450            match block {
451                kawa::OutBlock::Delimiter => break,
452                kawa::OutBlock::Store(store) => {
453                    io_slices.push(IoSlice::new(store.data(kawa.storage.buffer())));
454                }
455            }
456        }
457        let can_finalize_server_close = matches!(self.position, Position::Server)
458            && kawa.is_terminated()
459            && kawa.is_completed();
460        if io_slices.is_empty() && !self.socket.socket_wants_write() && !can_finalize_server_close {
461            self.readiness.interest.remove(Ready::WRITABLE);
462            return MuxResult::Continue;
463        }
464        let tls_only_flush = io_slices.is_empty();
465        let (size, status) = self.socket.socket_write_vectored(&io_slices);
466        context.debug.push(DebugEvent::StreamEvent(1, size));
467        kawa.consume(size);
468        self.position.count_bytes_out_counter(size);
469        self.position.count_bytes_out(parts.metrics, size);
470        let should_yield = update_readiness_after_write(size, status, &mut self.readiness);
471        if self.socket.socket_wants_write() {
472            self.readiness.signal_pending_write();
473            return MuxResult::Continue;
474        }
475        if !tls_only_flush && should_yield {
476            return MuxResult::Continue;
477        }
478
479        if kawa.is_terminated() && kawa.is_completed() {
480            match self.position {
481                Position::Client(..) => self.readiness.interest.insert(Ready::READABLE),
482                Position::Server => {
483                    if stream.context.closing {
484                        return self.defer_close_for_tls_flush("closing-context");
485                    }
486                    let kawa = &mut stream.back;
487                    match kawa.detached.status_line {
488                        kawa::StatusLine::Response { code: 101, .. } => {
489                            debug!("{} ============== HANDLE UPGRADE!", log_context!(self));
490                            stream.metrics.backend_stop();
491                            let client_rtt = socket_rtt(self.socket.socket_ref());
492                            let server_rtt = stream
493                                .linked_token()
494                                .and_then(|t| endpoint.socket(t))
495                                .and_then(socket_rtt);
496                            stream.generate_access_log(
497                                false,
498                                Some("H1::Upgrade"),
499                                context.listener.clone(),
500                                client_rtt,
501                                server_rtt,
502                            );
503                            return MuxResult::Upgrade;
504                        }
505                        kawa::StatusLine::Response { code: 100, .. } => {
506                            debug!("{} ============== HANDLE CONTINUE!", log_context!(self));
507                            // After a 100 Continue, we expect the client to continue
508                            // with its request body. Do NOT call generate_access_log
509                            // here — the final response will emit the access log.
510                            // Calling it here would double-decrement http.active_requests.
511                            self.timeout_container.reset();
512                            self.readiness.interest.insert(Ready::READABLE);
513                            kawa.clear();
514                            stream.metrics.backend_stop();
515                            if let StreamState::Linked(token) = stream.state {
516                                endpoint
517                                    .readiness_mut(token)
518                                    .interest
519                                    .insert(Ready::READABLE);
520                            }
521                            return MuxResult::Continue;
522                        }
523                        kawa::StatusLine::Response { code: 103, .. } => {
524                            debug!("{} ============== HANDLE EARLY HINT!", log_context!(self));
525                            // Do NOT call generate_access_log for 103 Early Hints.
526                            // The final response will emit the access log.
527                            // Calling it here would double-decrement http.active_requests.
528                            if let StreamState::Linked(token) = stream.state {
529                                // after a 103 early hints, we expect the backend to send its response
530                                endpoint
531                                    .readiness_mut(token)
532                                    .interest
533                                    .insert(Ready::READABLE);
534                                kawa.clear();
535                                stream.metrics.backend_stop();
536                                return MuxResult::Continue;
537                            } else {
538                                stream.metrics.backend_stop();
539                                let client_rtt = socket_rtt(self.socket.socket_ref());
540                                let server_rtt = stream
541                                    .linked_token()
542                                    .and_then(|t| endpoint.socket(t))
543                                    .and_then(socket_rtt);
544                                stream.generate_access_log(
545                                    false,
546                                    Some("H1::EarlyHint"),
547                                    context.listener.clone(),
548                                    client_rtt,
549                                    server_rtt,
550                                );
551                                return self.defer_close_for_tls_flush("early-hint");
552                            }
553                        }
554                        _ => {}
555                    }
556                    incr!(names::http::E2E_HTTP11);
557                    stream.metrics.backend_stop();
558                    let client_rtt = socket_rtt(self.socket.socket_ref());
559                    let server_rtt = stream
560                        .linked_token()
561                        .and_then(|t| endpoint.socket(t))
562                        .and_then(socket_rtt);
563                    stream.generate_access_log(
564                        false,
565                        Some("H1::Complete"),
566                        context.listener.clone(),
567                        client_rtt,
568                        server_rtt,
569                    );
570                    stream.metrics.reset();
571                    let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
572                    if let StreamState::Linked(token) = old_state {
573                        remove_backend_stream(&mut context.backend_streams, token, stream_id);
574                    }
575                    if stream.context.keep_alive_frontend {
576                        self.timeout_container.reset();
577                        if let StreamState::Linked(token) = old_state {
578                            endpoint.end_stream(token, stream_id, context);
579                        }
580                        self.readiness.interest.insert(Ready::READABLE);
581                        let stream = &mut context.streams[stream_id];
582                        stream.context.reset();
583                        stream.back.clear();
584                        stream.back.storage.clear();
585                        stream.front.clear();
586                        // do not stream.front.storage.clear() because of H1 pipelining
587                        stream.attempts = 0;
588                        // Transition back to Idle so buffered pipelined requests
589                        // trigger a phase transition on the next readable() call.
590                        stream.state = StreamState::Idle;
591                        // HTTP/1.1 pipelining: if there's still data in the frontend
592                        // storage (pipelined requests already read from the socket),
593                        // parse it now. We can't rely on a new READABLE event because
594                        // the socket buffer may be empty — all requests were already
595                        // read into kawa storage in the first socket_read.
596                        if !stream.front.storage.is_empty() {
597                            kawa::h1::parse(&mut stream.front, &mut stream.context);
598                            let is_error = stream.front.is_error();
599                            let is_main = stream.front.is_main_phase();
600                            let malformed = is_main
601                                && (stream.context.method.is_none()
602                                    || stream.context.authority.is_none()
603                                    || stream.context.path.is_none());
604                            if is_error || malformed {
605                                let answers_rc = context.listener.borrow().get_answers().clone();
606                                let answers = answers_rc.borrow();
607                                set_default_answer(stream, &mut self.readiness, 400, &answers);
608                            } else if is_main {
609                                self.requests += 1;
610                                incr!(names::http::REQUESTS);
611                                gauge_add!(names::http::ACTIVE_REQUESTS, 1);
612                                stream.metrics.service_start();
613                                stream.request_counted = true;
614                                stream.state = StreamState::Link;
615                                context.pending_links.push_back(stream_id);
616                            }
617                            // else: incomplete parse, wait for more data via READABLE
618                        }
619                    } else {
620                        return self.defer_close_for_tls_flush("response-complete");
621                    }
622                }
623            }
624        }
625        MuxResult::Continue
626    }
627
628    pub fn force_disconnect(&mut self) -> MuxResult {
629        match &mut self.position {
630            Position::Client(_, _, status) => {
631                *status = BackendStatus::Disconnecting;
632                self.readiness.event = Ready::HUP;
633                debug!(
634                    "{} H1 force_disconnect client: stream={:?}, wants_write={}, readiness={:?}",
635                    log_context!(self),
636                    self.stream,
637                    self.socket.socket_wants_write(),
638                    self.readiness
639                );
640                MuxResult::Continue
641            }
642            Position::Server => {
643                if self.socket.socket_wants_write() {
644                    debug!(
645                        "{} H1 force_disconnect delaying close: stream={:?}, wants_write=true, readiness={:?}",
646                        log_context!(self),
647                        self.stream,
648                        self.readiness
649                    );
650                    self.readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
651                    self.readiness.signal_pending_write();
652                    MuxResult::Continue
653                } else {
654                    debug!(
655                        "{} H1 force_disconnect closing session: stream={:?}, wants_write=false, readiness={:?}",
656                        log_context!(self),
657                        self.stream,
658                        self.readiness
659                    );
660                    MuxResult::CloseSession
661                }
662            }
663        }
664    }
665
666    pub fn has_pending_write(&self) -> bool {
667        self.socket.socket_wants_write()
668    }
669
670    pub fn initiate_close_notify(&mut self) -> bool {
671        if !self.position.is_server() {
672            return false;
673        }
674        if !self.close_notify_sent {
675            trace!("{} H1 initiating CLOSE_NOTIFY", log_context!(self));
676            self.socket.socket_close();
677            self.close_notify_sent = true;
678        }
679        if self.socket.socket_wants_write() {
680            self.readiness.arm_writable();
681            true
682        } else {
683            false
684        }
685    }
686
687    pub fn close<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E)
688    where
689        E: Endpoint,
690        L: ListenerHandler + L7ListenerHandler,
691    {
692        match self.position {
693            Position::Client(_, _, BackendStatus::KeepAlive)
694            | Position::Client(_, _, BackendStatus::Disconnecting) => {
695                trace!("{} close detached client ConnectionH1", log_context!(self));
696                return;
697            }
698            Position::Client(_, _, BackendStatus::Connecting(_))
699            | Position::Client(_, _, BackendStatus::Connected) => {
700                debug!(
701                    "{} BACKEND CLOSING FOR: {:?} {:?}",
702                    log_context!(self),
703                    self.position,
704                    self.stream
705                );
706            }
707            Position::Server => {
708                let tls_pending_before = self.socket.socket_wants_write();
709                let (tls_pending_after, drain_rounds) =
710                    drain_tls_close_notify(&mut self.socket, &mut self.close_notify_sent);
711                if tls_pending_after {
712                    error!(
713                        "{} H1 TLS buffer NOT fully drained on close: pending_before={}, pending_after={}, drain_rounds={}, stream={:?}, close_notify_sent={}, readiness={:?}",
714                        log_context!(self),
715                        tls_pending_before,
716                        tls_pending_after,
717                        drain_rounds,
718                        self.stream,
719                        self.close_notify_sent,
720                        self.readiness
721                    );
722                }
723                return;
724            }
725        }
726        let Some(stream_id) = self.stream else {
727            trace!(
728                "{} closing detached H1 client with no active stream",
729                log_context!(self)
730            );
731            return;
732        };
733        // reconnection is handled by the server
734        let StreamState::Linked(token) = context.streams[stream_id].state else {
735            trace!(
736                "{} closing detached H1 client in state {:?} on stream {}",
737                log_context!(self),
738                context.streams[stream_id].state,
739                stream_id
740            );
741            return;
742        };
743        endpoint.end_stream(token, stream_id, context)
744    }
745
746    pub fn end_stream<L>(&mut self, stream: GlobalStreamId, context: &mut Context<L>)
747    where
748        L: ListenerHandler + L7ListenerHandler,
749    {
750        if self.stream != Some(stream) {
751            error!(
752                "{} end_stream called with stream {} but expected {:?}",
753                log_context!(self),
754                stream,
755                self.stream
756            );
757            return;
758        }
759        context.unlink_stream(stream);
760        let answers_rc = context.listener.borrow().get_answers().clone();
761        let stream_id = stream;
762        let stream = &mut context.streams[stream_id];
763        let stream_context = &mut stream.context;
764        trace!(
765            "{} end H1 stream {:?}: {:#?}",
766            log_context!(self),
767            self.stream,
768            stream_context
769        );
770        match &mut self.position {
771            Position::Client(_, _, BackendStatus::Connecting(_)) => {
772                self.stream = None;
773                if stream.state != StreamState::Recycle {
774                    stream.state = StreamState::Unlinked;
775                }
776                self.readiness.interest.remove(Ready::ALL);
777                self.force_disconnect();
778            }
779            Position::Client(_, _, status @ BackendStatus::Connected) => {
780                self.stream = None;
781                if stream.state != StreamState::Recycle {
782                    stream.state = StreamState::Unlinked;
783                }
784                self.readiness.interest.remove(Ready::ALL);
785                // keep alive should probably be used only if the http context is fully reset
786                // in case end_stream occurs due to an error the connection state is probably
787                // unrecoverable and should be terminated
788                if stream_context.keep_alive_backend && stream.back.is_terminated() {
789                    *status = BackendStatus::KeepAlive;
790                } else {
791                    self.force_disconnect();
792                }
793            }
794            Position::Client(_, _, BackendStatus::KeepAlive)
795            | Position::Client(_, _, BackendStatus::Disconnecting) => {
796                error!(
797                    "{} end_stream called on KeepAlive or Disconnecting H1 client",
798                    log_context!(self)
799                );
800            }
801            Position::Server => match end_stream_decision(stream) {
802                EndStreamAction::ForwardTerminated => {
803                    debug!("{} CLOSING H1 TERMINATED STREAM", log_context!(self));
804                    stream.state = StreamState::Unlinked;
805                    self.readiness.interest.insert(Ready::WRITABLE);
806                    // End-of-stream was already queued into kawa by the parser;
807                    // no fresh WRITABLE event will arrive from the kernel.
808                    self.readiness.signal_pending_write();
809                }
810                EndStreamAction::CloseDelimited => {
811                    debug!("{} CLOSE DELIMITED", log_context!(self));
812                    stream.state = StreamState::Unlinked;
813                    self.readiness.arm_writable();
814                }
815                EndStreamAction::ForwardUnterminated => {
816                    debug!("{} CLOSING H1 UNTERMINATED STREAM", log_context!(self));
817                    forcefully_terminate_answer(
818                        stream,
819                        &mut self.readiness,
820                        H2Error::InternalError,
821                    );
822                }
823                EndStreamAction::SendDefault(status) => {
824                    let answers = answers_rc.borrow();
825                    set_default_answer(stream, &mut self.readiness, status, &answers);
826                }
827                EndStreamAction::Reconnect => {
828                    debug!("{} H1 RECONNECT", log_context!(self));
829                    stream.state = StreamState::Link;
830                    context.pending_links.push_back(stream_id);
831                }
832            },
833        }
834    }
835
836    pub fn start_stream<L>(&mut self, stream: GlobalStreamId, _context: &mut Context<L>) -> bool
837    where
838        L: ListenerHandler + L7ListenerHandler,
839    {
840        trace!(
841            "{} start H1 stream {} {:?}",
842            log_context!(self),
843            stream,
844            self.readiness
845        );
846        self.readiness.interest.insert(Ready::ALL);
847        self.stream = Some(stream);
848        match &mut self.position {
849            Position::Client(_, _, status @ BackendStatus::KeepAlive) => {
850                *status = BackendStatus::Connected;
851            }
852            Position::Client(_, _, BackendStatus::Disconnecting) => {
853                error!(
854                    "{} start_stream called on Disconnecting H1 client",
855                    log_context!(self)
856                );
857                return false;
858            }
859            Position::Client(_, _, _) => {}
860            Position::Server => {
861                error!(
862                    "{} start_stream must not be called on H1 server connection",
863                    log_context!(self)
864                );
865                return false;
866            }
867        }
868        true
869    }
870}