1use std::io::IoSlice;
9
10use rusty_ulid::Ulid;
11use sozu_command::{logging::ansi_palette, ready::Ready};
12
13use crate::metrics::names;
14use crate::{
15 L7ListenerHandler, ListenerHandler, Readiness,
16 protocol::mux::{
17 BackendStatus, Context, DebugEvent, Endpoint, GlobalStreamId, MuxResult, Position,
18 StreamState, forcefully_terminate_answer,
19 parser::H2Error,
20 remove_backend_stream, set_default_answer,
21 shared::{EndStreamAction, drain_tls_close_notify, end_stream_decision},
22 update_readiness_after_read, update_readiness_after_write,
23 },
24 socket::{SocketHandler, SocketResult, stats::socket_rtt},
25 timer::TimeoutContainer,
26};
27
28macro_rules! log_context {
44 ($self:expr) => {{
45 let (open, reset, grey, gray, white) = ansi_palette();
46 format!(
47 "[{ulid} - - -]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
48 open = open,
49 reset = reset,
50 grey = grey,
51 gray = gray,
52 white = white,
53 ulid = $self.session_ulid,
54 peer = $self.socket.socket_ref().peer_addr().ok(),
55 position = $self.position,
56 stream = $self.stream,
57 requests = $self.requests,
58 parked = $self.parked_on_buffer_pressure,
59 close_notify = $self.close_notify_sent,
60 readiness = $self.readiness,
61 )
62 }};
63}
64
65#[allow(unused_macros)]
69macro_rules! log_context_stream {
70 ($self:expr, $http_context:expr) => {{
71 let (open, reset, grey, gray, white) = ansi_palette();
72 format!(
73 "[{ulid} {req} {cluster} {backend}]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
74 open = open,
75 reset = reset,
76 grey = grey,
77 gray = gray,
78 white = white,
79 ulid = $self.session_ulid,
80 req = $http_context.id,
81 cluster = $http_context.cluster_id.as_deref().unwrap_or("-"),
82 backend = $http_context.backend_id.as_deref().unwrap_or("-"),
83 peer = $self.socket.socket_ref().peer_addr().ok(),
84 position = $self.position,
85 stream = $self.stream,
86 requests = $self.requests,
87 parked = $self.parked_on_buffer_pressure,
88 close_notify = $self.close_notify_sent,
89 readiness = $self.readiness,
90 )
91 }};
92}
93
94macro_rules! log_module_context {
97 () => {{
98 let (open, reset, _, _, _) = ansi_palette();
99 format!("{open}MUX-H1{reset}\t >>>", open = open, reset = reset)
100 }};
101}
102
103pub struct ConnectionH1<Front: SocketHandler> {
110 pub position: Position,
111 pub readiness: Readiness,
112 pub requests: usize,
113 pub socket: Front,
114 pub stream: Option<GlobalStreamId>,
117 pub timeout_container: TimeoutContainer,
118 pub parked_on_buffer_pressure: bool,
123 pub close_notify_sent: bool,
125 pub session_ulid: Ulid,
129}
130
131impl<Front: SocketHandler> std::fmt::Debug for ConnectionH1<Front> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("ConnectionH1")
134 .field("position", &self.position)
135 .field("readiness", &self.readiness)
136 .field("socket", &self.socket.socket_ref())
137 .field("stream", &self.stream)
138 .finish()
139 }
140}
141
142impl<Front: SocketHandler> ConnectionH1<Front> {
143 fn defer_close_for_tls_flush(&mut self, reason: &'static str) -> MuxResult {
144 if self.initiate_close_notify() {
145 trace!(
146 "{} H1 writable delaying close after {}: stream={:?}, close_notify_sent={}, wants_write={}, readiness={:?}",
147 log_context!(self),
148 reason,
149 self.stream,
150 self.close_notify_sent,
151 self.socket.socket_wants_write(),
152 self.readiness
153 );
154 MuxResult::Continue
155 } else {
156 MuxResult::CloseSession
157 }
158 }
159
160 fn terminate_close_delimited(kawa: &mut super::GenericHttpStream, stream_id: GlobalStreamId) {
169 debug_assert!(
173 !kawa.is_terminated(),
174 "terminate_close_delimited must not run on an already-terminated kawa"
175 );
176 if kawa.body_size == kawa::BodySize::Chunked {
177 warn!(
178 "{} H1 backend EOF mid-chunked response on stream {}: emitting RST_STREAM",
179 log_module_context!(),
180 stream_id
181 );
182 incr!(names::h1::BACKEND_EOF_BEFORE_MESSAGE_COMPLETE);
183 kawa.parsing_phase
184 .error(kawa::ParsingErrorKind::Processing {
185 message: "INTERNAL_ERROR",
186 });
187 debug_assert!(
190 kawa.is_error(),
191 "truncated chunked response must end in the Error phase"
192 );
193 return;
194 }
195 debug!(
196 "{} H1 close-delimited EOF on stream {}: terminating body",
197 log_module_context!(),
198 stream_id
199 );
200 kawa.push_block(kawa::Block::Flags(kawa::Flags {
201 end_body: true,
202 end_chunk: false,
203 end_header: false,
204 end_stream: true,
205 }));
206 kawa.parsing_phase = kawa::ParsingPhase::Terminated;
207 debug_assert!(
210 kawa.is_terminated(),
211 "close-delimited body must end in the Terminated phase"
212 );
213 }
214
215 pub fn readable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
216 where
217 E: Endpoint,
218 L: ListenerHandler + L7ListenerHandler,
219 {
220 trace!(
221 "{} ======= MUX H1 READABLE {:?}",
222 log_context!(self),
223 self.position
224 );
225 let Some(stream_id) = self.stream else {
226 error!(
227 "{} readable() called on H1 connection with no active stream",
228 log_context!(self)
229 );
230 return MuxResult::Continue;
231 };
232 self.timeout_container.reset();
233 let answers_rc = context.listener.borrow().get_answers().clone();
234 let stream = &mut context.streams[stream_id];
235 if stream.metrics.start.is_none() {
236 stream.metrics.mark_request_start();
237 }
238 let parts = stream.split(&self.position);
239 let kawa = parts.rbuffer;
240
241 if kawa.storage.available_space() == 0 {
248 self.readiness.event.remove(Ready::READABLE);
249 self.parked_on_buffer_pressure = true;
250 debug_assert!(
253 self.parked_on_buffer_pressure && !self.readiness.event.is_readable(),
254 "parking on buffer pressure must clear the READABLE event"
255 );
256 return MuxResult::Continue;
257 }
258
259 self.parked_on_buffer_pressure = false;
260 let space_before = kawa.storage.available_space();
263 debug_assert!(
264 space_before > 0,
265 "socket_read must target a buffer with free space"
266 );
267 let (size, status) = self.socket.socket_read(kawa.storage.space());
268 debug_assert!(
272 size <= space_before,
273 "socket_read returned more bytes than the buffer could hold"
274 );
275 context.debug.push(DebugEvent::StreamEvent(0, size));
276 kawa.storage.fill(size);
277 debug_assert_eq!(
278 kawa.storage.available_space(),
279 space_before - size,
280 "fill must consume exactly `size` bytes of free space"
281 );
282 self.position.count_bytes_in_counter(size);
283 self.position.count_bytes_in(parts.metrics, size);
284 if update_readiness_after_read(size, status, &mut self.readiness) {
285 if status == SocketResult::Closed
293 && self.position.is_client()
294 && kawa.is_main_phase()
295 && !kawa.is_terminated()
296 && !parts.context.keep_alive_backend
297 {
298 Self::terminate_close_delimited(kawa, stream_id);
299 self.timeout_container.cancel();
300 self.readiness.interest.remove(Ready::READABLE);
301 if let StreamState::Linked(token) = stream.state {
302 let peer = endpoint.readiness_mut(token);
306 peer.arm_writable();
307 }
308 }
309 return MuxResult::Continue;
310 }
311
312 let was_main_phase = kawa.is_main_phase();
313 kawa::h1::parse(kawa, parts.context);
314 if kawa.is_error() {
315 match self.position {
316 Position::Client(..) => {
317 incr!(names::http::BACKEND_PARSE_ERRORS);
318 let StreamState::Linked(token) = stream.state else {
319 error!(
320 "{} client stream in error is not in Linked state",
321 log_context!(self)
322 );
323 return MuxResult::CloseSession;
324 };
325 let global_stream_id = stream_id;
326 self.end_stream(global_stream_id, context);
327 endpoint.end_stream(token, global_stream_id, context);
328 }
329 Position::Server => {
330 incr!(names::http::FRONTEND_PARSE_ERRORS);
331 let answers = answers_rc.borrow();
332 set_default_answer(stream, &mut self.readiness, 400, &answers);
333 }
334 }
335 return MuxResult::Continue;
336 }
337 let is_keep_alive_backend = parts.context.keep_alive_backend;
340 let is_body_phase_after_parse = kawa.is_main_phase();
341
342 let is_1xx_backend = if self.position.is_client() {
348 if let kawa::StatusLine::Response { code, .. } = &kawa.detached.status_line {
349 if (100..200).contains(code) {
350 debug!(
351 "{} H1 backend: received {} informational response",
352 log_context!(self),
353 code
354 );
355 for block in &mut kawa.blocks {
356 if let kawa::Block::Flags(flags) = block {
357 flags.end_stream = false;
358 flags.end_body = false;
359 }
360 }
361 true
362 } else {
363 false
364 }
365 } else {
366 false
367 }
368 } else {
369 false
370 };
371 if kawa.is_terminated() && !is_1xx_backend {
372 self.timeout_container.cancel();
373 self.readiness.interest.remove(Ready::READABLE);
374 }
375 if kawa.is_main_phase() {
376 if !was_main_phase && self.position.is_server() {
377 if parts.context.method.is_none()
378 || parts.context.authority.is_none()
379 || parts.context.path.is_none()
380 {
381 if let kawa::StatusLine::Request {
382 version: kawa::Version::V10,
383 ..
384 } = kawa.detached.status_line
385 {
386 error!(
387 "{} Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
388 log_context!(self),
389 parts.context.session_address,
390 parts.context.method,
391 parts.context.authority,
392 parts.context.path
393 );
394 } else {
395 error!("{} Unexpected malformed request", log_context!(self));
396 kawa::debug_kawa(kawa);
397 }
398 let answers = answers_rc.borrow();
399 set_default_answer(stream, &mut self.readiness, 400, &answers);
400 return MuxResult::Continue;
401 }
402 let requests_before = self.requests;
408 let links_before = context.pending_links.len();
409 self.requests += 1;
410 debug_assert_eq!(
411 self.requests,
412 requests_before + 1,
413 "server keep-alive request counter must advance by exactly one"
414 );
415 trace!("{} REQUESTS: {}", log_context!(self), self.requests);
416 incr!(names::http::REQUESTS);
417 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
418 parts.metrics.service_start();
419 stream.request_counted = true;
421 stream.state = StreamState::Link;
422 context.pending_links.push_back(stream_id);
423 debug_assert!(
426 stream.request_counted,
427 "request_counted must be set when the active-requests gauge is incremented"
428 );
429 debug_assert_eq!(
430 stream.state,
431 StreamState::Link,
432 "a first-seen request must transition the stream to Link"
433 );
434 debug_assert_eq!(
435 context.pending_links.len(),
436 links_before + 1,
437 "a first-seen request must enqueue exactly one pending link"
438 );
439 }
440 if let StreamState::Linked(token) = stream.state {
441 let peer = endpoint.readiness_mut(token);
445 peer.arm_writable();
446 }
447 };
448 if is_1xx_backend {
452 if let StreamState::Linked(token) = stream.state {
453 let peer = endpoint.readiness_mut(token);
454 peer.arm_writable();
455 }
456 }
457
458 if status == SocketResult::Closed
463 && self.position.is_client()
464 && is_body_phase_after_parse
465 && !is_keep_alive_backend
466 && !context.streams[stream_id].back.is_terminated()
467 {
468 let kawa = &mut context.streams[stream_id].back;
469 Self::terminate_close_delimited(kawa, stream_id);
470 self.timeout_container.cancel();
471 self.readiness.interest.remove(Ready::READABLE);
472 }
473
474 MuxResult::Continue
475 }
476
477 pub fn writable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
478 where
479 E: Endpoint,
480 L: ListenerHandler + L7ListenerHandler,
481 {
482 trace!(
483 "{} ======= MUX H1 WRITABLE {:?}",
484 log_context!(self),
485 self.position
486 );
487 let Some(stream_id) = self.stream else {
488 if self.socket.socket_wants_write() {
489 let (size, status) = self.socket.socket_write_vectored(&[]);
490 let _ = update_readiness_after_write(size, status, &mut self.readiness);
491 if self.socket.socket_wants_write() {
492 self.readiness.signal_pending_write();
493 }
494 }
495 return MuxResult::Continue;
496 };
497 self.timeout_container.reset();
498 let stream = &mut context.streams[stream_id];
499 let parts = stream.split(&self.position);
500 let kawa = parts.wbuffer;
501 if matches!(self.position, Position::Server) && !parts.context.headers_response.is_empty() {
516 let edits = std::mem::take(&mut parts.context.headers_response);
517 super::shared::apply_response_header_edits(kawa, &edits);
518 }
519 kawa.prepare(&mut kawa::h1::BlockConverter);
520 let mut io_slices = Vec::new();
521 for block in kawa.out.iter() {
522 match block {
523 kawa::OutBlock::Delimiter => break,
524 kawa::OutBlock::Store(store) => {
525 io_slices.push(IoSlice::new(store.data(kawa.storage.buffer())));
526 }
527 }
528 }
529 let can_finalize_server_close = matches!(self.position, Position::Server)
530 && kawa.is_terminated()
531 && kawa.is_completed();
532 if io_slices.is_empty() && !self.socket.socket_wants_write() && !can_finalize_server_close {
533 self.readiness.interest.remove(Ready::WRITABLE);
534 return MuxResult::Continue;
535 }
536 let tls_only_flush = io_slices.is_empty();
537 let queued: usize = io_slices.iter().map(|s| s.len()).sum();
540 let (size, status) = self.socket.socket_write_vectored(&io_slices);
541 debug_assert!(
542 size <= queued,
543 "socket_write_vectored reported more bytes written than were queued"
544 );
545 context.debug.push(DebugEvent::StreamEvent(1, size));
546 kawa.consume(size);
547 self.position.count_bytes_out_counter(size);
548 self.position.count_bytes_out(parts.metrics, size);
549 let should_yield = update_readiness_after_write(size, status, &mut self.readiness);
550 if self.socket.socket_wants_write() {
551 self.readiness.signal_pending_write();
552 debug_assert!(
556 self.readiness.event.is_writable(),
557 "signal_pending_write must leave a WRITABLE event queued"
558 );
559 return MuxResult::Continue;
560 }
561 if !tls_only_flush && should_yield {
562 return MuxResult::Continue;
563 }
564
565 if kawa.is_terminated() && kawa.is_completed() {
566 match self.position {
567 Position::Client(..) => self.readiness.interest.insert(Ready::READABLE),
568 Position::Server => {
569 if stream.context.closing {
570 return self.defer_close_for_tls_flush("closing-context");
571 }
572 let kawa = &mut stream.back;
573 match kawa.detached.status_line {
574 kawa::StatusLine::Response { code: 101, .. } => {
575 debug!("{} ============== HANDLE UPGRADE!", log_context!(self));
576 stream.metrics.backend_stop();
577 let client_rtt = socket_rtt(self.socket.socket_ref());
578 let server_rtt = stream
579 .linked_token()
580 .and_then(|t| endpoint.socket(t))
581 .and_then(socket_rtt);
582 stream.generate_access_log(
583 false,
584 Some("H1::Upgrade"),
585 context.listener.clone(),
586 client_rtt,
587 server_rtt,
588 );
589 return MuxResult::Upgrade;
590 }
591 kawa::StatusLine::Response { code: 100, .. } => {
592 debug!("{} ============== HANDLE CONTINUE!", log_context!(self));
593 self.timeout_container.reset();
598 self.readiness.interest.insert(Ready::READABLE);
599 kawa.clear();
600 stream.metrics.backend_stop();
601 if let StreamState::Linked(token) = stream.state {
602 endpoint
603 .readiness_mut(token)
604 .interest
605 .insert(Ready::READABLE);
606 }
607 return MuxResult::Continue;
608 }
609 kawa::StatusLine::Response { code: 103, .. } => {
610 debug!("{} ============== HANDLE EARLY HINT!", log_context!(self));
611 if let StreamState::Linked(token) = stream.state {
615 endpoint
617 .readiness_mut(token)
618 .interest
619 .insert(Ready::READABLE);
620 kawa.clear();
621 stream.metrics.backend_stop();
622 return MuxResult::Continue;
623 } else {
624 stream.metrics.backend_stop();
625 let client_rtt = socket_rtt(self.socket.socket_ref());
626 let server_rtt = stream
627 .linked_token()
628 .and_then(|t| endpoint.socket(t))
629 .and_then(socket_rtt);
630 stream.generate_access_log(
631 false,
632 Some("H1::EarlyHint"),
633 context.listener.clone(),
634 client_rtt,
635 server_rtt,
636 );
637 return self.defer_close_for_tls_flush("early-hint");
638 }
639 }
640 _ => {}
641 }
642 incr!(names::http::E2E_HTTP11);
643 stream.metrics.backend_stop();
644 let client_rtt = socket_rtt(self.socket.socket_ref());
645 let server_rtt = stream
646 .linked_token()
647 .and_then(|t| endpoint.socket(t))
648 .and_then(socket_rtt);
649 stream.generate_access_log(
650 false,
651 Some("H1::Complete"),
652 context.listener.clone(),
653 client_rtt,
654 server_rtt,
655 );
656 stream.metrics.reset();
657 let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
658 if let StreamState::Linked(token) = old_state {
659 remove_backend_stream(&mut context.backend_streams, token, stream_id);
660 }
661 if stream.context.keep_alive_frontend {
662 self.timeout_container.reset();
663 if let StreamState::Linked(token) = old_state {
664 endpoint.end_stream(token, stream_id, context);
665 }
666 self.readiness.interest.insert(Ready::READABLE);
667 let stream = &mut context.streams[stream_id];
668 stream.context.reset();
669 stream.back.clear();
670 stream.back.storage.clear();
671 stream.front.clear();
672 stream.attempts = 0;
674 stream.state = StreamState::Idle;
677 debug_assert_eq!(
682 stream.state,
683 StreamState::Idle,
684 "keep-alive reset must return the stream to Idle"
685 );
686 debug_assert_eq!(stream.attempts, 0, "keep-alive reset must zero attempts");
687 debug_assert!(
688 !stream.request_counted,
689 "keep-alive reset must leave no counted request (active-requests leak)"
690 );
691 debug_assert!(
692 stream.back.storage.is_empty(),
693 "keep-alive reset must drain the response storage"
694 );
695 if !stream.front.storage.is_empty() {
701 kawa::h1::parse(&mut stream.front, &mut stream.context);
702 let is_error = stream.front.is_error();
703 let is_main = stream.front.is_main_phase();
704 let malformed = is_main
705 && (stream.context.method.is_none()
706 || stream.context.authority.is_none()
707 || stream.context.path.is_none());
708 if is_error || malformed {
709 let answers_rc = context.listener.borrow().get_answers().clone();
710 let answers = answers_rc.borrow();
711 set_default_answer(stream, &mut self.readiness, 400, &answers);
712 } else if is_main {
713 self.requests += 1;
714 incr!(names::http::REQUESTS);
715 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
716 stream.metrics.service_start();
717 stream.request_counted = true;
718 stream.state = StreamState::Link;
719 context.pending_links.push_back(stream_id);
720 }
721 }
723 } else {
724 return self.defer_close_for_tls_flush("response-complete");
725 }
726 }
727 }
728 }
729 MuxResult::Continue
730 }
731
732 pub fn force_disconnect(&mut self) -> MuxResult {
733 match &mut self.position {
734 Position::Client(_, _, status) => {
735 *status = BackendStatus::Disconnecting;
736 self.readiness.event = Ready::HUP;
737 debug!(
738 "{} H1 force_disconnect client: stream={:?}, wants_write={}, readiness={:?}",
739 log_context!(self),
740 self.stream,
741 self.socket.socket_wants_write(),
742 self.readiness
743 );
744 MuxResult::Continue
745 }
746 Position::Server => {
747 if self.socket.socket_wants_write() {
748 debug!(
749 "{} H1 force_disconnect delaying close: stream={:?}, wants_write=true, readiness={:?}",
750 log_context!(self),
751 self.stream,
752 self.readiness
753 );
754 self.readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
755 self.readiness.signal_pending_write();
756 MuxResult::Continue
757 } else {
758 debug!(
759 "{} H1 force_disconnect closing session: stream={:?}, wants_write=false, readiness={:?}",
760 log_context!(self),
761 self.stream,
762 self.readiness
763 );
764 MuxResult::CloseSession
765 }
766 }
767 }
768 }
769
770 pub fn has_pending_write(&self) -> bool {
771 self.socket.socket_wants_write()
772 }
773
774 pub fn initiate_close_notify(&mut self) -> bool {
775 if !self.position.is_server() {
776 return false;
777 }
778 debug_assert!(
781 self.position.is_server(),
782 "initiate_close_notify past the guard must be server-side"
783 );
784 if !self.close_notify_sent {
785 trace!("{} H1 initiating CLOSE_NOTIFY", log_context!(self));
786 self.socket.socket_close();
787 self.close_notify_sent = true;
788 }
789 debug_assert!(
792 self.close_notify_sent,
793 "close_notify_sent must be set once initiate_close_notify has run"
794 );
795 if self.socket.socket_wants_write() {
796 self.readiness.arm_writable();
797 debug_assert!(
800 self.readiness.interest.is_writable() && self.readiness.event.is_writable(),
801 "arm_writable must set both WRITABLE interest and event"
802 );
803 true
804 } else {
805 false
806 }
807 }
808
809 pub fn close<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E)
810 where
811 E: Endpoint,
812 L: ListenerHandler + L7ListenerHandler,
813 {
814 match self.position {
815 Position::Client(_, _, BackendStatus::KeepAlive)
816 | Position::Client(_, _, BackendStatus::Disconnecting) => {
817 trace!("{} close detached client ConnectionH1", log_context!(self));
818 return;
819 }
820 Position::Client(_, _, BackendStatus::Connecting(_))
821 | Position::Client(_, _, BackendStatus::Connected) => {
822 debug!(
823 "{} BACKEND CLOSING FOR: {:?} {:?}",
824 log_context!(self),
825 self.position,
826 self.stream
827 );
828 }
829 Position::Server => {
830 let tls_pending_before = self.socket.socket_wants_write();
831 let (tls_pending_after, drain_rounds) =
832 drain_tls_close_notify(&mut self.socket, &mut self.close_notify_sent);
833 if tls_pending_after {
834 error!(
835 "{} H1 TLS buffer NOT fully drained on close: pending_before={}, pending_after={}, drain_rounds={}, stream={:?}, close_notify_sent={}, readiness={:?}",
836 log_context!(self),
837 tls_pending_before,
838 tls_pending_after,
839 drain_rounds,
840 self.stream,
841 self.close_notify_sent,
842 self.readiness
843 );
844 }
845 return;
846 }
847 }
848 let Some(stream_id) = self.stream else {
849 trace!(
850 "{} closing detached H1 client with no active stream",
851 log_context!(self)
852 );
853 return;
854 };
855 let StreamState::Linked(token) = context.streams[stream_id].state else {
857 trace!(
858 "{} closing detached H1 client in state {:?} on stream {}",
859 log_context!(self),
860 context.streams[stream_id].state,
861 stream_id
862 );
863 return;
864 };
865 endpoint.end_stream(token, stream_id, context)
866 }
867
868 pub fn end_stream<L>(&mut self, stream: GlobalStreamId, context: &mut Context<L>)
869 where
870 L: ListenerHandler + L7ListenerHandler,
871 {
872 if self.stream != Some(stream) {
873 error!(
874 "{} end_stream called with stream {} but expected {:?}",
875 log_context!(self),
876 stream,
877 self.stream
878 );
879 return;
880 }
881 debug_assert_eq!(
884 self.stream,
885 Some(stream),
886 "end_stream past the guard must target the active stream"
887 );
888 context.unlink_stream(stream);
889 #[cfg(debug_assertions)]
894 if let StreamState::Linked(token) = context.streams[stream].state {
895 debug_assert!(
896 context
897 .backend_streams
898 .get(&token)
899 .is_none_or(|ids| !ids.contains(&stream)),
900 "unlink_stream must evict the stream from the backend reverse index"
901 );
902 }
903 let answers_rc = context.listener.borrow().get_answers().clone();
904 let stream_id = stream;
905 let stream = &mut context.streams[stream_id];
906 let stream_context = &mut stream.context;
907 trace!(
908 "{} end H1 stream {:?}: {:#?}",
909 log_context!(self),
910 self.stream,
911 stream_context
912 );
913 match &mut self.position {
914 Position::Client(_, _, BackendStatus::Connecting(_)) => {
915 self.stream = None;
916 if stream.state != StreamState::Recycle {
917 stream.state = StreamState::Unlinked;
918 }
919 debug_assert!(
923 self.stream.is_none(),
924 "client end_stream must detach the stream"
925 );
926 debug_assert!(
927 !matches!(stream.state, StreamState::Linked(_)),
928 "detached stream must not remain Linked"
929 );
930 self.readiness.interest.remove(Ready::ALL);
931 self.force_disconnect();
932 }
933 Position::Client(_, _, status @ BackendStatus::Connected) => {
934 self.stream = None;
935 if stream.state != StreamState::Recycle {
936 stream.state = StreamState::Unlinked;
937 }
938 debug_assert!(
939 self.stream.is_none(),
940 "client end_stream must detach the stream"
941 );
942 debug_assert!(
943 !matches!(stream.state, StreamState::Linked(_)),
944 "detached stream must not remain Linked"
945 );
946 self.readiness.interest.remove(Ready::ALL);
947 if stream_context.keep_alive_backend && stream.back.is_terminated() {
951 *status = BackendStatus::KeepAlive;
952 } else {
953 self.force_disconnect();
954 }
955 }
956 Position::Client(_, _, BackendStatus::KeepAlive)
957 | Position::Client(_, _, BackendStatus::Disconnecting) => {
958 error!(
959 "{} end_stream called on KeepAlive or Disconnecting H1 client",
960 log_context!(self)
961 );
962 }
963 Position::Server => match end_stream_decision(stream) {
964 EndStreamAction::ForwardTerminated => {
965 debug!("{} CLOSING H1 TERMINATED STREAM", log_context!(self));
966 stream.state = StreamState::Unlinked;
967 self.readiness.interest.insert(Ready::WRITABLE);
968 self.readiness.signal_pending_write();
971 }
972 EndStreamAction::CloseDelimited => {
973 debug!("{} CLOSE DELIMITED", log_context!(self));
974 stream.state = StreamState::Unlinked;
975 self.readiness.arm_writable();
976 }
977 EndStreamAction::ForwardUnterminated => {
978 debug!("{} CLOSING H1 UNTERMINATED STREAM", log_context!(self));
979 forcefully_terminate_answer(
980 stream,
981 &mut self.readiness,
982 H2Error::InternalError,
983 );
984 }
985 EndStreamAction::SendDefault(status) => {
986 let answers = answers_rc.borrow();
987 set_default_answer(stream, &mut self.readiness, status, &answers);
988 }
989 EndStreamAction::Reconnect => {
990 debug!("{} H1 RECONNECT", log_context!(self));
991 stream.state = StreamState::Link;
992 context.pending_links.push_back(stream_id);
993 }
994 },
995 }
996 }
997
998 pub fn start_stream<L>(&mut self, stream: GlobalStreamId, _context: &mut Context<L>) -> bool
999 where
1000 L: ListenerHandler + L7ListenerHandler,
1001 {
1002 trace!(
1003 "{} start H1 stream {} {:?}",
1004 log_context!(self),
1005 stream,
1006 self.readiness
1007 );
1008 self.readiness.interest.insert(Ready::ALL);
1009 self.stream = Some(stream);
1010 debug_assert_eq!(
1011 self.stream,
1012 Some(stream),
1013 "start_stream must pin the connection's active stream"
1014 );
1015 match &mut self.position {
1016 Position::Client(_, _, status @ BackendStatus::KeepAlive) => {
1017 *status = BackendStatus::Connected;
1018 debug_assert!(
1021 matches!(
1022 self.position,
1023 Position::Client(_, _, BackendStatus::Connected)
1024 ),
1025 "a reused keep-alive client must become Connected on start_stream"
1026 );
1027 }
1028 Position::Client(_, _, BackendStatus::Disconnecting) => {
1029 error!(
1030 "{} start_stream called on Disconnecting H1 client",
1031 log_context!(self)
1032 );
1033 return false;
1034 }
1035 Position::Client(_, _, _) => {}
1036 Position::Server => {
1037 error!(
1038 "{} start_stream must not be called on H1 server connection",
1039 log_context!(self)
1040 );
1041 return false;
1042 }
1043 }
1044 true
1045 }
1046}