1use std::collections::{HashMap, VecDeque};
7use std::time::Duration;
8
9use ringline_h2::hpack::HeaderField;
10use ringline_h2::settings::Settings;
11use ringline_h2::{ErrorCode, H2Connection, H2Event};
12
13use crate::error::{GrpcError, GrpcStatus};
14use crate::message::{self, BufferDecode, MessageBuffer};
15
16#[derive(Debug)]
19struct StreamState {
20 buffer: MessageBuffer,
21 encoding: Option<String>,
23 response_seen: bool,
27}
28
29impl StreamState {
30 fn new(max_message_size: usize) -> Self {
31 Self {
32 buffer: MessageBuffer::new(max_message_size),
33 encoding: None,
34 response_seen: false,
35 }
36 }
37}
38
39#[derive(Debug)]
43#[non_exhaustive]
44pub enum GrpcEvent {
45 Ready,
47 Response {
49 stream_id: u32,
50 metadata: Vec<HeaderField>,
51 },
52 Message { stream_id: u32, data: Vec<u8> },
54 Status {
56 stream_id: u32,
57 status: GrpcStatus,
58 message: String,
59 metadata: Vec<HeaderField>,
60 },
61 GoAway {
63 last_stream_id: u32,
64 error_code: ErrorCode,
65 debug_data: Vec<u8>,
66 },
67 Error(GrpcError),
69}
70
71pub struct GrpcConnection {
73 h2: H2Connection,
74 ready: bool,
75 buffers: HashMap<u32, StreamState>,
77 events: VecDeque<GrpcEvent>,
79 max_message_size: usize,
83}
84
85impl GrpcConnection {
86 pub fn new(settings: Settings) -> Self {
88 Self {
89 h2: H2Connection::new(settings),
90 ready: false,
91 buffers: HashMap::new(),
92 events: VecDeque::new(),
93 max_message_size: crate::message::DEFAULT_MAX_MESSAGE_SIZE,
94 }
95 }
96
97 pub fn set_max_message_size(&mut self, n: usize) {
101 self.max_message_size = n;
102 }
103
104 pub fn recv(&mut self, data: &[u8]) -> Result<(), GrpcError> {
106 self.h2.recv(data)?;
107 self.translate_events();
108 Ok(())
109 }
110
111 pub fn poll_event(&mut self) -> Option<GrpcEvent> {
113 self.events.pop_front()
114 }
115
116 pub fn take_pending_send(&mut self) -> Vec<u8> {
118 self.h2.take_pending_send()
119 }
120
121 pub fn has_pending_send(&self) -> bool {
123 self.h2.has_pending_send()
124 }
125
126 pub fn send_unary(
133 &mut self,
134 service: &str,
135 method: &str,
136 body: &[u8],
137 metadata: &[HeaderField],
138 ) -> Result<u32, GrpcError> {
139 self.send_unary_inner(service, method, body, metadata, None)
140 }
141
142 pub fn send_unary_with_deadline(
150 &mut self,
151 service: &str,
152 method: &str,
153 body: &[u8],
154 metadata: &[HeaderField],
155 deadline: Duration,
156 ) -> Result<u32, GrpcError> {
157 self.send_unary_inner(service, method, body, metadata, Some(deadline))
158 }
159
160 fn send_unary_inner(
161 &mut self,
162 service: &str,
163 method: &str,
164 body: &[u8],
165 metadata: &[HeaderField],
166 deadline: Option<Duration>,
167 ) -> Result<u32, GrpcError> {
168 if !self.ready {
169 return Err(GrpcError::NotReady);
170 }
171 let stream_id =
172 self.send_headers_with_deadline(service, method, metadata, false, deadline)?;
173
174 let mut framed = Vec::new();
176 message::encode(body, &mut framed).map_err(|e| GrpcError::InvalidMessage(e.to_string()))?;
177
178 self.h2.send_data(stream_id, &framed, true)?;
179
180 self.buffers
182 .insert(stream_id, StreamState::new(self.max_message_size));
183
184 Ok(stream_id)
185 }
186
187 pub fn start_request(
193 &mut self,
194 service: &str,
195 method: &str,
196 metadata: &[HeaderField],
197 ) -> Result<u32, GrpcError> {
198 if !self.ready {
199 return Err(GrpcError::NotReady);
200 }
201 let stream_id = self.send_headers(service, method, metadata, false)?;
202 self.buffers
203 .insert(stream_id, StreamState::new(self.max_message_size));
204 Ok(stream_id)
205 }
206
207 pub fn send_message(
209 &mut self,
210 stream_id: u32,
211 body: &[u8],
212 end_stream: bool,
213 ) -> Result<(), GrpcError> {
214 let mut framed = Vec::new();
215 message::encode(body, &mut framed).map_err(|e| GrpcError::InvalidMessage(e.to_string()))?;
216 self.h2.send_data(stream_id, &framed, end_stream)?;
217 Ok(())
218 }
219
220 pub fn cancel(&mut self, stream_id: u32) {
224 self.h2.reset_stream(stream_id, ErrorCode::Cancel);
225 if self.buffers.remove(&stream_id).is_some() {
226 self.events.push_back(GrpcEvent::Status {
227 stream_id,
228 status: GrpcStatus::Cancelled,
229 message: "cancelled by client".into(),
230 metadata: Vec::new(),
231 });
232 }
233 }
234
235 fn send_headers(
238 &mut self,
239 service: &str,
240 method: &str,
241 metadata: &[HeaderField],
242 end_stream: bool,
243 ) -> Result<u32, GrpcError> {
244 self.send_headers_with_deadline(service, method, metadata, end_stream, None)
245 }
246
247 fn send_headers_with_deadline(
248 &mut self,
249 service: &str,
250 method: &str,
251 metadata: &[HeaderField],
252 end_stream: bool,
253 deadline: Option<Duration>,
254 ) -> Result<u32, GrpcError> {
255 let path = format!("/{service}/{method}");
256 let mut headers = vec![
257 HeaderField::new(b":method", b"POST"),
258 HeaderField::new(b":path", path.as_bytes()),
259 HeaderField::new(b":scheme", b"https"),
260 HeaderField::new(b"content-type", b"application/grpc"),
261 HeaderField::new(b"te", b"trailers"),
262 ];
263 if let Some(enc) = crate::compress::accept_encoding_value() {
264 headers.push(HeaderField::new(b"grpc-accept-encoding", enc.as_bytes()));
265 }
266 if let Some(d) = deadline {
267 let encoded = encode_grpc_timeout(d);
268 headers.push(HeaderField::new(b"grpc-timeout", encoded.as_bytes()));
269 }
270 headers.extend_from_slice(metadata);
271
272 let stream_id = self.h2.send_request(&headers, end_stream)?;
273 Ok(stream_id)
274 }
275
276 fn translate_events(&mut self) {
277 while let Some(h2_event) = self.h2.poll_event() {
278 match h2_event {
279 H2Event::SettingsAcknowledged => {
280 self.ready = true;
281 self.events.push_back(GrpcEvent::Ready);
282 }
283 H2Event::Response {
284 stream_id,
285 headers,
286 end_stream,
287 } => {
288 self.handle_response(stream_id, headers, end_stream);
289 }
290 H2Event::Data {
291 stream_id,
292 data,
293 end_stream,
294 } => {
295 self.handle_data(stream_id, &data, end_stream);
296 }
297 H2Event::Trailers { stream_id, headers } => {
298 self.handle_trailers(stream_id, headers);
299 }
300 H2Event::StreamReset {
301 stream_id,
302 error_code,
303 } => {
304 self.buffers.remove(&stream_id);
305 self.events.push_back(GrpcEvent::Status {
306 stream_id,
307 status: GrpcStatus::Internal,
308 message: format!("stream reset: {error_code:?}"),
309 metadata: Vec::new(),
310 });
311 }
312 H2Event::GoAway {
313 last_stream_id,
314 error_code,
315 debug_data,
316 } => {
317 let stranded: Vec<u32> = self
322 .buffers
323 .keys()
324 .copied()
325 .filter(|id| *id > last_stream_id)
326 .collect();
327 for stream_id in stranded {
328 self.buffers.remove(&stream_id);
329 self.events.push_back(GrpcEvent::Status {
330 stream_id,
331 status: GrpcStatus::Unavailable,
332 message: format!("GoAway: {error_code:?}"),
333 metadata: Vec::new(),
334 });
335 }
336 self.events.push_back(GrpcEvent::GoAway {
337 last_stream_id,
338 error_code,
339 debug_data,
340 });
341 }
342 H2Event::Error(e) => {
343 self.events.push_back(GrpcEvent::Error(GrpcError::H2(e)));
344 }
345 H2Event::PingAcknowledged { .. } => {}
346 }
347 }
348 }
349
350 fn handle_response(&mut self, stream_id: u32, headers: Vec<HeaderField>, end_stream: bool) {
351 let state = self
353 .buffers
354 .entry(stream_id)
355 .or_insert_with(|| StreamState::new(self.max_message_size));
356 state.response_seen = true;
357
358 let http_status_override = http_status_to_grpc_status(&headers);
363
364 for h in &headers {
367 if header_name_eq(&h.name, b"grpc-encoding") {
368 state.encoding = Some(String::from_utf8_lossy(&h.value).into_owned());
369 }
370 }
371
372 if end_stream {
373 let (status, message) = derive_status(&headers, http_status_override);
377 let metadata: Vec<HeaderField> = headers
378 .iter()
379 .filter(|h| {
380 !header_name_eq(&h.name, b"grpc-status")
381 && !header_name_eq(&h.name, b"grpc-message")
382 })
383 .cloned()
384 .collect();
385 self.events.push_back(GrpcEvent::Response {
386 stream_id,
387 metadata: metadata.clone(),
388 });
389 self.buffers.remove(&stream_id);
390 self.events.push_back(GrpcEvent::Status {
391 stream_id,
392 status,
393 message,
394 metadata,
395 });
396 return;
397 }
398
399 self.events.push_back(GrpcEvent::Response {
400 stream_id,
401 metadata: headers,
402 });
403 }
404
405 fn handle_data(&mut self, stream_id: u32, data: &[u8], end_stream: bool) {
406 let response_seen = self
411 .buffers
412 .get(&stream_id)
413 .map(|s| s.response_seen)
414 .unwrap_or(false);
415 if !response_seen {
416 self.h2.reset_stream(stream_id, ErrorCode::ProtocolError);
417 self.buffers.remove(&stream_id);
418 self.events.push_back(GrpcEvent::Status {
419 stream_id,
420 status: GrpcStatus::Internal,
421 message: "received DATA before HEADERS".into(),
422 metadata: Vec::new(),
423 });
424 return;
425 }
426
427 let max = self.max_message_size;
428 if let Some(state) = self.buffers.get_mut(&stream_id) {
429 if let Err(e) = state.buffer.push(data) {
433 self.fail_stream(stream_id, GrpcStatus::ResourceExhausted, e.to_string());
434 return;
435 }
436 loop {
437 match state.buffer.try_decode() {
438 BufferDecode::Complete(payload, compressed) => {
439 let data = if compressed {
440 match &state.encoding {
441 Some(enc) => {
446 match crate::compress::decompress(enc, &payload, max) {
447 Ok(d) => d,
448 Err(e) => {
449 self.fail_stream(
450 stream_id,
451 GrpcStatus::Internal,
452 format!("decompression failed: {e}"),
453 );
454 break;
455 }
456 }
457 }
458 None => {
462 self.fail_stream(
463 stream_id,
464 GrpcStatus::Internal,
465 "compressed flag set but no grpc-encoding header".into(),
466 );
467 break;
468 }
469 }
470 } else {
471 payload
472 };
473 self.events
474 .push_back(GrpcEvent::Message { stream_id, data });
475 }
476 BufferDecode::Incomplete => break,
477 BufferDecode::TooLarge(n) => {
478 self.fail_stream(
479 stream_id,
480 GrpcStatus::ResourceExhausted,
481 format!("message length {n} exceeds cap {max}"),
482 );
483 break;
484 }
485 }
486 }
487 }
488
489 if end_stream {
490 self.emit_status_from_cleanup(stream_id, &[]);
491 }
492 }
493
494 fn handle_trailers(&mut self, stream_id: u32, headers: Vec<HeaderField>) {
495 let max = self.max_message_size;
500 if let Some(state) = self.buffers.get_mut(&stream_id) {
501 while let BufferDecode::Complete(payload, compressed) = state.buffer.try_decode() {
502 let data = if compressed {
503 match &state.encoding {
504 Some(enc) => {
505 match crate::compress::decompress(enc, &payload, max) {
506 Ok(d) => d,
507 Err(_) => break, }
509 }
510 None => break,
511 }
512 } else {
513 payload
514 };
515 self.events
516 .push_back(GrpcEvent::Message { stream_id, data });
517 }
518 }
519
520 let status = extract_grpc_status(&headers);
522 let message = extract_grpc_message(&headers);
523 let remaining: Vec<HeaderField> = headers
524 .into_iter()
525 .filter(|h| h.name != b"grpc-status" && h.name != b"grpc-message")
526 .collect();
527
528 self.events.push_back(GrpcEvent::Status {
529 stream_id,
530 status,
531 message,
532 metadata: remaining,
533 });
534 self.buffers.remove(&stream_id);
535 }
536
537 fn emit_status_from_cleanup(&mut self, stream_id: u32, _headers: &[HeaderField]) {
543 let truncated = self
544 .buffers
545 .get(&stream_id)
546 .map(|s| !s.buffer.is_empty())
547 .unwrap_or(false);
548 self.buffers.remove(&stream_id);
549 let message = if truncated {
550 "stream ended mid-message without trailers".into()
551 } else {
552 "stream ended without trailers".into()
553 };
554 self.events.push_back(GrpcEvent::Status {
555 stream_id,
556 status: GrpcStatus::Internal,
557 message,
558 metadata: Vec::new(),
559 });
560 }
561
562 fn fail_stream(&mut self, stream_id: u32, status: GrpcStatus, message: String) {
567 self.h2.reset_stream(stream_id, ErrorCode::Cancel);
568 self.buffers.remove(&stream_id);
569 self.events.push_back(GrpcEvent::Status {
570 stream_id,
571 status,
572 message,
573 metadata: Vec::new(),
574 });
575 }
576}
577
578fn extract_grpc_status(headers: &[HeaderField]) -> GrpcStatus {
583 headers
584 .iter()
585 .find(|h| h.name == b"grpc-status")
586 .and_then(|h| std::str::from_utf8(&h.value).ok())
587 .and_then(|s| s.parse::<u8>().ok())
588 .map(GrpcStatus::from_u8)
589 .unwrap_or(GrpcStatus::Unknown)
590}
591
592fn header_name_eq(a: &[u8], b: &[u8]) -> bool {
596 a.eq_ignore_ascii_case(b)
597}
598
599fn derive_status(
607 headers: &[HeaderField],
608 http_status_override: Option<(GrpcStatus, String)>,
609) -> (GrpcStatus, String) {
610 if let Some((s, msg)) = http_status_override {
611 return (s, msg);
612 }
613 let raw = headers
614 .iter()
615 .find(|h| header_name_eq(&h.name, b"grpc-status"))
616 .map(|h| h.value.clone());
617 let status = match raw {
618 Some(bytes) => match std::str::from_utf8(&bytes)
619 .ok()
620 .and_then(|s| s.parse::<u32>().ok())
621 {
622 Some(n) if n <= 16 => GrpcStatus::from_u8(n as u8),
623 Some(n) => {
624 return (
625 GrpcStatus::Internal,
626 format!("invalid grpc-status value: {n}"),
627 );
628 }
629 None => {
630 return (
631 GrpcStatus::Internal,
632 format!(
633 "invalid grpc-status value: {:?}",
634 String::from_utf8_lossy(&bytes)
635 ),
636 );
637 }
638 },
639 None => {
640 return (GrpcStatus::Internal, "missing grpc-status trailer".into());
641 }
642 };
643 let message = extract_grpc_message(headers);
644 (status, message)
645}
646
647fn extract_grpc_message(headers: &[HeaderField]) -> String {
652 let raw = match headers
653 .iter()
654 .find(|h| header_name_eq(&h.name, b"grpc-message"))
655 {
656 Some(h) => &h.value[..],
657 None => return String::new(),
658 };
659 percent_decode_to_string(raw)
660}
661
662fn percent_decode_to_string(input: &[u8]) -> String {
663 let mut out: Vec<u8> = Vec::with_capacity(input.len());
664 let mut i = 0;
665 while i < input.len() {
666 if input[i] == b'%' && i + 2 < input.len() {
667 let hi = hex_value(input[i + 1]);
668 let lo = hex_value(input[i + 2]);
669 if let (Some(hi), Some(lo)) = (hi, lo) {
670 out.push((hi << 4) | lo);
671 i += 3;
672 continue;
673 }
674 }
675 out.push(input[i]);
676 i += 1;
677 }
678 String::from_utf8_lossy(&out).into_owned()
679}
680
681fn hex_value(c: u8) -> Option<u8> {
682 match c {
683 b'0'..=b'9' => Some(c - b'0'),
684 b'a'..=b'f' => Some(c - b'a' + 10),
685 b'A'..=b'F' => Some(c - b'A' + 10),
686 _ => None,
687 }
688}
689
690fn encode_grpc_timeout(d: Duration) -> String {
696 let nanos = d.as_nanos();
698 if nanos <= 99_999_999 {
699 return format!("{nanos}n");
700 }
701 let micros = d.as_micros();
702 if micros <= 99_999_999 {
703 return format!("{micros}u");
704 }
705 let millis = d.as_millis();
706 if millis <= 99_999_999 {
707 return format!("{millis}m");
708 }
709 let secs = d.as_secs() as u128;
710 if secs <= 99_999_999 {
711 return format!("{secs}S");
712 }
713 let minutes = secs / 60;
714 if minutes <= 99_999_999 {
715 return format!("{minutes}M");
716 }
717 let hours = secs / 3600;
718 if hours <= 99_999_999 {
719 return format!("{hours}H");
720 }
721 "99999999H".into()
722}
723
724fn http_status_to_grpc_status(headers: &[HeaderField]) -> Option<(GrpcStatus, String)> {
729 let raw = headers
730 .iter()
731 .find(|h| h.name == b":status")
732 .map(|h| h.value.clone())?;
733 let code = std::str::from_utf8(&raw).ok()?.parse::<u16>().ok()?;
734 if code == 200 {
735 return None;
736 }
737 let status = match code {
738 400 => GrpcStatus::Internal,
739 401 => GrpcStatus::Unauthenticated,
740 403 => GrpcStatus::PermissionDenied,
741 404 => GrpcStatus::Unimplemented,
742 429 | 502 | 503 | 504 => GrpcStatus::Unavailable,
743 _ => GrpcStatus::Unknown,
744 };
745 Some((status, format!("HTTP/2 :status {code}")))
746}
747
748#[cfg(test)]
749mod tests {
750 use super::*;
751
752 #[test]
753 fn derive_status_ok() {
754 let headers = vec![HeaderField::new(b"grpc-status", b"0")];
755 let (status, message) = derive_status(&headers, None);
756 assert_eq!(status, GrpcStatus::Ok);
757 assert_eq!(message, "");
758 }
759
760 #[test]
761 fn derive_status_not_found() {
762 let headers = vec![
763 HeaderField::new(b"grpc-status", b"5"),
764 HeaderField::new(b"grpc-message", b"service not found"),
765 ];
766 let (status, message) = derive_status(&headers, None);
767 assert_eq!(status, GrpcStatus::NotFound);
768 assert_eq!(message, "service not found");
769 }
770
771 #[test]
772 fn derive_status_missing_is_internal_not_ok() {
773 let headers: Vec<HeaderField> = vec![];
775 let (status, message) = derive_status(&headers, None);
776 assert_eq!(status, GrpcStatus::Internal);
777 assert!(
778 message.contains("missing grpc-status"),
779 "wrong message: {message}"
780 );
781 }
782
783 #[test]
784 fn derive_status_invalid_value_is_internal() {
785 let headers = vec![HeaderField::new(b"grpc-status", b"not-a-number")];
786 let (status, _) = derive_status(&headers, None);
787 assert_eq!(status, GrpcStatus::Internal);
788 }
789
790 #[test]
791 fn derive_status_out_of_range_is_internal() {
792 let headers = vec![HeaderField::new(b"grpc-status", b"99")];
793 let (status, _) = derive_status(&headers, None);
794 assert_eq!(status, GrpcStatus::Internal);
795 }
796
797 #[test]
798 fn http_status_override_takes_priority() {
799 let headers = vec![
802 HeaderField::new(b":status", b"503"),
803 HeaderField::new(b"grpc-status", b"0"),
804 ];
805 let override_ = http_status_to_grpc_status(&headers);
806 let (status, _msg) = derive_status(&headers, override_);
807 assert_eq!(status, GrpcStatus::Unavailable);
808 }
809
810 #[test]
811 fn http_status_200_no_override() {
812 let headers = vec![HeaderField::new(b":status", b"200")];
813 assert!(http_status_to_grpc_status(&headers).is_none());
814 }
815
816 #[test]
817 fn http_status_codes_map_per_spec() {
818 for (code, expected) in [
819 (401u16, GrpcStatus::Unauthenticated),
820 (403, GrpcStatus::PermissionDenied),
821 (404, GrpcStatus::Unimplemented),
822 (429, GrpcStatus::Unavailable),
823 (502, GrpcStatus::Unavailable),
824 (503, GrpcStatus::Unavailable),
825 (504, GrpcStatus::Unavailable),
826 (418, GrpcStatus::Unknown), ] {
828 let val = code.to_string();
829 let headers = vec![HeaderField::new(b":status", val.as_bytes())];
830 let (status, _) = http_status_to_grpc_status(&headers).expect("non-200");
831 assert_eq!(status, expected, "code {code}");
832 }
833 }
834
835 #[test]
836 fn percent_decode_grpc_message() {
837 let headers = vec![HeaderField::new(
839 b"grpc-message",
840 b"hello%20%E2%9C%93%20done",
841 )];
842 assert_eq!(extract_grpc_message(&headers), "hello ✓ done");
843 }
844
845 #[test]
846 fn percent_decode_invalid_escape_passthrough() {
847 let headers = vec![HeaderField::new(b"grpc-message", b"a%ZZb")];
849 assert_eq!(extract_grpc_message(&headers), "a%ZZb");
850 }
851
852 #[test]
853 fn grpc_timeout_encoding() {
854 assert_eq!(encode_grpc_timeout(Duration::from_nanos(500)), "500n");
856 assert_eq!(encode_grpc_timeout(Duration::from_micros(500)), "500000n");
858 assert_eq!(encode_grpc_timeout(Duration::from_secs(1)), "1000000u");
860 assert_eq!(encode_grpc_timeout(Duration::from_millis(100)), "100000u");
862 assert_eq!(encode_grpc_timeout(Duration::from_secs(3600)), "3600000m");
864 let huge = Duration::from_secs(u64::MAX);
866 assert!(encode_grpc_timeout(huge).ends_with('H'));
867 }
868
869 #[test]
870 fn trailers_only_response_extracts_grpc_status() {
871 use ringline_h2::hpack::Encoder;
872 use ringline_h2::{Frame, Settings};
873
874 let mut grpc = GrpcConnection::new(Settings::client_default());
875 let _ = grpc.take_pending_send();
876
877 let peer_settings = {
882 let f = Frame::Settings {
883 ack: false,
884 settings: Settings::default(),
885 };
886 let mut buf = Vec::new();
887 f.encode(&mut buf);
888 buf
889 };
890 grpc.recv(&peer_settings).unwrap();
891 let _ = grpc.take_pending_send();
892 let settings_ack = {
893 let f = Frame::Settings {
894 ack: true,
895 settings: Settings::default(),
896 };
897 let mut buf = Vec::new();
898 f.encode(&mut buf);
899 buf
900 };
901 grpc.recv(&settings_ack).unwrap();
902 let _ = grpc.take_pending_send();
903 while grpc.poll_event().is_some() {}
905
906 let stream_id = grpc.start_request("test.Service", "Method", &[]).unwrap();
908 let _ = grpc.take_pending_send();
909
910 let mut enc = Encoder::new(4096);
913 let mut encoded = Vec::new();
914 enc.encode(
915 &[
916 HeaderField::new(b":status", b"200"),
917 HeaderField::new(b"grpc-status", b"5"),
918 HeaderField::new(b"grpc-message", b"not found"),
919 ],
920 &mut encoded,
921 );
922 let frame = Frame::Headers {
923 stream_id,
924 encoded,
925 end_stream: true,
926 end_headers: true,
927 priority: None,
928 };
929 let mut resp_buf = Vec::new();
930 frame.encode(&mut resp_buf);
931 grpc.recv(&resp_buf).unwrap();
932
933 match grpc.poll_event() {
935 Some(GrpcEvent::Response { .. }) => {}
936 other => panic!("expected Response, got {other:?}"),
937 }
938 match grpc.poll_event() {
939 Some(GrpcEvent::Status {
940 status, message, ..
941 }) => {
942 assert_eq!(status, GrpcStatus::NotFound, "wrong grpc-status");
943 assert_eq!(message, "not found", "wrong grpc-message");
944 }
945 other => panic!("expected Status, got {other:?}"),
946 }
947 }
948}