Skip to main content

ringline_grpc/
connection.rs

1//! gRPC client connection state machine.
2//!
3//! `GrpcConnection` wraps an `H2Connection` and adds gRPC message framing,
4//! header conventions, and status extraction from trailers.
5
6use std::collections::{HashMap, VecDeque};
7use std::time::Duration;
8
9use ringline_h2::hpack::HeaderField;
10use ringline_h2::settings::Settings;
11use ringline_h2::{ErrorCode, H2Connection, H2Event};
12
13use crate::error::{GrpcError, GrpcStatus};
14use crate::message::{self, BufferDecode, MessageBuffer};
15
16/// Per-stream state for tracking the server's chosen encoding and the
17/// progress of the inbound message reassembly.
18#[derive(Debug)]
19struct StreamState {
20    buffer: MessageBuffer,
21    /// The encoding advertised by the server for this stream (from `grpc-encoding` header).
22    encoding: Option<String>,
23    /// `true` once we've delivered the initial `Response` event for this
24    /// stream — used to gate DATA-before-HEADERS and to detect
25    /// trailers-only paths.
26    response_seen: bool,
27}
28
29impl StreamState {
30    fn new(max_message_size: usize) -> Self {
31        Self {
32            buffer: MessageBuffer::new(max_message_size),
33            encoding: None,
34            response_seen: false,
35        }
36    }
37}
38
39/// Events produced by the gRPC connection for the application.
40///
41/// Marked `#[non_exhaustive]` because the crate is still evolving.
42#[derive(Debug)]
43#[non_exhaustive]
44pub enum GrpcEvent {
45    /// HTTP/2 settings exchange complete; connection is ready.
46    Ready,
47    /// Initial response metadata received.
48    Response {
49        stream_id: u32,
50        metadata: Vec<HeaderField>,
51    },
52    /// A complete gRPC message (length-prefix stripped).
53    Message { stream_id: u32, data: Vec<u8> },
54    /// Stream completed with a gRPC status (from trailers).
55    Status {
56        stream_id: u32,
57        status: GrpcStatus,
58        message: String,
59        metadata: Vec<HeaderField>,
60    },
61    /// Connection-level shutdown.
62    GoAway {
63        last_stream_id: u32,
64        error_code: ErrorCode,
65        debug_data: Vec<u8>,
66    },
67    /// Error event.
68    Error(GrpcError),
69}
70
71/// Sans-IO gRPC client connection wrapping an `H2Connection`.
72pub struct GrpcConnection {
73    h2: H2Connection,
74    ready: bool,
75    /// Per-stream message reassembly buffers.
76    buffers: HashMap<u32, StreamState>,
77    /// Pending gRPC events.
78    events: VecDeque<GrpcEvent>,
79    /// Cap on a single received gRPC message's size, both pre-decompression
80    /// (header length prefix) and post-decompression. Defaults to 4 MiB
81    /// per the gRPC standard `max_receive_message_length`.
82    max_message_size: usize,
83}
84
85impl GrpcConnection {
86    /// Create a new gRPC connection with the given HTTP/2 settings.
87    pub fn new(settings: Settings) -> Self {
88        Self {
89            h2: H2Connection::new(settings),
90            ready: false,
91            buffers: HashMap::new(),
92            events: VecDeque::new(),
93            max_message_size: crate::message::DEFAULT_MAX_MESSAGE_SIZE,
94        }
95    }
96
97    /// Override the cap on a single received gRPC message. Defaults to
98    /// 4 MiB. Applied to the length-prefix decode (raw frame size) and to
99    /// the decompressed output if `grpc-encoding` is set.
100    pub fn set_max_message_size(&mut self, n: usize) {
101        self.max_message_size = n;
102    }
103
104    /// Feed received bytes from the transport.
105    pub fn recv(&mut self, data: &[u8]) -> Result<(), GrpcError> {
106        self.h2.recv(data)?;
107        self.translate_events();
108        Ok(())
109    }
110
111    /// Poll the next gRPC event, if any.
112    pub fn poll_event(&mut self) -> Option<GrpcEvent> {
113        self.events.pop_front()
114    }
115
116    /// Take all pending bytes to send to the transport.
117    pub fn take_pending_send(&mut self) -> Vec<u8> {
118        self.h2.take_pending_send()
119    }
120
121    /// Whether there are bytes pending to send.
122    pub fn has_pending_send(&self) -> bool {
123        self.h2.has_pending_send()
124    }
125
126    /// Send a unary gRPC request (headers + length-prefixed body + end_stream).
127    ///
128    /// Returns the stream ID. Fails with `GrpcError::NotReady` if the HTTP/2
129    /// settings exchange hasn't completed yet — sending before SETTINGS ACK
130    /// can violate frame-size or stream-concurrency limits the server is
131    /// about to advertise.
132    pub fn send_unary(
133        &mut self,
134        service: &str,
135        method: &str,
136        body: &[u8],
137        metadata: &[HeaderField],
138    ) -> Result<u32, GrpcError> {
139        self.send_unary_inner(service, method, body, metadata, None)
140    }
141
142    /// Same as [`send_unary`](Self::send_unary) but also encodes a
143    /// `grpc-timeout` header so the server knows when to cancel work on
144    /// the application's behalf. The encoded timeout uses the smallest
145    /// gRPC unit (n/u/m/S/M/H) that fits the value in ≤ 8 digits per
146    /// the gRPC spec. Note: this only *advertises* the deadline to the
147    /// server. Client-side enforcement (cancel + emit Status with
148    /// DeadlineExceeded) is the caller's responsibility.
149    pub fn send_unary_with_deadline(
150        &mut self,
151        service: &str,
152        method: &str,
153        body: &[u8],
154        metadata: &[HeaderField],
155        deadline: Duration,
156    ) -> Result<u32, GrpcError> {
157        self.send_unary_inner(service, method, body, metadata, Some(deadline))
158    }
159
160    fn send_unary_inner(
161        &mut self,
162        service: &str,
163        method: &str,
164        body: &[u8],
165        metadata: &[HeaderField],
166        deadline: Option<Duration>,
167    ) -> Result<u32, GrpcError> {
168        if !self.ready {
169            return Err(GrpcError::NotReady);
170        }
171        let stream_id =
172            self.send_headers_with_deadline(service, method, metadata, false, deadline)?;
173
174        // Encode the gRPC length-prefixed message.
175        let mut framed = Vec::new();
176        message::encode(body, &mut framed).map_err(|e| GrpcError::InvalidMessage(e.to_string()))?;
177
178        self.h2.send_data(stream_id, &framed, true)?;
179
180        // Allocate a message buffer for the response.
181        self.buffers
182            .insert(stream_id, StreamState::new(self.max_message_size));
183
184        Ok(stream_id)
185    }
186
187    /// Start a streaming gRPC request (headers only, no end_stream).
188    ///
189    /// Returns the stream ID. Use `send_message()` to send body frames.
190    /// Fails with `GrpcError::NotReady` if the HTTP/2 settings exchange
191    /// hasn't completed yet.
192    pub fn start_request(
193        &mut self,
194        service: &str,
195        method: &str,
196        metadata: &[HeaderField],
197    ) -> Result<u32, GrpcError> {
198        if !self.ready {
199            return Err(GrpcError::NotReady);
200        }
201        let stream_id = self.send_headers(service, method, metadata, false)?;
202        self.buffers
203            .insert(stream_id, StreamState::new(self.max_message_size));
204        Ok(stream_id)
205    }
206
207    /// Send a gRPC message on an open stream.
208    pub fn send_message(
209        &mut self,
210        stream_id: u32,
211        body: &[u8],
212        end_stream: bool,
213    ) -> Result<(), GrpcError> {
214        let mut framed = Vec::new();
215        message::encode(body, &mut framed).map_err(|e| GrpcError::InvalidMessage(e.to_string()))?;
216        self.h2.send_data(stream_id, &framed, end_stream)?;
217        Ok(())
218    }
219
220    /// Cancel a stream with RST_STREAM CANCEL. Pushes a terminal
221    /// `Status::Cancelled` event so any caller awaiting completion
222    /// observes the cancellation rather than hanging.
223    pub fn cancel(&mut self, stream_id: u32) {
224        self.h2.reset_stream(stream_id, ErrorCode::Cancel);
225        if self.buffers.remove(&stream_id).is_some() {
226            self.events.push_back(GrpcEvent::Status {
227                stream_id,
228                status: GrpcStatus::Cancelled,
229                message: "cancelled by client".into(),
230                metadata: Vec::new(),
231            });
232        }
233    }
234
235    // -- Internal --
236
237    fn send_headers(
238        &mut self,
239        service: &str,
240        method: &str,
241        metadata: &[HeaderField],
242        end_stream: bool,
243    ) -> Result<u32, GrpcError> {
244        self.send_headers_with_deadline(service, method, metadata, end_stream, None)
245    }
246
247    fn send_headers_with_deadline(
248        &mut self,
249        service: &str,
250        method: &str,
251        metadata: &[HeaderField],
252        end_stream: bool,
253        deadline: Option<Duration>,
254    ) -> Result<u32, GrpcError> {
255        let path = format!("/{service}/{method}");
256        let mut headers = vec![
257            HeaderField::new(b":method", b"POST"),
258            HeaderField::new(b":path", path.as_bytes()),
259            HeaderField::new(b":scheme", b"https"),
260            HeaderField::new(b"content-type", b"application/grpc"),
261            HeaderField::new(b"te", b"trailers"),
262        ];
263        if let Some(enc) = crate::compress::accept_encoding_value() {
264            headers.push(HeaderField::new(b"grpc-accept-encoding", enc.as_bytes()));
265        }
266        if let Some(d) = deadline {
267            let encoded = encode_grpc_timeout(d);
268            headers.push(HeaderField::new(b"grpc-timeout", encoded.as_bytes()));
269        }
270        headers.extend_from_slice(metadata);
271
272        let stream_id = self.h2.send_request(&headers, end_stream)?;
273        Ok(stream_id)
274    }
275
276    fn translate_events(&mut self) {
277        while let Some(h2_event) = self.h2.poll_event() {
278            match h2_event {
279                H2Event::SettingsAcknowledged => {
280                    self.ready = true;
281                    self.events.push_back(GrpcEvent::Ready);
282                }
283                H2Event::Response {
284                    stream_id,
285                    headers,
286                    end_stream,
287                } => {
288                    self.handle_response(stream_id, headers, end_stream);
289                }
290                H2Event::Data {
291                    stream_id,
292                    data,
293                    end_stream,
294                } => {
295                    self.handle_data(stream_id, &data, end_stream);
296                }
297                H2Event::Trailers { stream_id, headers } => {
298                    self.handle_trailers(stream_id, headers);
299                }
300                H2Event::StreamReset {
301                    stream_id,
302                    error_code,
303                } => {
304                    self.buffers.remove(&stream_id);
305                    self.events.push_back(GrpcEvent::Status {
306                        stream_id,
307                        status: GrpcStatus::Internal,
308                        message: format!("stream reset: {error_code:?}"),
309                        metadata: Vec::new(),
310                    });
311                }
312                H2Event::GoAway {
313                    last_stream_id,
314                    error_code,
315                    debug_data,
316                } => {
317                    // Streams above `last_stream_id` will not be processed
318                    // by the server. Emit a terminal `Status::Unavailable`
319                    // for each so callers don't hang awaiting completion,
320                    // and drop the per-stream buffers we'd otherwise leak.
321                    let stranded: Vec<u32> = self
322                        .buffers
323                        .keys()
324                        .copied()
325                        .filter(|id| *id > last_stream_id)
326                        .collect();
327                    for stream_id in stranded {
328                        self.buffers.remove(&stream_id);
329                        self.events.push_back(GrpcEvent::Status {
330                            stream_id,
331                            status: GrpcStatus::Unavailable,
332                            message: format!("GoAway: {error_code:?}"),
333                            metadata: Vec::new(),
334                        });
335                    }
336                    self.events.push_back(GrpcEvent::GoAway {
337                        last_stream_id,
338                        error_code,
339                        debug_data,
340                    });
341                }
342                H2Event::Error(e) => {
343                    self.events.push_back(GrpcEvent::Error(GrpcError::H2(e)));
344                }
345                H2Event::PingAcknowledged { .. } => {}
346            }
347        }
348    }
349
350    fn handle_response(&mut self, stream_id: u32, headers: Vec<HeaderField>, end_stream: bool) {
351        // Ensure we have a buffer even for server-push scenarios.
352        let state = self
353            .buffers
354            .entry(stream_id)
355            .or_insert_with(|| StreamState::new(self.max_message_size));
356        state.response_seen = true;
357
358        // Inspect :status before grpc-status — per the gRPC-over-HTTP/2
359        // spec, a non-200 transport status maps to a specific grpc-status
360        // and overrides what trailers say (or substitutes for them on a
361        // trailers-only path).
362        let http_status_override = http_status_to_grpc_status(&headers);
363
364        // Capture grpc-encoding for the message-decode path even when
365        // end_stream is true; harmless and keeps the path symmetric.
366        for h in &headers {
367            if header_name_eq(&h.name, b"grpc-encoding") {
368                state.encoding = Some(String::from_utf8_lossy(&h.value).into_owned());
369            }
370        }
371
372        if end_stream {
373            // Trailers-only response: HEADERS+END_STREAM carries grpc-status
374            // (gRPC spec §2). Filter grpc-status/grpc-message out of the
375            // metadata view to match the two-headers path.
376            let (status, message) = derive_status(&headers, http_status_override);
377            let metadata: Vec<HeaderField> = headers
378                .iter()
379                .filter(|h| {
380                    !header_name_eq(&h.name, b"grpc-status")
381                        && !header_name_eq(&h.name, b"grpc-message")
382                })
383                .cloned()
384                .collect();
385            self.events.push_back(GrpcEvent::Response {
386                stream_id,
387                metadata: metadata.clone(),
388            });
389            self.buffers.remove(&stream_id);
390            self.events.push_back(GrpcEvent::Status {
391                stream_id,
392                status,
393                message,
394                metadata,
395            });
396            return;
397        }
398
399        self.events.push_back(GrpcEvent::Response {
400            stream_id,
401            metadata: headers,
402        });
403    }
404
405    fn handle_data(&mut self, stream_id: u32, data: &[u8], end_stream: bool) {
406        // DATA before HEADERS is a protocol violation — the server has
407        // not yet identified the response. Reset and surface an Internal
408        // status instead of emitting Message events for an unannounced
409        // stream.
410        let response_seen = self
411            .buffers
412            .get(&stream_id)
413            .map(|s| s.response_seen)
414            .unwrap_or(false);
415        if !response_seen {
416            self.h2.reset_stream(stream_id, ErrorCode::ProtocolError);
417            self.buffers.remove(&stream_id);
418            self.events.push_back(GrpcEvent::Status {
419                stream_id,
420                status: GrpcStatus::Internal,
421                message: "received DATA before HEADERS".into(),
422                metadata: Vec::new(),
423            });
424            return;
425        }
426
427        let max = self.max_message_size;
428        if let Some(state) = self.buffers.get_mut(&stream_id) {
429            // If the per-stream buffer fills up — the peer is
430            // sending data faster than message boundaries
431            // arrive — fail the stream rather than OOM.
432            if let Err(e) = state.buffer.push(data) {
433                self.fail_stream(stream_id, GrpcStatus::ResourceExhausted, e.to_string());
434                return;
435            }
436            loop {
437                match state.buffer.try_decode() {
438                    BufferDecode::Complete(payload, compressed) => {
439                        let data = if compressed {
440                            match &state.encoding {
441                                // Compressed flag set with a known encoding — decompress
442                                // or fail the stream with INTERNAL (do NOT silently fall
443                                // back to raw bytes; the application can't tell the
444                                // difference between a real message and our garbage).
445                                Some(enc) => {
446                                    match crate::compress::decompress(enc, &payload, max) {
447                                        Ok(d) => d,
448                                        Err(e) => {
449                                            self.fail_stream(
450                                                stream_id,
451                                                GrpcStatus::Internal,
452                                                format!("decompression failed: {e}"),
453                                            );
454                                            break;
455                                        }
456                                    }
457                                }
458                                // Compressed flag set but no grpc-encoding header — peer
459                                // is malformed; treat as INTERNAL rather than silently
460                                // delivering raw compressed bytes as the message.
461                                None => {
462                                    self.fail_stream(
463                                        stream_id,
464                                        GrpcStatus::Internal,
465                                        "compressed flag set but no grpc-encoding header".into(),
466                                    );
467                                    break;
468                                }
469                            }
470                        } else {
471                            payload
472                        };
473                        self.events
474                            .push_back(GrpcEvent::Message { stream_id, data });
475                    }
476                    BufferDecode::Incomplete => break,
477                    BufferDecode::TooLarge(n) => {
478                        self.fail_stream(
479                            stream_id,
480                            GrpcStatus::ResourceExhausted,
481                            format!("message length {n} exceeds cap {max}"),
482                        );
483                        break;
484                    }
485                }
486            }
487        }
488
489        if end_stream {
490            self.emit_status_from_cleanup(stream_id, &[]);
491        }
492    }
493
494    fn handle_trailers(&mut self, stream_id: u32, headers: Vec<HeaderField>) {
495        // Drain any remaining buffered messages. Decompression
496        // failures on these final messages fall through to the
497        // status emission below (we'd want to surface them but
498        // the trailer arrival is the authoritative end signal).
499        let max = self.max_message_size;
500        if let Some(state) = self.buffers.get_mut(&stream_id) {
501            while let BufferDecode::Complete(payload, compressed) = state.buffer.try_decode() {
502                let data = if compressed {
503                    match &state.encoding {
504                        Some(enc) => {
505                            match crate::compress::decompress(enc, &payload, max) {
506                                Ok(d) => d,
507                                Err(_) => break, // surfaced via grpc-status below
508                            }
509                        }
510                        None => break,
511                    }
512                } else {
513                    payload
514                };
515                self.events
516                    .push_back(GrpcEvent::Message { stream_id, data });
517            }
518        }
519
520        // Extract grpc-status and grpc-message from trailers.
521        let status = extract_grpc_status(&headers);
522        let message = extract_grpc_message(&headers);
523        let remaining: Vec<HeaderField> = headers
524            .into_iter()
525            .filter(|h| h.name != b"grpc-status" && h.name != b"grpc-message")
526            .collect();
527
528        self.events.push_back(GrpcEvent::Status {
529            stream_id,
530            status,
531            message,
532            metadata: remaining,
533        });
534        self.buffers.remove(&stream_id);
535    }
536
537    /// Emit a status event when the stream ends without explicit trailers
538    /// (e.g., end_stream on a DATA frame). Per the gRPC spec, every
539    /// stream must end with a HEADERS frame carrying trailers — missing
540    /// trailers indicates a malformed response. If the reassembly buffer
541    /// still has bytes (a partial message), surface that distinctly.
542    fn emit_status_from_cleanup(&mut self, stream_id: u32, _headers: &[HeaderField]) {
543        let truncated = self
544            .buffers
545            .get(&stream_id)
546            .map(|s| !s.buffer.is_empty())
547            .unwrap_or(false);
548        self.buffers.remove(&stream_id);
549        let message = if truncated {
550            "stream ended mid-message without trailers".into()
551        } else {
552            "stream ended without trailers".into()
553        };
554        self.events.push_back(GrpcEvent::Status {
555            stream_id,
556            status: GrpcStatus::Internal,
557            message,
558            metadata: Vec::new(),
559        });
560    }
561
562    /// Reset a stream from the gRPC layer with an explicit status and
563    /// reason. Sends RST_STREAM(CANCEL) at the H2 layer, removes the
564    /// per-stream buffer, and emits a Status event so the caller learns
565    /// the RPC outcome.
566    fn fail_stream(&mut self, stream_id: u32, status: GrpcStatus, message: String) {
567        self.h2.reset_stream(stream_id, ErrorCode::Cancel);
568        self.buffers.remove(&stream_id);
569        self.events.push_back(GrpcEvent::Status {
570            stream_id,
571            status,
572            message,
573            metadata: Vec::new(),
574        });
575    }
576}
577
578/// Extract `grpc-status` from trailer headers. A missing or malformed
579/// status is treated as `Unknown` — defaulting to `Ok` would let a
580/// misbehaving server's incomplete trailers look like a successful RPC.
581/// The gRPC spec requires every response to carry a `grpc-status` trailer.
582fn extract_grpc_status(headers: &[HeaderField]) -> GrpcStatus {
583    headers
584        .iter()
585        .find(|h| h.name == b"grpc-status")
586        .and_then(|h| std::str::from_utf8(&h.value).ok())
587        .and_then(|s| s.parse::<u8>().ok())
588        .map(GrpcStatus::from_u8)
589        .unwrap_or(GrpcStatus::Unknown)
590}
591
592/// Header-name comparison. h2 already rejects uppercase names so direct
593/// byte comparison would suffice, but trailer-frame validation lives in a
594/// different code path; case-insensitive here is the conservative choice.
595fn header_name_eq(a: &[u8], b: &[u8]) -> bool {
596    a.eq_ignore_ascii_case(b)
597}
598
599/// Derive `(status, message)` from a header section. Per gRPC spec
600/// `grpc-status` is mandatory in any terminal HEADERS — missing or
601/// unparseable values default to `Internal` (NOT `Ok`) with a diagnostic
602/// so a peer that silently drops the trailer can't be mistaken for
603/// success. An `http_status_override` (set by [`http_status_to_grpc_status`])
604/// preempts trailer values when the HTTP-level transport reported an
605/// error.
606fn derive_status(
607    headers: &[HeaderField],
608    http_status_override: Option<(GrpcStatus, String)>,
609) -> (GrpcStatus, String) {
610    if let Some((s, msg)) = http_status_override {
611        return (s, msg);
612    }
613    let raw = headers
614        .iter()
615        .find(|h| header_name_eq(&h.name, b"grpc-status"))
616        .map(|h| h.value.clone());
617    let status = match raw {
618        Some(bytes) => match std::str::from_utf8(&bytes)
619            .ok()
620            .and_then(|s| s.parse::<u32>().ok())
621        {
622            Some(n) if n <= 16 => GrpcStatus::from_u8(n as u8),
623            Some(n) => {
624                return (
625                    GrpcStatus::Internal,
626                    format!("invalid grpc-status value: {n}"),
627                );
628            }
629            None => {
630                return (
631                    GrpcStatus::Internal,
632                    format!(
633                        "invalid grpc-status value: {:?}",
634                        String::from_utf8_lossy(&bytes)
635                    ),
636                );
637            }
638        },
639        None => {
640            return (GrpcStatus::Internal, "missing grpc-status trailer".into());
641        }
642    };
643    let message = extract_grpc_message(headers);
644    (status, message)
645}
646
647/// Extract `grpc-message` from a header section. The value is
648/// percent-encoded per gRPC spec (so arbitrary bytes survive the HPACK
649/// ASCII restriction). Decode `%XX` byte escapes and produce a lossy
650/// UTF-8 string. Returns the empty string when absent.
651fn extract_grpc_message(headers: &[HeaderField]) -> String {
652    let raw = match headers
653        .iter()
654        .find(|h| header_name_eq(&h.name, b"grpc-message"))
655    {
656        Some(h) => &h.value[..],
657        None => return String::new(),
658    };
659    percent_decode_to_string(raw)
660}
661
662fn percent_decode_to_string(input: &[u8]) -> String {
663    let mut out: Vec<u8> = Vec::with_capacity(input.len());
664    let mut i = 0;
665    while i < input.len() {
666        if input[i] == b'%' && i + 2 < input.len() {
667            let hi = hex_value(input[i + 1]);
668            let lo = hex_value(input[i + 2]);
669            if let (Some(hi), Some(lo)) = (hi, lo) {
670                out.push((hi << 4) | lo);
671                i += 3;
672                continue;
673            }
674        }
675        out.push(input[i]);
676        i += 1;
677    }
678    String::from_utf8_lossy(&out).into_owned()
679}
680
681fn hex_value(c: u8) -> Option<u8> {
682    match c {
683        b'0'..=b'9' => Some(c - b'0'),
684        b'a'..=b'f' => Some(c - b'a' + 10),
685        b'A'..=b'F' => Some(c - b'A' + 10),
686        _ => None,
687    }
688}
689
690/// Encode a `grpc-timeout` header value: a positive integer (≤ 8 digits)
691/// followed by a unit (`n`anosecond, `u`s, `m`s, `S`econd, `M`inute,
692/// `H`our). Picks the smallest unit such that the magnitude fits in 8
693/// digits. Saturates at the maximum (`99999999H`, ≈ 11 408 years) and at
694/// zero for the lower bound. Per gRPC spec.
695fn encode_grpc_timeout(d: Duration) -> String {
696    // Try units from finest to coarsest; the first that fits 8 digits wins.
697    let nanos = d.as_nanos();
698    if nanos <= 99_999_999 {
699        return format!("{nanos}n");
700    }
701    let micros = d.as_micros();
702    if micros <= 99_999_999 {
703        return format!("{micros}u");
704    }
705    let millis = d.as_millis();
706    if millis <= 99_999_999 {
707        return format!("{millis}m");
708    }
709    let secs = d.as_secs() as u128;
710    if secs <= 99_999_999 {
711        return format!("{secs}S");
712    }
713    let minutes = secs / 60;
714    if minutes <= 99_999_999 {
715        return format!("{minutes}M");
716    }
717    let hours = secs / 3600;
718    if hours <= 99_999_999 {
719        return format!("{hours}H");
720    }
721    "99999999H".into()
722}
723
724/// HTTP/2 `:status` to gRPC status mapping per gRPC-over-HTTP/2 spec.
725/// Returns `Some((status, message))` for any non-200 status, `None`
726/// otherwise. The trailer `grpc-status` is ignored when this returns
727/// `Some` (transport-level failure trumps semantic).
728fn http_status_to_grpc_status(headers: &[HeaderField]) -> Option<(GrpcStatus, String)> {
729    let raw = headers
730        .iter()
731        .find(|h| h.name == b":status")
732        .map(|h| h.value.clone())?;
733    let code = std::str::from_utf8(&raw).ok()?.parse::<u16>().ok()?;
734    if code == 200 {
735        return None;
736    }
737    let status = match code {
738        400 => GrpcStatus::Internal,
739        401 => GrpcStatus::Unauthenticated,
740        403 => GrpcStatus::PermissionDenied,
741        404 => GrpcStatus::Unimplemented,
742        429 | 502 | 503 | 504 => GrpcStatus::Unavailable,
743        _ => GrpcStatus::Unknown,
744    };
745    Some((status, format!("HTTP/2 :status {code}")))
746}
747
748#[cfg(test)]
749mod tests {
750    use super::*;
751
752    #[test]
753    fn derive_status_ok() {
754        let headers = vec![HeaderField::new(b"grpc-status", b"0")];
755        let (status, message) = derive_status(&headers, None);
756        assert_eq!(status, GrpcStatus::Ok);
757        assert_eq!(message, "");
758    }
759
760    #[test]
761    fn derive_status_not_found() {
762        let headers = vec![
763            HeaderField::new(b"grpc-status", b"5"),
764            HeaderField::new(b"grpc-message", b"service not found"),
765        ];
766        let (status, message) = derive_status(&headers, None);
767        assert_eq!(status, GrpcStatus::NotFound);
768        assert_eq!(message, "service not found");
769    }
770
771    #[test]
772    fn derive_status_missing_is_internal_not_ok() {
773        // Regression: missing grpc-status used to silently report Ok.
774        let headers: Vec<HeaderField> = vec![];
775        let (status, message) = derive_status(&headers, None);
776        assert_eq!(status, GrpcStatus::Internal);
777        assert!(
778            message.contains("missing grpc-status"),
779            "wrong message: {message}"
780        );
781    }
782
783    #[test]
784    fn derive_status_invalid_value_is_internal() {
785        let headers = vec![HeaderField::new(b"grpc-status", b"not-a-number")];
786        let (status, _) = derive_status(&headers, None);
787        assert_eq!(status, GrpcStatus::Internal);
788    }
789
790    #[test]
791    fn derive_status_out_of_range_is_internal() {
792        let headers = vec![HeaderField::new(b"grpc-status", b"99")];
793        let (status, _) = derive_status(&headers, None);
794        assert_eq!(status, GrpcStatus::Internal);
795    }
796
797    #[test]
798    fn http_status_override_takes_priority() {
799        // Server returns :status 503 + grpc-status 0 — transport says
800        // Unavailable, trailer says success. Transport wins.
801        let headers = vec![
802            HeaderField::new(b":status", b"503"),
803            HeaderField::new(b"grpc-status", b"0"),
804        ];
805        let override_ = http_status_to_grpc_status(&headers);
806        let (status, _msg) = derive_status(&headers, override_);
807        assert_eq!(status, GrpcStatus::Unavailable);
808    }
809
810    #[test]
811    fn http_status_200_no_override() {
812        let headers = vec![HeaderField::new(b":status", b"200")];
813        assert!(http_status_to_grpc_status(&headers).is_none());
814    }
815
816    #[test]
817    fn http_status_codes_map_per_spec() {
818        for (code, expected) in [
819            (401u16, GrpcStatus::Unauthenticated),
820            (403, GrpcStatus::PermissionDenied),
821            (404, GrpcStatus::Unimplemented),
822            (429, GrpcStatus::Unavailable),
823            (502, GrpcStatus::Unavailable),
824            (503, GrpcStatus::Unavailable),
825            (504, GrpcStatus::Unavailable),
826            (418, GrpcStatus::Unknown), // teapot → Unknown
827        ] {
828            let val = code.to_string();
829            let headers = vec![HeaderField::new(b":status", val.as_bytes())];
830            let (status, _) = http_status_to_grpc_status(&headers).expect("non-200");
831            assert_eq!(status, expected, "code {code}");
832        }
833    }
834
835    #[test]
836    fn percent_decode_grpc_message() {
837        // %20 → space, %E2%9C%93 → U+2713 ✓
838        let headers = vec![HeaderField::new(
839            b"grpc-message",
840            b"hello%20%E2%9C%93%20done",
841        )];
842        assert_eq!(extract_grpc_message(&headers), "hello ✓ done");
843    }
844
845    #[test]
846    fn percent_decode_invalid_escape_passthrough() {
847        // %ZZ isn't a valid hex pair — should be passed through literally.
848        let headers = vec![HeaderField::new(b"grpc-message", b"a%ZZb")];
849        assert_eq!(extract_grpc_message(&headers), "a%ZZb");
850    }
851
852    #[test]
853    fn grpc_timeout_encoding() {
854        // Stays in nanoseconds when ≤ 8 digits.
855        assert_eq!(encode_grpc_timeout(Duration::from_nanos(500)), "500n");
856        // 500us = 500_000 ns (6 digits) — stays in nanoseconds.
857        assert_eq!(encode_grpc_timeout(Duration::from_micros(500)), "500000n");
858        // 1s = 10^9 ns = 10 digits — overflows nanos; 1_000_000us = 7 digits → us.
859        assert_eq!(encode_grpc_timeout(Duration::from_secs(1)), "1000000u");
860        // 100ms = 10^8 ns = 9 digits → overflows; 100_000us = 6 digits.
861        assert_eq!(encode_grpc_timeout(Duration::from_millis(100)), "100000u");
862        // 1 hour = 3600s: 3.6e9 us is too big; 3_600_000ms is 7 digits.
863        assert_eq!(encode_grpc_timeout(Duration::from_secs(3600)), "3600000m");
864        // Saturates well below the limit cases.
865        let huge = Duration::from_secs(u64::MAX);
866        assert!(encode_grpc_timeout(huge).ends_with('H'));
867    }
868
869    #[test]
870    fn trailers_only_response_extracts_grpc_status() {
871        use ringline_h2::hpack::Encoder;
872        use ringline_h2::{Frame, Settings};
873
874        let mut grpc = GrpcConnection::new(Settings::client_default());
875        let _ = grpc.take_pending_send();
876
877        // Drive the settings handshake to completion: peer sends its
878        // own SETTINGS, then ACKs ours. The h2 layer emits
879        // SettingsAcknowledged only on the ACK arm, and the grpc layer
880        // now refuses `start_request` until that fires (G12).
881        let peer_settings = {
882            let f = Frame::Settings {
883                ack: false,
884                settings: Settings::default(),
885            };
886            let mut buf = Vec::new();
887            f.encode(&mut buf);
888            buf
889        };
890        grpc.recv(&peer_settings).unwrap();
891        let _ = grpc.take_pending_send();
892        let settings_ack = {
893            let f = Frame::Settings {
894                ack: true,
895                settings: Settings::default(),
896            };
897            let mut buf = Vec::new();
898            f.encode(&mut buf);
899            buf
900        };
901        grpc.recv(&settings_ack).unwrap();
902        let _ = grpc.take_pending_send();
903        // Drain Ready event.
904        while grpc.poll_event().is_some() {}
905
906        // Send a request.
907        let stream_id = grpc.start_request("test.Service", "Method", &[]).unwrap();
908        let _ = grpc.take_pending_send();
909
910        // Server sends trailers-only response: HEADERS with END_STREAM,
911        // carrying :status, grpc-status, and grpc-message.
912        let mut enc = Encoder::new(4096);
913        let mut encoded = Vec::new();
914        enc.encode(
915            &[
916                HeaderField::new(b":status", b"200"),
917                HeaderField::new(b"grpc-status", b"5"),
918                HeaderField::new(b"grpc-message", b"not found"),
919            ],
920            &mut encoded,
921        );
922        let frame = Frame::Headers {
923            stream_id,
924            encoded,
925            end_stream: true,
926            end_headers: true,
927            priority: None,
928        };
929        let mut resp_buf = Vec::new();
930        frame.encode(&mut resp_buf);
931        grpc.recv(&resp_buf).unwrap();
932
933        // Should get Response event followed by Status with NotFound.
934        match grpc.poll_event() {
935            Some(GrpcEvent::Response { .. }) => {}
936            other => panic!("expected Response, got {other:?}"),
937        }
938        match grpc.poll_event() {
939            Some(GrpcEvent::Status {
940                status, message, ..
941            }) => {
942                assert_eq!(status, GrpcStatus::NotFound, "wrong grpc-status");
943                assert_eq!(message, "not found", "wrong grpc-message");
944            }
945            other => panic!("expected Status, got {other:?}"),
946        }
947    }
948}