1use std::{io::IoSlice, time::Instant};
9
10use rusty_ulid::Ulid;
11use sozu_command::{logging::ansi_palette, ready::Ready};
12
13use crate::metrics::names;
14use crate::{
15 L7ListenerHandler, ListenerHandler, Readiness,
16 protocol::mux::{
17 BackendStatus, Context, DebugEvent, Endpoint, GlobalStreamId, MuxResult, Position,
18 StreamState, forcefully_terminate_answer,
19 parser::H2Error,
20 remove_backend_stream, set_default_answer,
21 shared::{EndStreamAction, drain_tls_close_notify, end_stream_decision},
22 update_readiness_after_read, update_readiness_after_write,
23 },
24 socket::{SocketHandler, SocketResult, stats::socket_rtt},
25 timer::TimeoutContainer,
26};
27
28macro_rules! log_context {
44 ($self:expr) => {{
45 let (open, reset, grey, gray, white) = ansi_palette();
46 format!(
47 "[{ulid} - - -]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
48 open = open,
49 reset = reset,
50 grey = grey,
51 gray = gray,
52 white = white,
53 ulid = $self.session_ulid,
54 peer = $self.socket.socket_ref().peer_addr().ok(),
55 position = $self.position,
56 stream = $self.stream,
57 requests = $self.requests,
58 parked = $self.parked_on_buffer_pressure,
59 close_notify = $self.close_notify_sent,
60 readiness = $self.readiness,
61 )
62 }};
63}
64
65#[allow(unused_macros)]
69macro_rules! log_context_stream {
70 ($self:expr, $http_context:expr) => {{
71 let (open, reset, grey, gray, white) = ansi_palette();
72 format!(
73 "[{ulid} {req} {cluster} {backend}]\t{open}MUX-H1{reset}\t{grey}Session{reset}({gray}peer{reset}={white}{peer:?}{reset}, {gray}position{reset}={white}{position:?}{reset}, {gray}stream{reset}={white}{stream:?}{reset}, {gray}requests{reset}={white}{requests}{reset}, {gray}parked{reset}={white}{parked}{reset}, {gray}close_notify{reset}={white}{close_notify}{reset}, {gray}readiness{reset}={white}{readiness}{reset})\t >>>",
74 open = open,
75 reset = reset,
76 grey = grey,
77 gray = gray,
78 white = white,
79 ulid = $self.session_ulid,
80 req = $http_context.id,
81 cluster = $http_context.cluster_id.as_deref().unwrap_or("-"),
82 backend = $http_context.backend_id.as_deref().unwrap_or("-"),
83 peer = $self.socket.socket_ref().peer_addr().ok(),
84 position = $self.position,
85 stream = $self.stream,
86 requests = $self.requests,
87 parked = $self.parked_on_buffer_pressure,
88 close_notify = $self.close_notify_sent,
89 readiness = $self.readiness,
90 )
91 }};
92}
93
94macro_rules! log_module_context {
97 () => {{
98 let (open, reset, _, _, _) = ansi_palette();
99 format!("{open}MUX-H1{reset}\t >>>", open = open, reset = reset)
100 }};
101}
102
103pub struct ConnectionH1<Front: SocketHandler> {
110 pub position: Position,
111 pub readiness: Readiness,
112 pub requests: usize,
113 pub socket: Front,
114 pub stream: Option<GlobalStreamId>,
117 pub timeout_container: TimeoutContainer,
118 pub parked_on_buffer_pressure: bool,
123 pub close_notify_sent: bool,
125 pub session_ulid: Ulid,
129}
130
131impl<Front: SocketHandler> std::fmt::Debug for ConnectionH1<Front> {
132 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
133 f.debug_struct("ConnectionH1")
134 .field("position", &self.position)
135 .field("readiness", &self.readiness)
136 .field("socket", &self.socket.socket_ref())
137 .field("stream", &self.stream)
138 .finish()
139 }
140}
141
142impl<Front: SocketHandler> ConnectionH1<Front> {
143 fn defer_close_for_tls_flush(&mut self, reason: &'static str) -> MuxResult {
144 if self.initiate_close_notify() {
145 trace!(
146 "{} H1 writable delaying close after {}: stream={:?}, close_notify_sent={}, wants_write={}, readiness={:?}",
147 log_context!(self),
148 reason,
149 self.stream,
150 self.close_notify_sent,
151 self.socket.socket_wants_write(),
152 self.readiness
153 );
154 MuxResult::Continue
155 } else {
156 MuxResult::CloseSession
157 }
158 }
159
160 fn terminate_close_delimited(kawa: &mut super::GenericHttpStream, stream_id: GlobalStreamId) {
169 if kawa.body_size == kawa::BodySize::Chunked {
170 warn!(
171 "{} H1 backend EOF mid-chunked response on stream {}: emitting RST_STREAM",
172 log_module_context!(),
173 stream_id
174 );
175 incr!(names::h1::BACKEND_EOF_BEFORE_MESSAGE_COMPLETE);
176 kawa.parsing_phase
177 .error(kawa::ParsingErrorKind::Processing {
178 message: "INTERNAL_ERROR",
179 });
180 return;
181 }
182 debug!(
183 "{} H1 close-delimited EOF on stream {}: terminating body",
184 log_module_context!(),
185 stream_id
186 );
187 kawa.push_block(kawa::Block::Flags(kawa::Flags {
188 end_body: true,
189 end_chunk: false,
190 end_header: false,
191 end_stream: true,
192 }));
193 kawa.parsing_phase = kawa::ParsingPhase::Terminated;
194 }
195
196 pub fn readable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
197 where
198 E: Endpoint,
199 L: ListenerHandler + L7ListenerHandler,
200 {
201 trace!(
202 "{} ======= MUX H1 READABLE {:?}",
203 log_context!(self),
204 self.position
205 );
206 let Some(stream_id) = self.stream else {
207 error!(
208 "{} readable() called on H1 connection with no active stream",
209 log_context!(self)
210 );
211 return MuxResult::Continue;
212 };
213 self.timeout_container.reset();
214 let answers_rc = context.listener.borrow().get_answers().clone();
215 let stream = &mut context.streams[stream_id];
216 if stream.metrics.start.is_none() {
217 stream.metrics.start = Some(Instant::now());
218 }
219 let parts = stream.split(&self.position);
220 let kawa = parts.rbuffer;
221
222 if kawa.storage.available_space() == 0 {
229 self.readiness.event.remove(Ready::READABLE);
230 self.parked_on_buffer_pressure = true;
231 return MuxResult::Continue;
232 }
233
234 self.parked_on_buffer_pressure = false;
235 let (size, status) = self.socket.socket_read(kawa.storage.space());
236 context.debug.push(DebugEvent::StreamEvent(0, size));
237 kawa.storage.fill(size);
238 self.position.count_bytes_in_counter(size);
239 self.position.count_bytes_in(parts.metrics, size);
240 if update_readiness_after_read(size, status, &mut self.readiness) {
241 if status == SocketResult::Closed
249 && self.position.is_client()
250 && kawa.is_main_phase()
251 && !kawa.is_terminated()
252 && !parts.context.keep_alive_backend
253 {
254 Self::terminate_close_delimited(kawa, stream_id);
255 self.timeout_container.cancel();
256 self.readiness.interest.remove(Ready::READABLE);
257 if let StreamState::Linked(token) = stream.state {
258 let peer = endpoint.readiness_mut(token);
262 peer.arm_writable();
263 }
264 }
265 return MuxResult::Continue;
266 }
267
268 let was_main_phase = kawa.is_main_phase();
269 kawa::h1::parse(kawa, parts.context);
270 if kawa.is_error() {
271 match self.position {
272 Position::Client(..) => {
273 incr!(names::http::BACKEND_PARSE_ERRORS);
274 let StreamState::Linked(token) = stream.state else {
275 error!(
276 "{} client stream in error is not in Linked state",
277 log_context!(self)
278 );
279 return MuxResult::CloseSession;
280 };
281 let global_stream_id = stream_id;
282 self.end_stream(global_stream_id, context);
283 endpoint.end_stream(token, global_stream_id, context);
284 }
285 Position::Server => {
286 incr!(names::http::FRONTEND_PARSE_ERRORS);
287 let answers = answers_rc.borrow();
288 set_default_answer(stream, &mut self.readiness, 400, &answers);
289 }
290 }
291 return MuxResult::Continue;
292 }
293 let is_keep_alive_backend = parts.context.keep_alive_backend;
296 let is_body_phase_after_parse = kawa.is_main_phase();
297
298 let is_1xx_backend = if self.position.is_client() {
304 if let kawa::StatusLine::Response { code, .. } = &kawa.detached.status_line {
305 if (100..200).contains(code) {
306 debug!(
307 "{} H1 backend: received {} informational response",
308 log_context!(self),
309 code
310 );
311 for block in &mut kawa.blocks {
312 if let kawa::Block::Flags(flags) = block {
313 flags.end_stream = false;
314 flags.end_body = false;
315 }
316 }
317 true
318 } else {
319 false
320 }
321 } else {
322 false
323 }
324 } else {
325 false
326 };
327 if kawa.is_terminated() && !is_1xx_backend {
328 self.timeout_container.cancel();
329 self.readiness.interest.remove(Ready::READABLE);
330 }
331 if kawa.is_main_phase() {
332 if !was_main_phase && self.position.is_server() {
333 if parts.context.method.is_none()
334 || parts.context.authority.is_none()
335 || parts.context.path.is_none()
336 {
337 if let kawa::StatusLine::Request {
338 version: kawa::Version::V10,
339 ..
340 } = kawa.detached.status_line
341 {
342 error!(
343 "{} Unexpected malformed request: HTTP/1.0 from {:?} with {:?} {:?} {:?}",
344 log_context!(self),
345 parts.context.session_address,
346 parts.context.method,
347 parts.context.authority,
348 parts.context.path
349 );
350 } else {
351 error!("{} Unexpected malformed request", log_context!(self));
352 kawa::debug_kawa(kawa);
353 }
354 let answers = answers_rc.borrow();
355 set_default_answer(stream, &mut self.readiness, 400, &answers);
356 return MuxResult::Continue;
357 }
358 self.requests += 1;
359 trace!("{} REQUESTS: {}", log_context!(self), self.requests);
360 incr!(names::http::REQUESTS);
361 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
362 parts.metrics.service_start();
363 stream.request_counted = true;
365 stream.state = StreamState::Link;
366 context.pending_links.push_back(stream_id);
367 }
368 if let StreamState::Linked(token) = stream.state {
369 let peer = endpoint.readiness_mut(token);
373 peer.arm_writable();
374 }
375 };
376 if is_1xx_backend {
380 if let StreamState::Linked(token) = stream.state {
381 let peer = endpoint.readiness_mut(token);
382 peer.arm_writable();
383 }
384 }
385
386 if status == SocketResult::Closed
391 && self.position.is_client()
392 && is_body_phase_after_parse
393 && !is_keep_alive_backend
394 && !context.streams[stream_id].back.is_terminated()
395 {
396 let kawa = &mut context.streams[stream_id].back;
397 Self::terminate_close_delimited(kawa, stream_id);
398 self.timeout_container.cancel();
399 self.readiness.interest.remove(Ready::READABLE);
400 }
401
402 MuxResult::Continue
403 }
404
405 pub fn writable<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E) -> MuxResult
406 where
407 E: Endpoint,
408 L: ListenerHandler + L7ListenerHandler,
409 {
410 trace!(
411 "{} ======= MUX H1 WRITABLE {:?}",
412 log_context!(self),
413 self.position
414 );
415 let Some(stream_id) = self.stream else {
416 if self.socket.socket_wants_write() {
417 let (size, status) = self.socket.socket_write_vectored(&[]);
418 let _ = update_readiness_after_write(size, status, &mut self.readiness);
419 if self.socket.socket_wants_write() {
420 self.readiness.signal_pending_write();
421 }
422 }
423 return MuxResult::Continue;
424 };
425 self.timeout_container.reset();
426 let stream = &mut context.streams[stream_id];
427 let parts = stream.split(&self.position);
428 let kawa = parts.wbuffer;
429 if matches!(self.position, Position::Server) && !parts.context.headers_response.is_empty() {
444 let edits = std::mem::take(&mut parts.context.headers_response);
445 super::shared::apply_response_header_edits(kawa, &edits);
446 }
447 kawa.prepare(&mut kawa::h1::BlockConverter);
448 let mut io_slices = Vec::new();
449 for block in kawa.out.iter() {
450 match block {
451 kawa::OutBlock::Delimiter => break,
452 kawa::OutBlock::Store(store) => {
453 io_slices.push(IoSlice::new(store.data(kawa.storage.buffer())));
454 }
455 }
456 }
457 let can_finalize_server_close = matches!(self.position, Position::Server)
458 && kawa.is_terminated()
459 && kawa.is_completed();
460 if io_slices.is_empty() && !self.socket.socket_wants_write() && !can_finalize_server_close {
461 self.readiness.interest.remove(Ready::WRITABLE);
462 return MuxResult::Continue;
463 }
464 let tls_only_flush = io_slices.is_empty();
465 let (size, status) = self.socket.socket_write_vectored(&io_slices);
466 context.debug.push(DebugEvent::StreamEvent(1, size));
467 kawa.consume(size);
468 self.position.count_bytes_out_counter(size);
469 self.position.count_bytes_out(parts.metrics, size);
470 let should_yield = update_readiness_after_write(size, status, &mut self.readiness);
471 if self.socket.socket_wants_write() {
472 self.readiness.signal_pending_write();
473 return MuxResult::Continue;
474 }
475 if !tls_only_flush && should_yield {
476 return MuxResult::Continue;
477 }
478
479 if kawa.is_terminated() && kawa.is_completed() {
480 match self.position {
481 Position::Client(..) => self.readiness.interest.insert(Ready::READABLE),
482 Position::Server => {
483 if stream.context.closing {
484 return self.defer_close_for_tls_flush("closing-context");
485 }
486 let kawa = &mut stream.back;
487 match kawa.detached.status_line {
488 kawa::StatusLine::Response { code: 101, .. } => {
489 debug!("{} ============== HANDLE UPGRADE!", log_context!(self));
490 stream.metrics.backend_stop();
491 let client_rtt = socket_rtt(self.socket.socket_ref());
492 let server_rtt = stream
493 .linked_token()
494 .and_then(|t| endpoint.socket(t))
495 .and_then(socket_rtt);
496 stream.generate_access_log(
497 false,
498 Some("H1::Upgrade"),
499 context.listener.clone(),
500 client_rtt,
501 server_rtt,
502 );
503 return MuxResult::Upgrade;
504 }
505 kawa::StatusLine::Response { code: 100, .. } => {
506 debug!("{} ============== HANDLE CONTINUE!", log_context!(self));
507 self.timeout_container.reset();
512 self.readiness.interest.insert(Ready::READABLE);
513 kawa.clear();
514 stream.metrics.backend_stop();
515 if let StreamState::Linked(token) = stream.state {
516 endpoint
517 .readiness_mut(token)
518 .interest
519 .insert(Ready::READABLE);
520 }
521 return MuxResult::Continue;
522 }
523 kawa::StatusLine::Response { code: 103, .. } => {
524 debug!("{} ============== HANDLE EARLY HINT!", log_context!(self));
525 if let StreamState::Linked(token) = stream.state {
529 endpoint
531 .readiness_mut(token)
532 .interest
533 .insert(Ready::READABLE);
534 kawa.clear();
535 stream.metrics.backend_stop();
536 return MuxResult::Continue;
537 } else {
538 stream.metrics.backend_stop();
539 let client_rtt = socket_rtt(self.socket.socket_ref());
540 let server_rtt = stream
541 .linked_token()
542 .and_then(|t| endpoint.socket(t))
543 .and_then(socket_rtt);
544 stream.generate_access_log(
545 false,
546 Some("H1::EarlyHint"),
547 context.listener.clone(),
548 client_rtt,
549 server_rtt,
550 );
551 return self.defer_close_for_tls_flush("early-hint");
552 }
553 }
554 _ => {}
555 }
556 incr!(names::http::E2E_HTTP11);
557 stream.metrics.backend_stop();
558 let client_rtt = socket_rtt(self.socket.socket_ref());
559 let server_rtt = stream
560 .linked_token()
561 .and_then(|t| endpoint.socket(t))
562 .and_then(socket_rtt);
563 stream.generate_access_log(
564 false,
565 Some("H1::Complete"),
566 context.listener.clone(),
567 client_rtt,
568 server_rtt,
569 );
570 stream.metrics.reset();
571 let old_state = std::mem::replace(&mut stream.state, StreamState::Unlinked);
572 if let StreamState::Linked(token) = old_state {
573 remove_backend_stream(&mut context.backend_streams, token, stream_id);
574 }
575 if stream.context.keep_alive_frontend {
576 self.timeout_container.reset();
577 if let StreamState::Linked(token) = old_state {
578 endpoint.end_stream(token, stream_id, context);
579 }
580 self.readiness.interest.insert(Ready::READABLE);
581 let stream = &mut context.streams[stream_id];
582 stream.context.reset();
583 stream.back.clear();
584 stream.back.storage.clear();
585 stream.front.clear();
586 stream.attempts = 0;
588 stream.state = StreamState::Idle;
591 if !stream.front.storage.is_empty() {
597 kawa::h1::parse(&mut stream.front, &mut stream.context);
598 let is_error = stream.front.is_error();
599 let is_main = stream.front.is_main_phase();
600 let malformed = is_main
601 && (stream.context.method.is_none()
602 || stream.context.authority.is_none()
603 || stream.context.path.is_none());
604 if is_error || malformed {
605 let answers_rc = context.listener.borrow().get_answers().clone();
606 let answers = answers_rc.borrow();
607 set_default_answer(stream, &mut self.readiness, 400, &answers);
608 } else if is_main {
609 self.requests += 1;
610 incr!(names::http::REQUESTS);
611 gauge_add!(names::http::ACTIVE_REQUESTS, 1);
612 stream.metrics.service_start();
613 stream.request_counted = true;
614 stream.state = StreamState::Link;
615 context.pending_links.push_back(stream_id);
616 }
617 }
619 } else {
620 return self.defer_close_for_tls_flush("response-complete");
621 }
622 }
623 }
624 }
625 MuxResult::Continue
626 }
627
628 pub fn force_disconnect(&mut self) -> MuxResult {
629 match &mut self.position {
630 Position::Client(_, _, status) => {
631 *status = BackendStatus::Disconnecting;
632 self.readiness.event = Ready::HUP;
633 debug!(
634 "{} H1 force_disconnect client: stream={:?}, wants_write={}, readiness={:?}",
635 log_context!(self),
636 self.stream,
637 self.socket.socket_wants_write(),
638 self.readiness
639 );
640 MuxResult::Continue
641 }
642 Position::Server => {
643 if self.socket.socket_wants_write() {
644 debug!(
645 "{} H1 force_disconnect delaying close: stream={:?}, wants_write=true, readiness={:?}",
646 log_context!(self),
647 self.stream,
648 self.readiness
649 );
650 self.readiness.interest = Ready::WRITABLE | Ready::HUP | Ready::ERROR;
651 self.readiness.signal_pending_write();
652 MuxResult::Continue
653 } else {
654 debug!(
655 "{} H1 force_disconnect closing session: stream={:?}, wants_write=false, readiness={:?}",
656 log_context!(self),
657 self.stream,
658 self.readiness
659 );
660 MuxResult::CloseSession
661 }
662 }
663 }
664 }
665
666 pub fn has_pending_write(&self) -> bool {
667 self.socket.socket_wants_write()
668 }
669
670 pub fn initiate_close_notify(&mut self) -> bool {
671 if !self.position.is_server() {
672 return false;
673 }
674 if !self.close_notify_sent {
675 trace!("{} H1 initiating CLOSE_NOTIFY", log_context!(self));
676 self.socket.socket_close();
677 self.close_notify_sent = true;
678 }
679 if self.socket.socket_wants_write() {
680 self.readiness.arm_writable();
681 true
682 } else {
683 false
684 }
685 }
686
687 pub fn close<E, L>(&mut self, context: &mut Context<L>, mut endpoint: E)
688 where
689 E: Endpoint,
690 L: ListenerHandler + L7ListenerHandler,
691 {
692 match self.position {
693 Position::Client(_, _, BackendStatus::KeepAlive)
694 | Position::Client(_, _, BackendStatus::Disconnecting) => {
695 trace!("{} close detached client ConnectionH1", log_context!(self));
696 return;
697 }
698 Position::Client(_, _, BackendStatus::Connecting(_))
699 | Position::Client(_, _, BackendStatus::Connected) => {
700 debug!(
701 "{} BACKEND CLOSING FOR: {:?} {:?}",
702 log_context!(self),
703 self.position,
704 self.stream
705 );
706 }
707 Position::Server => {
708 let tls_pending_before = self.socket.socket_wants_write();
709 let (tls_pending_after, drain_rounds) =
710 drain_tls_close_notify(&mut self.socket, &mut self.close_notify_sent);
711 if tls_pending_after {
712 error!(
713 "{} H1 TLS buffer NOT fully drained on close: pending_before={}, pending_after={}, drain_rounds={}, stream={:?}, close_notify_sent={}, readiness={:?}",
714 log_context!(self),
715 tls_pending_before,
716 tls_pending_after,
717 drain_rounds,
718 self.stream,
719 self.close_notify_sent,
720 self.readiness
721 );
722 }
723 return;
724 }
725 }
726 let Some(stream_id) = self.stream else {
727 trace!(
728 "{} closing detached H1 client with no active stream",
729 log_context!(self)
730 );
731 return;
732 };
733 let StreamState::Linked(token) = context.streams[stream_id].state else {
735 trace!(
736 "{} closing detached H1 client in state {:?} on stream {}",
737 log_context!(self),
738 context.streams[stream_id].state,
739 stream_id
740 );
741 return;
742 };
743 endpoint.end_stream(token, stream_id, context)
744 }
745
746 pub fn end_stream<L>(&mut self, stream: GlobalStreamId, context: &mut Context<L>)
747 where
748 L: ListenerHandler + L7ListenerHandler,
749 {
750 if self.stream != Some(stream) {
751 error!(
752 "{} end_stream called with stream {} but expected {:?}",
753 log_context!(self),
754 stream,
755 self.stream
756 );
757 return;
758 }
759 context.unlink_stream(stream);
760 let answers_rc = context.listener.borrow().get_answers().clone();
761 let stream_id = stream;
762 let stream = &mut context.streams[stream_id];
763 let stream_context = &mut stream.context;
764 trace!(
765 "{} end H1 stream {:?}: {:#?}",
766 log_context!(self),
767 self.stream,
768 stream_context
769 );
770 match &mut self.position {
771 Position::Client(_, _, BackendStatus::Connecting(_)) => {
772 self.stream = None;
773 if stream.state != StreamState::Recycle {
774 stream.state = StreamState::Unlinked;
775 }
776 self.readiness.interest.remove(Ready::ALL);
777 self.force_disconnect();
778 }
779 Position::Client(_, _, status @ BackendStatus::Connected) => {
780 self.stream = None;
781 if stream.state != StreamState::Recycle {
782 stream.state = StreamState::Unlinked;
783 }
784 self.readiness.interest.remove(Ready::ALL);
785 if stream_context.keep_alive_backend && stream.back.is_terminated() {
789 *status = BackendStatus::KeepAlive;
790 } else {
791 self.force_disconnect();
792 }
793 }
794 Position::Client(_, _, BackendStatus::KeepAlive)
795 | Position::Client(_, _, BackendStatus::Disconnecting) => {
796 error!(
797 "{} end_stream called on KeepAlive or Disconnecting H1 client",
798 log_context!(self)
799 );
800 }
801 Position::Server => match end_stream_decision(stream) {
802 EndStreamAction::ForwardTerminated => {
803 debug!("{} CLOSING H1 TERMINATED STREAM", log_context!(self));
804 stream.state = StreamState::Unlinked;
805 self.readiness.interest.insert(Ready::WRITABLE);
806 self.readiness.signal_pending_write();
809 }
810 EndStreamAction::CloseDelimited => {
811 debug!("{} CLOSE DELIMITED", log_context!(self));
812 stream.state = StreamState::Unlinked;
813 self.readiness.arm_writable();
814 }
815 EndStreamAction::ForwardUnterminated => {
816 debug!("{} CLOSING H1 UNTERMINATED STREAM", log_context!(self));
817 forcefully_terminate_answer(
818 stream,
819 &mut self.readiness,
820 H2Error::InternalError,
821 );
822 }
823 EndStreamAction::SendDefault(status) => {
824 let answers = answers_rc.borrow();
825 set_default_answer(stream, &mut self.readiness, status, &answers);
826 }
827 EndStreamAction::Reconnect => {
828 debug!("{} H1 RECONNECT", log_context!(self));
829 stream.state = StreamState::Link;
830 context.pending_links.push_back(stream_id);
831 }
832 },
833 }
834 }
835
836 pub fn start_stream<L>(&mut self, stream: GlobalStreamId, _context: &mut Context<L>) -> bool
837 where
838 L: ListenerHandler + L7ListenerHandler,
839 {
840 trace!(
841 "{} start H1 stream {} {:?}",
842 log_context!(self),
843 stream,
844 self.readiness
845 );
846 self.readiness.interest.insert(Ready::ALL);
847 self.stream = Some(stream);
848 match &mut self.position {
849 Position::Client(_, _, status @ BackendStatus::KeepAlive) => {
850 *status = BackendStatus::Connected;
851 }
852 Position::Client(_, _, BackendStatus::Disconnecting) => {
853 error!(
854 "{} start_stream called on Disconnecting H1 client",
855 log_context!(self)
856 );
857 return false;
858 }
859 Position::Client(_, _, _) => {}
860 Position::Server => {
861 error!(
862 "{} start_stream must not be called on H1 server connection",
863 log_context!(self)
864 );
865 return false;
866 }
867 }
868 true
869 }
870}