Skip to main content

nodedb_cluster/rpc_codec/
header.rs

1// SPDX-License-Identifier: BUSL-1.1
2
3//! RPC frame header layout and framing helpers.
4//!
5//! # Current layout (18 bytes)
6//!
7//! ```text
8//! ┌─────────┬──────────┬────────────┬──────────┬───────────────┬─────────────────────┐
9//! │ version │ rpc_type │ payload_len│ crc32c   │ cluster_epoch │ rkyv payload bytes  │
10//! │  1 byte │  1 byte  │  4 bytes   │ 4 bytes  │   8 bytes LE  │  payload_len bytes  │
11//! └─────────┴──────────┴────────────┴──────────┴───────────────┴─────────────────────┘
12//! ```
13//!
14//! Every frame is emitted with `RPC_FRAME_VERSION` in the version byte.
15//! Frames with any other version byte are rejected immediately.
16
17use crate::cluster_epoch::{current_local_cluster_epoch, observe_peer_cluster_epoch};
18use crate::error::{ClusterError, Result};
19
20/// Header size in bytes: version(1) + rpc_type(1) + payload_len(4) + crc32c(4) + cluster_epoch(8).
21pub const HEADER_SIZE: usize = 18;
22
23/// Wire-version byte stamped on every outbound frame and required on every inbound frame.
24const RPC_FRAME_VERSION: u8 = 3;
25
26/// Maximum RPC message payload size (64 MiB). Distinct from WAL's MAX_RPC_PAYLOAD_SIZE.
27///
28/// Prevents degenerate allocations from corrupt frames.
29pub const MAX_RPC_PAYLOAD_SIZE: u32 = 64 * 1024 * 1024;
30
31/// Write a framed v3 header + payload into `out`.
32///
33/// `rpc_type` is the discriminant byte; `payload` is the already-serialized
34/// body. The current cluster epoch (read from
35/// [`current_local_cluster_epoch`]) is stamped into the header.
36pub fn write_frame(rpc_type: u8, payload: &[u8], out: &mut Vec<u8>) -> Result<()> {
37    let payload_len: u32 = payload.len().try_into().map_err(|_| ClusterError::Codec {
38        detail: format!("payload too large: {} bytes", payload.len()),
39    })?;
40    let crc = crc32c::crc32c(payload);
41    let epoch = current_local_cluster_epoch();
42    out.push(RPC_FRAME_VERSION);
43    out.push(rpc_type);
44    out.extend_from_slice(&payload_len.to_le_bytes());
45    out.extend_from_slice(&crc.to_le_bytes());
46    out.extend_from_slice(&epoch.to_le_bytes());
47    out.extend_from_slice(payload);
48    Ok(())
49}
50
51/// Validate the CRC32C of an inbound frame and return the payload slice.
52///
53/// `data` must start at byte 0 (version byte). Returns `(rpc_type, payload)`.
54///
55/// Side effect: observes the peer's cluster epoch via
56/// [`observe_peer_cluster_epoch`] (monotonic max).
57pub fn parse_frame(data: &[u8]) -> Result<(u8, &[u8])> {
58    if data.is_empty() {
59        return Err(ClusterError::Codec {
60            detail: format!("frame too short: 0 bytes, need {HEADER_SIZE}"),
61        });
62    }
63
64    let version = data[0];
65    if version != RPC_FRAME_VERSION {
66        return Err(ClusterError::UnsupportedWireVersion {
67            got: version,
68            supported_min: RPC_FRAME_VERSION,
69            supported_max: RPC_FRAME_VERSION,
70        });
71    }
72
73    if data.len() < HEADER_SIZE {
74        return Err(ClusterError::Codec {
75            detail: format!("frame too short: {} bytes, need {HEADER_SIZE}", data.len()),
76        });
77    }
78
79    let rpc_type = data[1];
80    let payload_len = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
81    let expected_crc = u32::from_le_bytes([data[6], data[7], data[8], data[9]]);
82    let peer_epoch = u64::from_le_bytes([
83        data[10], data[11], data[12], data[13], data[14], data[15], data[16], data[17],
84    ]);
85
86    if payload_len > MAX_RPC_PAYLOAD_SIZE {
87        return Err(ClusterError::Codec {
88            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
89        });
90    }
91
92    let expected_total = HEADER_SIZE + payload_len as usize;
93    if data.len() < expected_total {
94        return Err(ClusterError::Codec {
95            detail: format!(
96                "frame truncated: got {} bytes, expected {expected_total}",
97                data.len()
98            ),
99        });
100    }
101
102    let payload = &data[HEADER_SIZE..expected_total];
103    let actual_crc = crc32c::crc32c(payload);
104    if actual_crc != expected_crc {
105        return Err(ClusterError::Codec {
106            detail: format!(
107                "CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}"
108            ),
109        });
110    }
111
112    if peer_epoch > 0 {
113        observe_peer_cluster_epoch(peer_epoch);
114    }
115
116    Ok((rpc_type, payload))
117}
118
119/// Return the total frame size for a buffer that starts with a valid header.
120///
121/// The buffer must be at least [`HEADER_SIZE`] bytes.
122pub fn frame_size(header: &[u8; HEADER_SIZE]) -> Result<usize> {
123    let payload_len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]);
124    if payload_len > MAX_RPC_PAYLOAD_SIZE {
125        return Err(ClusterError::Codec {
126            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
127        });
128    }
129    Ok(HEADER_SIZE + payload_len as usize)
130}
131
132// rkyv_deserialize and rkyv_serialize are macros in each sub-module because
133// rkyv's generic bounds for Serialize and Deserialize are cumbersome to
134// express generically across all types. Each sub-module calls rkyv directly.
135
136#[cfg(test)]
137mod tests {
138    use super::*;
139
140    /// Build a frame with the given version byte and a 10-byte (old short) header shape.
141    /// Used to test rejection of non-current version bytes.
142    fn make_short_frame(version: u8, payload: &[u8]) -> Vec<u8> {
143        let mut out = Vec::with_capacity(10 + payload.len());
144        let payload_len = payload.len() as u32;
145        let crc = crc32c::crc32c(payload);
146        out.push(version);
147        out.push(0xAB); // arbitrary rpc_type
148        out.extend_from_slice(&payload_len.to_le_bytes());
149        out.extend_from_slice(&crc.to_le_bytes());
150        out.extend_from_slice(payload);
151        out
152    }
153
154    /// Build a correct current-version frame (18-byte header).
155    fn make_frame(payload: &[u8], epoch: u64) -> Vec<u8> {
156        let mut out = Vec::with_capacity(HEADER_SIZE + payload.len());
157        let payload_len = payload.len() as u32;
158        let crc = crc32c::crc32c(payload);
159        out.push(RPC_FRAME_VERSION);
160        out.push(0xAB);
161        out.extend_from_slice(&payload_len.to_le_bytes());
162        out.extend_from_slice(&crc.to_le_bytes());
163        out.extend_from_slice(&epoch.to_le_bytes());
164        out.extend_from_slice(payload);
165        out
166    }
167
168    #[test]
169    fn parse_frame_accepts_current_version() {
170        let payload = b"world";
171        let frame = make_frame(payload, 0);
172        let (_t, body) = parse_frame(&frame).expect("current version must be accepted");
173        assert_eq!(body, payload);
174    }
175
176    #[test]
177    fn parse_frame_rejects_n_plus_one() {
178        let payload = b"future";
179        let frame = make_short_frame(RPC_FRAME_VERSION.saturating_add(1), payload);
180        let err = parse_frame(&frame).expect_err("N+1 must be rejected");
181        match err {
182            ClusterError::UnsupportedWireVersion {
183                got,
184                supported_min,
185                supported_max,
186            } => {
187                assert_eq!(got, RPC_FRAME_VERSION.saturating_add(1));
188                assert_eq!(supported_min, RPC_FRAME_VERSION);
189                assert_eq!(supported_max, RPC_FRAME_VERSION);
190            }
191            other => panic!("expected UnsupportedWireVersion, got {other:?}"),
192        }
193    }
194
195    #[test]
196    fn parse_frame_rejects_v1_v2() {
197        for version in [1u8, 2u8] {
198            let payload = b"old";
199            let frame = make_short_frame(version, payload);
200            let err = parse_frame(&frame).expect_err(&format!("v{version} must be rejected"));
201            assert!(
202                matches!(
203                    err,
204                    ClusterError::UnsupportedWireVersion { got, .. } if got == version
205                ),
206                "expected UnsupportedWireVersion for v{version}, got {err:?}"
207            );
208        }
209    }
210
211    #[test]
212    fn parse_frame_rejects_version_zero() {
213        let payload = b"zero";
214        let frame = make_short_frame(0, payload);
215        let err = parse_frame(&frame).expect_err("version 0 must be rejected");
216        assert!(matches!(
217            err,
218            ClusterError::UnsupportedWireVersion { got: 0, .. }
219        ));
220    }
221
222    #[test]
223    fn v3_frame_round_trips_with_epoch() {
224        use crate::cluster_epoch::set_local_cluster_epoch;
225        set_local_cluster_epoch(0);
226        set_local_cluster_epoch(7);
227        let payload = b"epoch-bound";
228        let mut buf = Vec::new();
229        write_frame(0xAB, payload, &mut buf).unwrap();
230        set_local_cluster_epoch(0);
231        let (rpc_type, body) = parse_frame(&buf).unwrap();
232        assert_eq!(rpc_type, 0xAB);
233        assert_eq!(body, payload);
234        assert_eq!(crate::cluster_epoch::current_local_cluster_epoch(), 7);
235        set_local_cluster_epoch(0);
236    }
237
238    #[test]
239    fn frame_size_current_version() {
240        let frame = make_frame(b"abcde", 1);
241        let mut hdr = [0u8; HEADER_SIZE];
242        hdr.copy_from_slice(&frame[..HEADER_SIZE]);
243        assert_eq!(frame_size(&hdr).unwrap(), HEADER_SIZE + 5);
244    }
245}