jokoway_core/grpc.rs
1use bytes::{Bytes, BytesMut};
2
3/// Defines the direction of the gRPC message stream.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum GrpcDirection {
6 /// Request flowing from the client to the upstream server.
7 ClientToUpstream,
8 /// Response flowing from the upstream server back to the client.
9 UpstreamToClient,
10}
11
12/// Represents a single length-prefixed gRPC message payload.
13#[derive(Debug, Clone)]
14pub struct GrpcMessage {
15 /// Whether the message is compressed, indicated by the first byte of the Length-Prefixed-Message header.
16 pub compressed: bool,
17 /// The actual message payload (compressed or uncompressed, depending on flags).
18 pub payload: Bytes,
19}
20
21/// Actions a middleware can take after intercepting a gRPC message.
22#[derive(Debug)]
23pub enum GrpcMessageAction {
24 /// Forward the message along the processing chain. It can be potentially modified.
25 Forward(GrpcMessage),
26 /// Silently drop the message, meaning it won't reach its intended destination.
27 Drop,
28 /// Return an immediate gRPC error to the client with the specified status code and message.
29 Error(u32, String),
30}
31
32/// Parses a single gRPC message out of a `BytesMut` buffer.
33/// Returns `Ok(Some(message))` if a full message is available, consuming those bytes from the buffer.
34/// Returns `Ok(None)` if more data is needed to complete the next message.
35/// Returns `Err(String)` if the message exceeds `max_size` (if provided).
36pub fn parse_grpc_message(
37 buf: &mut BytesMut,
38 max_size: Option<usize>,
39) -> Result<Option<GrpcMessage>, String> {
40 if buf.len() < 5 {
41 return Ok(None);
42 }
43
44 let compressed_flag = buf[0];
45 let length = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
46
47 if let Some(max) = max_size
48 && length > max
49 {
50 return Err(format!(
51 "gRPC message size {} exceeds maximum allowed {}",
52 length, max
53 ));
54 }
55
56 if buf.len() < 5 + length {
57 return Ok(None);
58 }
59
60 // We have a full message
61 let compressed = compressed_flag == 1;
62
63 // Discard the 5-byte header
64 let _ = buf.split_to(5);
65
66 // Extract the payload
67 let payload = buf.split_to(length).freeze();
68
69 Ok(Some(GrpcMessage {
70 compressed,
71 payload,
72 }))
73}
74
75/// Encodes a `GrpcMessage` back into length-prefixed bytes.
76pub fn encode_grpc_message(msg: &GrpcMessage) -> Bytes {
77 let mut out = BytesMut::with_capacity(5 + msg.payload.len());
78 let compressed_flag: u8 = if msg.compressed { 1 } else { 0 };
79 out.extend_from_slice(&[compressed_flag]);
80 out.extend_from_slice(&(msg.payload.len() as u32).to_be_bytes());
81 out.extend_from_slice(&msg.payload);
82 out.freeze()
83}