1use bytes::{Bytes, BytesMut};
8use http::{Method, Uri};
9use std::collections::{HashMap, VecDeque};
10use std::fmt;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::task::{Poll, Wake, Waker};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tokio::sync::oneshot;
17use tokio::sync::Notify;
18use tracing;
19
20pub type StreamingHeadersResult = Result<(u16, Vec<(String, String)>)>;
21
22use crate::error::{Error, Result};
23use crate::headers::Headers;
24use crate::request::{RequestBody, RequestBodyStream};
25use crate::transport::h2::body::{H2BodyDataPush, H2BodyPush, H2BodyShared, TrailerSender};
26use crate::transport::h2::connection::{
27 ControlAction, H2Connection as RawH2Connection, StreamResponse,
28};
29use crate::transport::h2::frame::{flags, ErrorCode, FrameHeader, FrameType};
30use crate::transport::h2::tunnel::{H2Tunnel, H2TunnelCredit, H2TunnelEvent, H2TunnelOutbound};
31use crate::transport::h2::H2TransportConfig;
32
33const FAST_PATH_COMMAND_CHECK_INTERVAL: usize = 8;
34const FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET: usize = 2;
35
36pub enum DriverCommand {
38 SendRequest {
41 method: http::Method,
42 uri: http::Uri,
43 headers: Headers,
44 body: Option<bytes::Bytes>,
45 response_tx: oneshot::Sender<Result<StreamResponse>>,
46 },
47 SendStreamingRequest {
49 method: Method,
50 uri: Uri,
51 headers: Headers,
52 body: RequestBody,
53 body_shared: Arc<H2BodyShared>,
54 headers_tx: oneshot::Sender<StreamingHeadersResult>,
55 trailers_tx: Option<TrailerSender>,
59 },
60 OpenWebSocketTunnel {
62 uri: Uri,
63 headers: Vec<(String, String)>,
64 response_tx: oneshot::Sender<Result<H2Tunnel>>,
65 },
66 SendTunnelData {
68 stream_id: u32,
69 outbound: H2TunnelOutbound,
70 },
71}
72
73impl fmt::Debug for DriverCommand {
74 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75 match self {
76 DriverCommand::SendRequest {
77 method,
78 uri,
79 headers,
80 body,
81 ..
82 } => f
83 .debug_struct("SendRequest")
84 .field("method", method)
85 .field("uri", uri)
86 .field("headers_len", &headers.len())
87 .field("body_len", &body.as_ref().map(Bytes::len))
88 .finish(),
89 DriverCommand::SendStreamingRequest {
90 method,
91 uri,
92 headers,
93 body,
94 ..
95 } => f
96 .debug_struct("SendStreamingRequest")
97 .field("method", method)
98 .field("uri", uri)
99 .field("headers_len", &headers.len())
100 .field("body", body)
101 .finish(),
102 DriverCommand::OpenWebSocketTunnel { uri, headers, .. } => f
103 .debug_struct("OpenWebSocketTunnel")
104 .field("uri", uri)
105 .field("headers_len", &headers.len())
106 .finish(),
107 DriverCommand::SendTunnelData {
108 stream_id,
109 outbound,
110 } => f
111 .debug_struct("SendTunnelData")
112 .field("stream_id", stream_id)
113 .field("outbound", outbound)
114 .finish(),
115 }
116 }
117}
118
119pub struct InlineRegistration {
125 pub stream_id: u32,
126 pub headers_tx: oneshot::Sender<StreamingHeadersResult>,
127 pub body_shared: Arc<H2BodyShared>,
128 pub recv_window: i32,
129 pub trailers_tx: Option<TrailerSender>,
133}
134
135struct NotifyWake(Arc<Notify>);
136
137impl Wake for NotifyWake {
138 fn wake(self: Arc<Self>) {
139 self.0.notify_one();
140 }
141
142 fn wake_by_ref(self: &Arc<Self>) {
143 self.0.notify_one();
144 }
145}
146
147struct DriverStreamingRequestBody {
148 stream: RequestBodyStream,
149 content_length: Option<u64>,
150 current_chunk: Option<Bytes>,
151 current_offset: usize,
152 sent: u64,
153 finished: bool,
154 end_stream_sent: bool,
155}
156
157impl DriverStreamingRequestBody {
158 fn new(stream: RequestBodyStream, content_length: Option<u64>) -> Self {
159 Self {
160 stream,
161 content_length,
162 current_chunk: None,
163 current_offset: 0,
164 sent: 0,
165 finished: false,
166 end_stream_sent: false,
167 }
168 }
169}
170
171struct DriverStreamState {
173 response_tx: Option<oneshot::Sender<Result<StreamResponse>>>,
175 streaming_headers_tx: Option<oneshot::Sender<StreamingHeadersResult>>,
177 trailers_tx: Option<TrailerSender>,
182 streaming_body: Option<Arc<H2BodyShared>>,
184 status: Option<u16>,
186 headers: Vec<(String, String)>,
188 body: BytesMut,
190 pending_body: Bytes,
192 body_offset: usize,
194 request_stream: Option<DriverStreamingRequestBody>,
196 pending_streaming_body: VecDeque<Result<Bytes>>,
198 pending_streaming_end: bool,
200 recv_window: i32,
204 pending_recv_window_update: usize,
208 inline: bool,
212}
213
214impl DriverStreamState {
215 fn new(
216 response_tx: oneshot::Sender<Result<StreamResponse>>,
217 pending_body: Bytes,
218 recv_window: i32,
219 ) -> Self {
220 Self {
221 response_tx: Some(response_tx),
222 streaming_headers_tx: None,
223 trailers_tx: None,
224 streaming_body: None,
225 status: None,
226 headers: Vec::new(),
227 body: BytesMut::new(),
228 pending_body,
229 body_offset: 0,
230 request_stream: None,
231 pending_streaming_body: VecDeque::new(),
232 pending_streaming_end: false,
233 recv_window,
234 pending_recv_window_update: 0,
235 inline: false,
236 }
237 }
238
239 fn streaming(
240 headers_tx: oneshot::Sender<StreamingHeadersResult>,
241 trailers_tx: Option<TrailerSender>,
242 body_shared: Arc<H2BodyShared>,
243 pending_body: Bytes,
244 request_stream: Option<DriverStreamingRequestBody>,
245 recv_window: i32,
246 ) -> Self {
247 Self {
248 response_tx: None,
249 streaming_headers_tx: Some(headers_tx),
250 trailers_tx,
251 streaming_body: Some(body_shared),
252 status: None,
253 headers: Vec::new(),
254 body: BytesMut::new(),
255 pending_body,
256 body_offset: 0,
257 request_stream,
258 pending_streaming_body: VecDeque::new(),
259 pending_streaming_end: false,
260 recv_window,
261 pending_recv_window_update: 0,
262 inline: false,
263 }
264 }
265
266 fn streaming_inline(
267 headers_tx: oneshot::Sender<StreamingHeadersResult>,
268 trailers_tx: Option<TrailerSender>,
269 body_shared: Arc<H2BodyShared>,
270 recv_window: i32,
271 ) -> Self {
272 let mut state = Self::streaming(
273 headers_tx,
274 trailers_tx,
275 body_shared,
276 Bytes::new(),
277 None,
278 recv_window,
279 );
280 state.inline = true;
281 state
282 }
283}
284
285struct DriverTunnelState {
286 inbound_tx: mpsc::Sender<Result<H2TunnelEvent>>,
287 pending_inbound: VecDeque<Result<H2TunnelEvent>>,
288 pending_outbound: VecDeque<H2TunnelOutbound>,
289 recv_window: i32,
290 pending_recv_window_update: usize,
291 credit: Arc<H2TunnelCredit>,
292}
293
294#[derive(Clone, Copy, Debug, Eq, PartialEq)]
295enum TunnelInboundStatus {
296 Open,
297 Blocked,
298 Closed,
299 Remove,
300}
301
302impl DriverTunnelState {
303 fn push_inbound(&mut self, event: H2TunnelEvent) -> TunnelInboundStatus {
304 let item = Ok(event);
305 if !self.pending_inbound.is_empty() {
306 self.pending_inbound.push_back(item);
307 return TunnelInboundStatus::Open;
308 }
309
310 match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
311 TunnelInboundStatus::Blocked => TunnelInboundStatus::Open,
312 status => status,
313 }
314 }
315
316 fn flush_inbound(&mut self) -> TunnelInboundStatus {
317 while let Some(item) = self.pending_inbound.pop_front() {
318 match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
319 TunnelInboundStatus::Open => {}
320 TunnelInboundStatus::Blocked => return TunnelInboundStatus::Open,
321 status => return status,
322 }
323 }
324 TunnelInboundStatus::Open
325 }
326
327 fn try_send_inbound(
328 inbound_tx: &mpsc::Sender<Result<H2TunnelEvent>>,
329 pending_inbound: &mut VecDeque<Result<H2TunnelEvent>>,
330 item: Result<H2TunnelEvent>,
331 ) -> TunnelInboundStatus {
332 let remove_after_send = matches!(item, Ok(H2TunnelEvent::EndStream));
333 match inbound_tx.try_send(item) {
334 Ok(()) if remove_after_send => TunnelInboundStatus::Remove,
335 Ok(()) => TunnelInboundStatus::Open,
336 Err(mpsc::error::TrySendError::Full(item)) => {
337 pending_inbound.push_front(item);
338 TunnelInboundStatus::Blocked
339 }
340 Err(mpsc::error::TrySendError::Closed(_)) => TunnelInboundStatus::Closed,
341 }
342 }
343}
344
345pub struct H2Driver<S> {
347 command_rx: mpsc::Receiver<DriverCommand>,
349 command_tx: mpsc::Sender<DriverCommand>,
351 connection: RawH2Connection<S>,
353 streams: HashMap<u32, DriverStreamState>,
355 tunnels: HashMap<u32, DriverTunnelState>,
357 pending_requests: std::collections::VecDeque<DriverCommand>,
359 goaway_received: Arc<AtomicBool>,
361 config: H2TransportConfig,
363 pending_ping: Option<([u8; 8], Instant)>,
365 inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
369 inline_active: Arc<AtomicUsize>,
373 inline_eligible: Arc<AtomicBool>,
376 body_progress_notify: Arc<Notify>,
379 request_body_waker: Waker,
380 backpressure_stall_count: Arc<AtomicU64>,
383}
384
385impl<S> H2Driver<S>
386where
387 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
388{
389 pub fn new(
391 connection: RawH2Connection<S>,
392 command_tx: mpsc::Sender<DriverCommand>,
393 command_rx: mpsc::Receiver<DriverCommand>,
394 goaway_received: Arc<AtomicBool>,
395 config: H2TransportConfig,
396 backpressure_stall_count: Arc<AtomicU64>,
397 ) -> Self {
398 let (_, inline_register_rx) = mpsc::unbounded_channel();
399 let body_progress_notify = Arc::new(Notify::new());
400 Self::new_with_inline(
401 connection,
402 command_tx,
403 command_rx,
404 goaway_received,
405 config.normalized(),
406 inline_register_rx,
407 Arc::new(AtomicUsize::new(0)),
408 Arc::new(AtomicBool::new(false)),
409 body_progress_notify,
410 backpressure_stall_count,
411 )
412 }
413
414 #[allow(clippy::too_many_arguments)]
418 pub fn new_with_inline(
419 connection: RawH2Connection<S>,
420 command_tx: mpsc::Sender<DriverCommand>,
421 command_rx: mpsc::Receiver<DriverCommand>,
422 goaway_received: Arc<AtomicBool>,
423 config: H2TransportConfig,
424 inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
425 inline_active: Arc<AtomicUsize>,
426 inline_eligible: Arc<AtomicBool>,
427 body_progress_notify: Arc<Notify>,
428 backpressure_stall_count: Arc<AtomicU64>,
429 ) -> Self {
430 let request_body_waker = Waker::from(Arc::new(NotifyWake(body_progress_notify.clone())));
431 Self {
432 command_rx,
433 command_tx,
434 connection,
435 streams: HashMap::new(),
436 tunnels: HashMap::new(),
437 pending_requests: std::collections::VecDeque::new(),
438 goaway_received,
439 config: config.normalized(),
440 pending_ping: None,
441 inline_register_rx,
442 inline_active,
443 inline_eligible,
444 body_progress_notify,
445 request_body_waker,
446 backpressure_stall_count,
447 }
448 }
449
450 pub async fn drive(mut self) -> Result<()> {
452 loop {
453 self.drain_inline_registrations();
454
455 self.process_pending_requests().await?;
457
458 self.flush_pending_data().await?;
460 self.flush_tunnel_data().await?;
461 self.flush_tunnel_inbound().await?;
462 self.apply_released_body_credits().await?;
463 self.apply_released_tunnel_credits().await?;
464 self.flush_pending_streaming_bodies().await?;
465 self.check_keepalive_timeout()?;
466 self.refresh_inline_eligibility();
467
468 if let Some(stream_id) = self.single_stream_fast_path_target() {
469 if self
470 .run_single_stream_streaming_fast_path(stream_id)
471 .await?
472 {
473 continue;
474 }
475 }
476
477 let keepalive_delay = self.keepalive_delay();
478 let retry_streaming_backpressure =
479 self.has_pending_streaming_body() || self.has_request_body_work();
480
481 tokio::select! {
482 biased;
483 command = self.command_rx.recv() => {
485 match command {
486 Some(cmd) => {
487 match cmd {
488 DriverCommand::SendRequest { .. } => {
489 self.handle_send_request(cmd).await?;
490 }
491 DriverCommand::SendStreamingRequest { .. } => {
492 self.handle_send_streaming_request(cmd).await?;
493 }
494 DriverCommand::OpenWebSocketTunnel { uri, headers, response_tx } => {
495 self.handle_open_websocket_tunnel(uri, headers, response_tx).await?;
496 }
497 DriverCommand::SendTunnelData { stream_id, outbound } => {
498 self.queue_tunnel_outbound(stream_id, outbound).await?;
499 }
500 }
501 }
502 None => {
503 break;
505 }
506 }
507 }
508
509 inline = self.inline_register_rx.recv(), if !self.inline_register_rx.is_closed() => {
511 if let Some(reg) = inline {
512 self.register_inline_stream(reg);
513 }
514 }
515
516 read_res = self.connection.read_next_frame() => {
518 match read_res {
519 Ok((header, payload)) => {
520 if let Err(e) = self.handle_frame(header, payload).await {
521 tracing::error!("H2Driver frame error: {:?}", e);
522 return Err(e);
525 }
526 }
527 Err(e) => {
528 tracing::error!("H2Driver read error: {:?}", e);
530 return Err(e);
531 }
532 }
533 }
534
535 _ = async {
536 if let Some(delay) = keepalive_delay {
537 tokio::time::sleep(delay).await;
538 } else {
539 std::future::pending::<()>().await;
540 }
541 } => {
542 let ping = self.connection.send_ping().await?;
543 self.pending_ping = Some((ping, Instant::now()));
544 }
545
546 _ = async {
547 if retry_streaming_backpressure {
548 tokio::time::sleep(Duration::from_millis(1)).await;
549 } else {
550 std::future::pending::<()>().await;
551 }
552 } => {
553 if retry_streaming_backpressure {
554 self.backpressure_stall_count
555 .fetch_add(1, Ordering::Relaxed);
556 tracing::debug!("H2 driver streaming backpressure stall");
557 }
558 }
559
560 _ = self.body_progress_notify.notified() => {}
561 }
562 }
563 Ok(())
564 }
565
566 fn drain_inline_registrations(&mut self) {
572 while let Ok(reg) = self.inline_register_rx.try_recv() {
573 self.register_inline_stream(reg);
574 }
575 }
576
577 fn register_inline_stream(&mut self, reg: InlineRegistration) {
578 self.streams.insert(
579 reg.stream_id,
580 DriverStreamState::streaming_inline(
581 reg.headers_tx,
582 reg.trailers_tx,
583 reg.body_shared,
584 reg.recv_window,
585 ),
586 );
587 }
588
589 fn refresh_inline_eligibility(&self) {
595 let eligible = self.tunnels.is_empty() && !self.goaway_received.load(Ordering::Relaxed);
596 self.inline_eligible.store(eligible, Ordering::Relaxed);
597 }
598
599 fn note_stream_removed(state: &DriverStreamState, inline_active: &Arc<AtomicUsize>) {
603 if state.inline {
604 inline_active.fetch_sub(1, Ordering::AcqRel);
605 }
606 }
607
608 fn store_stream_recv_window(&mut self, stream_id: u32, recv_window: i32) {
609 if let Some(stream) = self.streams.get_mut(&stream_id) {
610 stream.recv_window = recv_window;
611 }
612 }
613
614 fn single_stream_fast_path_target(&self) -> Option<u32> {
624 if self.streams.len() != 1
625 || !self.tunnels.is_empty()
626 || !self.pending_requests.is_empty()
627 || self.pending_ping.is_some()
628 || !self.command_rx.is_empty()
629 || !self.inline_register_rx.is_empty()
630 {
631 return None;
632 }
633
634 let (stream_id, stream) = self.streams.iter().next()?;
635 if stream.streaming_body.is_none()
636 || !stream.pending_streaming_body.is_empty()
637 || stream.body_offset < stream.pending_body.len()
638 || stream.request_stream.is_some()
639 {
640 return None;
641 }
642
643 Some(*stream_id)
644 }
645
646 async fn run_single_stream_streaming_fast_path(&mut self, stream_id: u32) -> Result<bool> {
654 let (body_shared, mut recv_window) = match self.streams.get(&stream_id) {
657 Some(stream) => match stream.streaming_body.as_ref() {
658 Some(shared) => (shared.clone(), stream.recv_window),
659 None => return Ok(false),
660 },
661 None => return Ok(false),
662 };
663
664 let mut processed_any = false;
665 let mut frames_since_queue_check = 0usize;
666 let mut queued_data_frames_since_yield = 0usize;
667
668 loop {
669 if frames_since_queue_check >= FAST_PATH_COMMAND_CHECK_INTERVAL {
670 frames_since_queue_check = 0;
671 match self.command_rx.try_recv() {
672 Ok(cmd) => {
673 self.store_stream_recv_window(stream_id, recv_window);
674 self.pending_requests.push_back(cmd);
675 return Ok(true);
676 }
677 Err(mpsc::error::TryRecvError::Empty) => {}
678 Err(mpsc::error::TryRecvError::Disconnected) => {
679 self.store_stream_recv_window(stream_id, recv_window);
680 return Ok(processed_any);
681 }
682 }
683
684 if let Ok(reg) = self.inline_register_rx.try_recv() {
685 self.store_stream_recv_window(stream_id, recv_window);
686 self.register_inline_stream(reg);
687 return Ok(true);
688 }
689
690 if body_shared.is_closed() {
691 self.cancel_stream_for_dropped_receiver(stream_id).await;
692 return Ok(true);
693 }
694 }
695
696 let (header, payload) = match self.connection.read_next_frame().await {
697 Ok(frame) => frame,
698 Err(e) => {
699 tracing::error!("H2Driver read error (fast path): {:?}", e);
700 return Err(e);
701 }
702 };
703 processed_any = true;
704 frames_since_queue_check += 1;
705
706 if header.stream_id == stream_id && header.frame_type == FrameType::Data {
707 let end_stream = (header.flags & flags::END_STREAM) != 0;
708 let data = if (header.flags & flags::PADDED) == 0 {
709 payload
710 } else {
711 self.connection
712 .parse_inbound_data_payload(stream_id, header.flags, payload)?
713 };
714 let data_len = data.len();
715 let mut should_yield_for_body_queue = false;
716 if let Some(increment) = self
717 .connection
718 .apply_conn_inbound_flow_control_delta(data_len)
719 {
720 self.connection
721 .send_connection_window_update(increment)
722 .await?;
723 }
724 if data_len > 0 {
725 recv_window -= data_len as i32;
726 }
727 let push = body_shared.push_data(data, end_stream);
728 match push {
729 H2BodyDataPush::Accepted { queued_len } => {
730 if queued_len > 1 {
731 queued_data_frames_since_yield += 1;
732 should_yield_for_body_queue = queued_data_frames_since_yield
733 >= FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET;
734 if should_yield_for_body_queue {
735 queued_data_frames_since_yield = 0;
736 }
737 } else {
738 queued_data_frames_since_yield = 0;
739 }
740 }
741 H2BodyDataPush::Full(data) => {
742 if let Some(stream) = self.streams.get_mut(&stream_id) {
743 stream.recv_window = recv_window;
744 stream.pending_streaming_body.push_back(Ok(data));
745 if end_stream {
746 stream.pending_streaming_end = true;
747 }
748 }
749 return Ok(true);
750 }
751 H2BodyDataPush::Closed => {
752 self.cancel_stream_for_dropped_receiver(stream_id).await;
753 return Ok(true);
754 }
755 }
756 if should_yield_for_body_queue {
757 tokio::task::yield_now().await;
758 if body_shared.is_closed() {
759 self.cancel_stream_for_dropped_receiver(stream_id).await;
760 return Ok(true);
761 }
762 }
763
764 if end_stream {
765 if data_len == 0 {
766 body_shared.finish();
767 }
768 tokio::task::yield_now().await;
769 self.complete_stream(stream_id);
770 return Ok(true);
771 }
772 } else {
773 self.store_stream_recv_window(stream_id, recv_window);
774 if let Err(e) = self.handle_frame(header, payload).await {
775 tracing::error!("H2Driver frame error (fast path): {:?}", e);
776 return Err(e);
777 }
778 if self.single_stream_fast_path_target() != Some(stream_id) {
779 return Ok(true);
780 }
781 }
782 }
783 }
784
785 fn check_keepalive_timeout(&self) -> Result<()> {
786 if let Some((_, sent_at)) = self.pending_ping {
787 if sent_at.elapsed() >= self.config.keep_alive_timeout {
788 return Err(Error::HttpProtocol(
789 "HTTP/2 keepalive ping timed out".into(),
790 ));
791 }
792 }
793 Ok(())
794 }
795
796 fn keepalive_delay(&self) -> Option<Duration> {
797 let interval = self.config.keep_alive_interval?;
798 if self.pending_ping.is_some() {
799 return None;
800 }
801 if !self.config.keep_alive_while_idle && self.active_stream_count() == 0 {
802 return None;
803 }
804 Some(interval)
805 }
806
807 fn has_pending_streaming_body(&self) -> bool {
808 self.streams.values().any(|stream| {
809 stream.streaming_body.is_some()
810 && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
811 })
812 }
813
814 fn has_request_body_work(&self) -> bool {
815 self.streams
816 .values()
817 .any(|stream| stream.request_stream.is_some())
818 }
819
820 async fn handle_send_request(&mut self, cmd: DriverCommand) -> Result<()> {
822 if !self.has_available_stream_slot() {
823 self.pending_requests.push_back(cmd);
825 } else {
826 self.send_request_internal(cmd).await?;
828 }
829 Ok(())
830 }
831
832 async fn handle_send_streaming_request(&mut self, cmd: DriverCommand) -> Result<()> {
833 if !self.has_available_stream_slot() {
834 self.pending_requests.push_back(cmd);
835 } else {
836 self.send_streaming_request_internal(cmd).await?;
837 }
838 Ok(())
839 }
840
841 async fn process_pending_requests(&mut self) -> Result<()> {
843 while self.has_available_stream_slot() {
844 if let Some(cmd) = self.pending_requests.pop_front() {
845 match cmd {
846 DriverCommand::SendRequest { .. } => {
847 self.send_request_internal(cmd).await?;
848 }
849 DriverCommand::OpenWebSocketTunnel {
850 uri,
851 headers,
852 response_tx,
853 } => {
854 self.open_websocket_tunnel_internal(uri, headers, response_tx)
855 .await?;
856 }
857 DriverCommand::SendStreamingRequest { .. } => {
858 self.send_streaming_request_internal(cmd).await?;
859 }
860 DriverCommand::SendTunnelData {
861 stream_id,
862 outbound,
863 } => {
864 self.queue_tunnel_outbound(stream_id, outbound).await?;
865 }
866 }
867 } else {
868 break;
869 }
870 }
871 Ok(())
872 }
873
874 fn active_stream_count(&self) -> usize {
875 self.streams.len() + self.tunnels.len()
876 }
877
878 fn has_available_stream_slot(&self) -> bool {
879 let max_streams = self.config.effective_max_concurrent_streams(
880 self.connection.peer_settings().max_concurrent_streams,
881 );
882 self.active_stream_count() < max_streams
883 }
884
885 async fn send_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
887 if let DriverCommand::SendRequest {
888 method,
889 uri,
890 headers,
891 body,
892 response_tx,
893 } = cmd
894 {
895 let body_bytes = body.unwrap_or_default();
896 let has_body = !body_bytes.is_empty();
897 let end_stream = !has_body;
898
899 let initial_recv_window = self.connection.local_initial_window_size() as i32;
900 match self
901 .connection
902 .send_headers_raw(&method, &uri, &headers, end_stream)
903 .await
904 {
905 Ok(stream_id) => {
906 self.streams.insert(
907 stream_id,
908 DriverStreamState::new(response_tx, body_bytes, initial_recv_window),
909 );
910
911 self.flush_pending_data().await?;
912 }
913 Err(e) => {
914 if response_tx.send(Err(e)).is_err() {
915 tracing::debug!("Response channel closed while sending error");
916 }
917 }
918 }
919 }
920 Ok(())
921 }
922
923 async fn send_streaming_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
924 if let DriverCommand::SendStreamingRequest {
925 method,
926 uri,
927 headers,
928 body,
929 body_shared,
930 headers_tx,
931 trailers_tx,
932 } = cmd
933 {
934 let (body_bytes, request_stream, end_stream) = match body {
935 RequestBody::Empty => (Bytes::new(), None, true),
936 RequestBody::Bytes(bytes) => {
937 let end_stream = bytes.is_empty();
938 (bytes, None, end_stream)
939 }
940 RequestBody::Text(text) => {
941 let bytes = Bytes::from(text.into_bytes());
942 let end_stream = bytes.is_empty();
943 (bytes, None, end_stream)
944 }
945 RequestBody::Json(bytes) => {
946 let bytes = Bytes::from(bytes);
947 let end_stream = bytes.is_empty();
948 (bytes, None, end_stream)
949 }
950 RequestBody::Form(text) => {
951 let bytes = Bytes::from(text.into_bytes());
952 let end_stream = bytes.is_empty();
953 (bytes, None, end_stream)
954 }
955 RequestBody::Stream {
956 stream,
957 content_length,
958 } => (
959 Bytes::new(),
960 Some(DriverStreamingRequestBody::new(stream, content_length)),
961 false,
962 ),
963 };
964
965 let initial_recv_window = self.connection.local_initial_window_size() as i32;
966 match self
967 .connection
968 .send_headers_raw(&method, &uri, &headers, end_stream)
969 .await
970 {
971 Ok(stream_id) => {
972 self.streams.insert(
973 stream_id,
974 DriverStreamState::streaming(
975 headers_tx,
976 trailers_tx,
977 body_shared,
978 body_bytes,
979 request_stream,
980 initial_recv_window,
981 ),
982 );
983 self.flush_pending_data().await?;
984 }
985 Err(error) => {
986 let _ = headers_tx.send(Err(error));
987 }
988 }
989 }
990 Ok(())
991 }
992
993 async fn handle_open_websocket_tunnel(
994 &mut self,
995 uri: Uri,
996 headers: Vec<(String, String)>,
997 response_tx: oneshot::Sender<Result<H2Tunnel>>,
998 ) -> Result<()> {
999 if !self.has_available_stream_slot() {
1000 self.pending_requests
1001 .push_back(DriverCommand::OpenWebSocketTunnel {
1002 uri,
1003 headers,
1004 response_tx,
1005 });
1006 return Ok(());
1007 }
1008
1009 self.open_websocket_tunnel_internal(uri, headers, response_tx)
1010 .await
1011 }
1012
1013 async fn open_websocket_tunnel_internal(
1014 &mut self,
1015 uri: Uri,
1016 headers: Vec<(String, String)>,
1017 response_tx: oneshot::Sender<Result<H2Tunnel>>,
1018 ) -> Result<()> {
1019 let headers = Headers::from(headers);
1020 match self
1021 .connection
1022 .open_extended_connect_websocket_with_end_stream(&uri, &headers)
1023 .await
1024 {
1025 Ok((stream_id, end_stream)) => {
1026 let (outbound_tx, outbound_rx) = mpsc::channel(32);
1027 let (inbound_tx, inbound_rx) = mpsc::channel(32);
1028 let initial_window_size = self.connection.local_initial_window_size();
1029 let initial_recv_window = initial_window_size as i32;
1030 let credit =
1031 H2TunnelCredit::new(self.body_progress_notify.clone(), initial_window_size);
1032 if end_stream {
1033 let _ = inbound_tx.send(Ok(H2TunnelEvent::EndStream)).await;
1034 self.connection.remove_stream(stream_id);
1035 } else {
1036 let command_tx = self.command_tx.clone();
1037 tokio::spawn(async move {
1038 let mut outbound_rx = outbound_rx;
1039 while let Some(outbound) = outbound_rx.recv().await {
1040 if command_tx
1041 .send(DriverCommand::SendTunnelData {
1042 stream_id,
1043 outbound,
1044 })
1045 .await
1046 .is_err()
1047 {
1048 break;
1049 }
1050 }
1051 });
1052 self.tunnels.insert(
1053 stream_id,
1054 DriverTunnelState {
1055 inbound_tx,
1056 pending_inbound: VecDeque::new(),
1057 pending_outbound: VecDeque::new(),
1058 recv_window: initial_recv_window,
1059 pending_recv_window_update: 0,
1060 credit: credit.clone(),
1061 },
1062 );
1063 }
1064
1065 if response_tx
1066 .send(Ok(H2Tunnel::new_with_credit(
1067 outbound_tx,
1068 inbound_rx,
1069 credit,
1070 )))
1071 .is_err()
1072 {
1073 tracing::debug!("Tunnel response channel closed after open");
1074 self.tunnels.remove(&stream_id);
1075 }
1076 }
1077 Err(e) => {
1078 if response_tx.send(Err(e)).is_err() {
1079 tracing::debug!("Tunnel response channel closed while sending open error");
1080 }
1081 }
1082 }
1083 Ok(())
1084 }
1085
1086 async fn queue_tunnel_outbound(
1087 &mut self,
1088 stream_id: u32,
1089 outbound: H2TunnelOutbound,
1090 ) -> Result<()> {
1091 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1092 tunnel.pending_outbound.push_back(outbound);
1093 self.flush_tunnel_data().await?;
1094 }
1095
1096 Ok(())
1097 }
1098
1099 async fn flush_pending_data(&mut self) -> Result<()> {
1101 if !self.streams.values().any(|stream| {
1102 stream.body_offset < stream.pending_body.len() || stream.request_stream.is_some()
1103 }) {
1104 return Ok(());
1105 }
1106
1107 let stream_ids: Vec<u32> = self.streams.keys().cloned().collect();
1109
1110 for stream_id in stream_ids {
1111 loop {
1113 let (has_data, offset) = if let Some(stream) = self.streams.get(&stream_id) {
1115 (
1116 stream.body_offset < stream.pending_body.len(),
1117 stream.body_offset,
1118 )
1119 } else {
1120 (false, 0)
1121 };
1122
1123 if !has_data {
1124 break;
1125 }
1126
1127 let pending_body = {
1130 let s = self.streams.get(&stream_id).unwrap();
1131 s.pending_body.clone()
1132 };
1133
1134 let remaining = &pending_body[offset..];
1135 let is_last_chunk = true;
1136
1137 let sent = self
1139 .connection
1140 .send_data(stream_id, remaining, is_last_chunk)
1141 .await?;
1142
1143 if sent > 0 {
1144 if let Some(stream) = self.streams.get_mut(&stream_id) {
1145 stream.body_offset += sent;
1146 }
1147 } else {
1149 break;
1151 }
1152 }
1153
1154 self.flush_streaming_request_body(stream_id).await?;
1155 }
1156 Ok(())
1157 }
1158
1159 async fn flush_streaming_request_body(&mut self, stream_id: u32) -> Result<()> {
1160 loop {
1161 if self
1162 .streams
1163 .get(&stream_id)
1164 .and_then(|stream| stream.request_stream.as_ref())
1165 .is_none()
1166 {
1167 return Ok(());
1168 }
1169
1170 let available = self.connection.available_send_window(stream_id).await?;
1171 if available <= 0 {
1172 return Ok(());
1173 }
1174
1175 let batch_limit = (available as usize).min(self.connection.max_data_frame_size());
1176 if batch_limit == 0 {
1177 return Ok(());
1178 }
1179
1180 let mut batch = BytesMut::with_capacity(batch_limit);
1181 let mut end_stream = false;
1182 let mut remove_request_stream_after_send = false;
1183
1184 while batch.len() < batch_limit {
1185 let current = self
1186 .streams
1187 .get(&stream_id)
1188 .and_then(|stream| stream.request_stream.as_ref())
1189 .and_then(|body| {
1190 body.current_chunk
1191 .as_ref()
1192 .map(|chunk| (chunk.clone(), body.current_offset))
1193 });
1194
1195 if let Some((chunk, offset)) = current {
1196 let remaining = &chunk[offset..];
1197 let take = remaining.len().min(batch_limit - batch.len());
1198 batch.extend_from_slice(&remaining[..take]);
1199
1200 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1201 let body = stream
1202 .request_stream
1203 .as_mut()
1204 .expect("request stream exists");
1205 body.current_offset += take;
1206 body.sent += take as u64;
1207 if body.current_offset >= chunk.len() {
1208 body.current_chunk = None;
1209 body.current_offset = 0;
1210 }
1211 continue;
1212 }
1213
1214 let poll_result = {
1215 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1216 let body = stream
1217 .request_stream
1218 .as_mut()
1219 .expect("request stream exists");
1220 if body.finished {
1221 Poll::Ready(None)
1222 } else {
1223 let mut cx = std::task::Context::from_waker(&self.request_body_waker);
1224 body.stream.as_mut().poll_next(&mut cx)
1225 }
1226 };
1227
1228 match poll_result {
1229 Poll::Pending => break,
1230 Poll::Ready(Some(Ok(chunk))) => {
1231 if chunk.is_empty() {
1232 continue;
1233 }
1234 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1235 let body = stream
1236 .request_stream
1237 .as_mut()
1238 .expect("request stream exists");
1239 body.current_chunk = Some(chunk);
1240 body.current_offset = 0;
1241 }
1242 Poll::Ready(Some(Err(error))) => {
1243 self.fail_stream(
1244 stream_id,
1245 format!("request body stream error: {}", error),
1246 )
1247 .await;
1248 return Ok(());
1249 }
1250 Poll::Ready(None) => {
1251 let (valid_len, sent, expected, already_sent_end) = {
1252 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1253 let body = stream
1254 .request_stream
1255 .as_mut()
1256 .expect("request stream exists");
1257 body.finished = true;
1258 (
1259 body.content_length
1260 .map(|expected| expected == body.sent)
1261 .unwrap_or(true),
1262 body.sent,
1263 body.content_length,
1264 body.end_stream_sent,
1265 )
1266 };
1267 if !valid_len {
1268 self.fail_stream(
1269 stream_id,
1270 format!(
1271 "sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
1272 sent,
1273 expected.unwrap_or_default()
1274 ),
1275 )
1276 .await;
1277 return Ok(());
1278 }
1279 if already_sent_end {
1280 return Ok(());
1281 }
1282 end_stream = true;
1283 remove_request_stream_after_send = true;
1284 break;
1285 }
1286 }
1287 }
1288
1289 if batch.is_empty() {
1290 if end_stream {
1291 let sent = self.connection.send_data(stream_id, &[], true).await?;
1292 debug_assert_eq!(sent, 0);
1293 if let Some(stream) = self.streams.get_mut(&stream_id) {
1294 if let Some(body) = stream.request_stream.as_mut() {
1295 body.end_stream_sent = true;
1296 }
1297 stream.request_stream = None;
1298 }
1299 }
1300 return Ok(());
1301 }
1302
1303 let sent = self
1304 .connection
1305 .send_data(stream_id, &batch, end_stream)
1306 .await?;
1307 if sent == 0 {
1308 return Ok(());
1309 }
1310 if sent != batch.len() {
1311 return Err(Error::HttpProtocol(
1312 "short DATA write after preflighted flow-control window".into(),
1313 ));
1314 }
1315
1316 if remove_request_stream_after_send {
1317 if let Some(stream) = self.streams.get_mut(&stream_id) {
1318 if let Some(body) = stream.request_stream.as_mut() {
1319 body.end_stream_sent = true;
1320 }
1321 stream.request_stream = None;
1322 }
1323 return Ok(());
1324 }
1325 }
1326 }
1327
1328 async fn flush_tunnel_data(&mut self) -> Result<()> {
1329 if self.tunnels.is_empty() {
1330 return Ok(());
1331 }
1332 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1333
1334 for stream_id in stream_ids {
1335 loop {
1336 let outbound = match self
1337 .tunnels
1338 .get_mut(&stream_id)
1339 .and_then(|tunnel| tunnel.pending_outbound.pop_front())
1340 {
1341 Some(outbound) => outbound,
1342 None => break,
1343 };
1344
1345 let sent = self
1346 .connection
1347 .send_data(stream_id, &outbound.bytes, outbound.end_stream)
1348 .await?;
1349
1350 if outbound.bytes.is_empty() {
1351 continue;
1352 }
1353
1354 if sent == 0 {
1355 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1356 tunnel.pending_outbound.push_front(outbound);
1357 }
1358 break;
1359 }
1360
1361 if sent < outbound.bytes.len() {
1362 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1363 tunnel.pending_outbound.push_front(H2TunnelOutbound {
1364 bytes: outbound.bytes.slice(sent..),
1365 end_stream: outbound.end_stream,
1366 });
1367 }
1368 break;
1369 }
1370 }
1371 }
1372
1373 Ok(())
1374 }
1375
1376 async fn flush_tunnel_inbound(&mut self) -> Result<()> {
1377 if self.tunnels.is_empty() {
1378 return Ok(());
1379 }
1380
1381 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1382 for stream_id in stream_ids {
1383 let status = self
1384 .tunnels
1385 .get_mut(&stream_id)
1386 .map(DriverTunnelState::flush_inbound)
1387 .unwrap_or(TunnelInboundStatus::Open);
1388 self.apply_tunnel_inbound_status(stream_id, status).await?;
1389 }
1390
1391 Ok(())
1392 }
1393
1394 async fn apply_tunnel_inbound_status(
1395 &mut self,
1396 stream_id: u32,
1397 status: TunnelInboundStatus,
1398 ) -> Result<()> {
1399 match status {
1400 TunnelInboundStatus::Open | TunnelInboundStatus::Blocked => {}
1401 TunnelInboundStatus::Remove => {
1402 self.tunnels.remove(&stream_id);
1403 self.connection.remove_stream(stream_id);
1404 self.process_pending_requests().await?;
1405 }
1406 TunnelInboundStatus::Closed => {
1407 self.tunnels.remove(&stream_id);
1408 self.connection.remove_stream(stream_id);
1409 if let Err(e) = self
1410 .connection
1411 .send_rst_stream(stream_id, ErrorCode::Cancel)
1412 .await
1413 {
1414 tracing::warn!("Failed to send RST_STREAM for dropped tunnel: {:?}", e);
1415 }
1416 self.process_pending_requests().await?;
1417 }
1418 }
1419 Ok(())
1420 }
1421
1422 async fn apply_released_body_credits(&mut self) -> Result<()> {
1423 let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1424 for stream_id in stream_ids {
1425 let (released, closed) = self
1426 .streams
1427 .get(&stream_id)
1428 .and_then(|stream| stream.streaming_body.as_ref())
1429 .map(|body| (body.take_released_recv_bytes(), body.is_closed()))
1430 .unwrap_or((0, false));
1431
1432 if closed {
1433 self.cancel_stream_for_dropped_receiver(stream_id).await;
1434 continue;
1435 }
1436
1437 if released == 0 {
1438 continue;
1439 }
1440
1441 let refresh_threshold = self.connection.flow_control_refresh_threshold();
1442 let window_update_step = self.connection.stream_window_update_step();
1443 let increment = self.streams.get_mut(&stream_id).and_then(|stream| {
1444 stream.pending_recv_window_update =
1445 stream.pending_recv_window_update.saturating_add(released);
1446 let should_update = stream.pending_recv_window_update >= window_update_step
1447 || stream.recv_window < refresh_threshold;
1448 if should_update {
1449 let increment = stream.pending_recv_window_update.min(u32::MAX as usize);
1450 stream.pending_recv_window_update -= increment;
1451 stream.recv_window = stream.recv_window.saturating_add(increment as i32);
1452 Some(increment as u32)
1453 } else {
1454 None
1455 }
1456 });
1457
1458 if let Some(increment) = increment {
1459 self.connection
1460 .send_stream_window_update(stream_id, increment)
1461 .await?;
1462 }
1463 }
1464
1465 Ok(())
1466 }
1467
1468 async fn apply_released_tunnel_credits(&mut self) -> Result<()> {
1469 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1470 for stream_id in stream_ids {
1471 let (released, closed) = self
1472 .tunnels
1473 .get(&stream_id)
1474 .map(|tunnel| {
1475 (
1476 tunnel.credit.take_released_recv_bytes(),
1477 tunnel.inbound_tx.is_closed(),
1478 )
1479 })
1480 .unwrap_or((0, false));
1481
1482 if closed {
1483 self.apply_tunnel_inbound_status(stream_id, TunnelInboundStatus::Closed)
1484 .await?;
1485 continue;
1486 }
1487
1488 if released == 0 {
1489 continue;
1490 }
1491
1492 let refresh_threshold = self.connection.flow_control_refresh_threshold();
1493 let window_update_step = self.connection.stream_window_update_step();
1494 let increment = self.tunnels.get_mut(&stream_id).and_then(|tunnel| {
1495 tunnel.pending_recv_window_update =
1496 tunnel.pending_recv_window_update.saturating_add(released);
1497 let should_update = tunnel.pending_recv_window_update >= window_update_step
1498 || tunnel.recv_window < refresh_threshold;
1499 if should_update {
1500 let increment = tunnel.pending_recv_window_update.min(u32::MAX as usize);
1501 tunnel.pending_recv_window_update -= increment;
1502 tunnel.recv_window = tunnel.recv_window.saturating_add(increment as i32);
1503 Some(increment as u32)
1504 } else {
1505 None
1506 }
1507 });
1508
1509 if let Some(increment) = increment {
1510 self.connection
1511 .send_stream_window_update(stream_id, increment)
1512 .await?;
1513 }
1514 }
1515
1516 Ok(())
1517 }
1518
1519 async fn fail_stream(&mut self, stream_id: u32, message: String) {
1520 self.connection.remove_stream(stream_id);
1521 if let Some(mut stream) = self.streams.remove(&stream_id) {
1522 Self::note_stream_removed(&stream, &self.inline_active);
1523 if let Some(tx) = stream.response_tx.take() {
1524 let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1525 }
1526 if let Some(tx) = stream.streaming_headers_tx.take() {
1527 let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1528 }
1529 if let Some(tx) = stream.trailers_tx.take() {
1530 let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1531 }
1532 if let Some(body) = stream.streaming_body.take() {
1533 let _ = body.fail(Error::HttpProtocol(message));
1534 }
1535 }
1536 }
1537
1538 async fn cancel_stream_for_dropped_receiver(&mut self, stream_id: u32) {
1539 if let Some(state) = self.streams.remove(&stream_id) {
1540 Self::note_stream_removed(&state, &self.inline_active);
1541 }
1542 self.connection.remove_stream(stream_id);
1543 if let Err(e) = self
1544 .connection
1545 .send_rst_stream(stream_id, ErrorCode::Cancel)
1546 .await
1547 {
1548 tracing::warn!("Failed to send RST_STREAM for dropped receiver: {:?}", e);
1549 }
1550 }
1551
1552 async fn flush_pending_streaming_bodies(&mut self) -> Result<()> {
1553 if !self.has_pending_streaming_body() {
1554 return Ok(());
1555 }
1556 let stream_ids: Vec<u32> = self
1557 .streams
1558 .iter()
1559 .filter_map(|(stream_id, stream)| {
1560 if stream.streaming_body.is_some()
1561 && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
1562 {
1563 Some(*stream_id)
1564 } else {
1565 None
1566 }
1567 })
1568 .collect();
1569
1570 for stream_id in stream_ids {
1571 let mut should_cancel = false;
1572 let mut should_complete = false;
1573
1574 if let Some(stream) = self.streams.get_mut(&stream_id) {
1575 let Some(body) = stream.streaming_body.as_ref() else {
1576 continue;
1577 };
1578
1579 if body.is_closed() {
1580 should_cancel = true;
1581 } else {
1582 while let Some(item) = stream.pending_streaming_body.pop_front() {
1583 match body.push(item) {
1584 H2BodyPush::Accepted => {}
1585 H2BodyPush::Full(item) => {
1586 stream.pending_streaming_body.push_front(item);
1587 break;
1588 }
1589 H2BodyPush::Closed => {
1590 should_cancel = true;
1591 break;
1592 }
1593 }
1594 }
1595
1596 should_complete =
1597 stream.pending_streaming_end && stream.pending_streaming_body.is_empty();
1598 }
1599 }
1600
1601 if should_cancel {
1602 self.cancel_stream_for_dropped_receiver(stream_id).await;
1603 } else if should_complete {
1604 if let Some(body) = self
1605 .streams
1606 .get(&stream_id)
1607 .and_then(|stream| stream.streaming_body.as_ref())
1608 {
1609 body.finish();
1610 }
1611 self.complete_stream(stream_id);
1612 }
1613 }
1614
1615 Ok(())
1616 }
1617
1618 async fn handle_frame(&mut self, header: FrameHeader, mut payload: Bytes) -> Result<()> {
1620 self.drain_inline_registrations();
1623
1624 if header.stream_id != 0 {
1627 if let Some(stream) = self.streams.get(&header.stream_id) {
1628 if let Some(ref body) = stream.streaming_body {
1629 if body.is_closed() {
1630 let stream_id = header.stream_id;
1631 self.cancel_stream_for_dropped_receiver(stream_id).await;
1632 return Ok(());
1633 }
1634 }
1635 }
1636 }
1637
1638 if matches!(
1640 header.frame_type,
1641 FrameType::Settings
1642 | FrameType::WindowUpdate
1643 | FrameType::Ping
1644 | FrameType::GoAway
1645 | FrameType::RstStream
1646 | FrameType::PushPromise
1647 ) {
1648 match self
1649 .connection
1650 .handle_control_frame(&header, payload.clone())
1651 .await?
1652 {
1653 ControlAction::RstStream(sid, code) => {
1654 if let Some(tunnel) = self.tunnels.remove(&sid) {
1655 let _ = tunnel
1656 .inbound_tx
1657 .send(Ok(H2TunnelEvent::Reset(format!("{:?}", code))))
1658 .await;
1659 }
1660 self.fail_stream(sid, format!("Stream reset by peer: {:?}", code))
1662 .await;
1663 self.process_pending_requests().await?;
1665 return Ok(());
1666 }
1667 ControlAction::GoAway(last_sid) => {
1668 self.goaway_received
1669 .store(true, std::sync::atomic::Ordering::Relaxed);
1670 let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1671 for sid in stream_ids {
1672 if let Some(stream) = self.streams.get(&sid) {
1673 if stream.streaming_body.is_some()
1674 && stream
1675 .streaming_body
1676 .as_ref()
1677 .is_some_and(|body| body.is_slot_available())
1678 {
1679 if let Some(body) = stream.streaming_body.as_ref() {
1680 body.finish();
1681 }
1682 }
1683 }
1684 }
1685 let tunnel_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1686 for sid in tunnel_ids {
1687 if sid > last_sid {
1688 if let Some(tunnel) = self.tunnels.remove(&sid) {
1689 let _ = tunnel
1690 .inbound_tx
1691 .send(Ok(H2TunnelEvent::GoAway {
1692 last_stream_id: last_sid,
1693 }))
1694 .await;
1695 }
1696 }
1697 }
1698 let sids: Vec<u32> = self.streams.keys().cloned().collect();
1700 for sid in sids {
1701 if sid > last_sid {
1702 self.fail_stream(sid, "GOAWAY received".into()).await;
1703 }
1704 }
1705 return Ok(());
1708 }
1709 ControlAction::RefusePush(_stream_id, promised_id) => {
1710 if let Err(e) = self
1713 .connection
1714 .send_rst_stream(promised_id, ErrorCode::RefusedStream)
1715 .await
1716 {
1717 tracing::warn!(
1718 "Failed to send RST_STREAM for refused push promise: {:?}",
1719 e
1720 );
1721 }
1722 }
1723 ControlAction::PingAck(data) => {
1724 if self
1725 .pending_ping
1726 .is_some_and(|(pending_data, _)| pending_data == data)
1727 {
1728 self.pending_ping = None;
1729 }
1730 return Ok(());
1731 }
1732 ControlAction::None => {
1733 }
1735 }
1736 }
1737
1738 match header.frame_type {
1740 FrameType::Headers => {
1741 let stream_id = header.stream_id;
1742
1743 if (header.flags & flags::END_HEADERS) == 0 {
1747 let mut block = BytesMut::from(payload);
1751 loop {
1752 let (next_header, next_payload) = self.connection.read_next_frame().await?;
1753 if next_header.frame_type != FrameType::Continuation {
1754 return Err(Error::HttpProtocol("Expected CONTINUATION frame".into()));
1755 }
1756 if next_header.stream_id != stream_id {
1757 return Err(Error::HttpProtocol(
1758 "CONTINUATION frame stream ID mismatch".into(),
1759 ));
1760 }
1761 block.extend_from_slice(&next_payload);
1762 if (next_header.flags & flags::END_HEADERS) != 0 {
1763 break;
1764 }
1765 }
1766 payload = block.freeze();
1767 }
1768
1769 let decoded = self.connection.decode_header_block(payload)?;
1770
1771 let mut status = 0u16;
1773 let mut regular_headers = Vec::new();
1774
1775 for (name, value) in decoded {
1776 if name == ":status" {
1777 status = value.parse().unwrap_or(0);
1778 } else if !name.starts_with(':') {
1779 regular_headers.push((name, value));
1780 }
1781 }
1782
1783 if let Some(stream) = self.streams.get_mut(&stream_id) {
1784 if status >= 200 {
1785 let header_end_stream = (header.flags & flags::END_STREAM) != 0
1786 || self.goaway_received.load(Ordering::Relaxed);
1787 if let Some(tx) = stream.streaming_headers_tx.take() {
1788 let _ = tx.send(Ok((status, regular_headers)));
1789 if header_end_stream {
1790 if let Some(body) = stream.streaming_body.as_ref() {
1791 body.finish();
1792 }
1793 }
1794 } else {
1795 stream.status = Some(status);
1796 stream.headers = regular_headers;
1797 }
1798 } else if status > 0 {
1799 tracing::debug!("H2Driver: Ignoring informational status {}", status);
1801 } else {
1802 tracing::debug!("H2Driver: Received trailers for stream {}", stream_id);
1804 if let Some(tx) = stream.trailers_tx.take() {
1810 let _ = tx.send(Ok(regular_headers));
1811 }
1812 if (header.flags & flags::END_STREAM) != 0 {
1813 if let Some(body) = stream.streaming_body.as_ref() {
1814 body.finish();
1815 }
1816 }
1817 }
1818
1819 if (header.flags & flags::END_STREAM) != 0 {
1820 self.complete_stream(stream_id);
1821 }
1822 }
1823 }
1824 FrameType::Data => {
1825 let stream_id = header.stream_id;
1826 let end_stream = (header.flags & flags::END_STREAM) != 0;
1827
1828 let data =
1829 self.connection
1830 .parse_inbound_data_payload(stream_id, header.flags, payload)?;
1831 let data_len = data.len();
1832 if let Some(increment) = self
1833 .connection
1834 .apply_conn_inbound_flow_control_delta(data_len)
1835 {
1836 self.connection
1837 .send_connection_window_update(increment)
1838 .await?;
1839 }
1840
1841 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1842 if data_len > 0 {
1843 tunnel.recv_window -= data_len as i32;
1844 }
1845
1846 let mut status = TunnelInboundStatus::Open;
1847 if !data.is_empty() {
1848 status = tunnel.push_inbound(H2TunnelEvent::Data(data));
1849 }
1850 if status == TunnelInboundStatus::Open && end_stream {
1851 status = tunnel.push_inbound(H2TunnelEvent::EndStream);
1852 }
1853 self.apply_tunnel_inbound_status(stream_id, status).await?;
1854 return Ok(());
1855 }
1856
1857 let mut should_cancel = false;
1858 let mut should_complete = false;
1859 let mut should_finish_body = false;
1860 let mut should_yield_before_complete = false;
1861
1862 if let Some(stream) = self.streams.get_mut(&stream_id) {
1863 if data_len > 0 {
1864 stream.recv_window -= data_len as i32;
1865 }
1866
1867 if let Some(body) = stream.streaming_body.as_ref() {
1868 if !data.is_empty() {
1869 if stream.pending_streaming_body.is_empty() {
1870 let push = body.push_data(data, end_stream);
1871 match push {
1872 H2BodyDataPush::Accepted { .. } => {}
1873 H2BodyDataPush::Full(data) => {
1874 stream.pending_streaming_body.push_back(Ok(data));
1875 }
1876 H2BodyDataPush::Closed => {
1877 should_cancel = true;
1878 }
1879 }
1880 } else {
1881 stream.pending_streaming_body.push_back(Ok(data));
1882 }
1883 }
1884
1885 if end_stream {
1886 if stream.pending_streaming_body.is_empty() {
1887 should_complete = true;
1888 should_finish_body = data_len == 0;
1889 should_yield_before_complete = true;
1890 } else {
1891 stream.pending_streaming_end = true;
1892 }
1893 }
1894 } else {
1895 stream.body.extend_from_slice(&data);
1896 should_complete = end_stream;
1897 }
1898 }
1899
1900 if should_cancel {
1901 self.cancel_stream_for_dropped_receiver(stream_id).await;
1902 return Ok(());
1903 }
1904
1905 if should_complete {
1906 if should_finish_body {
1907 if let Some(body) = self
1908 .streams
1909 .get(&stream_id)
1910 .and_then(|stream| stream.streaming_body.as_ref())
1911 {
1912 body.finish();
1913 }
1914 }
1915 if should_yield_before_complete {
1916 tokio::task::yield_now().await;
1917 }
1918 self.complete_stream(stream_id);
1919 }
1920 }
1921 FrameType::WindowUpdate => {
1922 self.flush_pending_data().await?;
1926 self.flush_tunnel_data().await?;
1927 }
1928 _ => {} }
1930
1931 Ok(())
1932 }
1933
1934 fn complete_stream(&mut self, stream_id: u32) {
1936 if let Some(mut stream) = self.streams.remove(&stream_id) {
1937 if !stream.inline {
1938 self.connection.remove_stream(stream_id);
1939 }
1940 Self::note_stream_removed(&stream, &self.inline_active);
1941 if let Some(tx) = stream.response_tx.take() {
1942 let response = match stream.status {
1945 Some(status) => Ok(StreamResponse {
1946 status,
1947 headers: stream.headers,
1948 body: stream.body.freeze(),
1949 }),
1950 None => Err(Error::HttpProtocol(format!(
1951 "Stream {} completed without status code",
1952 stream_id
1953 ))),
1954 };
1955 if tx.send(response).is_err() {
1956 tracing::debug!("Response channel closed while completing stream");
1957 }
1958 }
1959 }
1960 }
1963}