Skip to main content

jokoway_core/
grpc.rs

1use bytes::{Bytes, BytesMut};
2
3/// Defines the direction of the gRPC message stream.
4#[derive(Debug, Clone, Copy, PartialEq, Eq)]
5pub enum GrpcDirection {
6    /// Request flowing from the client to the upstream server.
7    ClientToUpstream,
8    /// Response flowing from the upstream server back to the client.
9    UpstreamToClient,
10}
11
12/// Represents a single length-prefixed gRPC message payload.
13#[derive(Debug, Clone)]
14pub struct GrpcMessage {
15    /// Whether the message is compressed, indicated by the first byte of the Length-Prefixed-Message header.
16    pub compressed: bool,
17    /// The actual message payload (compressed or uncompressed, depending on flags).
18    pub payload: Bytes,
19}
20
21/// Actions a middleware can take after intercepting a gRPC message.
22#[derive(Debug)]
23pub enum GrpcMessageAction {
24    /// Forward the message along the processing chain. It can be potentially modified.
25    Forward(GrpcMessage),
26    /// Silently drop the message, meaning it won't reach its intended destination.
27    Drop,
28    /// Return an immediate gRPC error to the client with the specified status code and message.
29    Error(u32, String),
30}
31
32/// Parses a single gRPC message out of a `BytesMut` buffer.
33/// Returns `Ok(Some(message))` if a full message is available, consuming those bytes from the buffer.
34/// Returns `Ok(None)` if more data is needed to complete the next message.
35/// Returns `Err(String)` if the message exceeds `max_size` (if provided).
36pub fn parse_grpc_message(
37    buf: &mut BytesMut,
38    max_size: Option<usize>,
39) -> Result<Option<GrpcMessage>, String> {
40    if buf.len() < 5 {
41        return Ok(None);
42    }
43
44    let compressed_flag = buf[0];
45    let length = u32::from_be_bytes([buf[1], buf[2], buf[3], buf[4]]) as usize;
46
47    if let Some(max) = max_size
48        && length > max
49    {
50        return Err(format!(
51            "gRPC message size {} exceeds maximum allowed {}",
52            length, max
53        ));
54    }
55
56    if buf.len() < 5 + length {
57        return Ok(None);
58    }
59
60    // We have a full message
61    let compressed = compressed_flag == 1;
62
63    // Discard the 5-byte header
64    let _ = buf.split_to(5);
65
66    // Extract the payload
67    let payload = buf.split_to(length).freeze();
68
69    Ok(Some(GrpcMessage {
70        compressed,
71        payload,
72    }))
73}
74
75/// Encodes a `GrpcMessage` back into length-prefixed bytes.
76pub fn encode_grpc_message(msg: &GrpcMessage) -> Bytes {
77    let mut out = BytesMut::with_capacity(5 + msg.payload.len());
78    let compressed_flag: u8 = if msg.compressed { 1 } else { 0 };
79    out.extend_from_slice(&[compressed_flag]);
80    out.extend_from_slice(&(msg.payload.len() as u32).to_be_bytes());
81    out.extend_from_slice(&msg.payload);
82    out.freeze()
83}