Skip to main content

specter/transport/h2/
driver.rs

1//! HTTP/2 connection driver - background task that reads frames and routes them to streams.
2//!
3//! The driver owns the raw H2Connection and continuously reads frames from the socket,
4//! routing them to the appropriate stream channels. This allows multiple requests
5//! to be multiplexed without blocking each other.
6
7use bytes::{Bytes, BytesMut};
8use http::{Method, Uri};
9use std::collections::{HashMap, VecDeque};
10use std::fmt;
11use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use std::task::{Poll, Wake, Waker};
14use std::time::{Duration, Instant};
15use tokio::sync::mpsc;
16use tokio::sync::oneshot;
17use tokio::sync::Notify;
18use tracing;
19
20pub type StreamingHeadersResult = Result<(u16, Vec<(String, String)>)>;
21
22use crate::error::{Error, Result};
23use crate::headers::Headers;
24use crate::request::{RequestBody, RequestBodyStream};
25use crate::transport::h2::body::{H2BodyDataPush, H2BodyPush, H2BodyShared, TrailerSender};
26use crate::transport::h2::connection::{
27    ControlAction, H2Connection as RawH2Connection, StreamResponse,
28};
29use crate::transport::h2::frame::{flags, ErrorCode, FrameHeader, FrameType};
30use crate::transport::h2::tunnel::{H2Tunnel, H2TunnelCredit, H2TunnelEvent, H2TunnelOutbound};
31use crate::transport::h2::H2TransportConfig;
32
33const FAST_PATH_COMMAND_CHECK_INTERVAL: usize = 8;
34const FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET: usize = 2;
35
36/// Command sent from handle to driver
37pub enum DriverCommand {
38    /// Send a request and get response via oneshot
39    /// Driver allocates stream_id
40    SendRequest {
41        method: http::Method,
42        uri: http::Uri,
43        headers: Headers,
44        body: Option<bytes::Bytes>,
45        response_tx: oneshot::Sender<Result<StreamResponse>>,
46    },
47    /// Send a request with a streaming body
48    SendStreamingRequest {
49        method: Method,
50        uri: Uri,
51        headers: Headers,
52        body: RequestBody,
53        body_shared: Arc<H2BodyShared>,
54        headers_tx: oneshot::Sender<StreamingHeadersResult>,
55        /// Side-channel sender for HTTP/2 response trailers. `Some` only when
56        /// the caller requested trailers (request carried `te: trailers`);
57        /// `None` keeps the warm streaming path allocation-free.
58        trailers_tx: Option<TrailerSender>,
59    },
60    /// Open an RFC 8441 WebSocket tunnel on a pooled HTTP/2 stream.
61    OpenWebSocketTunnel {
62        uri: Uri,
63        headers: Vec<(String, String)>,
64        response_tx: oneshot::Sender<Result<H2Tunnel>>,
65    },
66    /// Queue outbound DATA for an open RFC 8441 tunnel.
67    SendTunnelData {
68        stream_id: u32,
69        outbound: H2TunnelOutbound,
70    },
71}
72
73impl fmt::Debug for DriverCommand {
74    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
75        match self {
76            DriverCommand::SendRequest {
77                method,
78                uri,
79                headers,
80                body,
81                ..
82            } => f
83                .debug_struct("SendRequest")
84                .field("method", method)
85                .field("uri", uri)
86                .field("headers_len", &headers.len())
87                .field("body_len", &body.as_ref().map(Bytes::len))
88                .finish(),
89            DriverCommand::SendStreamingRequest {
90                method,
91                uri,
92                headers,
93                body,
94                ..
95            } => f
96                .debug_struct("SendStreamingRequest")
97                .field("method", method)
98                .field("uri", uri)
99                .field("headers_len", &headers.len())
100                .field("body", body)
101                .finish(),
102            DriverCommand::OpenWebSocketTunnel { uri, headers, .. } => f
103                .debug_struct("OpenWebSocketTunnel")
104                .field("uri", uri)
105                .field("headers_len", &headers.len())
106                .finish(),
107            DriverCommand::SendTunnelData {
108                stream_id,
109                outbound,
110            } => f
111                .debug_struct("SendTunnelData")
112                .field("stream_id", stream_id)
113                .field("outbound", outbound)
114                .finish(),
115        }
116    }
117}
118
119/// Inline-registered streaming stream sent from `H2Handle` to the driver.
120///
121/// The handle has already written HEADERS via the shared write half and
122/// allocated `stream_id`. The driver only needs to register the response
123/// routing channels and the seed `recv_window`.
124pub struct InlineRegistration {
125    pub stream_id: u32,
126    pub headers_tx: oneshot::Sender<StreamingHeadersResult>,
127    pub body_shared: Arc<H2BodyShared>,
128    pub recv_window: i32,
129    /// Side-channel sender for HTTP/2 response trailers. `Some` only when the
130    /// inline caller requested trailers (`te: trailers`); `None` otherwise so
131    /// the warm inline path allocates no extra channel.
132    pub trailers_tx: Option<TrailerSender>,
133}
134
135struct NotifyWake(Arc<Notify>);
136
137impl Wake for NotifyWake {
138    fn wake(self: Arc<Self>) {
139        self.0.notify_one();
140    }
141
142    fn wake_by_ref(self: &Arc<Self>) {
143        self.0.notify_one();
144    }
145}
146
147struct DriverStreamingRequestBody {
148    stream: RequestBodyStream,
149    content_length: Option<u64>,
150    current_chunk: Option<Bytes>,
151    current_offset: usize,
152    sent: u64,
153    finished: bool,
154    end_stream_sent: bool,
155}
156
157impl DriverStreamingRequestBody {
158    fn new(stream: RequestBodyStream, content_length: Option<u64>) -> Self {
159        Self {
160            stream,
161            content_length,
162            current_chunk: None,
163            current_offset: 0,
164            sent: 0,
165            finished: false,
166            end_stream_sent: false,
167        }
168    }
169}
170
171/// Per-stream state tracked by driver
172struct DriverStreamState {
173    /// Oneshot sender for response completion
174    response_tx: Option<oneshot::Sender<Result<StreamResponse>>>,
175    /// Oneshot sender for streaming response headers
176    streaming_headers_tx: Option<oneshot::Sender<StreamingHeadersResult>>,
177    /// Side-channel sender for HTTP/2 response trailers. `Some` only when the
178    /// caller requested trailers (`te: trailers`). Sent on the trailers HEADERS
179    /// branch (`Ok`) or on reset via `fail_stream` (`Err`); dropped un-sent on a
180    /// clean trailer-less end, which the receiver maps to `Ok(None)`.
181    trailers_tx: Option<TrailerSender>,
182    /// Streaming response body state shared with the public Body poller.
183    streaming_body: Option<Arc<H2BodyShared>>,
184    /// Accumulated response status
185    status: Option<u16>,
186    /// Accumulated response headers
187    headers: Vec<(String, String)>,
188    /// Accumulated response body
189    body: BytesMut,
190    /// Pending request body to be sent (flow control buffer)
191    pending_body: Bytes,
192    /// Offset of pending body already sent
193    body_offset: usize,
194    /// Streaming request body producer and partially sent chunk.
195    request_stream: Option<DriverStreamingRequestBody>,
196    /// Streaming response chunks waiting for downstream receiver capacity.
197    pending_streaming_body: VecDeque<Result<Bytes>>,
198    /// Whether END_STREAM arrived while streaming body chunks are still pending.
199    pending_streaming_end: bool,
200    /// Driver-owned per-stream inbound flow-control window. Mirrors the value
201    /// the connection's `Stream::recv_window` would have tracked, so the DATA
202    /// hot path only touches `self.streams` for inbound flow accounting.
203    recv_window: i32,
204    /// Stream-level receive credit released by the public body consumer but
205    /// not yet advertised with WINDOW_UPDATE. Batching this avoids one
206    /// WINDOW_UPDATE per DATA chunk while preserving backpressure.
207    pending_recv_window_update: usize,
208    /// Marks streams registered via the inline shared-writer fast path so
209    /// the driver knows to decrement the inline-active counter on stream
210    /// teardown.
211    inline: bool,
212}
213
214impl DriverStreamState {
215    fn new(
216        response_tx: oneshot::Sender<Result<StreamResponse>>,
217        pending_body: Bytes,
218        recv_window: i32,
219    ) -> Self {
220        Self {
221            response_tx: Some(response_tx),
222            streaming_headers_tx: None,
223            trailers_tx: None,
224            streaming_body: None,
225            status: None,
226            headers: Vec::new(),
227            body: BytesMut::new(),
228            pending_body,
229            body_offset: 0,
230            request_stream: None,
231            pending_streaming_body: VecDeque::new(),
232            pending_streaming_end: false,
233            recv_window,
234            pending_recv_window_update: 0,
235            inline: false,
236        }
237    }
238
239    fn streaming(
240        headers_tx: oneshot::Sender<StreamingHeadersResult>,
241        trailers_tx: Option<TrailerSender>,
242        body_shared: Arc<H2BodyShared>,
243        pending_body: Bytes,
244        request_stream: Option<DriverStreamingRequestBody>,
245        recv_window: i32,
246    ) -> Self {
247        Self {
248            response_tx: None,
249            streaming_headers_tx: Some(headers_tx),
250            trailers_tx,
251            streaming_body: Some(body_shared),
252            status: None,
253            headers: Vec::new(),
254            body: BytesMut::new(),
255            pending_body,
256            body_offset: 0,
257            request_stream,
258            pending_streaming_body: VecDeque::new(),
259            pending_streaming_end: false,
260            recv_window,
261            pending_recv_window_update: 0,
262            inline: false,
263        }
264    }
265
266    fn streaming_inline(
267        headers_tx: oneshot::Sender<StreamingHeadersResult>,
268        trailers_tx: Option<TrailerSender>,
269        body_shared: Arc<H2BodyShared>,
270        recv_window: i32,
271    ) -> Self {
272        let mut state = Self::streaming(
273            headers_tx,
274            trailers_tx,
275            body_shared,
276            Bytes::new(),
277            None,
278            recv_window,
279        );
280        state.inline = true;
281        state
282    }
283}
284
285struct DriverTunnelState {
286    inbound_tx: mpsc::Sender<Result<H2TunnelEvent>>,
287    pending_inbound: VecDeque<Result<H2TunnelEvent>>,
288    pending_outbound: VecDeque<H2TunnelOutbound>,
289    recv_window: i32,
290    pending_recv_window_update: usize,
291    credit: Arc<H2TunnelCredit>,
292}
293
294#[derive(Clone, Copy, Debug, Eq, PartialEq)]
295enum TunnelInboundStatus {
296    Open,
297    Blocked,
298    Closed,
299    Remove,
300}
301
302impl DriverTunnelState {
303    fn push_inbound(&mut self, event: H2TunnelEvent) -> TunnelInboundStatus {
304        let item = Ok(event);
305        if !self.pending_inbound.is_empty() {
306            self.pending_inbound.push_back(item);
307            return TunnelInboundStatus::Open;
308        }
309
310        match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
311            TunnelInboundStatus::Blocked => TunnelInboundStatus::Open,
312            status => status,
313        }
314    }
315
316    fn flush_inbound(&mut self) -> TunnelInboundStatus {
317        while let Some(item) = self.pending_inbound.pop_front() {
318            match Self::try_send_inbound(&self.inbound_tx, &mut self.pending_inbound, item) {
319                TunnelInboundStatus::Open => {}
320                TunnelInboundStatus::Blocked => return TunnelInboundStatus::Open,
321                status => return status,
322            }
323        }
324        TunnelInboundStatus::Open
325    }
326
327    fn try_send_inbound(
328        inbound_tx: &mpsc::Sender<Result<H2TunnelEvent>>,
329        pending_inbound: &mut VecDeque<Result<H2TunnelEvent>>,
330        item: Result<H2TunnelEvent>,
331    ) -> TunnelInboundStatus {
332        let remove_after_send = matches!(item, Ok(H2TunnelEvent::EndStream));
333        match inbound_tx.try_send(item) {
334            Ok(()) if remove_after_send => TunnelInboundStatus::Remove,
335            Ok(()) => TunnelInboundStatus::Open,
336            Err(mpsc::error::TrySendError::Full(item)) => {
337                pending_inbound.push_front(item);
338                TunnelInboundStatus::Blocked
339            }
340            Err(mpsc::error::TrySendError::Closed(_)) => TunnelInboundStatus::Closed,
341        }
342    }
343}
344
345/// HTTP/2 connection driver that runs in a background task
346pub struct H2Driver<S> {
347    /// Channel for receiving commands from handles
348    command_rx: mpsc::Receiver<DriverCommand>,
349    /// Sender back into the driver command queue, used by tunnel outbound forwarders.
350    command_tx: mpsc::Sender<DriverCommand>,
351    /// Raw H2 connection (owned by driver)
352    connection: RawH2Connection<S>,
353    /// Per-stream state for routing responses
354    streams: HashMap<u32, DriverStreamState>,
355    /// Per-stream state for open RFC 8441 tunnels.
356    tunnels: HashMap<u32, DriverTunnelState>,
357    /// Queue for pending requests when max streams reached
358    pending_requests: std::collections::VecDeque<DriverCommand>,
359    /// Shared flag set when GOAWAY frame is received
360    goaway_received: Arc<AtomicBool>,
361    /// Runtime keepalive and flow-control tuning.
362    config: H2TransportConfig,
363    /// Outstanding keepalive ping payload and send time.
364    pending_ping: Option<([u8; 8], Instant)>,
365    /// Channel for inline-registered streaming streams (HEADERS already
366    /// written by the caller). Decoupled from `command_rx` so the inline
367    /// caller never awaits the bounded mpsc command hop.
368    inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
369    /// Counter incremented by the inline caller before HEADERS write and
370    /// decremented by the driver when the stream is removed. Mirrors the
371    /// value visible to `H2Handle::try_send_streaming_inline`.
372    inline_active: Arc<AtomicUsize>,
373    /// Toggle that disables future inline streaming when an RFC 8441 tunnel
374    /// or other ineligible state is in effect.
375    inline_eligible: Arc<AtomicBool>,
376    /// Woken when public H2 bodies consume a slot/drop or request-body
377    /// producers become ready after returning Pending.
378    body_progress_notify: Arc<Notify>,
379    request_body_waker: Waker,
380    /// Incremented when the driver sleeps 1 ms while streaming body work is
381    /// pending (body queue full or request-body producer not ready).
382    backpressure_stall_count: Arc<AtomicU64>,
383}
384
385impl<S> H2Driver<S>
386where
387    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send,
388{
389    /// Create a new driver from an established connection
390    pub fn new(
391        connection: RawH2Connection<S>,
392        command_tx: mpsc::Sender<DriverCommand>,
393        command_rx: mpsc::Receiver<DriverCommand>,
394        goaway_received: Arc<AtomicBool>,
395        config: H2TransportConfig,
396        backpressure_stall_count: Arc<AtomicU64>,
397    ) -> Self {
398        let (_, inline_register_rx) = mpsc::unbounded_channel();
399        let body_progress_notify = Arc::new(Notify::new());
400        Self::new_with_inline(
401            connection,
402            command_tx,
403            command_rx,
404            goaway_received,
405            config.normalized(),
406            inline_register_rx,
407            Arc::new(AtomicUsize::new(0)),
408            Arc::new(AtomicBool::new(false)),
409            body_progress_notify,
410            backpressure_stall_count,
411        )
412    }
413
414    /// Create a new driver wired to an inline-registration channel and the
415    /// shared `inline_active` / `inline_eligible` counters that the matching
416    /// `H2Handle` sees.
417    #[allow(clippy::too_many_arguments)]
418    pub fn new_with_inline(
419        connection: RawH2Connection<S>,
420        command_tx: mpsc::Sender<DriverCommand>,
421        command_rx: mpsc::Receiver<DriverCommand>,
422        goaway_received: Arc<AtomicBool>,
423        config: H2TransportConfig,
424        inline_register_rx: mpsc::UnboundedReceiver<InlineRegistration>,
425        inline_active: Arc<AtomicUsize>,
426        inline_eligible: Arc<AtomicBool>,
427        body_progress_notify: Arc<Notify>,
428        backpressure_stall_count: Arc<AtomicU64>,
429    ) -> Self {
430        let request_body_waker = Waker::from(Arc::new(NotifyWake(body_progress_notify.clone())));
431        Self {
432            command_rx,
433            command_tx,
434            connection,
435            streams: HashMap::new(),
436            tunnels: HashMap::new(),
437            pending_requests: std::collections::VecDeque::new(),
438            goaway_received,
439            config: config.normalized(),
440            pending_ping: None,
441            inline_register_rx,
442            inline_active,
443            inline_eligible,
444            body_progress_notify,
445            request_body_waker,
446            backpressure_stall_count,
447        }
448    }
449
450    /// Run the driver loop - processes commands and reads frames
451    pub async fn drive(mut self) -> Result<()> {
452        loop {
453            self.drain_inline_registrations();
454
455            // Processing pending requests if slots available
456            self.process_pending_requests().await?;
457
458            // Try to flush any pending data (flow control)
459            self.flush_pending_data().await?;
460            self.flush_tunnel_data().await?;
461            self.flush_tunnel_inbound().await?;
462            self.apply_released_body_credits().await?;
463            self.apply_released_tunnel_credits().await?;
464            self.flush_pending_streaming_bodies().await?;
465            self.check_keepalive_timeout()?;
466            self.refresh_inline_eligibility();
467
468            if let Some(stream_id) = self.single_stream_fast_path_target() {
469                if self
470                    .run_single_stream_streaming_fast_path(stream_id)
471                    .await?
472                {
473                    continue;
474                }
475            }
476
477            let keepalive_delay = self.keepalive_delay();
478            let retry_streaming_backpressure =
479                self.has_pending_streaming_body() || self.has_request_body_work();
480
481            tokio::select! {
482                biased;
483                // Handle incoming commands (send requests)
484                command = self.command_rx.recv() => {
485                    match command {
486                        Some(cmd) => {
487                             match cmd {
488                                DriverCommand::SendRequest { .. } => {
489                                    self.handle_send_request(cmd).await?;
490                                }
491                                DriverCommand::SendStreamingRequest { .. } => {
492                                    self.handle_send_streaming_request(cmd).await?;
493                                }
494                                DriverCommand::OpenWebSocketTunnel { uri, headers, response_tx } => {
495                                    self.handle_open_websocket_tunnel(uri, headers, response_tx).await?;
496                                 }
497                                DriverCommand::SendTunnelData { stream_id, outbound } => {
498                                    self.queue_tunnel_outbound(stream_id, outbound).await?;
499                                }
500                             }
501                        }
502                        None => {
503                            // Channel closed - driver should shutdown
504                            break;
505                        }
506                    }
507                }
508
509                // Drain freshly registered inline streams.
510                inline = self.inline_register_rx.recv(), if !self.inline_register_rx.is_closed() => {
511                    if let Some(reg) = inline {
512                        self.register_inline_stream(reg);
513                    }
514                }
515
516                // Handle incoming frames
517                read_res = self.connection.read_next_frame() => {
518                    match read_res {
519                        Ok((header, payload)) => {
520                            if let Err(e) = self.handle_frame(header, payload).await {
521                                tracing::error!("H2Driver frame error: {:?}", e);
522                                // Protocol errors are fatal and require connection termination.
523                                // The connection state may be inconsistent after this error.
524                                return Err(e);
525                            }
526                        }
527                        Err(e) => {
528                             // Connection error
529                            tracing::error!("H2Driver read error: {:?}", e);
530                            return Err(e);
531                        }
532                    }
533                }
534
535                _ = async {
536                    if let Some(delay) = keepalive_delay {
537                        tokio::time::sleep(delay).await;
538                    } else {
539                        std::future::pending::<()>().await;
540                    }
541                } => {
542                    let ping = self.connection.send_ping().await?;
543                    self.pending_ping = Some((ping, Instant::now()));
544                }
545
546                _ = async {
547                    if retry_streaming_backpressure {
548                        tokio::time::sleep(Duration::from_millis(1)).await;
549                    } else {
550                        std::future::pending::<()>().await;
551                    }
552                } => {
553                    if retry_streaming_backpressure {
554                        self.backpressure_stall_count
555                            .fetch_add(1, Ordering::Relaxed);
556                        tracing::debug!("H2 driver streaming backpressure stall");
557                    }
558                }
559
560                _ = self.body_progress_notify.notified() => {}
561            }
562        }
563        Ok(())
564    }
565
566    /// Drain freshly arrived inline registrations into `self.streams` so the
567    /// next frame routed to one of them finds the entry. Called at the top of
568    /// the main loop and again at the top of frame handling to close the
569    /// race where the response HEADERS arrive before the registration
570    /// notice is observed.
571    fn drain_inline_registrations(&mut self) {
572        while let Ok(reg) = self.inline_register_rx.try_recv() {
573            self.register_inline_stream(reg);
574        }
575    }
576
577    fn register_inline_stream(&mut self, reg: InlineRegistration) {
578        self.streams.insert(
579            reg.stream_id,
580            DriverStreamState::streaming_inline(
581                reg.headers_tx,
582                reg.trailers_tx,
583                reg.body_shared,
584                reg.recv_window,
585            ),
586        );
587    }
588
589    /// Clear or restore the inline-eligibility flag based on whether the
590    /// driver currently allows sequential inline streams. RFC 8441 tunnels
591    /// and GOAWAY block inline writes; other driver-managed streams may
592    /// coexist with at most one inline stream because the shared write
593    /// half preserves stream-id ordering and the driver routes by id.
594    fn refresh_inline_eligibility(&self) {
595        let eligible = self.tunnels.is_empty() && !self.goaway_received.load(Ordering::Relaxed);
596        self.inline_eligible.store(eligible, Ordering::Relaxed);
597    }
598
599    /// Decrement the inline-active counter when an inline-registered stream
600    /// is removed. The handle observes this counter going back to zero
601    /// before allowing the next sequential inline stream.
602    fn note_stream_removed(state: &DriverStreamState, inline_active: &Arc<AtomicUsize>) {
603        if state.inline {
604            inline_active.fetch_sub(1, Ordering::AcqRel);
605        }
606    }
607
608    fn store_stream_recv_window(&mut self, stream_id: u32, recv_window: i32) {
609        if let Some(stream) = self.streams.get_mut(&stream_id) {
610            stream.recv_window = recv_window;
611        }
612    }
613
614    /// Returns the stream ID eligible for the single-stream streaming fast
615    /// path, or `None` when the regular multiplexed driver loop must run.
616    ///
617    /// Fast-path conditions: exactly one active stream, that stream is a
618    /// streaming response with no pending request body left to send, no
619    /// queued multiplexed work, no RFC 8441 tunnels open, no outstanding
620    /// keepalive ping, no streaming backpressure currently buffered, no
621    /// pending inline registration waiting to be drained, and the
622    /// `command_rx` queue is empty so we will not delay another command.
623    fn single_stream_fast_path_target(&self) -> Option<u32> {
624        if self.streams.len() != 1
625            || !self.tunnels.is_empty()
626            || !self.pending_requests.is_empty()
627            || self.pending_ping.is_some()
628            || !self.command_rx.is_empty()
629            || !self.inline_register_rx.is_empty()
630        {
631            return None;
632        }
633
634        let (stream_id, stream) = self.streams.iter().next()?;
635        if stream.streaming_body.is_none()
636            || !stream.pending_streaming_body.is_empty()
637            || stream.body_offset < stream.pending_body.len()
638            || stream.request_stream.is_some()
639        {
640            return None;
641        }
642
643        Some(*stream_id)
644    }
645
646    /// Tight inner loop that bypasses the multiplexing dispatch when only one
647    /// streaming response is active. Reads frames directly and forwards DATA
648    /// payloads to the single owner channel without iterating the streams
649    /// HashMap or polling the command queue. Returns `Ok(true)` when it
650    /// processed at least one frame and the regular loop should continue,
651    /// `Ok(false)` when the conditions changed before any frame was processed
652    /// (so the caller falls through to the regular `select!`).
653    async fn run_single_stream_streaming_fast_path(&mut self, stream_id: u32) -> Result<bool> {
654        // Hoist the shared body slot once so per-frame DATA delivery does
655        // not re-enter the streams HashMap in the unbackpressured case.
656        let (body_shared, mut recv_window) = match self.streams.get(&stream_id) {
657            Some(stream) => match stream.streaming_body.as_ref() {
658                Some(shared) => (shared.clone(), stream.recv_window),
659                None => return Ok(false),
660            },
661            None => return Ok(false),
662        };
663
664        let mut processed_any = false;
665        let mut frames_since_queue_check = 0usize;
666        let mut queued_data_frames_since_yield = 0usize;
667
668        loop {
669            if frames_since_queue_check >= FAST_PATH_COMMAND_CHECK_INTERVAL {
670                frames_since_queue_check = 0;
671                match self.command_rx.try_recv() {
672                    Ok(cmd) => {
673                        self.store_stream_recv_window(stream_id, recv_window);
674                        self.pending_requests.push_back(cmd);
675                        return Ok(true);
676                    }
677                    Err(mpsc::error::TryRecvError::Empty) => {}
678                    Err(mpsc::error::TryRecvError::Disconnected) => {
679                        self.store_stream_recv_window(stream_id, recv_window);
680                        return Ok(processed_any);
681                    }
682                }
683
684                if let Ok(reg) = self.inline_register_rx.try_recv() {
685                    self.store_stream_recv_window(stream_id, recv_window);
686                    self.register_inline_stream(reg);
687                    return Ok(true);
688                }
689
690                if body_shared.is_closed() {
691                    self.cancel_stream_for_dropped_receiver(stream_id).await;
692                    return Ok(true);
693                }
694            }
695
696            let (header, payload) = match self.connection.read_next_frame().await {
697                Ok(frame) => frame,
698                Err(e) => {
699                    tracing::error!("H2Driver read error (fast path): {:?}", e);
700                    return Err(e);
701                }
702            };
703            processed_any = true;
704            frames_since_queue_check += 1;
705
706            if header.stream_id == stream_id && header.frame_type == FrameType::Data {
707                let end_stream = (header.flags & flags::END_STREAM) != 0;
708                let data = if (header.flags & flags::PADDED) == 0 {
709                    payload
710                } else {
711                    self.connection
712                        .parse_inbound_data_payload(stream_id, header.flags, payload)?
713                };
714                let data_len = data.len();
715                let mut should_yield_for_body_queue = false;
716                if let Some(increment) = self
717                    .connection
718                    .apply_conn_inbound_flow_control_delta(data_len)
719                {
720                    self.connection
721                        .send_connection_window_update(increment)
722                        .await?;
723                }
724                if data_len > 0 {
725                    recv_window -= data_len as i32;
726                }
727                let push = body_shared.push_data(data, end_stream);
728                match push {
729                    H2BodyDataPush::Accepted { queued_len } => {
730                        if queued_len > 1 {
731                            queued_data_frames_since_yield += 1;
732                            should_yield_for_body_queue = queued_data_frames_since_yield
733                                >= FAST_PATH_BODY_QUEUE_YIELD_FRAME_BUDGET;
734                            if should_yield_for_body_queue {
735                                queued_data_frames_since_yield = 0;
736                            }
737                        } else {
738                            queued_data_frames_since_yield = 0;
739                        }
740                    }
741                    H2BodyDataPush::Full(data) => {
742                        if let Some(stream) = self.streams.get_mut(&stream_id) {
743                            stream.recv_window = recv_window;
744                            stream.pending_streaming_body.push_back(Ok(data));
745                            if end_stream {
746                                stream.pending_streaming_end = true;
747                            }
748                        }
749                        return Ok(true);
750                    }
751                    H2BodyDataPush::Closed => {
752                        self.cancel_stream_for_dropped_receiver(stream_id).await;
753                        return Ok(true);
754                    }
755                }
756                if should_yield_for_body_queue {
757                    tokio::task::yield_now().await;
758                    if body_shared.is_closed() {
759                        self.cancel_stream_for_dropped_receiver(stream_id).await;
760                        return Ok(true);
761                    }
762                }
763
764                if end_stream {
765                    if data_len == 0 {
766                        body_shared.finish();
767                    }
768                    tokio::task::yield_now().await;
769                    self.complete_stream(stream_id);
770                    return Ok(true);
771                }
772            } else {
773                self.store_stream_recv_window(stream_id, recv_window);
774                if let Err(e) = self.handle_frame(header, payload).await {
775                    tracing::error!("H2Driver frame error (fast path): {:?}", e);
776                    return Err(e);
777                }
778                if self.single_stream_fast_path_target() != Some(stream_id) {
779                    return Ok(true);
780                }
781            }
782        }
783    }
784
785    fn check_keepalive_timeout(&self) -> Result<()> {
786        if let Some((_, sent_at)) = self.pending_ping {
787            if sent_at.elapsed() >= self.config.keep_alive_timeout {
788                return Err(Error::HttpProtocol(
789                    "HTTP/2 keepalive ping timed out".into(),
790                ));
791            }
792        }
793        Ok(())
794    }
795
796    fn keepalive_delay(&self) -> Option<Duration> {
797        let interval = self.config.keep_alive_interval?;
798        if self.pending_ping.is_some() {
799            return None;
800        }
801        if !self.config.keep_alive_while_idle && self.active_stream_count() == 0 {
802            return None;
803        }
804        Some(interval)
805    }
806
807    fn has_pending_streaming_body(&self) -> bool {
808        self.streams.values().any(|stream| {
809            stream.streaming_body.is_some()
810                && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
811        })
812    }
813
814    fn has_request_body_work(&self) -> bool {
815        self.streams
816            .values()
817            .any(|stream| stream.request_stream.is_some())
818    }
819
820    /// Handle SendRequest command
821    async fn handle_send_request(&mut self, cmd: DriverCommand) -> Result<()> {
822        if !self.has_available_stream_slot() {
823            // Queue request
824            self.pending_requests.push_back(cmd);
825        } else {
826            // Send immediately
827            self.send_request_internal(cmd).await?;
828        }
829        Ok(())
830    }
831
832    async fn handle_send_streaming_request(&mut self, cmd: DriverCommand) -> Result<()> {
833        if !self.has_available_stream_slot() {
834            self.pending_requests.push_back(cmd);
835        } else {
836            self.send_streaming_request_internal(cmd).await?;
837        }
838        Ok(())
839    }
840
841    /// Process pending requests if slots available
842    async fn process_pending_requests(&mut self) -> Result<()> {
843        while self.has_available_stream_slot() {
844            if let Some(cmd) = self.pending_requests.pop_front() {
845                match cmd {
846                    DriverCommand::SendRequest { .. } => {
847                        self.send_request_internal(cmd).await?;
848                    }
849                    DriverCommand::OpenWebSocketTunnel {
850                        uri,
851                        headers,
852                        response_tx,
853                    } => {
854                        self.open_websocket_tunnel_internal(uri, headers, response_tx)
855                            .await?;
856                    }
857                    DriverCommand::SendStreamingRequest { .. } => {
858                        self.send_streaming_request_internal(cmd).await?;
859                    }
860                    DriverCommand::SendTunnelData {
861                        stream_id,
862                        outbound,
863                    } => {
864                        self.queue_tunnel_outbound(stream_id, outbound).await?;
865                    }
866                }
867            } else {
868                break;
869            }
870        }
871        Ok(())
872    }
873
874    fn active_stream_count(&self) -> usize {
875        self.streams.len() + self.tunnels.len()
876    }
877
878    fn has_available_stream_slot(&self) -> bool {
879        let max_streams = self.config.effective_max_concurrent_streams(
880            self.connection.peer_settings().max_concurrent_streams,
881        );
882        self.active_stream_count() < max_streams
883    }
884
885    /// Internal helper to send request
886    async fn send_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
887        if let DriverCommand::SendRequest {
888            method,
889            uri,
890            headers,
891            body,
892            response_tx,
893        } = cmd
894        {
895            let body_bytes = body.unwrap_or_default();
896            let has_body = !body_bytes.is_empty();
897            let end_stream = !has_body;
898
899            let initial_recv_window = self.connection.local_initial_window_size() as i32;
900            match self
901                .connection
902                .send_headers_raw(&method, &uri, &headers, end_stream)
903                .await
904            {
905                Ok(stream_id) => {
906                    self.streams.insert(
907                        stream_id,
908                        DriverStreamState::new(response_tx, body_bytes, initial_recv_window),
909                    );
910
911                    self.flush_pending_data().await?;
912                }
913                Err(e) => {
914                    if response_tx.send(Err(e)).is_err() {
915                        tracing::debug!("Response channel closed while sending error");
916                    }
917                }
918            }
919        }
920        Ok(())
921    }
922
923    async fn send_streaming_request_internal(&mut self, cmd: DriverCommand) -> Result<()> {
924        if let DriverCommand::SendStreamingRequest {
925            method,
926            uri,
927            headers,
928            body,
929            body_shared,
930            headers_tx,
931            trailers_tx,
932        } = cmd
933        {
934            let (body_bytes, request_stream, end_stream) = match body {
935                RequestBody::Empty => (Bytes::new(), None, true),
936                RequestBody::Bytes(bytes) => {
937                    let end_stream = bytes.is_empty();
938                    (bytes, None, end_stream)
939                }
940                RequestBody::Text(text) => {
941                    let bytes = Bytes::from(text.into_bytes());
942                    let end_stream = bytes.is_empty();
943                    (bytes, None, end_stream)
944                }
945                RequestBody::Json(bytes) => {
946                    let bytes = Bytes::from(bytes);
947                    let end_stream = bytes.is_empty();
948                    (bytes, None, end_stream)
949                }
950                RequestBody::Form(text) => {
951                    let bytes = Bytes::from(text.into_bytes());
952                    let end_stream = bytes.is_empty();
953                    (bytes, None, end_stream)
954                }
955                RequestBody::Stream {
956                    stream,
957                    content_length,
958                } => (
959                    Bytes::new(),
960                    Some(DriverStreamingRequestBody::new(stream, content_length)),
961                    false,
962                ),
963            };
964
965            let initial_recv_window = self.connection.local_initial_window_size() as i32;
966            match self
967                .connection
968                .send_headers_raw(&method, &uri, &headers, end_stream)
969                .await
970            {
971                Ok(stream_id) => {
972                    self.streams.insert(
973                        stream_id,
974                        DriverStreamState::streaming(
975                            headers_tx,
976                            trailers_tx,
977                            body_shared,
978                            body_bytes,
979                            request_stream,
980                            initial_recv_window,
981                        ),
982                    );
983                    self.flush_pending_data().await?;
984                }
985                Err(error) => {
986                    let _ = headers_tx.send(Err(error));
987                }
988            }
989        }
990        Ok(())
991    }
992
993    async fn handle_open_websocket_tunnel(
994        &mut self,
995        uri: Uri,
996        headers: Vec<(String, String)>,
997        response_tx: oneshot::Sender<Result<H2Tunnel>>,
998    ) -> Result<()> {
999        if !self.has_available_stream_slot() {
1000            self.pending_requests
1001                .push_back(DriverCommand::OpenWebSocketTunnel {
1002                    uri,
1003                    headers,
1004                    response_tx,
1005                });
1006            return Ok(());
1007        }
1008
1009        self.open_websocket_tunnel_internal(uri, headers, response_tx)
1010            .await
1011    }
1012
1013    async fn open_websocket_tunnel_internal(
1014        &mut self,
1015        uri: Uri,
1016        headers: Vec<(String, String)>,
1017        response_tx: oneshot::Sender<Result<H2Tunnel>>,
1018    ) -> Result<()> {
1019        let headers = Headers::from(headers);
1020        match self
1021            .connection
1022            .open_extended_connect_websocket_with_end_stream(&uri, &headers)
1023            .await
1024        {
1025            Ok((stream_id, end_stream)) => {
1026                let (outbound_tx, outbound_rx) = mpsc::channel(32);
1027                let (inbound_tx, inbound_rx) = mpsc::channel(32);
1028                let initial_window_size = self.connection.local_initial_window_size();
1029                let initial_recv_window = initial_window_size as i32;
1030                let credit =
1031                    H2TunnelCredit::new(self.body_progress_notify.clone(), initial_window_size);
1032                if end_stream {
1033                    let _ = inbound_tx.send(Ok(H2TunnelEvent::EndStream)).await;
1034                    self.connection.remove_stream(stream_id);
1035                } else {
1036                    let command_tx = self.command_tx.clone();
1037                    tokio::spawn(async move {
1038                        let mut outbound_rx = outbound_rx;
1039                        while let Some(outbound) = outbound_rx.recv().await {
1040                            if command_tx
1041                                .send(DriverCommand::SendTunnelData {
1042                                    stream_id,
1043                                    outbound,
1044                                })
1045                                .await
1046                                .is_err()
1047                            {
1048                                break;
1049                            }
1050                        }
1051                    });
1052                    self.tunnels.insert(
1053                        stream_id,
1054                        DriverTunnelState {
1055                            inbound_tx,
1056                            pending_inbound: VecDeque::new(),
1057                            pending_outbound: VecDeque::new(),
1058                            recv_window: initial_recv_window,
1059                            pending_recv_window_update: 0,
1060                            credit: credit.clone(),
1061                        },
1062                    );
1063                }
1064
1065                if response_tx
1066                    .send(Ok(H2Tunnel::new_with_credit(
1067                        outbound_tx,
1068                        inbound_rx,
1069                        credit,
1070                    )))
1071                    .is_err()
1072                {
1073                    tracing::debug!("Tunnel response channel closed after open");
1074                    self.tunnels.remove(&stream_id);
1075                }
1076            }
1077            Err(e) => {
1078                if response_tx.send(Err(e)).is_err() {
1079                    tracing::debug!("Tunnel response channel closed while sending open error");
1080                }
1081            }
1082        }
1083        Ok(())
1084    }
1085
1086    async fn queue_tunnel_outbound(
1087        &mut self,
1088        stream_id: u32,
1089        outbound: H2TunnelOutbound,
1090    ) -> Result<()> {
1091        if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1092            tunnel.pending_outbound.push_back(outbound);
1093            self.flush_tunnel_data().await?;
1094        }
1095
1096        Ok(())
1097    }
1098
1099    /// Iterate all active streams and try to send pending body data
1100    async fn flush_pending_data(&mut self) -> Result<()> {
1101        if !self.streams.values().any(|stream| {
1102            stream.body_offset < stream.pending_body.len() || stream.request_stream.is_some()
1103        }) {
1104            return Ok(());
1105        }
1106
1107        // Collect IDs to avoid borrow conflict
1108        let stream_ids: Vec<u32> = self.streams.keys().cloned().collect();
1109
1110        for stream_id in stream_ids {
1111            // Keep sending chunks for this stream until blocked or done
1112            loop {
1113                // Check if we have data to send
1114                let (has_data, offset) = if let Some(stream) = self.streams.get(&stream_id) {
1115                    (
1116                        stream.body_offset < stream.pending_body.len(),
1117                        stream.body_offset,
1118                    )
1119                } else {
1120                    (false, 0)
1121                };
1122
1123                if !has_data {
1124                    break;
1125                }
1126
1127                // Prepare arguments for send_data
1128                // We clone the Bytes handle which is cheap
1129                let pending_body = {
1130                    let s = self.streams.get(&stream_id).unwrap();
1131                    s.pending_body.clone()
1132                };
1133
1134                let remaining = &pending_body[offset..];
1135                let is_last_chunk = true;
1136
1137                // send_data returns bytes sent. If 0, it means blocked.
1138                let sent = self
1139                    .connection
1140                    .send_data(stream_id, remaining, is_last_chunk)
1141                    .await?;
1142
1143                if sent > 0 {
1144                    if let Some(stream) = self.streams.get_mut(&stream_id) {
1145                        stream.body_offset += sent;
1146                    }
1147                    // Loop again to send next chunk
1148                } else {
1149                    // Blocked by flow control
1150                    break;
1151                }
1152            }
1153
1154            self.flush_streaming_request_body(stream_id).await?;
1155        }
1156        Ok(())
1157    }
1158
1159    async fn flush_streaming_request_body(&mut self, stream_id: u32) -> Result<()> {
1160        loop {
1161            if self
1162                .streams
1163                .get(&stream_id)
1164                .and_then(|stream| stream.request_stream.as_ref())
1165                .is_none()
1166            {
1167                return Ok(());
1168            }
1169
1170            let available = self.connection.available_send_window(stream_id).await?;
1171            if available <= 0 {
1172                return Ok(());
1173            }
1174
1175            let batch_limit = (available as usize).min(self.connection.max_data_frame_size());
1176            if batch_limit == 0 {
1177                return Ok(());
1178            }
1179
1180            let mut batch = BytesMut::with_capacity(batch_limit);
1181            let mut end_stream = false;
1182            let mut remove_request_stream_after_send = false;
1183
1184            while batch.len() < batch_limit {
1185                let current = self
1186                    .streams
1187                    .get(&stream_id)
1188                    .and_then(|stream| stream.request_stream.as_ref())
1189                    .and_then(|body| {
1190                        body.current_chunk
1191                            .as_ref()
1192                            .map(|chunk| (chunk.clone(), body.current_offset))
1193                    });
1194
1195                if let Some((chunk, offset)) = current {
1196                    let remaining = &chunk[offset..];
1197                    let take = remaining.len().min(batch_limit - batch.len());
1198                    batch.extend_from_slice(&remaining[..take]);
1199
1200                    let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1201                    let body = stream
1202                        .request_stream
1203                        .as_mut()
1204                        .expect("request stream exists");
1205                    body.current_offset += take;
1206                    body.sent += take as u64;
1207                    if body.current_offset >= chunk.len() {
1208                        body.current_chunk = None;
1209                        body.current_offset = 0;
1210                    }
1211                    continue;
1212                }
1213
1214                let poll_result = {
1215                    let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1216                    let body = stream
1217                        .request_stream
1218                        .as_mut()
1219                        .expect("request stream exists");
1220                    if body.finished {
1221                        Poll::Ready(None)
1222                    } else {
1223                        let mut cx = std::task::Context::from_waker(&self.request_body_waker);
1224                        body.stream.as_mut().poll_next(&mut cx)
1225                    }
1226                };
1227
1228                match poll_result {
1229                    Poll::Pending => break,
1230                    Poll::Ready(Some(Ok(chunk))) => {
1231                        if chunk.is_empty() {
1232                            continue;
1233                        }
1234                        let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1235                        let body = stream
1236                            .request_stream
1237                            .as_mut()
1238                            .expect("request stream exists");
1239                        body.current_chunk = Some(chunk);
1240                        body.current_offset = 0;
1241                    }
1242                    Poll::Ready(Some(Err(error))) => {
1243                        self.fail_stream(
1244                            stream_id,
1245                            format!("request body stream error: {}", error),
1246                        )
1247                        .await;
1248                        return Ok(());
1249                    }
1250                    Poll::Ready(None) => {
1251                        let (valid_len, sent, expected, already_sent_end) = {
1252                            let stream = self.streams.get_mut(&stream_id).expect("stream exists");
1253                            let body = stream
1254                                .request_stream
1255                                .as_mut()
1256                                .expect("request stream exists");
1257                            body.finished = true;
1258                            (
1259                                body.content_length
1260                                    .map(|expected| expected == body.sent)
1261                                    .unwrap_or(true),
1262                                body.sent,
1263                                body.content_length,
1264                                body.end_stream_sent,
1265                            )
1266                        };
1267                        if !valid_len {
1268                            self.fail_stream(
1269                                stream_id,
1270                                format!(
1271                                    "sized streaming request body length mismatch: sent {} bytes, Content-Length is {}",
1272                                    sent,
1273                                    expected.unwrap_or_default()
1274                                ),
1275                            )
1276                            .await;
1277                            return Ok(());
1278                        }
1279                        if already_sent_end {
1280                            return Ok(());
1281                        }
1282                        end_stream = true;
1283                        remove_request_stream_after_send = true;
1284                        break;
1285                    }
1286                }
1287            }
1288
1289            if batch.is_empty() {
1290                if end_stream {
1291                    let sent = self.connection.send_data(stream_id, &[], true).await?;
1292                    debug_assert_eq!(sent, 0);
1293                    if let Some(stream) = self.streams.get_mut(&stream_id) {
1294                        if let Some(body) = stream.request_stream.as_mut() {
1295                            body.end_stream_sent = true;
1296                        }
1297                        stream.request_stream = None;
1298                    }
1299                }
1300                return Ok(());
1301            }
1302
1303            let sent = self
1304                .connection
1305                .send_data(stream_id, &batch, end_stream)
1306                .await?;
1307            if sent == 0 {
1308                return Ok(());
1309            }
1310            if sent != batch.len() {
1311                return Err(Error::HttpProtocol(
1312                    "short DATA write after preflighted flow-control window".into(),
1313                ));
1314            }
1315
1316            if remove_request_stream_after_send {
1317                if let Some(stream) = self.streams.get_mut(&stream_id) {
1318                    if let Some(body) = stream.request_stream.as_mut() {
1319                        body.end_stream_sent = true;
1320                    }
1321                    stream.request_stream = None;
1322                }
1323                return Ok(());
1324            }
1325        }
1326    }
1327
1328    async fn flush_tunnel_data(&mut self) -> Result<()> {
1329        if self.tunnels.is_empty() {
1330            return Ok(());
1331        }
1332        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1333
1334        for stream_id in stream_ids {
1335            loop {
1336                let outbound = match self
1337                    .tunnels
1338                    .get_mut(&stream_id)
1339                    .and_then(|tunnel| tunnel.pending_outbound.pop_front())
1340                {
1341                    Some(outbound) => outbound,
1342                    None => break,
1343                };
1344
1345                let sent = self
1346                    .connection
1347                    .send_data(stream_id, &outbound.bytes, outbound.end_stream)
1348                    .await?;
1349
1350                if outbound.bytes.is_empty() {
1351                    continue;
1352                }
1353
1354                if sent == 0 {
1355                    if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1356                        tunnel.pending_outbound.push_front(outbound);
1357                    }
1358                    break;
1359                }
1360
1361                if sent < outbound.bytes.len() {
1362                    if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1363                        tunnel.pending_outbound.push_front(H2TunnelOutbound {
1364                            bytes: outbound.bytes.slice(sent..),
1365                            end_stream: outbound.end_stream,
1366                        });
1367                    }
1368                    break;
1369                }
1370            }
1371        }
1372
1373        Ok(())
1374    }
1375
1376    async fn flush_tunnel_inbound(&mut self) -> Result<()> {
1377        if self.tunnels.is_empty() {
1378            return Ok(());
1379        }
1380
1381        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1382        for stream_id in stream_ids {
1383            let status = self
1384                .tunnels
1385                .get_mut(&stream_id)
1386                .map(DriverTunnelState::flush_inbound)
1387                .unwrap_or(TunnelInboundStatus::Open);
1388            self.apply_tunnel_inbound_status(stream_id, status).await?;
1389        }
1390
1391        Ok(())
1392    }
1393
1394    async fn apply_tunnel_inbound_status(
1395        &mut self,
1396        stream_id: u32,
1397        status: TunnelInboundStatus,
1398    ) -> Result<()> {
1399        match status {
1400            TunnelInboundStatus::Open | TunnelInboundStatus::Blocked => {}
1401            TunnelInboundStatus::Remove => {
1402                self.tunnels.remove(&stream_id);
1403                self.connection.remove_stream(stream_id);
1404                self.process_pending_requests().await?;
1405            }
1406            TunnelInboundStatus::Closed => {
1407                self.tunnels.remove(&stream_id);
1408                self.connection.remove_stream(stream_id);
1409                if let Err(e) = self
1410                    .connection
1411                    .send_rst_stream(stream_id, ErrorCode::Cancel)
1412                    .await
1413                {
1414                    tracing::warn!("Failed to send RST_STREAM for dropped tunnel: {:?}", e);
1415                }
1416                self.process_pending_requests().await?;
1417            }
1418        }
1419        Ok(())
1420    }
1421
1422    async fn apply_released_body_credits(&mut self) -> Result<()> {
1423        let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1424        for stream_id in stream_ids {
1425            let (released, closed) = self
1426                .streams
1427                .get(&stream_id)
1428                .and_then(|stream| stream.streaming_body.as_ref())
1429                .map(|body| (body.take_released_recv_bytes(), body.is_closed()))
1430                .unwrap_or((0, false));
1431
1432            if closed {
1433                self.cancel_stream_for_dropped_receiver(stream_id).await;
1434                continue;
1435            }
1436
1437            if released == 0 {
1438                continue;
1439            }
1440
1441            let refresh_threshold = self.connection.flow_control_refresh_threshold();
1442            let window_update_step = self.connection.stream_window_update_step();
1443            let increment = self.streams.get_mut(&stream_id).and_then(|stream| {
1444                stream.pending_recv_window_update =
1445                    stream.pending_recv_window_update.saturating_add(released);
1446                let should_update = stream.pending_recv_window_update >= window_update_step
1447                    || stream.recv_window < refresh_threshold;
1448                if should_update {
1449                    let increment = stream.pending_recv_window_update.min(u32::MAX as usize);
1450                    stream.pending_recv_window_update -= increment;
1451                    stream.recv_window = stream.recv_window.saturating_add(increment as i32);
1452                    Some(increment as u32)
1453                } else {
1454                    None
1455                }
1456            });
1457
1458            if let Some(increment) = increment {
1459                self.connection
1460                    .send_stream_window_update(stream_id, increment)
1461                    .await?;
1462            }
1463        }
1464
1465        Ok(())
1466    }
1467
1468    async fn apply_released_tunnel_credits(&mut self) -> Result<()> {
1469        let stream_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1470        for stream_id in stream_ids {
1471            let (released, closed) = self
1472                .tunnels
1473                .get(&stream_id)
1474                .map(|tunnel| {
1475                    (
1476                        tunnel.credit.take_released_recv_bytes(),
1477                        tunnel.inbound_tx.is_closed(),
1478                    )
1479                })
1480                .unwrap_or((0, false));
1481
1482            if closed {
1483                self.apply_tunnel_inbound_status(stream_id, TunnelInboundStatus::Closed)
1484                    .await?;
1485                continue;
1486            }
1487
1488            if released == 0 {
1489                continue;
1490            }
1491
1492            let refresh_threshold = self.connection.flow_control_refresh_threshold();
1493            let window_update_step = self.connection.stream_window_update_step();
1494            let increment = self.tunnels.get_mut(&stream_id).and_then(|tunnel| {
1495                tunnel.pending_recv_window_update =
1496                    tunnel.pending_recv_window_update.saturating_add(released);
1497                let should_update = tunnel.pending_recv_window_update >= window_update_step
1498                    || tunnel.recv_window < refresh_threshold;
1499                if should_update {
1500                    let increment = tunnel.pending_recv_window_update.min(u32::MAX as usize);
1501                    tunnel.pending_recv_window_update -= increment;
1502                    tunnel.recv_window = tunnel.recv_window.saturating_add(increment as i32);
1503                    Some(increment as u32)
1504                } else {
1505                    None
1506                }
1507            });
1508
1509            if let Some(increment) = increment {
1510                self.connection
1511                    .send_stream_window_update(stream_id, increment)
1512                    .await?;
1513            }
1514        }
1515
1516        Ok(())
1517    }
1518
1519    async fn fail_stream(&mut self, stream_id: u32, message: String) {
1520        self.connection.remove_stream(stream_id);
1521        if let Some(mut stream) = self.streams.remove(&stream_id) {
1522            Self::note_stream_removed(&stream, &self.inline_active);
1523            if let Some(tx) = stream.response_tx.take() {
1524                let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1525            }
1526            if let Some(tx) = stream.streaming_headers_tx.take() {
1527                let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1528            }
1529            if let Some(tx) = stream.trailers_tx.take() {
1530                let _ = tx.send(Err(Error::HttpProtocol(message.clone())));
1531            }
1532            if let Some(body) = stream.streaming_body.take() {
1533                let _ = body.fail(Error::HttpProtocol(message));
1534            }
1535        }
1536    }
1537
1538    async fn cancel_stream_for_dropped_receiver(&mut self, stream_id: u32) {
1539        if let Some(state) = self.streams.remove(&stream_id) {
1540            Self::note_stream_removed(&state, &self.inline_active);
1541        }
1542        self.connection.remove_stream(stream_id);
1543        if let Err(e) = self
1544            .connection
1545            .send_rst_stream(stream_id, ErrorCode::Cancel)
1546            .await
1547        {
1548            tracing::warn!("Failed to send RST_STREAM for dropped receiver: {:?}", e);
1549        }
1550    }
1551
1552    async fn flush_pending_streaming_bodies(&mut self) -> Result<()> {
1553        if !self.has_pending_streaming_body() {
1554            return Ok(());
1555        }
1556        let stream_ids: Vec<u32> = self
1557            .streams
1558            .iter()
1559            .filter_map(|(stream_id, stream)| {
1560                if stream.streaming_body.is_some()
1561                    && (!stream.pending_streaming_body.is_empty() || stream.pending_streaming_end)
1562                {
1563                    Some(*stream_id)
1564                } else {
1565                    None
1566                }
1567            })
1568            .collect();
1569
1570        for stream_id in stream_ids {
1571            let mut should_cancel = false;
1572            let mut should_complete = false;
1573
1574            if let Some(stream) = self.streams.get_mut(&stream_id) {
1575                let Some(body) = stream.streaming_body.as_ref() else {
1576                    continue;
1577                };
1578
1579                if body.is_closed() {
1580                    should_cancel = true;
1581                } else {
1582                    while let Some(item) = stream.pending_streaming_body.pop_front() {
1583                        match body.push(item) {
1584                            H2BodyPush::Accepted => {}
1585                            H2BodyPush::Full(item) => {
1586                                stream.pending_streaming_body.push_front(item);
1587                                break;
1588                            }
1589                            H2BodyPush::Closed => {
1590                                should_cancel = true;
1591                                break;
1592                            }
1593                        }
1594                    }
1595
1596                    should_complete =
1597                        stream.pending_streaming_end && stream.pending_streaming_body.is_empty();
1598                }
1599            }
1600
1601            if should_cancel {
1602                self.cancel_stream_for_dropped_receiver(stream_id).await;
1603            } else if should_complete {
1604                if let Some(body) = self
1605                    .streams
1606                    .get(&stream_id)
1607                    .and_then(|stream| stream.streaming_body.as_ref())
1608                {
1609                    body.finish();
1610                }
1611                self.complete_stream(stream_id);
1612            }
1613        }
1614
1615        Ok(())
1616    }
1617
1618    /// Handle a single frame
1619    async fn handle_frame(&mut self, header: FrameHeader, mut payload: Bytes) -> Result<()> {
1620        // Drain any inline registrations so a freshly registered stream is
1621        // visible before we try to route this frame to it.
1622        self.drain_inline_registrations();
1623
1624        // Check if receiver has been dropped (is_closed) for this stream before frame processing.
1625        // If dropped, send RST_STREAM(CANCEL) and evict.
1626        if header.stream_id != 0 {
1627            if let Some(stream) = self.streams.get(&header.stream_id) {
1628                if let Some(ref body) = stream.streaming_body {
1629                    if body.is_closed() {
1630                        let stream_id = header.stream_id;
1631                        self.cancel_stream_for_dropped_receiver(stream_id).await;
1632                        return Ok(());
1633                    }
1634                }
1635            }
1636        }
1637
1638        // 1. Check control frames that modify connection state
1639        if matches!(
1640            header.frame_type,
1641            FrameType::Settings
1642                | FrameType::WindowUpdate
1643                | FrameType::Ping
1644                | FrameType::GoAway
1645                | FrameType::RstStream
1646                | FrameType::PushPromise
1647        ) {
1648            match self
1649                .connection
1650                .handle_control_frame(&header, payload.clone())
1651                .await?
1652            {
1653                ControlAction::RstStream(sid, code) => {
1654                    if let Some(tunnel) = self.tunnels.remove(&sid) {
1655                        let _ = tunnel
1656                            .inbound_tx
1657                            .send(Ok(H2TunnelEvent::Reset(format!("{:?}", code))))
1658                            .await;
1659                    }
1660                    // Notify stream of reset
1661                    self.fail_stream(sid, format!("Stream reset by peer: {:?}", code))
1662                        .await;
1663                    // Stream slot freed, try to process pending
1664                    self.process_pending_requests().await?;
1665                    return Ok(());
1666                }
1667                ControlAction::GoAway(last_sid) => {
1668                    self.goaway_received
1669                        .store(true, std::sync::atomic::Ordering::Relaxed);
1670                    let stream_ids: Vec<u32> = self.streams.keys().copied().collect();
1671                    for sid in stream_ids {
1672                        if let Some(stream) = self.streams.get(&sid) {
1673                            if stream.streaming_body.is_some()
1674                                && stream
1675                                    .streaming_body
1676                                    .as_ref()
1677                                    .is_some_and(|body| body.is_slot_available())
1678                            {
1679                                if let Some(body) = stream.streaming_body.as_ref() {
1680                                    body.finish();
1681                                }
1682                            }
1683                        }
1684                    }
1685                    let tunnel_ids: Vec<u32> = self.tunnels.keys().copied().collect();
1686                    for sid in tunnel_ids {
1687                        if sid > last_sid {
1688                            if let Some(tunnel) = self.tunnels.remove(&sid) {
1689                                let _ = tunnel
1690                                    .inbound_tx
1691                                    .send(Ok(H2TunnelEvent::GoAway {
1692                                        last_stream_id: last_sid,
1693                                    }))
1694                                    .await;
1695                            }
1696                        }
1697                    }
1698                    // Close all streams > last_sid
1699                    let sids: Vec<u32> = self.streams.keys().cloned().collect();
1700                    for sid in sids {
1701                        if sid > last_sid {
1702                            self.fail_stream(sid, "GOAWAY received".into()).await;
1703                        }
1704                    }
1705                    // Driver continues processing existing streams until they complete.
1706                    // A future enhancement could implement immediate shutdown on GOAWAY.
1707                    return Ok(());
1708                }
1709                ControlAction::RefusePush(_stream_id, promised_id) => {
1710                    // Send RST_STREAM for the promised stream
1711                    // RFC 9113 8.4: RST_STREAM with REFUSED_STREAM
1712                    if let Err(e) = self
1713                        .connection
1714                        .send_rst_stream(promised_id, ErrorCode::RefusedStream)
1715                        .await
1716                    {
1717                        tracing::warn!(
1718                            "Failed to send RST_STREAM for refused push promise: {:?}",
1719                            e
1720                        );
1721                    }
1722                }
1723                ControlAction::PingAck(data) => {
1724                    if self
1725                        .pending_ping
1726                        .is_some_and(|(pending_data, _)| pending_data == data)
1727                    {
1728                        self.pending_ping = None;
1729                    }
1730                    return Ok(());
1731                }
1732                ControlAction::None => {
1733                    // Continue to specific processing
1734                }
1735            }
1736        }
1737
1738        // 2. Data / Headers routing
1739        match header.frame_type {
1740            FrameType::Headers => {
1741                let stream_id = header.stream_id;
1742
1743                // Handle CONTINUATION frames if needed (END_HEADERS flag not set).
1744                // CONTINUATION frames are collected in the loop below; this branch handles
1745                // the initial HEADERS frame that starts a header block.
1746                if (header.flags & flags::END_HEADERS) == 0 {
1747                    // Loop to read CONTINUATION frames
1748                    // This inner loop blocks the driver select! loop, which is expected
1749                    // per RFC 9113 Section 6.2 (CONTINUATION frames must be processed sequentially).
1750                    let mut block = BytesMut::from(payload);
1751                    loop {
1752                        let (next_header, next_payload) = self.connection.read_next_frame().await?;
1753                        if next_header.frame_type != FrameType::Continuation {
1754                            return Err(Error::HttpProtocol("Expected CONTINUATION frame".into()));
1755                        }
1756                        if next_header.stream_id != stream_id {
1757                            return Err(Error::HttpProtocol(
1758                                "CONTINUATION frame stream ID mismatch".into(),
1759                            ));
1760                        }
1761                        block.extend_from_slice(&next_payload);
1762                        if (next_header.flags & flags::END_HEADERS) != 0 {
1763                            break;
1764                        }
1765                    }
1766                    payload = block.freeze();
1767                }
1768
1769                let decoded = self.connection.decode_header_block(payload)?;
1770
1771                // Parse pseudo-headers
1772                let mut status = 0u16;
1773                let mut regular_headers = Vec::new();
1774
1775                for (name, value) in decoded {
1776                    if name == ":status" {
1777                        status = value.parse().unwrap_or(0);
1778                    } else if !name.starts_with(':') {
1779                        regular_headers.push((name, value));
1780                    }
1781                }
1782
1783                if let Some(stream) = self.streams.get_mut(&stream_id) {
1784                    if status >= 200 {
1785                        let header_end_stream = (header.flags & flags::END_STREAM) != 0
1786                            || self.goaway_received.load(Ordering::Relaxed);
1787                        if let Some(tx) = stream.streaming_headers_tx.take() {
1788                            let _ = tx.send(Ok((status, regular_headers)));
1789                            if header_end_stream {
1790                                if let Some(body) = stream.streaming_body.as_ref() {
1791                                    body.finish();
1792                                }
1793                            }
1794                        } else {
1795                            stream.status = Some(status);
1796                            stream.headers = regular_headers;
1797                        }
1798                    } else if status > 0 {
1799                        // 1xx informational status
1800                        tracing::debug!("H2Driver: Ignoring informational status {}", status);
1801                    } else {
1802                        // status == 0, likely trailers HEADERS frame (no :status)
1803                        tracing::debug!("H2Driver: Received trailers for stream {}", stream_id);
1804                        // Route the already-decoded trailer block out via the
1805                        // side channel when the caller requested trailers. When
1806                        // `trailers_tx` is `None` (no `te: trailers`), this is a
1807                        // no-op and `regular_headers` is dropped exactly as
1808                        // before, off the DATA hot path.
1809                        if let Some(tx) = stream.trailers_tx.take() {
1810                            let _ = tx.send(Ok(regular_headers));
1811                        }
1812                        if (header.flags & flags::END_STREAM) != 0 {
1813                            if let Some(body) = stream.streaming_body.as_ref() {
1814                                body.finish();
1815                            }
1816                        }
1817                    }
1818
1819                    if (header.flags & flags::END_STREAM) != 0 {
1820                        self.complete_stream(stream_id);
1821                    }
1822                }
1823            }
1824            FrameType::Data => {
1825                let stream_id = header.stream_id;
1826                let end_stream = (header.flags & flags::END_STREAM) != 0;
1827
1828                let data =
1829                    self.connection
1830                        .parse_inbound_data_payload(stream_id, header.flags, payload)?;
1831                let data_len = data.len();
1832                if let Some(increment) = self
1833                    .connection
1834                    .apply_conn_inbound_flow_control_delta(data_len)
1835                {
1836                    self.connection
1837                        .send_connection_window_update(increment)
1838                        .await?;
1839                }
1840
1841                if let Some(tunnel) = self.tunnels.get_mut(&stream_id) {
1842                    if data_len > 0 {
1843                        tunnel.recv_window -= data_len as i32;
1844                    }
1845
1846                    let mut status = TunnelInboundStatus::Open;
1847                    if !data.is_empty() {
1848                        status = tunnel.push_inbound(H2TunnelEvent::Data(data));
1849                    }
1850                    if status == TunnelInboundStatus::Open && end_stream {
1851                        status = tunnel.push_inbound(H2TunnelEvent::EndStream);
1852                    }
1853                    self.apply_tunnel_inbound_status(stream_id, status).await?;
1854                    return Ok(());
1855                }
1856
1857                let mut should_cancel = false;
1858                let mut should_complete = false;
1859                let mut should_finish_body = false;
1860                let mut should_yield_before_complete = false;
1861
1862                if let Some(stream) = self.streams.get_mut(&stream_id) {
1863                    if data_len > 0 {
1864                        stream.recv_window -= data_len as i32;
1865                    }
1866
1867                    if let Some(body) = stream.streaming_body.as_ref() {
1868                        if !data.is_empty() {
1869                            if stream.pending_streaming_body.is_empty() {
1870                                let push = body.push_data(data, end_stream);
1871                                match push {
1872                                    H2BodyDataPush::Accepted { .. } => {}
1873                                    H2BodyDataPush::Full(data) => {
1874                                        stream.pending_streaming_body.push_back(Ok(data));
1875                                    }
1876                                    H2BodyDataPush::Closed => {
1877                                        should_cancel = true;
1878                                    }
1879                                }
1880                            } else {
1881                                stream.pending_streaming_body.push_back(Ok(data));
1882                            }
1883                        }
1884
1885                        if end_stream {
1886                            if stream.pending_streaming_body.is_empty() {
1887                                should_complete = true;
1888                                should_finish_body = data_len == 0;
1889                                should_yield_before_complete = true;
1890                            } else {
1891                                stream.pending_streaming_end = true;
1892                            }
1893                        }
1894                    } else {
1895                        stream.body.extend_from_slice(&data);
1896                        should_complete = end_stream;
1897                    }
1898                }
1899
1900                if should_cancel {
1901                    self.cancel_stream_for_dropped_receiver(stream_id).await;
1902                    return Ok(());
1903                }
1904
1905                if should_complete {
1906                    if should_finish_body {
1907                        if let Some(body) = self
1908                            .streams
1909                            .get(&stream_id)
1910                            .and_then(|stream| stream.streaming_body.as_ref())
1911                        {
1912                            body.finish();
1913                        }
1914                    }
1915                    if should_yield_before_complete {
1916                        tokio::task::yield_now().await;
1917                    }
1918                    self.complete_stream(stream_id);
1919                }
1920            }
1921            FrameType::WindowUpdate => {
1922                // Window update received and processed by handle_control_frame,
1923                // which updates the connection/stream window in self.connection.
1924                // Flush any pending data that was previously blocked by flow control.
1925                self.flush_pending_data().await?;
1926                self.flush_tunnel_data().await?;
1927            }
1928            _ => {} // Other frames handled by handle_control_frame (or ignored)
1929        }
1930
1931        Ok(())
1932    }
1933
1934    /// Complete a stream: build response and send
1935    fn complete_stream(&mut self, stream_id: u32) {
1936        if let Some(mut stream) = self.streams.remove(&stream_id) {
1937            if !stream.inline {
1938                self.connection.remove_stream(stream_id);
1939            }
1940            Self::note_stream_removed(&stream, &self.inline_active);
1941            if let Some(tx) = stream.response_tx.take() {
1942                // If no status was received, this is a protocol violation
1943                // Return an error rather than defaulting to 200
1944                let response = match stream.status {
1945                    Some(status) => Ok(StreamResponse {
1946                        status,
1947                        headers: stream.headers,
1948                        body: stream.body.freeze(),
1949                    }),
1950                    None => Err(Error::HttpProtocol(format!(
1951                        "Stream {} completed without status code",
1952                        stream_id
1953                    ))),
1954                };
1955                if tx.send(response).is_err() {
1956                    tracing::debug!("Response channel closed while completing stream");
1957                }
1958            }
1959        }
1960        // Stream slot is now available. The main loop will call process_pending_requests
1961        // to process any queued requests waiting for available stream slots.
1962    }
1963}