Skip to main content

specter/transport/h2/
driver.rs

1//! HTTP/2 connection driver - background task that reads frames and routes them to streams.
2//!
3//! The driver owns the raw H2Connection and continuously reads frames from the socket,
4//! routing them to the appropriate stream channels. This allows multiple requests
5//! to be multiplexed without blocking each other.
6
7use bytes::{Bytes, BytesMut};
8use http::{Method, Uri};
9use std::collections::{HashMap, VecDeque};
10use std::fmt;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::task::{Poll, Wake, Waker};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tokio::sync::oneshot;
17use tokio::sync::Notify;
18use tracing;
19
20pub type StreamingHeadersResult = Result<(u16, Vec<(String, String)>)>;
21
22use crate::error::{Error, Result};
23use crate::headers::Headers;
24use crate::request::{RequestBody, RequestBodyStream};
25use crate::transport::h2::body::{H2BodyDataPush, H2BodyPush, H2BodyShared};
26use crate::transport::h2::connection::{
27    ControlAction, H2Connection as RawH2Connection, StreamResponse,
28};
29use crate::transport::h2::frame::{flags, ErrorCode, FrameHeader, FrameType};
30use crate::transport::h2::tunnel::{H2Tunnel, H2TunnelCredit, H2TunnelEvent, H2TunnelOutbound};
31use crate::transport::h2::H2TransportConfig;
32
33const FAST_PATH_COMMAND_CHECK_INTERVAL: usize = 8;
34const FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET: usize = 2;
35
36/// Command sent from handle to driver
37pub enum DriverCommand {
38    /// Send a request and get response via oneshot
39    /// Driver allocates stream_id
40    SendRequest {
41        method: http::Method,
42        uri: http::Uri,
43        headers: Headers,
44        body: Option<bytes::Bytes>,
45        response_tx: oneshot::Sender<Result<StreamResponse>>,
46    },
47    /// Send a request with a streaming body
48    SendStreamingRequest {
49        method: Method,
50        uri: Uri,
51        headers: Headers,
52        body: RequestBody,
53        body_shared: Arc<H2BodyShared>,
54        headers_tx: oneshot::Sender<StreamingHeadersResult>,
55    },
56    /// Open an RFC 8441 WebSocket tunnel on a pooled HTTP/2 stream.
57    OpenWebSocketTunnel {
58        uri: Uri,
59        headers: Vec<(String, String)>,
60        response_tx: oneshot::Sender<Result<H2Tunnel>>,
61    },
62    /// Queue outbound DATA for an open RFC 8441 tunnel.
63    SendTunnelData {
64        stream_id: u32,
65        outbound: H2TunnelOutbound,
66    },
67}
68
69impl fmt::Debug for DriverCommand {
70    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
71        match self {
72            DriverCommand::SendRequest {
73                method,
74                uri,
75                headers,
76                body,
77                ..
78            } => f
79                .debug_struct("SendRequest")
80                .field("method", method)
81                .field("uri", uri)
82                .field("headers_len", &headers.len())
83                .field("body_len", &body.as_ref().map(Bytes::len))
84                .finish(),
85            DriverCommand::SendStreamingRequest {
86                method,
87                uri,
88                headers,
89                body,
90                ..
91            } => f
92                .debug_struct("SendStreamingRequest")
93                .field("method", method)
94                .field("uri", uri)
95                .field("headers_len", &headers.len())
96                .field("body", body)
97                .finish(),
98            DriverCommand::OpenWebSocketTunnel { uri, headers, .. } => f
99                .debug_struct("OpenWebSocketTunnel")
100                .field("uri", uri)
101                .field("headers_len", &headers.len())
102                .finish(),
103            DriverCommand::SendTunnelData {
104                stream_id,
105                outbound,
106            } => f
107                .debug_struct("SendTunnelData")
108                .field("stream_id", stream_id)
109                .field("outbound", outbound)
110                .finish(),
111        }
112    }
113}
114
115/// Inline-registered streaming stream sent from `H2Handle` to the driver.
116///
117/// The handle has already written HEADERS via the shared write half and
118/// allocated `stream_id`. The driver only needs to register the response
119/// routing channels and the seed `recv_window`.
120pub struct InlineRegistration {
121    pub stream_id: u32,
122    pub headers_tx: oneshot::Sender<StreamingHeadersResult>,
123    pub body_shared: Arc<H2BodyShared>,
124    pub recv_window: i32,
125}
126
127struct NotifyWake(Arc<Notify>);
128
129impl Wake for NotifyWake {
130    fn wake(self: Arc<Self>) {
131        self.0.notify_one();
132    }
133
134    fn wake_by_ref(self: &Arc<Self>) {
135        self.0.notify_one();
136    }
137}
138
139struct DriverStreamingRequestBody {
140    stream: RequestBodyStream,
141    content_length: Option<u64>,
142    current_chunk: Option<Bytes>,
143    current_offset: usize,
144    sent: u64,
145    finished: bool,
146    end_stream_sent: bool,
147}
148
149impl DriverStreamingRequestBody {
150    fn new(stream: RequestBodyStream, content_length: Option<u64>) -> Self {
151        Self {
152            stream,
153            content_length,
154            current_chunk: None,
155            current_offset: 0,
156            sent: 0,
157            finished: false,
158            end_stream_sent: false,
159        }
160    }
161}
162
163/// Per-stream state tracked by driver
164struct DriverStreamState {
165    /// Oneshot sender for response completion
166    response_tx: Option<oneshot::Sender<Result<StreamResponse>>>,
167    /// Oneshot sender for streaming response headers
168    streaming_headers_tx: Option<oneshot::Sender<StreamingHeadersResult>>,
169    /// Streaming response body state shared with the public Body poller.
170    streaming_body: Option<Arc<H2BodyShared>>,
171    /// Accumulated response status
172    status: Option<u16>,
173    /// Accumulated response headers
174    headers: Vec<(String, String)>,
175    /// Accumulated response body
176    body: BytesMut,
177    /// Pending request body to be sent (flow control buffer)
178    pending_body: Bytes,
179    /// Offset of pending body already sent
180    body_offset: usize,
181    /// Streaming request body producer and partially sent chunk.
182    request_stream: Option<DriverStreamingRequestBody>,
183    /// Streaming response chunks waiting for downstream receiver capacity.
184    pending_streaming_body: VecDeque<Result<Bytes>>,
185    /// Whether END_STREAM arrived while streaming body chunks are still pending.
186    pending_streaming_end: bool,
187    /// Driver-owned per-stream inbound flow-control window. Mirrors the value
188    /// the connection's `Stream::recv_window` would have tracked, so the DATA
189    /// hot path only touches `self.streams` for inbound flow accounting.
190    recv_window: i32,
191    /// Stream-level receive credit released by the public body consumer but
192    /// not yet advertised with WINDOW_UPDATE. Batching this avoids one
193    /// WINDOW_UPDATE per DATA chunk while preserving backpressure.
194    pending_recv_window_update: usize,
195    /// Marks streams registered via the inline shared-writer fast path so
196    /// the driver knows to decrement the inline-active counter on stream
197    /// teardown.
198    inline: bool,
199}
200
201impl DriverStreamState {
202    fn new(
203        response_tx: oneshot::Sender<Result<StreamResponse>>,
204        pending_body: Bytes,
205        recv_window: i32,
206    ) -> Self {
207        Self {
208            response_tx: Some(response_tx),
209            streaming_headers_tx: None,
210            streaming_body: None,
211            status: None,
212            headers: Vec::new(),
213            body: BytesMut::new(),
214            pending_body,
215            body_offset: 0,
216            request_stream: None,
217            pending_streaming_body: VecDeque::new(),
218            pending_streaming_end: false,
219            recv_window,
220            pending_recv_window_update: 0,
221            inline: false,
222        }
223    }
224
225    fn streaming(
226        headers_tx: oneshot::Sender<StreamingHeadersResult>,
227        body_shared: Arc<H2BodyShared>,
228        pending_body: Bytes,
229        request_stream: Option<DriverStreamingRequestBody>,
230        recv_window: i32,
231    ) -> Self {
232        Self {
233            response_tx: None,
234            streaming_headers_tx: Some(headers_tx),
235            streaming_body: Some(body_shared),
236            status: None,
237            headers: Vec::new(),
238            body: BytesMut::new(),
239            pending_body,
240            body_offset: 0,
241            request_stream,
242            pending_streaming_body: VecDeque::new(),
243            pending_streaming_end: false,
244            recv_window,
245            pending_recv_window_update: 0,
246            inline: false,
247        }
248    }
249
250    fn streaming_inline(
251        headers_tx: oneshot::Sender<StreamingHeadersResult>,
252        body_shared: Arc<H2BodyShared>,
253        recv_window: i32,
254    ) -> Self {
255        let mut state = Self::streaming(headers_tx, body_shared, Bytes::new(), None, recv_window);
256        state.inline = true;
257        state
258    }
259}
260
261struct DriverTunnelState {
262    inbound_tx: mpsc::Sender<Result<H2TunnelEvent>>,
263    pending_inbound: VecDeque<Result<H2TunnelEvent>>,
264    pending_outbound: VecDeque<H2TunnelOutbound>,
265    recv_window: i32,
266    pending_recv_window_update: usize,
267    credit: Arc<H2TunnelCredit>,
268}
269
270#[derive(Clone, Copy, Debug, Eq, PartialEq)]
271enum TunnelInboundStatus {
272    Open,
273    Blocked,
274    Closed,
275    Remove,
276}
277
278impl DriverTunnelState {
279    fn push_inbound(&mut self, event: H2TunnelEvent) -> TunnelInboundStatus {
280        let item = Ok(event);
281        if !self.pending_inbound.is_empty() {
282            self.pending_inbound.push_back(item);
283            return TunnelInboundStatus::Open;
284        }
285
286        match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
287            TunnelInboundStatus::Blocked => TunnelInboundStatus::Open,
288            status => status,
289        }
290    }
291
292    fn flush_inbound(&mut self) -> TunnelInboundStatus {
293        while let Some(item) = self.pending_inbound.pop_front() {
294            match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
295                TunnelInboundStatus::Open => {}
296                TunnelInboundStatus::Blocked => return TunnelInboundStatus::Open,
297                status => return status,
298            }
299        }
300        TunnelInboundStatus::Open
301    }
302
303    fn try_send_inbound(
304        inbound_tx: &mpsc::Sender<Result<H2TunnelEvent>>,
305        pending_inbound: &mut VecDeque<Result<H2TunnelEvent>>,
306        item: Result<H2TunnelEvent>,
307    ) -> TunnelInboundStatus {
308        let remove_after_send = matches!(item, Ok(H2TunnelEvent::EndStream));
309        match inbound_tx.try_send(item) {
310            Ok(()) if remove_after_send => TunnelInboundStatus::Remove,
311            Ok(()) => TunnelInboundStatus::Open,
312            Err(mpsc::error::TrySendError::Full(item)) => {
313                pending_inbound.push_front(item);
314                TunnelInboundStatus::Blocked
315            }
316            Err(mpsc::error::TrySendError::Closed(_)) => TunnelInboundStatus::Closed,
317        }
318    }
319}
320
321/// HTTP/2 connection driver that runs in a background task
322pub struct H2Driver<S> {
323    /// Channel for receiving commands from handles
324    command_rx: mpsc::Receiver<DriverCommand>,
325    /// Sender back into the driver command queue, used by tunnel outbound forwarders.
326    command_tx: mpsc::Sender<DriverCommand>,
327    /// Raw H2 connection (owned by driver)
328    connection: RawH2Connection<S>,
329    /// Per-stream state for routing responses
330    streams: HashMap<u32, DriverStreamState>,
331    /// Per-stream state for open RFC 8441 tunnels.
332    tunnels: HashMap<u32, DriverTunnelState>,
333    /// Queue for pending requests when max streams reached
334    pending_requests: std::collections::VecDeque<DriverCommand>,
335    /// Shared flag set when GOAWAY frame is received
336    goaway_received: Arc<AtomicBool>,
337    /// Runtime keepalive and flow-control tuning.
338    config: H2TransportConfig,
339    /// Outstanding keepalive ping payload and send time.
340    pending_ping: Option<([u8; 8], Instant)>,
341    /// Channel for inline-registered streaming streams (HEADERS already
342    /// written by the caller). Decoupled from `command_rx` so the inline
343    /// caller never awaits the bounded mpsc command hop.
344    inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
345    /// Counter incremented by the inline caller before HEADERS write and
346    /// decremented by the driver when the stream is removed. Mirrors the
347    /// value visible to `H2Handle::try_send_streaming_inline`.
348    inline_active: Arc<AtomicUsize>,
349    /// Toggle that disables future inline streaming when an RFC 8441 tunnel
350    /// or other ineligible state is in effect.
351    inline_eligible: Arc<AtomicBool>,
352    /// Woken when public H2 bodies consume a slot/drop or request-body
353    /// producers become ready after returning Pending.
354    body_progress_notify: Arc<Notify>,
355    request_body_waker: Waker,
356    /// Incremented when the driver sleeps 1 ms while streaming body work is
357    /// pending (body queue full or request-body producer not ready).
358    backpressure_stall_count: Arc<AtomicU64>,
359}
360
361impl<S> H2Driver<S>
362where
363    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
364{
365    /// Create a new driver from an established connection
366    pub fn new(
367        connection: RawH2Connection<S>,
368        command_tx: mpsc::Sender<DriverCommand>,
369        command_rx: mpsc::Receiver<DriverCommand>,
370        goaway_received: Arc<AtomicBool>,
371        config: H2TransportConfig,
372        backpressure_stall_count: Arc<AtomicU64>,
373    ) -> Self {
374        let (_, inline_register_rx) = mpsc::unbounded_channel();
375        let body_progress_notify = Arc::new(Notify::new());
376        Self::new_with_inline(
377            connection,
378            command_tx,
379            command_rx,
380            goaway_received,
381            config.normalized(),
382            inline_register_rx,
383            Arc::new(AtomicUsize::new(0)),
384            Arc::new(AtomicBool::new(false)),
385            body_progress_notify,
386            backpressure_stall_count,
387        )
388    }
389
390    /// Create a new driver wired to an inline-registration channel and the
391    /// shared `inline_active` / `inline_eligible` counters that the matching
392    /// `H2Handle` sees.
393    #[allow(clippy::too_many_arguments)]
394    pub fn new_with_inline(
395        connection: RawH2Connection<S>,
396        command_tx: mpsc::Sender<DriverCommand>,
397        command_rx: mpsc::Receiver<DriverCommand>,
398        goaway_received: Arc<AtomicBool>,
399        config: H2TransportConfig,
400        inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
401        inline_active: Arc<AtomicUsize>,
402        inline_eligible: Arc<AtomicBool>,
403        body_progress_notify: Arc<Notify>,
404        backpressure_stall_count: Arc<AtomicU64>,
405    ) -> Self {
406        let request_body_waker = Waker::from(Arc::new(NotifyWake(body_progress_notify.clone())));
407        Self {
408            command_rx,
409            command_tx,
410            connection,
411            streams: HashMap::new(),
412            tunnels: HashMap::new(),
413            pending_requests: std::collections::VecDeque::new(),
414            goaway_received,
415            config: config.normalized(),
416            pending_ping: None,
417            inline_register_rx,
418            inline_active,
419            inline_eligible,
420            body_progress_notify,
421            request_body_waker,
422            backpressure_stall_count,
423        }
424    }
425
426    /// Run the driver loop - processes commands and reads frames
427    pub async fn drive(mut self) -> Result<()> {
428        loop {
429            self.drain_inline_registrations();
430
431            // Processing pending requests if slots available
432            self.process_pending_requests().await?;
433
434            // Try to flush any pending data (flow control)
435            self.flush_pending_data().await?;
436            self.flush_tunnel_data().await?;
437            self.flush_tunnel_inbound().await?;
438            self.apply_released_body_credits().await?;
439            self.apply_released_tunnel_credits().await?;
440            self.flush_pending_streaming_bodies().await?;
441            self.check_keepalive_timeout()?;
442            self.refresh_inline_eligibility();
443
444            if let Some(stream_id) = self.single_stream_fast_path_target() {
445                if self
446                    .run_single_stream_streaming_fast_path(stream_id)
447                    .await?
448                {
449                    continue;
450                }
451            }
452
453            let keepalive_delay = self.keepalive_delay();
454            let retry_streaming_backpressure =
455                self.has_pending_streaming_body() || self.has_request_body_work();
456
457            tokio::select! {
458                biased;
459                // Handle incoming commands (send requests)
460                command = self.command_rx.recv() => {
461                    match command {
462                        Some(cmd) => {
463                             match cmd {
464                                DriverCommand::SendRequest { .. } => {
465                                    self.handle_send_request(cmd).await?;
466                                }
467                                DriverCommand::SendStreamingRequest { .. } => {
468                                    self.handle_send_streaming_request(cmd).await?;
469                                }
470                                DriverCommand::OpenWebSocketTunnel { uri, headers, response_tx } => {
471                                    self.handle_open_websocket_tunnel(uri, headers, response_tx).await?;
472                                 }
473                                DriverCommand::SendTunnelData { stream_id, outbound } => {
474                                    self.queue_tunnel_outbound(stream_id, outbound).await?;
475                                }
476                             }
477                        }
478                        None => {
479                            // Channel closed - driver should shutdown
480                            break;
481                        }
482                    }
483                }
484
485                // Drain freshly registered inline streams.
486                inline = self.inline_register_rx.recv(), if !self.inline_register_rx.is_closed() => {
487                    if let Some(reg) = inline {
488                        self.register_inline_stream(reg);
489                    }
490                }
491
492                // Handle incoming frames
493                read_res = self.connection.read_next_frame() => {
494                    match read_res {
495                        Ok((header, payload)) => {
496                            if let Err(e) = self.handle_frame(header, payload).await {
497                                tracing::error!("H2Driver frame error: {:?}", e);
498                                // Protocol errors are fatal and require connection termination.
499                                // The connection state may be inconsistent after this error.
500                                return Err(e);
501                            }
502                        }
503                        Err(e) => {
504                             // Connection error
505                            tracing::error!("H2Driver read error: {:?}", e);
506                            return Err(e);
507                        }
508                    }
509                }
510
511                _ = async {
512                    if let Some(delay) = keepalive_delay {
513                        tokio::time::sleep(delay).await;
514                    } else {
515                        std::future::pending::<()>().await;
516                    }
517                } => {
518                    let ping = self.connection.send_ping().await?;
519                    self.pending_ping = Some((ping, Instant::now()));
520                }
521
522                _ = async {
523                    if retry_streaming_backpressure {
524                        tokio::time::sleep(Duration::from_millis(1)).await;
525                    } else {
526                        std::future::pending::<()>().await;
527                    }
528                } => {
529                    if retry_streaming_backpressure {
530                        self.backpressure_stall_count
531                            .fetch_add(1, Ordering::Relaxed);
532                        tracing::debug!("H2 driver streaming backpressure stall");
533                    }
534                }
535
536                _ = self.body_progress_notify.notified() => {}
537            }
538        }
539        Ok(())
540    }
541
542    /// Drain freshly arrived inline registrations into `self.streams` so the
543    /// next frame routed to one of them finds the entry. Called at the top of
544    /// the main loop and again at the top of frame handling to close the
545    /// race where the response HEADERS arrive before the registration
546    /// notice is observed.
547    fn drain_inline_registrations(&mut self) {
548        while let Ok(reg) = self.inline_register_rx.try_recv() {
549            self.register_inline_stream(reg);
550        }
551    }
552
553    fn register_inline_stream(&mut self, reg: InlineRegistration) {
554        self.streams.insert(
555            reg.stream_id,
556            DriverStreamState::streaming_inline(reg.headers_tx, reg.body_shared, reg.recv_window),
557        );
558    }
559
560    /// Clear or restore the inline-eligibility flag based on whether the
561    /// driver currently allows sequential inline streams. RFC 8441 tunnels
562    /// and GOAWAY block inline writes; other driver-managed streams may
563    /// coexist with at most one inline stream because the shared write
564    /// half preserves stream-id ordering and the driver routes by id.
565    fn refresh_inline_eligibility(&self) {
566        let eligible = self.tunnels.is_empty() && !self.goaway_received.load(Ordering::Relaxed);
567        self.inline_eligible.store(eligible, Ordering::Relaxed);
568    }
569
570    /// Decrement the inline-active counter when an inline-registered stream
571    /// is removed. The handle observes this counter going back to zero
572    /// before allowing the next sequential inline stream.
573    fn note_stream_removed(state: &DriverStreamState, inline_active: &Arc<AtomicUsize>) {
574        if state.inline {
575            inline_active.fetch_sub(1, Ordering::AcqRel);
576        }
577    }
578
579    fn store_stream_recv_window(&mut self, stream_id: u32, recv_window: i32) {
580        if let Some(stream) = self.streams.get_mut(&stream_id) {
581            stream.recv_window = recv_window;
582        }
583    }
584
585    /// Returns the stream ID eligible for the single-stream streaming fast
586    /// path, or `None` when the regular multiplexed driver loop must run.
587    ///
588    /// Fast-path conditions: exactly one active stream, that stream is a
589    /// streaming response with no pending request body left to send, no
590    /// queued multiplexed work, no RFC 8441 tunnels open, no outstanding
591    /// keepalive ping, no streaming backpressure currently buffered, no
592    /// pending inline registration waiting to be drained, and the
593    /// `command_rx` queue is empty so we will not delay another command.
594    fn single_stream_fast_path_target(&self) -> Option<u32> {
595        if self.streams.len() != 1
596            || !self.tunnels.is_empty()
597            || !self.pending_requests.is_empty()
598            || self.pending_ping.is_some()
599            || !self.command_rx.is_empty()
600            || !self.inline_register_rx.is_empty()
601        {
602            return None;
603        }
604
605        let (stream_id, stream) = self.streams.iter().next()?;
606        if stream.streaming_body.is_none()
607            || !stream.pending_streaming_body.is_empty()
608            || stream.body_offset < stream.pending_body.len()
609            || stream.request_stream.is_some()
610        {
611            return None;
612        }
613
614        Some(*stream_id)
615    }
616
617    /// Tight inner loop that bypasses the multiplexing dispatch when only one
618    /// streaming response is active. Reads frames directly and forwards DATA
619    /// payloads to the single owner channel without iterating the streams
620    /// HashMap or polling the command queue. Returns `Ok(true)` when it
621    /// processed at least one frame and the regular loop should continue,
622    /// `Ok(false)` when the conditions changed before any frame was processed
623    /// (so the caller falls through to the regular `select!`).
624    async fn run_single_stream_streaming_fast_path(&mut self, stream_id: u32) -> Result<bool> {
625        // Hoist the shared body slot once so per-frame DATA delivery does
626        // not re-enter the streams HashMap in the unbackpressured case.
627        let (body_shared, mut recv_window) = match self.streams.get(&stream_id) {
628            Some(stream) => match stream.streaming_body.as_ref() {
629                Some(shared) => (shared.clone(), stream.recv_window),
630                None => return Ok(false),
631            },
632            None => return Ok(false),
633        };
634
635        let mut processed_any = false;
636        let mut frames_since_queue_check = 0usize;
637        let mut queued_data_frames_since_yield = 0usize;
638
639        loop {
640            if frames_since_queue_check >= FAST_PATH_COMMAND_CHECK_INTERVAL {
641                frames_since_queue_check = 0;
642                match self.command_rx.try_recv() {
643                    Ok(cmd) => {
644                        self.store_stream_recv_window(stream_id, recv_window);
645                        self.pending_requests.push_back(cmd);
646                        return Ok(true);
647                    }
648                    Err(mpsc::error::TryRecvError::Empty) => {}
649                    Err(mpsc::error::TryRecvError::Disconnected) => {
650                        self.store_stream_recv_window(stream_id, recv_window);
651                        return Ok(processed_any);
652                    }
653                }
654
655                if let Ok(reg) = self.inline_register_rx.try_recv() {
656                    self.store_stream_recv_window(stream_id, recv_window);
657                    self.register_inline_stream(reg);
658                    return Ok(true);
659                }
660
661                if body_shared.is_closed() {
662                    self.cancel_stream_for_dropped_receiver(stream_id).await;
663                    return Ok(true);
664                }
665            }
666
667            let (header, payload) = match self.connection.read_next_frame().await {
668                Ok(frame) => frame,
669                Err(e) => {
670                    tracing::error!("H2Driver read error (fast path): {:?}", e);
671                    return Err(e);
672                }
673            };
674            processed_any = true;
675            frames_since_queue_check += 1;
676
677            if header.stream_id == stream_id && header.frame_type == FrameType::Data {
678                let end_stream = (header.flags & flags::END_STREAM) != 0;
679                let data = if (header.flags & flags::PADDED) == 0 {
680                    payload
681                } else {
682                    self.connection
683                        .parse_inbound_data_payload(stream_id, header.flags, payload)?
684                };
685                let data_len = data.len();
686                let mut should_yield_for_body_queue = false;
687                if let Some(increment) = self
688                    .connection
689                    .apply_conn_inbound_flow_control_delta(data_len)
690                {
691                    self.connection
692                        .send_connection_window_update(increment)
693                        .await?;
694                }
695                if data_len > 0 {
696                    recv_window -= data_len as i32;
697                }
698                let push = body_shared.push_data(data, end_stream);
699                match push {
700                    H2BodyDataPush::Accepted { queued_len } => {
701                        if queued_len > 1 {
702                            queued_data_frames_since_yield += 1;
703                            should_yield_for_body_queue = queued_data_frames_since_yield
704                                >= FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET;
705                            if should_yield_for_body_queue {
706                                queued_data_frames_since_yield = 0;
707                            }
708                        } else {
709                            queued_data_frames_since_yield = 0;
710                        }
711                    }
712                    H2BodyDataPush::Full(data) => {
713                        if let Some(stream) = self.streams.get_mut(&stream_id) {
714                            stream.recv_window = recv_window;
715                            stream.pending_streaming_body.push_back(Ok(data));
716                            if end_stream {
717                                stream.pending_streaming_end = true;
718                            }
719                        }
720                        return Ok(true);
721                    }
722                    H2BodyDataPush::Closed => {
723                        self.cancel_stream_for_dropped_receiver(stream_id).await;
724                        return Ok(true);
725                    }
726                }
727                if should_yield_for_body_queue {
728                    tokio::task::yield_now().await;
729                    if body_shared.is_closed() {
730                        self.cancel_stream_for_dropped_receiver(stream_id).await;
731                        return Ok(true);
732                    }
733                }
734
735                if end_stream {
736                    if data_len == 0 {
737                        body_shared.finish();
738                    }
739                    tokio::task::yield_now().await;
740                    self.complete_stream(stream_id);
741                    return Ok(true);
742                }
743            } else {
744                self.store_stream_recv_window(stream_id, recv_window);
745                if let Err(e) = self.handle_frame(header, payload).await {
746                    tracing::error!("H2Driver frame error (fast path): {:?}", e);
747                    return Err(e);
748                }
749                if self.single_stream_fast_path_target() != Some(stream_id) {
750                    return Ok(true);
751                }
752            }
753        }
754    }
755
756    fn check_keepalive_timeout(&self) -> Result<()> {
757        if let Some((_, sent_at)) = self.pending_ping {
758            if sent_at.elapsed() >= self.config.keep_alive_timeout {
759                return Err(Error::HttpProtocol(
760                    "HTTP/2 keepalive ping timed out".into(),
761                ));
762            }
763        }
764        Ok(())
765    }
766
767    fn keepalive_delay(&self) -> Option<Duration> {
768        let interval = self.config.keep_alive_interval?;
769        if self.pending_ping.is_some() {
770            return None;
771        }
772        if !self.config.keep_alive_while_idle && self.active_stream_count() == 0 {
773            return None;
774        }
775        Some(interval)
776    }
777
778    fn has_pending_streaming_body(&self) -> bool {
779        self.streams.values().any(|stream| {
780            stream.streaming_body.is_some()
781                && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
782        })
783    }
784
785    fn has_request_body_work(&self) -> bool {
786        self.streams
787            .values()
788            .any(|stream| stream.request_stream.is_some())
789    }
790
791    /// Handle SendRequest command
792    async fn handle_send_request(&mut self, cmd: DriverCommand) -> Result<()> {
793        if !self.has_available_stream_slot() {
794            // Queue request
795            self.pending_requests.push_back(cmd);
796        } else {
797            // Send immediately
798            self.send_request_internal(cmd).await?;
799        }
800        Ok(())
801    }
802
803    async fn handle_send_streaming_request(&mut self, cmd: DriverCommand) -> Result<()> {
804        if !self.has_available_stream_slot() {
805            self.pending_requests.push_back(cmd);
806        } else {
807            self.send_streaming_request_internal(cmd).await?;
808        }
809        Ok(())
810    }
811
812    /// Process pending requests if slots available
813    async fn process_pending_requests(&mut self) -> Result<()> {
814        while self.has_available_stream_slot() {
815            if let Some(cmd) = self.pending_requests.pop_front() {
816                match cmd {
817                    DriverCommand::SendRequest { .. } => {
818                        self.send_request_internal(cmd).await?;
819                    }
820                    DriverCommand::OpenWebSocketTunnel {
821                        uri,
822                        headers,
823                        response_tx,
824                    } => {
825                        self.open_websocket_tunnel_internal(uri, headers, response_tx)
826                            .await?;
827                    }
828                    DriverCommand::SendStreamingRequest { .. } => {
829                        self.send_streaming_request_internal(cmd).await?;
830                    }
831                    DriverCommand::SendTunnelData {
832                        stream_id,
833                        outbound,
834                    } => {
835                        self.queue_tunnel_outbound(stream_id, outbound).await?;
836                    }
837                }
838            } else {
839                break;
840            }
841        }
842        Ok(())
843    }
844
845    fn active_stream_count(&self) -> usize {
846        self.streams.len() + self.tunnels.len()
847    }
848
849    fn has_available_stream_slot(&self) -> bool {
850        let max_streams = self.config.effective_max_concurrent_streams(
851            self.connection.peer_settings().max_concurrent_streams,
852        );
853        self.active_stream_count() < max_streams
854    }
855
856    /// Internal helper to send request
857    async fn send_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
858        if let DriverCommand::SendRequest {
859            method,
860            uri,
861            headers,
862            body,
863            response_tx,
864        } = cmd
865        {
866            let body_bytes = body.unwrap_or_default();
867            let has_body = !body_bytes.is_empty();
868            let end_stream = !has_body;
869
870            let initial_recv_window = self.connection.local_initial_window_size() as i32;
871            match self
872                .connection
873                .send_headers_raw(&method, &uri, &headers, end_stream)
874                .await
875            {
876                Ok(stream_id) => {
877                    self.streams.insert(
878                        stream_id,
879                        DriverStreamState::new(response_tx, body_bytes, initial_recv_window),
880                    );
881
882                    self.flush_pending_data().await?;
883                }
884                Err(e) => {
885                    if response_tx.send(Err(e)).is_err() {
886                        tracing::debug!("Response channel closed while sending error");
887                    }
888                }
889            }
890        }
891        Ok(())
892    }
893
894    async fn send_streaming_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
895        if let DriverCommand::SendStreamingRequest {
896            method,
897            uri,
898            headers,
899            body,
900            body_shared,
901            headers_tx,
902        } = cmd
903        {
904            let (body_bytes, request_stream, end_stream) = match body {
905                RequestBody::Empty => (Bytes::new(), None, true),
906                RequestBody::Bytes(bytes) => {
907                    let end_stream = bytes.is_empty();
908                    (bytes, None, end_stream)
909                }
910                RequestBody::Text(text) => {
911                    let bytes = Bytes::from(text.into_bytes());
912                    let end_stream = bytes.is_empty();
913                    (bytes, None, end_stream)
914                }
915                RequestBody::Json(bytes) => {
916                    let bytes = Bytes::from(bytes);
917                    let end_stream = bytes.is_empty();
918                    (bytes, None, end_stream)
919                }
920                RequestBody::Form(text) => {
921                    let bytes = Bytes::from(text.into_bytes());
922                    let end_stream = bytes.is_empty();
923                    (bytes, None, end_stream)
924                }
925                RequestBody::Stream {
926                    stream,
927                    content_length,
928                } => (
929                    Bytes::new(),
930                    Some(DriverStreamingRequestBody::new(stream, content_length)),
931                    false,
932                ),
933            };
934
935            let initial_recv_window = self.connection.local_initial_window_size() as i32;
936            match self
937                .connection
938                .send_headers_raw(&method, &uri, &headers, end_stream)
939                .await
940            {
941                Ok(stream_id) => {
942                    self.streams.insert(
943                        stream_id,
944                        DriverStreamState::streaming(
945                            headers_tx,
946                            body_shared,
947                            body_bytes,
948                            request_stream,
949                            initial_recv_window,
950                        ),
951                    );
952                    self.flush_pending_data().await?;
953                }
954                Err(error) => {
955                    let _ = headers_tx.send(Err(error));
956                }
957            }
958        }
959        Ok(())
960    }
961
962    async fn handle_open_websocket_tunnel(
963        &mut self,
964        uri: Uri,
965        headers: Vec<(String, String)>,
966        response_tx: oneshot::Sender<Result<H2Tunnel>>,
967    ) -> Result<()> {
968        if !self.has_available_stream_slot() {
969            self.pending_requests
970                .push_back(DriverCommand::OpenWebSocketTunnel {
971                    uri,
972                    headers,
973                    response_tx,
974                });
975            return Ok(());
976        }
977
978        self.open_websocket_tunnel_internal(uri, headers, response_tx)
979            .await
980    }
981
982    async fn open_websocket_tunnel_internal(
983        &mut self,
984        uri: Uri,
985        headers: Vec<(String, String)>,
986        response_tx: oneshot::Sender<Result<H2Tunnel>>,
987    ) -> Result<()> {
988        let headers = Headers::from(headers);
989        match self
990            .connection
991            .open_extended_connect_websocket_with_end_stream(&uri, &headers)
992            .await
993        {
994            Ok((stream_id, end_stream)) => {
995                let (outbound_tx, outbound_rx) = mpsc::channel(32);
996                let (inbound_tx, inbound_rx) = mpsc::channel(32);
997                let initial_window_size = self.connection.local_initial_window_size();
998                let initial_recv_window = initial_window_size as i32;
999                let credit =
1000                    H2TunnelCredit::new(self.body_progress_notify.clone(), initial_window_size);
1001                if end_stream {
1002                    let _ = inbound_tx.send(Ok(H2TunnelEvent::EndStream)).await;
1003                    self.connection.remove_stream(stream_id);
1004                } else {
1005                    let command_tx = self.command_tx.clone();
1006                    tokio::spawn(async move {
1007                        let mut outbound_rx = outbound_rx;
1008                        while let Some(outbound) = outbound_rx.recv().await {
1009                            if command_tx
1010                                .send(DriverCommand::SendTunnelData {
1011                                    stream_id,
1012                                    outbound,
1013                                })
1014                                .await
1015                                .is_err()
1016                            {
1017                                break;
1018                            }
1019                        }
1020                    });
1021                    self.tunnels.insert(
1022                        stream_id,
1023                        DriverTunnelState {
1024                            inbound_tx,
1025                            pending_inbound: VecDeque::new(),
1026                            pending_outbound: VecDeque::new(),
1027                            recv_window: initial_recv_window,
1028                            pending_recv_window_update: 0,
1029                            credit: credit.clone(),
1030                        },
1031                    );
1032                }
1033
1034                if response_tx
1035                    .send(Ok(H2Tunnel::new_with_credit(
1036                        outbound_tx,
1037                        inbound_rx,
1038                        credit,
1039                    )))
1040                    .is_err()
1041                {
1042                    tracing::debug!("Tunnel response channel closed after open");
1043                    self.tunnels.remove(&stream_id);
1044                }
1045            }
1046            Err(e) => {
1047                if response_tx.send(Err(e)).is_err() {
1048                    tracing::debug!("Tunnel response channel closed while sending open error");
1049                }
1050            }
1051        }
1052        Ok(())
1053    }
1054
1055    async fn queue_tunnel_outbound(
1056        &mut self,
1057        stream_id: u32,
1058        outbound: H2TunnelOutbound,
1059    ) -> Result<()> {
1060        if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1061            tunnel.pending_outbound.push_back(outbound);
1062            self.flush_tunnel_data().await?;
1063        }
1064
1065        Ok(())
1066    }
1067
1068    /// Iterate all active streams and try to send pending body data
1069    async fn flush_pending_data(&mut self) -> Result<()> {
1070        if !self.streams.values().any(|stream| {
1071            stream.body_offset < stream.pending_body.len() || stream.request_stream.is_some()
1072        }) {
1073            return Ok(());
1074        }
1075
1076        // Collect IDs to avoid borrow conflict
1077        let stream_ids: Vec<u32> = self.streams.keys().cloned().collect();
1078
1079        for stream_id in stream_ids {
1080            // Keep sending chunks for this stream until blocked or done
1081            loop {
1082                // Check if we have data to send
1083                let (has_data, offset) = if let Some(stream) = self.streams.get(&stream_id) {
1084                    (
1085                        stream.body_offset < stream.pending_body.len(),
1086                        stream.body_offset,
1087                    )
1088                } else {
1089                    (false, 0)
1090                };
1091
1092                if !has_data {
1093                    break;
1094                }
1095
1096                // Prepare arguments for send_data
1097                // We clone the Bytes handle which is cheap
1098                let pending_body = {
1099                    let s = self.streams.get(&stream_id).unwrap();
1100                    s.pending_body.clone()
1101                };
1102
1103                let remaining = &pending_body[offset..];
1104                let is_last_chunk = true;
1105
1106                // send_data returns bytes sent. If 0, it means blocked.
1107                let sent = self
1108                    .connection
1109                    .send_data(stream_id, remaining, is_last_chunk)
1110                    .await?;
1111
1112                if sent > 0 {
1113                    if let Some(stream) = self.streams.get_mut(&stream_id) {
1114                        stream.body_offset += sent;
1115                    }
1116                    // Loop again to send next chunk
1117                } else {
1118                    // Blocked by flow control
1119                    break;
1120                }
1121            }
1122
1123            self.flush_streaming_request_body(stream_id).await?;
1124        }
1125        Ok(())
1126    }
1127
1128    async fn flush_streaming_request_body(&mut self, stream_id: u32) -> Result<()> {
1129        loop {
1130            if self
1131                .streams
1132                .get(&stream_id)
1133                .and_then(|stream| stream.request_stream.as_ref())
1134                .is_none()
1135            {
1136                return Ok(());
1137            }
1138
1139            let available = self.connection.available_send_window(stream_id).await?;
1140            if available <= 0 {
1141                return Ok(());
1142            }
1143
1144            let batch_limit = (available as usize).min(self.connection.max_data_frame_size());
1145            if batch_limit == 0 {
1146                return Ok(());
1147            }
1148
1149            let mut batch = BytesMut::with_capacity(batch_limit);
1150            let mut end_stream = false;
1151            let mut remove_request_stream_after_send = false;
1152
1153            while batch.len() < batch_limit {
1154                let current = self
1155                    .streams
1156                    .get(&stream_id)
1157                    .and_then(|stream| stream.request_stream.as_ref())
1158                    .and_then(|body| {
1159                        body.current_chunk
1160                            .as_ref()
1161                            .map(|chunk| (chunk.clone(), body.current_offset))
1162                    });
1163
1164                if let Some((chunk, offset)) = current {
1165                    let remaining = &chunk[offset..];
1166                    let take = remaining.len().min(batch_limit - batch.len());
1167                    batch.extend_from_slice(&remaining[..take]);
1168
1169                    let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1170                    let body = stream
1171                        .request_stream
1172                        .as_mut()
1173                        .expect("request stream exists");
1174                    body.current_offset += take;
1175                    body.sent += take as u64;
1176                    if body.current_offset >= chunk.len() {
1177                        body.current_chunk = None;
1178                        body.current_offset = 0;
1179                    }
1180                    continue;
1181                }
1182
1183                let poll_result = {
1184                    let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1185                    let body = stream
1186                        .request_stream
1187                        .as_mut()
1188                        .expect("request stream exists");
1189                    if body.finished {
1190                        Poll::Ready(None)
1191                    } else {
1192                        let mut cx = std::task::Context::from_waker(&self.request_body_waker);
1193                        body.stream.as_mut().poll_next(&mut cx)
1194                    }
1195                };
1196
1197                match poll_result {
1198                    Poll::Pending => break,
1199                    Poll::Ready(Some(Ok(chunk))) => {
1200                        if chunk.is_empty() {
1201                            continue;
1202                        }
1203                        let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1204                        let body = stream
1205                            .request_stream
1206                            .as_mut()
1207                            .expect("request stream exists");
1208                        body.current_chunk = Some(chunk);
1209                        body.current_offset = 0;
1210                    }
1211                    Poll::Ready(Some(Err(error))) => {
1212                        self.fail_stream(
1213                            stream_id,
1214                            format!("request body stream error: {}", error),
1215                        )
1216                        .await;
1217                        return Ok(());
1218                    }
1219                    Poll::Ready(None) => {
1220                        let (valid_len, sent, expected, already_sent_end) = {
1221                            let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1222                            let body = stream
1223                                .request_stream
1224                                .as_mut()
1225                                .expect("request stream exists");
1226                            body.finished = true;
1227                            (
1228                                body.content_length
1229                                    .map(|expected| expected == body.sent)
1230                                    .unwrap_or(true),
1231                                body.sent,
1232                                body.content_length,
1233                                body.end_stream_sent,
1234                            )
1235                        };
1236                        if !valid_len {
1237                            self.fail_stream(
1238                                stream_id,
1239                                format!(
1240                                    "sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
1241                                    sent,
1242                                    expected.unwrap_or_default()
1243                                ),
1244                            )
1245                            .await;
1246                            return Ok(());
1247                        }
1248                        if already_sent_end {
1249                            return Ok(());
1250                        }
1251                        end_stream = true;
1252                        remove_request_stream_after_send = true;
1253                        break;
1254                    }
1255                }
1256            }
1257
1258            if batch.is_empty() {
1259                if end_stream {
1260                    let sent = self.connection.send_data(stream_id, &[], true).await?;
1261                    debug_assert_eq!(sent, 0);
1262                    if let Some(stream) = self.streams.get_mut(&stream_id) {
1263                        if let Some(body) = stream.request_stream.as_mut() {
1264                            body.end_stream_sent = true;
1265                        }
1266                        stream.request_stream = None;
1267                    }
1268                }
1269                return Ok(());
1270            }
1271
1272            let sent = self
1273                .connection
1274                .send_data(stream_id, &batch, end_stream)
1275                .await?;
1276            if sent == 0 {
1277                return Ok(());
1278            }
1279            if sent != batch.len() {
1280                return Err(Error::HttpProtocol(
1281                    "short DATA write after preflighted flow-control window".into(),
1282                ));
1283            }
1284
1285            if remove_request_stream_after_send {
1286                if let Some(stream) = self.streams.get_mut(&stream_id) {
1287                    if let Some(body) = stream.request_stream.as_mut() {
1288                        body.end_stream_sent = true;
1289                    }
1290                    stream.request_stream = None;
1291                }
1292                return Ok(());
1293            }
1294        }
1295    }
1296
1297    async fn flush_tunnel_data(&mut self) -> Result<()> {
1298        if self.tunnels.is_empty() {
1299            return Ok(());
1300        }
1301        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1302
1303        for stream_id in stream_ids {
1304            loop {
1305                let outbound = match self
1306                    .tunnels
1307                    .get_mut(&stream_id)
1308                    .and_then(|tunnel| tunnel.pending_outbound.pop_front())
1309                {
1310                    Some(outbound) => outbound,
1311                    None => break,
1312                };
1313
1314                let sent = self
1315                    .connection
1316                    .send_data(stream_id, &outbound.bytes, outbound.end_stream)
1317                    .await?;
1318
1319                if outbound.bytes.is_empty() {
1320                    continue;
1321                }
1322
1323                if sent == 0 {
1324                    if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1325                        tunnel.pending_outbound.push_front(outbound);
1326                    }
1327                    break;
1328                }
1329
1330                if sent < outbound.bytes.len() {
1331                    if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1332                        tunnel.pending_outbound.push_front(H2TunnelOutbound {
1333                            bytes: outbound.bytes.slice(sent..),
1334                            end_stream: outbound.end_stream,
1335                        });
1336                    }
1337                    break;
1338                }
1339            }
1340        }
1341
1342        Ok(())
1343    }
1344
1345    async fn flush_tunnel_inbound(&mut self) -> Result<()> {
1346        if self.tunnels.is_empty() {
1347            return Ok(());
1348        }
1349
1350        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1351        for stream_id in stream_ids {
1352            let status = self
1353                .tunnels
1354                .get_mut(&stream_id)
1355                .map(DriverTunnelState::flush_inbound)
1356                .unwrap_or(TunnelInboundStatus::Open);
1357            self.apply_tunnel_inbound_status(stream_id, status).await?;
1358        }
1359
1360        Ok(())
1361    }
1362
1363    async fn apply_tunnel_inbound_status(
1364        &mut self,
1365        stream_id: u32,
1366        status: TunnelInboundStatus,
1367    ) -> Result<()> {
1368        match status {
1369            TunnelInboundStatus::Open | TunnelInboundStatus::Blocked => {}
1370            TunnelInboundStatus::Remove => {
1371                self.tunnels.remove(&stream_id);
1372                self.connection.remove_stream(stream_id);
1373                self.process_pending_requests().await?;
1374            }
1375            TunnelInboundStatus::Closed => {
1376                self.tunnels.remove(&stream_id);
1377                self.connection.remove_stream(stream_id);
1378                if let Err(e) = self
1379                    .connection
1380                    .send_rst_stream(stream_id, ErrorCode::Cancel)
1381                    .await
1382                {
1383                    tracing::warn!("Failed to send RST_STREAM for dropped tunnel: {:?}", e);
1384                }
1385                self.process_pending_requests().await?;
1386            }
1387        }
1388        Ok(())
1389    }
1390
1391    async fn apply_released_body_credits(&mut self) -> Result<()> {
1392        let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1393        for stream_id in stream_ids {
1394            let (released, closed) = self
1395                .streams
1396                .get(&stream_id)
1397                .and_then(|stream| stream.streaming_body.as_ref())
1398                .map(|body| (body.take_released_recv_bytes(), body.is_closed()))
1399                .unwrap_or((0, false));
1400
1401            if closed {
1402                self.cancel_stream_for_dropped_receiver(stream_id).await;
1403                continue;
1404            }
1405
1406            if released == 0 {
1407                continue;
1408            }
1409
1410            let refresh_threshold = self.connection.flow_control_refresh_threshold();
1411            let window_update_step = self.connection.stream_window_update_step();
1412            let increment = self.streams.get_mut(&stream_id).and_then(|stream| {
1413                stream.pending_recv_window_update =
1414                    stream.pending_recv_window_update.saturating_add(released);
1415                let should_update = stream.pending_recv_window_update >= window_update_step
1416                    || stream.recv_window < refresh_threshold;
1417                if should_update {
1418                    let increment = stream.pending_recv_window_update.min(u32::MAX as usize);
1419                    stream.pending_recv_window_update -= increment;
1420                    stream.recv_window = stream.recv_window.saturating_add(increment as i32);
1421                    Some(increment as u32)
1422                } else {
1423                    None
1424                }
1425            });
1426
1427            if let Some(increment) = increment {
1428                self.connection
1429                    .send_stream_window_update(stream_id, increment)
1430                    .await?;
1431            }
1432        }
1433
1434        Ok(())
1435    }
1436
1437    async fn apply_released_tunnel_credits(&mut self) -> Result<()> {
1438        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1439        for stream_id in stream_ids {
1440            let (released, closed) = self
1441                .tunnels
1442                .get(&stream_id)
1443                .map(|tunnel| {
1444                    (
1445                        tunnel.credit.take_released_recv_bytes(),
1446                        tunnel.inbound_tx.is_closed(),
1447                    )
1448                })
1449                .unwrap_or((0, false));
1450
1451            if closed {
1452                self.apply_tunnel_inbound_status(stream_id, TunnelInboundStatus::Closed)
1453                    .await?;
1454                continue;
1455            }
1456
1457            if released == 0 {
1458                continue;
1459            }
1460
1461            let refresh_threshold = self.connection.flow_control_refresh_threshold();
1462            let window_update_step = self.connection.stream_window_update_step();
1463            let increment = self.tunnels.get_mut(&stream_id).and_then(|tunnel| {
1464                tunnel.pending_recv_window_update =
1465                    tunnel.pending_recv_window_update.saturating_add(released);
1466                let should_update = tunnel.pending_recv_window_update >= window_update_step
1467                    || tunnel.recv_window < refresh_threshold;
1468                if should_update {
1469                    let increment = tunnel.pending_recv_window_update.min(u32::MAX as usize);
1470                    tunnel.pending_recv_window_update -= increment;
1471                    tunnel.recv_window = tunnel.recv_window.saturating_add(increment as i32);
1472                    Some(increment as u32)
1473                } else {
1474                    None
1475                }
1476            });
1477
1478            if let Some(increment) = increment {
1479                self.connection
1480                    .send_stream_window_update(stream_id, increment)
1481                    .await?;
1482            }
1483        }
1484
1485        Ok(())
1486    }
1487
1488    async fn fail_stream(&mut self, stream_id: u32, message: String) {
1489        self.connection.remove_stream(stream_id);
1490        if let Some(mut stream) = self.streams.remove(&stream_id) {
1491            Self::note_stream_removed(&stream, &self.inline_active);
1492            if let Some(tx) = stream.response_tx.take() {
1493                let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1494            }
1495            if let Some(tx) = stream.streaming_headers_tx.take() {
1496                let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1497            }
1498            if let Some(body) = stream.streaming_body.take() {
1499                let _ = body.fail(Error::HttpProtocol(message));
1500            }
1501        }
1502    }
1503
1504    async fn cancel_stream_for_dropped_receiver(&mut self, stream_id: u32) {
1505        if let Some(state) = self.streams.remove(&stream_id) {
1506            Self::note_stream_removed(&state, &self.inline_active);
1507        }
1508        self.connection.remove_stream(stream_id);
1509        if let Err(e) = self
1510            .connection
1511            .send_rst_stream(stream_id, ErrorCode::Cancel)
1512            .await
1513        {
1514            tracing::warn!("Failed to send RST_STREAM for dropped receiver: {:?}", e);
1515        }
1516    }
1517
1518    async fn flush_pending_streaming_bodies(&mut self) -> Result<()> {
1519        if !self.has_pending_streaming_body() {
1520            return Ok(());
1521        }
1522        let stream_ids: Vec<u32> = self
1523            .streams
1524            .iter()
1525            .filter_map(|(stream_id, stream)| {
1526                if stream.streaming_body.is_some()
1527                    && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
1528                {
1529                    Some(*stream_id)
1530                } else {
1531                    None
1532                }
1533            })
1534            .collect();
1535
1536        for stream_id in stream_ids {
1537            let mut should_cancel = false;
1538            let mut should_complete = false;
1539
1540            if let Some(stream) = self.streams.get_mut(&stream_id) {
1541                let Some(body) = stream.streaming_body.as_ref() else {
1542                    continue;
1543                };
1544
1545                if body.is_closed() {
1546                    should_cancel = true;
1547                } else {
1548                    while let Some(item) = stream.pending_streaming_body.pop_front() {
1549                        match body.push(item) {
1550                            H2BodyPush::Accepted => {}
1551                            H2BodyPush::Full(item) => {
1552                                stream.pending_streaming_body.push_front(item);
1553                                break;
1554                            }
1555                            H2BodyPush::Closed => {
1556                                should_cancel = true;
1557                                break;
1558                            }
1559                        }
1560                    }
1561
1562                    should_complete =
1563                        stream.pending_streaming_end && stream.pending_streaming_body.is_empty();
1564                }
1565            }
1566
1567            if should_cancel {
1568                self.cancel_stream_for_dropped_receiver(stream_id).await;
1569            } else if should_complete {
1570                if let Some(body) = self
1571                    .streams
1572                    .get(&stream_id)
1573                    .and_then(|stream| stream.streaming_body.as_ref())
1574                {
1575                    body.finish();
1576                }
1577                self.complete_stream(stream_id);
1578            }
1579        }
1580
1581        Ok(())
1582    }
1583
1584    /// Handle a single frame
1585    async fn handle_frame(&mut self, header: FrameHeader, mut payload: Bytes) -> Result<()> {
1586        // Drain any inline registrations so a freshly registered stream is
1587        // visible before we try to route this frame to it.
1588        self.drain_inline_registrations();
1589
1590        // Check if receiver has been dropped (is_closed) for this stream before frame processing.
1591        // If dropped, send RST_STREAM(CANCEL) and evict.
1592        if header.stream_id != 0 {
1593            if let Some(stream) = self.streams.get(&header.stream_id) {
1594                if let Some(ref body) = stream.streaming_body {
1595                    if body.is_closed() {
1596                        let stream_id = header.stream_id;
1597                        self.cancel_stream_for_dropped_receiver(stream_id).await;
1598                        return Ok(());
1599                    }
1600                }
1601            }
1602        }
1603
1604        // 1. Check control frames that modify connection state
1605        if matches!(
1606            header.frame_type,
1607            FrameType::Settings
1608                | FrameType::WindowUpdate
1609                | FrameType::Ping
1610                | FrameType::GoAway
1611                | FrameType::RstStream
1612                | FrameType::PushPromise
1613        ) {
1614            match self
1615                .connection
1616                .handle_control_frame(&header, payload.clone())
1617                .await?
1618            {
1619                ControlAction::RstStream(sid, code) => {
1620                    if let Some(tunnel) = self.tunnels.remove(&sid) {
1621                        let _ = tunnel
1622                            .inbound_tx
1623                            .send(Ok(H2TunnelEvent::Reset(format!("{:?}", code))))
1624                            .await;
1625                    }
1626                    // Notify stream of reset
1627                    self.fail_stream(sid, format!("Stream reset by peer: {:?}", code))
1628                        .await;
1629                    // Stream slot freed, try to process pending
1630                    self.process_pending_requests().await?;
1631                    return Ok(());
1632                }
1633                ControlAction::GoAway(last_sid) => {
1634                    self.goaway_received
1635                        .store(true, std::sync::atomic::Ordering::Relaxed);
1636                    let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1637                    for sid in stream_ids {
1638                        if let Some(stream) = self.streams.get(&sid) {
1639                            if stream.streaming_body.is_some()
1640                                && stream
1641                                    .streaming_body
1642                                    .as_ref()
1643                                    .is_some_and(|body| body.is_slot_available())
1644                            {
1645                                if let Some(body) = stream.streaming_body.as_ref() {
1646                                    body.finish();
1647                                }
1648                            }
1649                        }
1650                    }
1651                    let tunnel_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1652                    for sid in tunnel_ids {
1653                        if sid > last_sid {
1654                            if let Some(tunnel) = self.tunnels.remove(&sid) {
1655                                let _ = tunnel
1656                                    .inbound_tx
1657                                    .send(Ok(H2TunnelEvent::GoAway {
1658                                        last_stream_id: last_sid,
1659                                    }))
1660                                    .await;
1661                            }
1662                        }
1663                    }
1664                    // Close all streams > last_sid
1665                    let sids: Vec<u32> = self.streams.keys().cloned().collect();
1666                    for sid in sids {
1667                        if sid > last_sid {
1668                            self.fail_stream(sid, "GOAWAY received".into()).await;
1669                        }
1670                    }
1671                    // Driver continues processing existing streams until they complete.
1672                    // A future enhancement could implement immediate shutdown on GOAWAY.
1673                    return Ok(());
1674                }
1675                ControlAction::RefusePush(_stream_id, promised_id) => {
1676                    // Send RST_STREAM for the promised stream
1677                    // RFC 9113 8.4: RST_STREAM with REFUSED_STREAM
1678                    if let Err(e) = self
1679                        .connection
1680                        .send_rst_stream(promised_id, ErrorCode::RefusedStream)
1681                        .await
1682                    {
1683                        tracing::warn!(
1684                            "Failed to send RST_STREAM for refused push promise: {:?}",
1685                            e
1686                        );
1687                    }
1688                }
1689                ControlAction::PingAck(data) => {
1690                    if self
1691                        .pending_ping
1692                        .is_some_and(|(pending_data, _)| pending_data == data)
1693                    {
1694                        self.pending_ping = None;
1695                    }
1696                    return Ok(());
1697                }
1698                ControlAction::None => {
1699                    // Continue to specific processing
1700                }
1701            }
1702        }
1703
1704        // 2. Data / Headers routing
1705        match header.frame_type {
1706            FrameType::Headers => {
1707                let stream_id = header.stream_id;
1708
1709                // Handle CONTINUATION frames if needed (END_HEADERS flag not set).
1710                // CONTINUATION frames are collected in the loop below; this branch handles
1711                // the initial HEADERS frame that starts a header block.
1712                if (header.flags & flags::END_HEADERS) == 0 {
1713                    // Loop to read CONTINUATION frames
1714                    // This inner loop blocks the driver select! loop, which is expected
1715                    // per RFC 9113 Section 6.2 (CONTINUATION frames must be processed sequentially).
1716                    let mut block = BytesMut::from(payload);
1717                    loop {
1718                        let (next_header, next_payload) = self.connection.read_next_frame().await?;
1719                        if next_header.frame_type != FrameType::Continuation {
1720                            return Err(Error::HttpProtocol("Expected CONTINUATION frame".into()));
1721                        }
1722                        if next_header.stream_id != stream_id {
1723                            return Err(Error::HttpProtocol(
1724                                "CONTINUATION frame stream ID mismatch".into(),
1725                            ));
1726                        }
1727                        block.extend_from_slice(&next_payload);
1728                        if (next_header.flags & flags::END_HEADERS) != 0 {
1729                            break;
1730                        }
1731                    }
1732                    payload = block.freeze();
1733                }
1734
1735                let decoded = self.connection.decode_header_block(payload)?;
1736
1737                // Parse pseudo-headers
1738                let mut status = 0u16;
1739                let mut regular_headers = Vec::new();
1740
1741                for (name, value) in decoded {
1742                    if name == ":status" {
1743                        status = value.parse().unwrap_or(0);
1744                    } else if !name.starts_with(':') {
1745                        regular_headers.push((name, value));
1746                    }
1747                }
1748
1749                if let Some(stream) = self.streams.get_mut(&stream_id) {
1750                    if status >= 200 {
1751                        let header_end_stream = (header.flags & flags::END_STREAM) != 0
1752                            || self.goaway_received.load(Ordering::Relaxed);
1753                        if let Some(tx) = stream.streaming_headers_tx.take() {
1754                            let _ = tx.send(Ok((status, regular_headers)));
1755                            if header_end_stream {
1756                                if let Some(body) = stream.streaming_body.as_ref() {
1757                                    body.finish();
1758                                }
1759                            }
1760                        } else {
1761                            stream.status = Some(status);
1762                            stream.headers = regular_headers;
1763                        }
1764                    } else if status > 0 {
1765                        // 1xx informational status
1766                        tracing::debug!("H2Driver: Ignoring informational status {}", status);
1767                    } else {
1768                        // status == 0, likely trailers HEADERS frame (no :status)
1769                        tracing::debug!("H2Driver: Received trailers for stream {}", stream_id);
1770                        if (header.flags & flags::END_STREAM) != 0 {
1771                            if let Some(body) = stream.streaming_body.as_ref() {
1772                                body.finish();
1773                            }
1774                        }
1775                    }
1776
1777                    if (header.flags & flags::END_STREAM) != 0 {
1778                        self.complete_stream(stream_id);
1779                    }
1780                }
1781            }
1782            FrameType::Data => {
1783                let stream_id = header.stream_id;
1784                let end_stream = (header.flags & flags::END_STREAM) != 0;
1785
1786                let data =
1787                    self.connection
1788                        .parse_inbound_data_payload(stream_id, header.flags, payload)?;
1789                let data_len = data.len();
1790                if let Some(increment) = self
1791                    .connection
1792                    .apply_conn_inbound_flow_control_delta(data_len)
1793                {
1794                    self.connection
1795                        .send_connection_window_update(increment)
1796                        .await?;
1797                }
1798
1799                if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1800                    if data_len > 0 {
1801                        tunnel.recv_window -= data_len as i32;
1802                    }
1803
1804                    let mut status = TunnelInboundStatus::Open;
1805                    if !data.is_empty() {
1806                        status = tunnel.push_inbound(H2TunnelEvent::Data(data));
1807                    }
1808                    if status == TunnelInboundStatus::Open && end_stream {
1809                        status = tunnel.push_inbound(H2TunnelEvent::EndStream);
1810                    }
1811                    self.apply_tunnel_inbound_status(stream_id, status).await?;
1812                    return Ok(());
1813                }
1814
1815                let mut should_cancel = false;
1816                let mut should_complete = false;
1817                let mut should_finish_body = false;
1818                let mut should_yield_before_complete = false;
1819
1820                if let Some(stream) = self.streams.get_mut(&stream_id) {
1821                    if data_len > 0 {
1822                        stream.recv_window -= data_len as i32;
1823                    }
1824
1825                    if let Some(body) = stream.streaming_body.as_ref() {
1826                        if !data.is_empty() {
1827                            if stream.pending_streaming_body.is_empty() {
1828                                let push = body.push_data(data, end_stream);
1829                                match push {
1830                                    H2BodyDataPush::Accepted { .. } => {}
1831                                    H2BodyDataPush::Full(data) => {
1832                                        stream.pending_streaming_body.push_back(Ok(data));
1833                                    }
1834                                    H2BodyDataPush::Closed => {
1835                                        should_cancel = true;
1836                                    }
1837                                }
1838                            } else {
1839                                stream.pending_streaming_body.push_back(Ok(data));
1840                            }
1841                        }
1842
1843                        if end_stream {
1844                            if stream.pending_streaming_body.is_empty() {
1845                                should_complete = true;
1846                                should_finish_body = data_len == 0;
1847                                should_yield_before_complete = true;
1848                            } else {
1849                                stream.pending_streaming_end = true;
1850                            }
1851                        }
1852                    } else {
1853                        stream.body.extend_from_slice(&data);
1854                        should_complete = end_stream;
1855                    }
1856                }
1857
1858                if should_cancel {
1859                    self.cancel_stream_for_dropped_receiver(stream_id).await;
1860                    return Ok(());
1861                }
1862
1863                if should_complete {
1864                    if should_finish_body {
1865                        if let Some(body) = self
1866                            .streams
1867                            .get(&stream_id)
1868                            .and_then(|stream| stream.streaming_body.as_ref())
1869                        {
1870                            body.finish();
1871                        }
1872                    }
1873                    if should_yield_before_complete {
1874                        tokio::task::yield_now().await;
1875                    }
1876                    self.complete_stream(stream_id);
1877                }
1878            }
1879            FrameType::WindowUpdate => {
1880                // Window update received and processed by handle_control_frame,
1881                // which updates the connection/stream window in self.connection.
1882                // Flush any pending data that was previously blocked by flow control.
1883                self.flush_pending_data().await?;
1884                self.flush_tunnel_data().await?;
1885            }
1886            _ => {} // Other frames handled by handle_control_frame (or ignored)
1887        }
1888
1889        Ok(())
1890    }
1891
1892    /// Complete a stream: build response and send
1893    fn complete_stream(&mut self, stream_id: u32) {
1894        if let Some(mut stream) = self.streams.remove(&stream_id) {
1895            if !stream.inline {
1896                self.connection.remove_stream(stream_id);
1897            }
1898            Self::note_stream_removed(&stream, &self.inline_active);
1899            if let Some(tx) = stream.response_tx.take() {
1900                // If no status was received, this is a protocol violation
1901                // Return an error rather than defaulting to 200
1902                let response = match stream.status {
1903                    Some(status) => Ok(StreamResponse {
1904                        status,
1905                        headers: stream.headers,
1906                        body: stream.body.freeze(),
1907                    }),
1908                    None => Err(Error::HttpProtocol(format!(
1909                        "Stream {} completed without status code",
1910                        stream_id
1911                    ))),
1912                };
1913                if tx.send(response).is_err() {
1914                    tracing::debug!("Response channel closed while completing stream");
1915                }
1916            }
1917        }
1918        // Stream slot is now available. The main loop will call process_pending_requests
1919        // to process any queued requests waiting for available stream slots.
1920    }
1921}