Skip to main content

nodedb_cluster/rpc_codec/
header.rs

1//! RPC frame header layout and framing helpers.
2//!
3//! Wire layout (10-byte header + payload):
4//!
5//! ```text
6//! ┌─────────┬──────────┬────────────┬──────────┬─────────────────────┐
7//! │ version │ rpc_type │ payload_len│ crc32c   │ rkyv payload bytes  │
8//! │  1 byte │  1 byte  │  4 bytes   │ 4 bytes  │  payload_len bytes  │
9//! └─────────┴──────────┴────────────┴──────────┴─────────────────────┘
10//! ```
11
12use crate::error::{ClusterError, Result};
13use crate::wire::WIRE_VERSION;
14
15/// Header size in bytes: version(1) + rpc_type(1) + payload_len(4) + crc32c(4).
16pub const HEADER_SIZE: usize = 10;
17
18/// Maximum RPC message payload size (64 MiB). Distinct from WAL's MAX_RPC_PAYLOAD_SIZE.
19///
20/// Prevents degenerate allocations from corrupt frames.
21pub const MAX_RPC_PAYLOAD_SIZE: u32 = 64 * 1024 * 1024;
22
23/// Write a framed header + payload into `out`.
24///
25/// `rpc_type` is the discriminant byte; `payload` is the already-serialized body.
26pub fn write_frame(rpc_type: u8, payload: &[u8], out: &mut Vec<u8>) -> Result<()> {
27    let payload_len: u32 = payload.len().try_into().map_err(|_| ClusterError::Codec {
28        detail: format!("payload too large: {} bytes", payload.len()),
29    })?;
30    let crc = crc32c::crc32c(payload);
31    // Version field is 1 byte on the wire; narrowing cast is intentional.
32    out.push(WIRE_VERSION as u8);
33    out.push(rpc_type);
34    out.extend_from_slice(&payload_len.to_le_bytes());
35    out.extend_from_slice(&crc.to_le_bytes());
36    out.extend_from_slice(payload);
37    Ok(())
38}
39
40/// Validate the CRC32C of an inbound frame and return the payload slice.
41///
42/// `data` must start at byte 0 (version byte). Returns `(rpc_type, payload)`.
43pub fn parse_frame(data: &[u8]) -> Result<(u8, &[u8])> {
44    if data.len() < HEADER_SIZE {
45        return Err(ClusterError::Codec {
46            detail: format!("frame too short: {} bytes, need {HEADER_SIZE}", data.len()),
47        });
48    }
49
50    let version = data[0];
51    if version != WIRE_VERSION as u8 {
52        return Err(ClusterError::Codec {
53            detail: format!("unsupported wire version: {version}, expected {WIRE_VERSION}"),
54        });
55    }
56
57    let rpc_type = data[1];
58    let payload_len = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
59    let expected_crc = u32::from_le_bytes([data[6], data[7], data[8], data[9]]);
60
61    if payload_len > MAX_RPC_PAYLOAD_SIZE {
62        return Err(ClusterError::Codec {
63            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
64        });
65    }
66
67    let expected_total = HEADER_SIZE + payload_len as usize;
68    if data.len() < expected_total {
69        return Err(ClusterError::Codec {
70            detail: format!(
71                "frame truncated: got {} bytes, expected {expected_total}",
72                data.len()
73            ),
74        });
75    }
76
77    let payload = &data[HEADER_SIZE..expected_total];
78    let actual_crc = crc32c::crc32c(payload);
79    if actual_crc != expected_crc {
80        return Err(ClusterError::Codec {
81            detail: format!(
82                "CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}"
83            ),
84        });
85    }
86
87    Ok((rpc_type, payload))
88}
89
90/// Return the total frame size for a buffer that starts with a valid header.
91pub fn frame_size(header: &[u8; HEADER_SIZE]) -> Result<usize> {
92    let payload_len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]);
93    if payload_len > MAX_RPC_PAYLOAD_SIZE {
94        return Err(ClusterError::Codec {
95            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
96        });
97    }
98    Ok(HEADER_SIZE + payload_len as usize)
99}
100
101// rkyv_deserialize and rkyv_serialize are macros in each sub-module because
102// rkyv's generic bounds for Serialize and Deserialize are cumbersome to
103// express generically across all types. Each sub-module calls rkyv directly.