1use bytes::{Bytes, BytesMut};
8use http::{Method, Uri};
9use std::collections::{HashMap, VecDeque};
10use std::fmt;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::task::{Poll, Wake, Waker};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tokio::sync::oneshot;
17use tokio::sync::Notify;
18use tracing;
19
20pub type StreamingHeadersResult = Result<(u16, Vec<(String, String)>)>;
21
22use crate::error::{Error, Result};
23use crate::headers::Headers;
24use crate::request::{RequestBody, RequestBodyStream};
25use crate::transport::h2::body::{H2BodyDataPush, H2BodyPush, H2BodyShared};
26use crate::transport::h2::connection::{
27 ControlAction, H2Connection as RawH2Connection, StreamResponse,
28};
29use crate::transport::h2::frame::{flags, ErrorCode, FrameHeader, FrameType};
30use crate::transport::h2::tunnel::{H2Tunnel, H2TunnelCredit, H2TunnelEvent, H2TunnelOutbound};
31use crate::transport::h2::H2TransportConfig;
32
33const FAST_PATH_COMMAND_CHECK_INTERVAL: usize = 8;
34const FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET: usize = 2;
35
36pub enum DriverCommand {
38 SendRequest {
41 method: http::Method,
42 uri: http::Uri,
43 headers: Headers,
44 body: Option<bytes::Bytes>,
45 response_tx: oneshot::Sender<Result<StreamResponse>>,
46 },
47 SendStreamingRequest {
49 method: Method,
50 uri: Uri,
51 headers: Headers,
52 body: RequestBody,
53 body_shared: Arc<H2BodyShared>,
54 headers_tx: oneshot::Sender<StreamingHeadersResult>,
55 },
56 OpenWebSocketTunnel {
58 uri: Uri,
59 headers: Vec<(String, String)>,
60 response_tx: oneshot::Sender<Result<H2Tunnel>>,
61 },
62 SendTunnelData {
64 stream_id: u32,
65 outbound: H2TunnelOutbound,
66 },
67}
68
69impl fmt::Debug for DriverCommand {
70 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71 match self {
72 DriverCommand::SendRequest {
73 method,
74 uri,
75 headers,
76 body,
77 ..
78 } => f
79 .debug_struct("SendRequest")
80 .field("method", method)
81 .field("uri", uri)
82 .field("headers_len", &headers.len())
83 .field("body_len", &body.as_ref().map(Bytes::len))
84 .finish(),
85 DriverCommand::SendStreamingRequest {
86 method,
87 uri,
88 headers,
89 body,
90 ..
91 } => f
92 .debug_struct("SendStreamingRequest")
93 .field("method", method)
94 .field("uri", uri)
95 .field("headers_len", &headers.len())
96 .field("body", body)
97 .finish(),
98 DriverCommand::OpenWebSocketTunnel { uri, headers, .. } => f
99 .debug_struct("OpenWebSocketTunnel")
100 .field("uri", uri)
101 .field("headers_len", &headers.len())
102 .finish(),
103 DriverCommand::SendTunnelData {
104 stream_id,
105 outbound,
106 } => f
107 .debug_struct("SendTunnelData")
108 .field("stream_id", stream_id)
109 .field("outbound", outbound)
110 .finish(),
111 }
112 }
113}
114
115pub struct InlineRegistration {
121 pub stream_id: u32,
122 pub headers_tx: oneshot::Sender<StreamingHeadersResult>,
123 pub body_shared: Arc<H2BodyShared>,
124 pub recv_window: i32,
125}
126
127struct NotifyWake(Arc<Notify>);
128
129impl Wake for NotifyWake {
130 fn wake(self: Arc<Self>) {
131 self.0.notify_one();
132 }
133
134 fn wake_by_ref(self: &Arc<Self>) {
135 self.0.notify_one();
136 }
137}
138
139struct DriverStreamingRequestBody {
140 stream: RequestBodyStream,
141 content_length: Option<u64>,
142 current_chunk: Option<Bytes>,
143 current_offset: usize,
144 sent: u64,
145 finished: bool,
146 end_stream_sent: bool,
147}
148
149impl DriverStreamingRequestBody {
150 fn new(stream: RequestBodyStream, content_length: Option<u64>) -> Self {
151 Self {
152 stream,
153 content_length,
154 current_chunk: None,
155 current_offset: 0,
156 sent: 0,
157 finished: false,
158 end_stream_sent: false,
159 }
160 }
161}
162
163struct DriverStreamState {
165 response_tx: Option<oneshot::Sender<Result<StreamResponse>>>,
167 streaming_headers_tx: Option<oneshot::Sender<StreamingHeadersResult>>,
169 streaming_body: Option<Arc<H2BodyShared>>,
171 status: Option<u16>,
173 headers: Vec<(String, String)>,
175 body: BytesMut,
177 pending_body: Bytes,
179 body_offset: usize,
181 request_stream: Option<DriverStreamingRequestBody>,
183 pending_streaming_body: VecDeque<Result<Bytes>>,
185 pending_streaming_end: bool,
187 recv_window: i32,
191 pending_recv_window_update: usize,
195 inline: bool,
199}
200
201impl DriverStreamState {
202 fn new(
203 response_tx: oneshot::Sender<Result<StreamResponse>>,
204 pending_body: Bytes,
205 recv_window: i32,
206 ) -> Self {
207 Self {
208 response_tx: Some(response_tx),
209 streaming_headers_tx: None,
210 streaming_body: None,
211 status: None,
212 headers: Vec::new(),
213 body: BytesMut::new(),
214 pending_body,
215 body_offset: 0,
216 request_stream: None,
217 pending_streaming_body: VecDeque::new(),
218 pending_streaming_end: false,
219 recv_window,
220 pending_recv_window_update: 0,
221 inline: false,
222 }
223 }
224
225 fn streaming(
226 headers_tx: oneshot::Sender<StreamingHeadersResult>,
227 body_shared: Arc<H2BodyShared>,
228 pending_body: Bytes,
229 request_stream: Option<DriverStreamingRequestBody>,
230 recv_window: i32,
231 ) -> Self {
232 Self {
233 response_tx: None,
234 streaming_headers_tx: Some(headers_tx),
235 streaming_body: Some(body_shared),
236 status: None,
237 headers: Vec::new(),
238 body: BytesMut::new(),
239 pending_body,
240 body_offset: 0,
241 request_stream,
242 pending_streaming_body: VecDeque::new(),
243 pending_streaming_end: false,
244 recv_window,
245 pending_recv_window_update: 0,
246 inline: false,
247 }
248 }
249
250 fn streaming_inline(
251 headers_tx: oneshot::Sender<StreamingHeadersResult>,
252 body_shared: Arc<H2BodyShared>,
253 recv_window: i32,
254 ) -> Self {
255 let mut state = Self::streaming(headers_tx, body_shared, Bytes::new(), None, recv_window);
256 state.inline = true;
257 state
258 }
259}
260
261struct DriverTunnelState {
262 inbound_tx: mpsc::Sender<Result<H2TunnelEvent>>,
263 pending_inbound: VecDeque<Result<H2TunnelEvent>>,
264 pending_outbound: VecDeque<H2TunnelOutbound>,
265 recv_window: i32,
266 pending_recv_window_update: usize,
267 credit: Arc<H2TunnelCredit>,
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq)]
271enum TunnelInboundStatus {
272 Open,
273 Blocked,
274 Closed,
275 Remove,
276}
277
278impl DriverTunnelState {
279 fn push_inbound(&mut self, event: H2TunnelEvent) -> TunnelInboundStatus {
280 let item = Ok(event);
281 if !self.pending_inbound.is_empty() {
282 self.pending_inbound.push_back(item);
283 return TunnelInboundStatus::Open;
284 }
285
286 match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
287 TunnelInboundStatus::Blocked => TunnelInboundStatus::Open,
288 status => status,
289 }
290 }
291
292 fn flush_inbound(&mut self) -> TunnelInboundStatus {
293 while let Some(item) = self.pending_inbound.pop_front() {
294 match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
295 TunnelInboundStatus::Open => {}
296 TunnelInboundStatus::Blocked => return TunnelInboundStatus::Open,
297 status => return status,
298 }
299 }
300 TunnelInboundStatus::Open
301 }
302
303 fn try_send_inbound(
304 inbound_tx: &mpsc::Sender<Result<H2TunnelEvent>>,
305 pending_inbound: &mut VecDeque<Result<H2TunnelEvent>>,
306 item: Result<H2TunnelEvent>,
307 ) -> TunnelInboundStatus {
308 let remove_after_send = matches!(item, Ok(H2TunnelEvent::EndStream));
309 match inbound_tx.try_send(item) {
310 Ok(()) if remove_after_send => TunnelInboundStatus::Remove,
311 Ok(()) => TunnelInboundStatus::Open,
312 Err(mpsc::error::TrySendError::Full(item)) => {
313 pending_inbound.push_front(item);
314 TunnelInboundStatus::Blocked
315 }
316 Err(mpsc::error::TrySendError::Closed(_)) => TunnelInboundStatus::Closed,
317 }
318 }
319}
320
321pub struct H2Driver<S> {
323 command_rx: mpsc::Receiver<DriverCommand>,
325 command_tx: mpsc::Sender<DriverCommand>,
327 connection: RawH2Connection<S>,
329 streams: HashMap<u32, DriverStreamState>,
331 tunnels: HashMap<u32, DriverTunnelState>,
333 pending_requests: std::collections::VecDeque<DriverCommand>,
335 goaway_received: Arc<AtomicBool>,
337 config: H2TransportConfig,
339 pending_ping: Option<([u8; 8], Instant)>,
341 inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
345 inline_active: Arc<AtomicUsize>,
349 inline_eligible: Arc<AtomicBool>,
352 body_progress_notify: Arc<Notify>,
355 request_body_waker: Waker,
356 backpressure_stall_count: Arc<AtomicU64>,
359}
360
361impl<S> H2Driver<S>
362where
363 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
364{
365 pub fn new(
367 connection: RawH2Connection<S>,
368 command_tx: mpsc::Sender<DriverCommand>,
369 command_rx: mpsc::Receiver<DriverCommand>,
370 goaway_received: Arc<AtomicBool>,
371 config: H2TransportConfig,
372 backpressure_stall_count: Arc<AtomicU64>,
373 ) -> Self {
374 let (_, inline_register_rx) = mpsc::unbounded_channel();
375 let body_progress_notify = Arc::new(Notify::new());
376 Self::new_with_inline(
377 connection,
378 command_tx,
379 command_rx,
380 goaway_received,
381 config.normalized(),
382 inline_register_rx,
383 Arc::new(AtomicUsize::new(0)),
384 Arc::new(AtomicBool::new(false)),
385 body_progress_notify,
386 backpressure_stall_count,
387 )
388 }
389
390 #[allow(clippy::too_many_arguments)]
394 pub fn new_with_inline(
395 connection: RawH2Connection<S>,
396 command_tx: mpsc::Sender<DriverCommand>,
397 command_rx: mpsc::Receiver<DriverCommand>,
398 goaway_received: Arc<AtomicBool>,
399 config: H2TransportConfig,
400 inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
401 inline_active: Arc<AtomicUsize>,
402 inline_eligible: Arc<AtomicBool>,
403 body_progress_notify: Arc<Notify>,
404 backpressure_stall_count: Arc<AtomicU64>,
405 ) -> Self {
406 let request_body_waker = Waker::from(Arc::new(NotifyWake(body_progress_notify.clone())));
407 Self {
408 command_rx,
409 command_tx,
410 connection,
411 streams: HashMap::new(),
412 tunnels: HashMap::new(),
413 pending_requests: std::collections::VecDeque::new(),
414 goaway_received,
415 config: config.normalized(),
416 pending_ping: None,
417 inline_register_rx,
418 inline_active,
419 inline_eligible,
420 body_progress_notify,
421 request_body_waker,
422 backpressure_stall_count,
423 }
424 }
425
426 pub async fn drive(mut self) -> Result<()> {
428 loop {
429 self.drain_inline_registrations();
430
431 self.process_pending_requests().await?;
433
434 self.flush_pending_data().await?;
436 self.flush_tunnel_data().await?;
437 self.flush_tunnel_inbound().await?;
438 self.apply_released_body_credits().await?;
439 self.apply_released_tunnel_credits().await?;
440 self.flush_pending_streaming_bodies().await?;
441 self.check_keepalive_timeout()?;
442 self.refresh_inline_eligibility();
443
444 if let Some(stream_id) = self.single_stream_fast_path_target() {
445 if self
446 .run_single_stream_streaming_fast_path(stream_id)
447 .await?
448 {
449 continue;
450 }
451 }
452
453 let keepalive_delay = self.keepalive_delay();
454 let retry_streaming_backpressure =
455 self.has_pending_streaming_body() || self.has_request_body_work();
456
457 tokio::select! {
458 biased;
459 command = self.command_rx.recv() => {
461 match command {
462 Some(cmd) => {
463 match cmd {
464 DriverCommand::SendRequest { .. } => {
465 self.handle_send_request(cmd).await?;
466 }
467 DriverCommand::SendStreamingRequest { .. } => {
468 self.handle_send_streaming_request(cmd).await?;
469 }
470 DriverCommand::OpenWebSocketTunnel { uri, headers, response_tx } => {
471 self.handle_open_websocket_tunnel(uri, headers, response_tx).await?;
472 }
473 DriverCommand::SendTunnelData { stream_id, outbound } => {
474 self.queue_tunnel_outbound(stream_id, outbound).await?;
475 }
476 }
477 }
478 None => {
479 break;
481 }
482 }
483 }
484
485 inline = self.inline_register_rx.recv(), if !self.inline_register_rx.is_closed() => {
487 if let Some(reg) = inline {
488 self.register_inline_stream(reg);
489 }
490 }
491
492 read_res = self.connection.read_next_frame() => {
494 match read_res {
495 Ok((header, payload)) => {
496 if let Err(e) = self.handle_frame(header, payload).await {
497 tracing::error!("H2Driver frame error: {:?}", e);
498 return Err(e);
501 }
502 }
503 Err(e) => {
504 tracing::error!("H2Driver read error: {:?}", e);
506 return Err(e);
507 }
508 }
509 }
510
511 _ = async {
512 if let Some(delay) = keepalive_delay {
513 tokio::time::sleep(delay).await;
514 } else {
515 std::future::pending::<()>().await;
516 }
517 } => {
518 let ping = self.connection.send_ping().await?;
519 self.pending_ping = Some((ping, Instant::now()));
520 }
521
522 _ = async {
523 if retry_streaming_backpressure {
524 tokio::time::sleep(Duration::from_millis(1)).await;
525 } else {
526 std::future::pending::<()>().await;
527 }
528 } => {
529 if retry_streaming_backpressure {
530 self.backpressure_stall_count
531 .fetch_add(1, Ordering::Relaxed);
532 tracing::debug!("H2 driver streaming backpressure stall");
533 }
534 }
535
536 _ = self.body_progress_notify.notified() => {}
537 }
538 }
539 Ok(())
540 }
541
542 fn drain_inline_registrations(&mut self) {
548 while let Ok(reg) = self.inline_register_rx.try_recv() {
549 self.register_inline_stream(reg);
550 }
551 }
552
553 fn register_inline_stream(&mut self, reg: InlineRegistration) {
554 self.streams.insert(
555 reg.stream_id,
556 DriverStreamState::streaming_inline(reg.headers_tx, reg.body_shared, reg.recv_window),
557 );
558 }
559
560 fn refresh_inline_eligibility(&self) {
566 let eligible = self.tunnels.is_empty() && !self.goaway_received.load(Ordering::Relaxed);
567 self.inline_eligible.store(eligible, Ordering::Relaxed);
568 }
569
570 fn note_stream_removed(state: &DriverStreamState, inline_active: &Arc<AtomicUsize>) {
574 if state.inline {
575 inline_active.fetch_sub(1, Ordering::AcqRel);
576 }
577 }
578
579 fn store_stream_recv_window(&mut self, stream_id: u32, recv_window: i32) {
580 if let Some(stream) = self.streams.get_mut(&stream_id) {
581 stream.recv_window = recv_window;
582 }
583 }
584
585 fn single_stream_fast_path_target(&self) -> Option<u32> {
595 if self.streams.len() != 1
596 || !self.tunnels.is_empty()
597 || !self.pending_requests.is_empty()
598 || self.pending_ping.is_some()
599 || !self.command_rx.is_empty()
600 || !self.inline_register_rx.is_empty()
601 {
602 return None;
603 }
604
605 let (stream_id, stream) = self.streams.iter().next()?;
606 if stream.streaming_body.is_none()
607 || !stream.pending_streaming_body.is_empty()
608 || stream.body_offset < stream.pending_body.len()
609 || stream.request_stream.is_some()
610 {
611 return None;
612 }
613
614 Some(*stream_id)
615 }
616
617 async fn run_single_stream_streaming_fast_path(&mut self, stream_id: u32) -> Result<bool> {
625 let (body_shared, mut recv_window) = match self.streams.get(&stream_id) {
628 Some(stream) => match stream.streaming_body.as_ref() {
629 Some(shared) => (shared.clone(), stream.recv_window),
630 None => return Ok(false),
631 },
632 None => return Ok(false),
633 };
634
635 let mut processed_any = false;
636 let mut frames_since_queue_check = 0usize;
637 let mut queued_data_frames_since_yield = 0usize;
638
639 loop {
640 if frames_since_queue_check >= FAST_PATH_COMMAND_CHECK_INTERVAL {
641 frames_since_queue_check = 0;
642 match self.command_rx.try_recv() {
643 Ok(cmd) => {
644 self.store_stream_recv_window(stream_id, recv_window);
645 self.pending_requests.push_back(cmd);
646 return Ok(true);
647 }
648 Err(mpsc::error::TryRecvError::Empty) => {}
649 Err(mpsc::error::TryRecvError::Disconnected) => {
650 self.store_stream_recv_window(stream_id, recv_window);
651 return Ok(processed_any);
652 }
653 }
654
655 if let Ok(reg) = self.inline_register_rx.try_recv() {
656 self.store_stream_recv_window(stream_id, recv_window);
657 self.register_inline_stream(reg);
658 return Ok(true);
659 }
660
661 if body_shared.is_closed() {
662 self.cancel_stream_for_dropped_receiver(stream_id).await;
663 return Ok(true);
664 }
665 }
666
667 let (header, payload) = match self.connection.read_next_frame().await {
668 Ok(frame) => frame,
669 Err(e) => {
670 tracing::error!("H2Driver read error (fast path): {:?}", e);
671 return Err(e);
672 }
673 };
674 processed_any = true;
675 frames_since_queue_check += 1;
676
677 if header.stream_id == stream_id && header.frame_type == FrameType::Data {
678 let end_stream = (header.flags & flags::END_STREAM) != 0;
679 let data = if (header.flags & flags::PADDED) == 0 {
680 payload
681 } else {
682 self.connection
683 .parse_inbound_data_payload(stream_id, header.flags, payload)?
684 };
685 let data_len = data.len();
686 let mut should_yield_for_body_queue = false;
687 if let Some(increment) = self
688 .connection
689 .apply_conn_inbound_flow_control_delta(data_len)
690 {
691 self.connection
692 .send_connection_window_update(increment)
693 .await?;
694 }
695 if data_len > 0 {
696 recv_window -= data_len as i32;
697 }
698 let push = body_shared.push_data(data, end_stream);
699 match push {
700 H2BodyDataPush::Accepted { queued_len } => {
701 if queued_len > 1 {
702 queued_data_frames_since_yield += 1;
703 should_yield_for_body_queue = queued_data_frames_since_yield
704 >= FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET;
705 if should_yield_for_body_queue {
706 queued_data_frames_since_yield = 0;
707 }
708 } else {
709 queued_data_frames_since_yield = 0;
710 }
711 }
712 H2BodyDataPush::Full(data) => {
713 if let Some(stream) = self.streams.get_mut(&stream_id) {
714 stream.recv_window = recv_window;
715 stream.pending_streaming_body.push_back(Ok(data));
716 if end_stream {
717 stream.pending_streaming_end = true;
718 }
719 }
720 return Ok(true);
721 }
722 H2BodyDataPush::Closed => {
723 self.cancel_stream_for_dropped_receiver(stream_id).await;
724 return Ok(true);
725 }
726 }
727 if should_yield_for_body_queue {
728 tokio::task::yield_now().await;
729 if body_shared.is_closed() {
730 self.cancel_stream_for_dropped_receiver(stream_id).await;
731 return Ok(true);
732 }
733 }
734
735 if end_stream {
736 if data_len == 0 {
737 body_shared.finish();
738 }
739 tokio::task::yield_now().await;
740 self.complete_stream(stream_id);
741 return Ok(true);
742 }
743 } else {
744 self.store_stream_recv_window(stream_id, recv_window);
745 if let Err(e) = self.handle_frame(header, payload).await {
746 tracing::error!("H2Driver frame error (fast path): {:?}", e);
747 return Err(e);
748 }
749 if self.single_stream_fast_path_target() != Some(stream_id) {
750 return Ok(true);
751 }
752 }
753 }
754 }
755
756 fn check_keepalive_timeout(&self) -> Result<()> {
757 if let Some((_, sent_at)) = self.pending_ping {
758 if sent_at.elapsed() >= self.config.keep_alive_timeout {
759 return Err(Error::HttpProtocol(
760 "HTTP/2 keepalive ping timed out".into(),
761 ));
762 }
763 }
764 Ok(())
765 }
766
767 fn keepalive_delay(&self) -> Option<Duration> {
768 let interval = self.config.keep_alive_interval?;
769 if self.pending_ping.is_some() {
770 return None;
771 }
772 if !self.config.keep_alive_while_idle && self.active_stream_count() == 0 {
773 return None;
774 }
775 Some(interval)
776 }
777
778 fn has_pending_streaming_body(&self) -> bool {
779 self.streams.values().any(|stream| {
780 stream.streaming_body.is_some()
781 && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
782 })
783 }
784
785 fn has_request_body_work(&self) -> bool {
786 self.streams
787 .values()
788 .any(|stream| stream.request_stream.is_some())
789 }
790
791 async fn handle_send_request(&mut self, cmd: DriverCommand) -> Result<()> {
793 if !self.has_available_stream_slot() {
794 self.pending_requests.push_back(cmd);
796 } else {
797 self.send_request_internal(cmd).await?;
799 }
800 Ok(())
801 }
802
803 async fn handle_send_streaming_request(&mut self, cmd: DriverCommand) -> Result<()> {
804 if !self.has_available_stream_slot() {
805 self.pending_requests.push_back(cmd);
806 } else {
807 self.send_streaming_request_internal(cmd).await?;
808 }
809 Ok(())
810 }
811
812 async fn process_pending_requests(&mut self) -> Result<()> {
814 while self.has_available_stream_slot() {
815 if let Some(cmd) = self.pending_requests.pop_front() {
816 match cmd {
817 DriverCommand::SendRequest { .. } => {
818 self.send_request_internal(cmd).await?;
819 }
820 DriverCommand::OpenWebSocketTunnel {
821 uri,
822 headers,
823 response_tx,
824 } => {
825 self.open_websocket_tunnel_internal(uri, headers, response_tx)
826 .await?;
827 }
828 DriverCommand::SendStreamingRequest { .. } => {
829 self.send_streaming_request_internal(cmd).await?;
830 }
831 DriverCommand::SendTunnelData {
832 stream_id,
833 outbound,
834 } => {
835 self.queue_tunnel_outbound(stream_id, outbound).await?;
836 }
837 }
838 } else {
839 break;
840 }
841 }
842 Ok(())
843 }
844
845 fn active_stream_count(&self) -> usize {
846 self.streams.len() + self.tunnels.len()
847 }
848
849 fn has_available_stream_slot(&self) -> bool {
850 let max_streams = self.config.effective_max_concurrent_streams(
851 self.connection.peer_settings().max_concurrent_streams,
852 );
853 self.active_stream_count() < max_streams
854 }
855
856 async fn send_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
858 if let DriverCommand::SendRequest {
859 method,
860 uri,
861 headers,
862 body,
863 response_tx,
864 } = cmd
865 {
866 let body_bytes = body.unwrap_or_default();
867 let has_body = !body_bytes.is_empty();
868 let end_stream = !has_body;
869
870 let initial_recv_window = self.connection.local_initial_window_size() as i32;
871 match self
872 .connection
873 .send_headers_raw(&method, &uri, &headers, end_stream)
874 .await
875 {
876 Ok(stream_id) => {
877 self.streams.insert(
878 stream_id,
879 DriverStreamState::new(response_tx, body_bytes, initial_recv_window),
880 );
881
882 self.flush_pending_data().await?;
883 }
884 Err(e) => {
885 if response_tx.send(Err(e)).is_err() {
886 tracing::debug!("Response channel closed while sending error");
887 }
888 }
889 }
890 }
891 Ok(())
892 }
893
894 async fn send_streaming_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
895 if let DriverCommand::SendStreamingRequest {
896 method,
897 uri,
898 headers,
899 body,
900 body_shared,
901 headers_tx,
902 } = cmd
903 {
904 let (body_bytes, request_stream, end_stream) = match body {
905 RequestBody::Empty => (Bytes::new(), None, true),
906 RequestBody::Bytes(bytes) => {
907 let end_stream = bytes.is_empty();
908 (bytes, None, end_stream)
909 }
910 RequestBody::Text(text) => {
911 let bytes = Bytes::from(text.into_bytes());
912 let end_stream = bytes.is_empty();
913 (bytes, None, end_stream)
914 }
915 RequestBody::Json(bytes) => {
916 let bytes = Bytes::from(bytes);
917 let end_stream = bytes.is_empty();
918 (bytes, None, end_stream)
919 }
920 RequestBody::Form(text) => {
921 let bytes = Bytes::from(text.into_bytes());
922 let end_stream = bytes.is_empty();
923 (bytes, None, end_stream)
924 }
925 RequestBody::Stream {
926 stream,
927 content_length,
928 } => (
929 Bytes::new(),
930 Some(DriverStreamingRequestBody::new(stream, content_length)),
931 false,
932 ),
933 };
934
935 let initial_recv_window = self.connection.local_initial_window_size() as i32;
936 match self
937 .connection
938 .send_headers_raw(&method, &uri, &headers, end_stream)
939 .await
940 {
941 Ok(stream_id) => {
942 self.streams.insert(
943 stream_id,
944 DriverStreamState::streaming(
945 headers_tx,
946 body_shared,
947 body_bytes,
948 request_stream,
949 initial_recv_window,
950 ),
951 );
952 self.flush_pending_data().await?;
953 }
954 Err(error) => {
955 let _ = headers_tx.send(Err(error));
956 }
957 }
958 }
959 Ok(())
960 }
961
962 async fn handle_open_websocket_tunnel(
963 &mut self,
964 uri: Uri,
965 headers: Vec<(String, String)>,
966 response_tx: oneshot::Sender<Result<H2Tunnel>>,
967 ) -> Result<()> {
968 if !self.has_available_stream_slot() {
969 self.pending_requests
970 .push_back(DriverCommand::OpenWebSocketTunnel {
971 uri,
972 headers,
973 response_tx,
974 });
975 return Ok(());
976 }
977
978 self.open_websocket_tunnel_internal(uri, headers, response_tx)
979 .await
980 }
981
982 async fn open_websocket_tunnel_internal(
983 &mut self,
984 uri: Uri,
985 headers: Vec<(String, String)>,
986 response_tx: oneshot::Sender<Result<H2Tunnel>>,
987 ) -> Result<()> {
988 let headers = Headers::from(headers);
989 match self
990 .connection
991 .open_extended_connect_websocket_with_end_stream(&uri, &headers)
992 .await
993 {
994 Ok((stream_id, end_stream)) => {
995 let (outbound_tx, outbound_rx) = mpsc::channel(32);
996 let (inbound_tx, inbound_rx) = mpsc::channel(32);
997 let initial_window_size = self.connection.local_initial_window_size();
998 let initial_recv_window = initial_window_size as i32;
999 let credit =
1000 H2TunnelCredit::new(self.body_progress_notify.clone(), initial_window_size);
1001 if end_stream {
1002 let _ = inbound_tx.send(Ok(H2TunnelEvent::EndStream)).await;
1003 self.connection.remove_stream(stream_id);
1004 } else {
1005 let command_tx = self.command_tx.clone();
1006 tokio::spawn(async move {
1007 let mut outbound_rx = outbound_rx;
1008 while let Some(outbound) = outbound_rx.recv().await {
1009 if command_tx
1010 .send(DriverCommand::SendTunnelData {
1011 stream_id,
1012 outbound,
1013 })
1014 .await
1015 .is_err()
1016 {
1017 break;
1018 }
1019 }
1020 });
1021 self.tunnels.insert(
1022 stream_id,
1023 DriverTunnelState {
1024 inbound_tx,
1025 pending_inbound: VecDeque::new(),
1026 pending_outbound: VecDeque::new(),
1027 recv_window: initial_recv_window,
1028 pending_recv_window_update: 0,
1029 credit: credit.clone(),
1030 },
1031 );
1032 }
1033
1034 if response_tx
1035 .send(Ok(H2Tunnel::new_with_credit(
1036 outbound_tx,
1037 inbound_rx,
1038 credit,
1039 )))
1040 .is_err()
1041 {
1042 tracing::debug!("Tunnel response channel closed after open");
1043 self.tunnels.remove(&stream_id);
1044 }
1045 }
1046 Err(e) => {
1047 if response_tx.send(Err(e)).is_err() {
1048 tracing::debug!("Tunnel response channel closed while sending open error");
1049 }
1050 }
1051 }
1052 Ok(())
1053 }
1054
1055 async fn queue_tunnel_outbound(
1056 &mut self,
1057 stream_id: u32,
1058 outbound: H2TunnelOutbound,
1059 ) -> Result<()> {
1060 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1061 tunnel.pending_outbound.push_back(outbound);
1062 self.flush_tunnel_data().await?;
1063 }
1064
1065 Ok(())
1066 }
1067
1068 async fn flush_pending_data(&mut self) -> Result<()> {
1070 if !self.streams.values().any(|stream| {
1071 stream.body_offset < stream.pending_body.len() || stream.request_stream.is_some()
1072 }) {
1073 return Ok(());
1074 }
1075
1076 let stream_ids: Vec<u32> = self.streams.keys().cloned().collect();
1078
1079 for stream_id in stream_ids {
1080 loop {
1082 let (has_data, offset) = if let Some(stream) = self.streams.get(&stream_id) {
1084 (
1085 stream.body_offset < stream.pending_body.len(),
1086 stream.body_offset,
1087 )
1088 } else {
1089 (false, 0)
1090 };
1091
1092 if !has_data {
1093 break;
1094 }
1095
1096 let pending_body = {
1099 let s = self.streams.get(&stream_id).unwrap();
1100 s.pending_body.clone()
1101 };
1102
1103 let remaining = &pending_body[offset..];
1104 let is_last_chunk = true;
1105
1106 let sent = self
1108 .connection
1109 .send_data(stream_id, remaining, is_last_chunk)
1110 .await?;
1111
1112 if sent > 0 {
1113 if let Some(stream) = self.streams.get_mut(&stream_id) {
1114 stream.body_offset += sent;
1115 }
1116 } else {
1118 break;
1120 }
1121 }
1122
1123 self.flush_streaming_request_body(stream_id).await?;
1124 }
1125 Ok(())
1126 }
1127
1128 async fn flush_streaming_request_body(&mut self, stream_id: u32) -> Result<()> {
1129 loop {
1130 if self
1131 .streams
1132 .get(&stream_id)
1133 .and_then(|stream| stream.request_stream.as_ref())
1134 .is_none()
1135 {
1136 return Ok(());
1137 }
1138
1139 let available = self.connection.available_send_window(stream_id).await?;
1140 if available <= 0 {
1141 return Ok(());
1142 }
1143
1144 let batch_limit = (available as usize).min(self.connection.max_data_frame_size());
1145 if batch_limit == 0 {
1146 return Ok(());
1147 }
1148
1149 let mut batch = BytesMut::with_capacity(batch_limit);
1150 let mut end_stream = false;
1151 let mut remove_request_stream_after_send = false;
1152
1153 while batch.len() < batch_limit {
1154 let current = self
1155 .streams
1156 .get(&stream_id)
1157 .and_then(|stream| stream.request_stream.as_ref())
1158 .and_then(|body| {
1159 body.current_chunk
1160 .as_ref()
1161 .map(|chunk| (chunk.clone(), body.current_offset))
1162 });
1163
1164 if let Some((chunk, offset)) = current {
1165 let remaining = &chunk[offset..];
1166 let take = remaining.len().min(batch_limit - batch.len());
1167 batch.extend_from_slice(&remaining[..take]);
1168
1169 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1170 let body = stream
1171 .request_stream
1172 .as_mut()
1173 .expect("request stream exists");
1174 body.current_offset += take;
1175 body.sent += take as u64;
1176 if body.current_offset >= chunk.len() {
1177 body.current_chunk = None;
1178 body.current_offset = 0;
1179 }
1180 continue;
1181 }
1182
1183 let poll_result = {
1184 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1185 let body = stream
1186 .request_stream
1187 .as_mut()
1188 .expect("request stream exists");
1189 if body.finished {
1190 Poll::Ready(None)
1191 } else {
1192 let mut cx = std::task::Context::from_waker(&self.request_body_waker);
1193 body.stream.as_mut().poll_next(&mut cx)
1194 }
1195 };
1196
1197 match poll_result {
1198 Poll::Pending => break,
1199 Poll::Ready(Some(Ok(chunk))) => {
1200 if chunk.is_empty() {
1201 continue;
1202 }
1203 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1204 let body = stream
1205 .request_stream
1206 .as_mut()
1207 .expect("request stream exists");
1208 body.current_chunk = Some(chunk);
1209 body.current_offset = 0;
1210 }
1211 Poll::Ready(Some(Err(error))) => {
1212 self.fail_stream(
1213 stream_id,
1214 format!("request body stream error: {}", error),
1215 )
1216 .await;
1217 return Ok(());
1218 }
1219 Poll::Ready(None) => {
1220 let (valid_len, sent, expected, already_sent_end) = {
1221 let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1222 let body = stream
1223 .request_stream
1224 .as_mut()
1225 .expect("request stream exists");
1226 body.finished = true;
1227 (
1228 body.content_length
1229 .map(|expected| expected == body.sent)
1230 .unwrap_or(true),
1231 body.sent,
1232 body.content_length,
1233 body.end_stream_sent,
1234 )
1235 };
1236 if !valid_len {
1237 self.fail_stream(
1238 stream_id,
1239 format!(
1240 "sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
1241 sent,
1242 expected.unwrap_or_default()
1243 ),
1244 )
1245 .await;
1246 return Ok(());
1247 }
1248 if already_sent_end {
1249 return Ok(());
1250 }
1251 end_stream = true;
1252 remove_request_stream_after_send = true;
1253 break;
1254 }
1255 }
1256 }
1257
1258 if batch.is_empty() {
1259 if end_stream {
1260 let sent = self.connection.send_data(stream_id, &[], true).await?;
1261 debug_assert_eq!(sent, 0);
1262 if let Some(stream) = self.streams.get_mut(&stream_id) {
1263 if let Some(body) = stream.request_stream.as_mut() {
1264 body.end_stream_sent = true;
1265 }
1266 stream.request_stream = None;
1267 }
1268 }
1269 return Ok(());
1270 }
1271
1272 let sent = self
1273 .connection
1274 .send_data(stream_id, &batch, end_stream)
1275 .await?;
1276 if sent == 0 {
1277 return Ok(());
1278 }
1279 if sent != batch.len() {
1280 return Err(Error::HttpProtocol(
1281 "short DATA write after preflighted flow-control window".into(),
1282 ));
1283 }
1284
1285 if remove_request_stream_after_send {
1286 if let Some(stream) = self.streams.get_mut(&stream_id) {
1287 if let Some(body) = stream.request_stream.as_mut() {
1288 body.end_stream_sent = true;
1289 }
1290 stream.request_stream = None;
1291 }
1292 return Ok(());
1293 }
1294 }
1295 }
1296
1297 async fn flush_tunnel_data(&mut self) -> Result<()> {
1298 if self.tunnels.is_empty() {
1299 return Ok(());
1300 }
1301 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1302
1303 for stream_id in stream_ids {
1304 loop {
1305 let outbound = match self
1306 .tunnels
1307 .get_mut(&stream_id)
1308 .and_then(|tunnel| tunnel.pending_outbound.pop_front())
1309 {
1310 Some(outbound) => outbound,
1311 None => break,
1312 };
1313
1314 let sent = self
1315 .connection
1316 .send_data(stream_id, &outbound.bytes, outbound.end_stream)
1317 .await?;
1318
1319 if outbound.bytes.is_empty() {
1320 continue;
1321 }
1322
1323 if sent == 0 {
1324 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1325 tunnel.pending_outbound.push_front(outbound);
1326 }
1327 break;
1328 }
1329
1330 if sent < outbound.bytes.len() {
1331 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1332 tunnel.pending_outbound.push_front(H2TunnelOutbound {
1333 bytes: outbound.bytes.slice(sent..),
1334 end_stream: outbound.end_stream,
1335 });
1336 }
1337 break;
1338 }
1339 }
1340 }
1341
1342 Ok(())
1343 }
1344
1345 async fn flush_tunnel_inbound(&mut self) -> Result<()> {
1346 if self.tunnels.is_empty() {
1347 return Ok(());
1348 }
1349
1350 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1351 for stream_id in stream_ids {
1352 let status = self
1353 .tunnels
1354 .get_mut(&stream_id)
1355 .map(DriverTunnelState::flush_inbound)
1356 .unwrap_or(TunnelInboundStatus::Open);
1357 self.apply_tunnel_inbound_status(stream_id, status).await?;
1358 }
1359
1360 Ok(())
1361 }
1362
1363 async fn apply_tunnel_inbound_status(
1364 &mut self,
1365 stream_id: u32,
1366 status: TunnelInboundStatus,
1367 ) -> Result<()> {
1368 match status {
1369 TunnelInboundStatus::Open | TunnelInboundStatus::Blocked => {}
1370 TunnelInboundStatus::Remove => {
1371 self.tunnels.remove(&stream_id);
1372 self.connection.remove_stream(stream_id);
1373 self.process_pending_requests().await?;
1374 }
1375 TunnelInboundStatus::Closed => {
1376 self.tunnels.remove(&stream_id);
1377 self.connection.remove_stream(stream_id);
1378 if let Err(e) = self
1379 .connection
1380 .send_rst_stream(stream_id, ErrorCode::Cancel)
1381 .await
1382 {
1383 tracing::warn!("Failed to send RST_STREAM for dropped tunnel: {:?}", e);
1384 }
1385 self.process_pending_requests().await?;
1386 }
1387 }
1388 Ok(())
1389 }
1390
1391 async fn apply_released_body_credits(&mut self) -> Result<()> {
1392 let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1393 for stream_id in stream_ids {
1394 let (released, closed) = self
1395 .streams
1396 .get(&stream_id)
1397 .and_then(|stream| stream.streaming_body.as_ref())
1398 .map(|body| (body.take_released_recv_bytes(), body.is_closed()))
1399 .unwrap_or((0, false));
1400
1401 if closed {
1402 self.cancel_stream_for_dropped_receiver(stream_id).await;
1403 continue;
1404 }
1405
1406 if released == 0 {
1407 continue;
1408 }
1409
1410 let refresh_threshold = self.connection.flow_control_refresh_threshold();
1411 let window_update_step = self.connection.stream_window_update_step();
1412 let increment = self.streams.get_mut(&stream_id).and_then(|stream| {
1413 stream.pending_recv_window_update =
1414 stream.pending_recv_window_update.saturating_add(released);
1415 let should_update = stream.pending_recv_window_update >= window_update_step
1416 || stream.recv_window < refresh_threshold;
1417 if should_update {
1418 let increment = stream.pending_recv_window_update.min(u32::MAX as usize);
1419 stream.pending_recv_window_update -= increment;
1420 stream.recv_window = stream.recv_window.saturating_add(increment as i32);
1421 Some(increment as u32)
1422 } else {
1423 None
1424 }
1425 });
1426
1427 if let Some(increment) = increment {
1428 self.connection
1429 .send_stream_window_update(stream_id, increment)
1430 .await?;
1431 }
1432 }
1433
1434 Ok(())
1435 }
1436
1437 async fn apply_released_tunnel_credits(&mut self) -> Result<()> {
1438 let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1439 for stream_id in stream_ids {
1440 let (released, closed) = self
1441 .tunnels
1442 .get(&stream_id)
1443 .map(|tunnel| {
1444 (
1445 tunnel.credit.take_released_recv_bytes(),
1446 tunnel.inbound_tx.is_closed(),
1447 )
1448 })
1449 .unwrap_or((0, false));
1450
1451 if closed {
1452 self.apply_tunnel_inbound_status(stream_id, TunnelInboundStatus::Closed)
1453 .await?;
1454 continue;
1455 }
1456
1457 if released == 0 {
1458 continue;
1459 }
1460
1461 let refresh_threshold = self.connection.flow_control_refresh_threshold();
1462 let window_update_step = self.connection.stream_window_update_step();
1463 let increment = self.tunnels.get_mut(&stream_id).and_then(|tunnel| {
1464 tunnel.pending_recv_window_update =
1465 tunnel.pending_recv_window_update.saturating_add(released);
1466 let should_update = tunnel.pending_recv_window_update >= window_update_step
1467 || tunnel.recv_window < refresh_threshold;
1468 if should_update {
1469 let increment = tunnel.pending_recv_window_update.min(u32::MAX as usize);
1470 tunnel.pending_recv_window_update -= increment;
1471 tunnel.recv_window = tunnel.recv_window.saturating_add(increment as i32);
1472 Some(increment as u32)
1473 } else {
1474 None
1475 }
1476 });
1477
1478 if let Some(increment) = increment {
1479 self.connection
1480 .send_stream_window_update(stream_id, increment)
1481 .await?;
1482 }
1483 }
1484
1485 Ok(())
1486 }
1487
1488 async fn fail_stream(&mut self, stream_id: u32, message: String) {
1489 self.connection.remove_stream(stream_id);
1490 if let Some(mut stream) = self.streams.remove(&stream_id) {
1491 Self::note_stream_removed(&stream, &self.inline_active);
1492 if let Some(tx) = stream.response_tx.take() {
1493 let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1494 }
1495 if let Some(tx) = stream.streaming_headers_tx.take() {
1496 let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1497 }
1498 if let Some(body) = stream.streaming_body.take() {
1499 let _ = body.fail(Error::HttpProtocol(message));
1500 }
1501 }
1502 }
1503
1504 async fn cancel_stream_for_dropped_receiver(&mut self, stream_id: u32) {
1505 if let Some(state) = self.streams.remove(&stream_id) {
1506 Self::note_stream_removed(&state, &self.inline_active);
1507 }
1508 self.connection.remove_stream(stream_id);
1509 if let Err(e) = self
1510 .connection
1511 .send_rst_stream(stream_id, ErrorCode::Cancel)
1512 .await
1513 {
1514 tracing::warn!("Failed to send RST_STREAM for dropped receiver: {:?}", e);
1515 }
1516 }
1517
1518 async fn flush_pending_streaming_bodies(&mut self) -> Result<()> {
1519 if !self.has_pending_streaming_body() {
1520 return Ok(());
1521 }
1522 let stream_ids: Vec<u32> = self
1523 .streams
1524 .iter()
1525 .filter_map(|(stream_id, stream)| {
1526 if stream.streaming_body.is_some()
1527 && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
1528 {
1529 Some(*stream_id)
1530 } else {
1531 None
1532 }
1533 })
1534 .collect();
1535
1536 for stream_id in stream_ids {
1537 let mut should_cancel = false;
1538 let mut should_complete = false;
1539
1540 if let Some(stream) = self.streams.get_mut(&stream_id) {
1541 let Some(body) = stream.streaming_body.as_ref() else {
1542 continue;
1543 };
1544
1545 if body.is_closed() {
1546 should_cancel = true;
1547 } else {
1548 while let Some(item) = stream.pending_streaming_body.pop_front() {
1549 match body.push(item) {
1550 H2BodyPush::Accepted => {}
1551 H2BodyPush::Full(item) => {
1552 stream.pending_streaming_body.push_front(item);
1553 break;
1554 }
1555 H2BodyPush::Closed => {
1556 should_cancel = true;
1557 break;
1558 }
1559 }
1560 }
1561
1562 should_complete =
1563 stream.pending_streaming_end && stream.pending_streaming_body.is_empty();
1564 }
1565 }
1566
1567 if should_cancel {
1568 self.cancel_stream_for_dropped_receiver(stream_id).await;
1569 } else if should_complete {
1570 if let Some(body) = self
1571 .streams
1572 .get(&stream_id)
1573 .and_then(|stream| stream.streaming_body.as_ref())
1574 {
1575 body.finish();
1576 }
1577 self.complete_stream(stream_id);
1578 }
1579 }
1580
1581 Ok(())
1582 }
1583
1584 async fn handle_frame(&mut self, header: FrameHeader, mut payload: Bytes) -> Result<()> {
1586 self.drain_inline_registrations();
1589
1590 if header.stream_id != 0 {
1593 if let Some(stream) = self.streams.get(&header.stream_id) {
1594 if let Some(ref body) = stream.streaming_body {
1595 if body.is_closed() {
1596 let stream_id = header.stream_id;
1597 self.cancel_stream_for_dropped_receiver(stream_id).await;
1598 return Ok(());
1599 }
1600 }
1601 }
1602 }
1603
1604 if matches!(
1606 header.frame_type,
1607 FrameType::Settings
1608 | FrameType::WindowUpdate
1609 | FrameType::Ping
1610 | FrameType::GoAway
1611 | FrameType::RstStream
1612 | FrameType::PushPromise
1613 ) {
1614 match self
1615 .connection
1616 .handle_control_frame(&header, payload.clone())
1617 .await?
1618 {
1619 ControlAction::RstStream(sid, code) => {
1620 if let Some(tunnel) = self.tunnels.remove(&sid) {
1621 let _ = tunnel
1622 .inbound_tx
1623 .send(Ok(H2TunnelEvent::Reset(format!("{:?}", code))))
1624 .await;
1625 }
1626 self.fail_stream(sid, format!("Stream reset by peer: {:?}", code))
1628 .await;
1629 self.process_pending_requests().await?;
1631 return Ok(());
1632 }
1633 ControlAction::GoAway(last_sid) => {
1634 self.goaway_received
1635 .store(true, std::sync::atomic::Ordering::Relaxed);
1636 let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1637 for sid in stream_ids {
1638 if let Some(stream) = self.streams.get(&sid) {
1639 if stream.streaming_body.is_some()
1640 && stream
1641 .streaming_body
1642 .as_ref()
1643 .is_some_and(|body| body.is_slot_available())
1644 {
1645 if let Some(body) = stream.streaming_body.as_ref() {
1646 body.finish();
1647 }
1648 }
1649 }
1650 }
1651 let tunnel_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1652 for sid in tunnel_ids {
1653 if sid > last_sid {
1654 if let Some(tunnel) = self.tunnels.remove(&sid) {
1655 let _ = tunnel
1656 .inbound_tx
1657 .send(Ok(H2TunnelEvent::GoAway {
1658 last_stream_id: last_sid,
1659 }))
1660 .await;
1661 }
1662 }
1663 }
1664 let sids: Vec<u32> = self.streams.keys().cloned().collect();
1666 for sid in sids {
1667 if sid > last_sid {
1668 self.fail_stream(sid, "GOAWAY received".into()).await;
1669 }
1670 }
1671 return Ok(());
1674 }
1675 ControlAction::RefusePush(_stream_id, promised_id) => {
1676 if let Err(e) = self
1679 .connection
1680 .send_rst_stream(promised_id, ErrorCode::RefusedStream)
1681 .await
1682 {
1683 tracing::warn!(
1684 "Failed to send RST_STREAM for refused push promise: {:?}",
1685 e
1686 );
1687 }
1688 }
1689 ControlAction::PingAck(data) => {
1690 if self
1691 .pending_ping
1692 .is_some_and(|(pending_data, _)| pending_data == data)
1693 {
1694 self.pending_ping = None;
1695 }
1696 return Ok(());
1697 }
1698 ControlAction::None => {
1699 }
1701 }
1702 }
1703
1704 match header.frame_type {
1706 FrameType::Headers => {
1707 let stream_id = header.stream_id;
1708
1709 if (header.flags & flags::END_HEADERS) == 0 {
1713 let mut block = BytesMut::from(payload);
1717 loop {
1718 let (next_header, next_payload) = self.connection.read_next_frame().await?;
1719 if next_header.frame_type != FrameType::Continuation {
1720 return Err(Error::HttpProtocol("Expected CONTINUATION frame".into()));
1721 }
1722 if next_header.stream_id != stream_id {
1723 return Err(Error::HttpProtocol(
1724 "CONTINUATION frame stream ID mismatch".into(),
1725 ));
1726 }
1727 block.extend_from_slice(&next_payload);
1728 if (next_header.flags & flags::END_HEADERS) != 0 {
1729 break;
1730 }
1731 }
1732 payload = block.freeze();
1733 }
1734
1735 let decoded = self.connection.decode_header_block(payload)?;
1736
1737 let mut status = 0u16;
1739 let mut regular_headers = Vec::new();
1740
1741 for (name, value) in decoded {
1742 if name == ":status" {
1743 status = value.parse().unwrap_or(0);
1744 } else if !name.starts_with(':') {
1745 regular_headers.push((name, value));
1746 }
1747 }
1748
1749 if let Some(stream) = self.streams.get_mut(&stream_id) {
1750 if status >= 200 {
1751 let header_end_stream = (header.flags & flags::END_STREAM) != 0
1752 || self.goaway_received.load(Ordering::Relaxed);
1753 if let Some(tx) = stream.streaming_headers_tx.take() {
1754 let _ = tx.send(Ok((status, regular_headers)));
1755 if header_end_stream {
1756 if let Some(body) = stream.streaming_body.as_ref() {
1757 body.finish();
1758 }
1759 }
1760 } else {
1761 stream.status = Some(status);
1762 stream.headers = regular_headers;
1763 }
1764 } else if status > 0 {
1765 tracing::debug!("H2Driver: Ignoring informational status {}", status);
1767 } else {
1768 tracing::debug!("H2Driver: Received trailers for stream {}", stream_id);
1770 if (header.flags & flags::END_STREAM) != 0 {
1771 if let Some(body) = stream.streaming_body.as_ref() {
1772 body.finish();
1773 }
1774 }
1775 }
1776
1777 if (header.flags & flags::END_STREAM) != 0 {
1778 self.complete_stream(stream_id);
1779 }
1780 }
1781 }
1782 FrameType::Data => {
1783 let stream_id = header.stream_id;
1784 let end_stream = (header.flags & flags::END_STREAM) != 0;
1785
1786 let data =
1787 self.connection
1788 .parse_inbound_data_payload(stream_id, header.flags, payload)?;
1789 let data_len = data.len();
1790 if let Some(increment) = self
1791 .connection
1792 .apply_conn_inbound_flow_control_delta(data_len)
1793 {
1794 self.connection
1795 .send_connection_window_update(increment)
1796 .await?;
1797 }
1798
1799 if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1800 if data_len > 0 {
1801 tunnel.recv_window -= data_len as i32;
1802 }
1803
1804 let mut status = TunnelInboundStatus::Open;
1805 if !data.is_empty() {
1806 status = tunnel.push_inbound(H2TunnelEvent::Data(data));
1807 }
1808 if status == TunnelInboundStatus::Open && end_stream {
1809 status = tunnel.push_inbound(H2TunnelEvent::EndStream);
1810 }
1811 self.apply_tunnel_inbound_status(stream_id, status).await?;
1812 return Ok(());
1813 }
1814
1815 let mut should_cancel = false;
1816 let mut should_complete = false;
1817 let mut should_finish_body = false;
1818 let mut should_yield_before_complete = false;
1819
1820 if let Some(stream) = self.streams.get_mut(&stream_id) {
1821 if data_len > 0 {
1822 stream.recv_window -= data_len as i32;
1823 }
1824
1825 if let Some(body) = stream.streaming_body.as_ref() {
1826 if !data.is_empty() {
1827 if stream.pending_streaming_body.is_empty() {
1828 let push = body.push_data(data, end_stream);
1829 match push {
1830 H2BodyDataPush::Accepted { .. } => {}
1831 H2BodyDataPush::Full(data) => {
1832 stream.pending_streaming_body.push_back(Ok(data));
1833 }
1834 H2BodyDataPush::Closed => {
1835 should_cancel = true;
1836 }
1837 }
1838 } else {
1839 stream.pending_streaming_body.push_back(Ok(data));
1840 }
1841 }
1842
1843 if end_stream {
1844 if stream.pending_streaming_body.is_empty() {
1845 should_complete = true;
1846 should_finish_body = data_len == 0;
1847 should_yield_before_complete = true;
1848 } else {
1849 stream.pending_streaming_end = true;
1850 }
1851 }
1852 } else {
1853 stream.body.extend_from_slice(&data);
1854 should_complete = end_stream;
1855 }
1856 }
1857
1858 if should_cancel {
1859 self.cancel_stream_for_dropped_receiver(stream_id).await;
1860 return Ok(());
1861 }
1862
1863 if should_complete {
1864 if should_finish_body {
1865 if let Some(body) = self
1866 .streams
1867 .get(&stream_id)
1868 .and_then(|stream| stream.streaming_body.as_ref())
1869 {
1870 body.finish();
1871 }
1872 }
1873 if should_yield_before_complete {
1874 tokio::task::yield_now().await;
1875 }
1876 self.complete_stream(stream_id);
1877 }
1878 }
1879 FrameType::WindowUpdate => {
1880 self.flush_pending_data().await?;
1884 self.flush_tunnel_data().await?;
1885 }
1886 _ => {} }
1888
1889 Ok(())
1890 }
1891
1892 fn complete_stream(&mut self, stream_id: u32) {
1894 if let Some(mut stream) = self.streams.remove(&stream_id) {
1895 if !stream.inline {
1896 self.connection.remove_stream(stream_id);
1897 }
1898 Self::note_stream_removed(&stream, &self.inline_active);
1899 if let Some(tx) = stream.response_tx.take() {
1900 let response = match stream.status {
1903 Some(status) => Ok(StreamResponse {
1904 status,
1905 headers: stream.headers,
1906 body: stream.body.freeze(),
1907 }),
1908 None => Err(Error::HttpProtocol(format!(
1909 "Stream {} completed without status code",
1910 stream_id
1911 ))),
1912 };
1913 if tx.send(response).is_err() {
1914 tracing::debug!("Response channel closed while completing stream");
1915 }
1916 }
1917 }
1918 }
1921}