nodedb_cluster/rpc_codec/
header.rs1use crate::cluster_epoch::{current_local_cluster_epoch, observe_peer_cluster_epoch};
18use crate::error::{ClusterError, Result};
19
20pub const HEADER_SIZE: usize = 18;
22
23const RPC_FRAME_VERSION: u8 = 3;
25
26pub const MAX_RPC_PAYLOAD_SIZE: u32 = 64 * 1024 * 1024;
30
31pub fn write_frame(rpc_type: u8, payload: &[u8], out: &mut Vec<u8>) -> Result<()> {
37 let payload_len: u32 = payload.len().try_into().map_err(|_| ClusterError::Codec {
38 detail: format!("payload too large: {} bytes", payload.len()),
39 })?;
40 let crc = crc32c::crc32c(payload);
41 let epoch = current_local_cluster_epoch();
42 out.push(RPC_FRAME_VERSION);
43 out.push(rpc_type);
44 out.extend_from_slice(&payload_len.to_le_bytes());
45 out.extend_from_slice(&crc.to_le_bytes());
46 out.extend_from_slice(&epoch.to_le_bytes());
47 out.extend_from_slice(payload);
48 Ok(())
49}
50
51pub fn parse_frame(data: &[u8]) -> Result<(u8, &[u8])> {
58 if data.is_empty() {
59 return Err(ClusterError::Codec {
60 detail: format!("frame too short: 0 bytes, need {HEADER_SIZE}"),
61 });
62 }
63
64 let version = data[0];
65 if version != RPC_FRAME_VERSION {
66 return Err(ClusterError::UnsupportedWireVersion {
67 got: version,
68 supported_min: RPC_FRAME_VERSION,
69 supported_max: RPC_FRAME_VERSION,
70 });
71 }
72
73 if data.len() < HEADER_SIZE {
74 return Err(ClusterError::Codec {
75 detail: format!("frame too short: {} bytes, need {HEADER_SIZE}", data.len()),
76 });
77 }
78
79 let rpc_type = data[1];
80 let payload_len = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
81 let expected_crc = u32::from_le_bytes([data[6], data[7], data[8], data[9]]);
82 let peer_epoch = u64::from_le_bytes([
83 data[10], data[11], data[12], data[13], data[14], data[15], data[16], data[17],
84 ]);
85
86 if payload_len > MAX_RPC_PAYLOAD_SIZE {
87 return Err(ClusterError::Codec {
88 detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
89 });
90 }
91
92 let expected_total = HEADER_SIZE + payload_len as usize;
93 if data.len() < expected_total {
94 return Err(ClusterError::Codec {
95 detail: format!(
96 "frame truncated: got {} bytes, expected {expected_total}",
97 data.len()
98 ),
99 });
100 }
101
102 let payload = &data[HEADER_SIZE..expected_total];
103 let actual_crc = crc32c::crc32c(payload);
104 if actual_crc != expected_crc {
105 return Err(ClusterError::Codec {
106 detail: format!(
107 "CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}"
108 ),
109 });
110 }
111
112 if peer_epoch > 0 {
113 observe_peer_cluster_epoch(peer_epoch);
114 }
115
116 Ok((rpc_type, payload))
117}
118
119pub fn frame_size(header: &[u8; HEADER_SIZE]) -> Result<usize> {
123 let payload_len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]);
124 if payload_len > MAX_RPC_PAYLOAD_SIZE {
125 return Err(ClusterError::Codec {
126 detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
127 });
128 }
129 Ok(HEADER_SIZE + payload_len as usize)
130}
131
132#[cfg(test)]
137mod tests {
138 use super::*;
139
140 fn make_short_frame(version: u8, payload: &[u8]) -> Vec<u8> {
143 let mut out = Vec::with_capacity(10 + payload.len());
144 let payload_len = payload.len() as u32;
145 let crc = crc32c::crc32c(payload);
146 out.push(version);
147 out.push(0xAB); out.extend_from_slice(&payload_len.to_le_bytes());
149 out.extend_from_slice(&crc.to_le_bytes());
150 out.extend_from_slice(payload);
151 out
152 }
153
154 fn make_frame(payload: &[u8], epoch: u64) -> Vec<u8> {
156 let mut out = Vec::with_capacity(HEADER_SIZE + payload.len());
157 let payload_len = payload.len() as u32;
158 let crc = crc32c::crc32c(payload);
159 out.push(RPC_FRAME_VERSION);
160 out.push(0xAB);
161 out.extend_from_slice(&payload_len.to_le_bytes());
162 out.extend_from_slice(&crc.to_le_bytes());
163 out.extend_from_slice(&epoch.to_le_bytes());
164 out.extend_from_slice(payload);
165 out
166 }
167
168 #[test]
169 fn parse_frame_accepts_current_version() {
170 let payload = b"world";
171 let frame = make_frame(payload, 0);
172 let (_t, body) = parse_frame(&frame).expect("current version must be accepted");
173 assert_eq!(body, payload);
174 }
175
176 #[test]
177 fn parse_frame_rejects_n_plus_one() {
178 let payload = b"future";
179 let frame = make_short_frame(RPC_FRAME_VERSION.saturating_add(1), payload);
180 let err = parse_frame(&frame).expect_err("N+1 must be rejected");
181 match err {
182 ClusterError::UnsupportedWireVersion {
183 got,
184 supported_min,
185 supported_max,
186 } => {
187 assert_eq!(got, RPC_FRAME_VERSION.saturating_add(1));
188 assert_eq!(supported_min, RPC_FRAME_VERSION);
189 assert_eq!(supported_max, RPC_FRAME_VERSION);
190 }
191 other => panic!("expected UnsupportedWireVersion, got {other:?}"),
192 }
193 }
194
195 #[test]
196 fn parse_frame_rejects_v1_v2() {
197 for version in [1u8, 2u8] {
198 let payload = b"old";
199 let frame = make_short_frame(version, payload);
200 let err = parse_frame(&frame).expect_err(&format!("v{version} must be rejected"));
201 assert!(
202 matches!(
203 err,
204 ClusterError::UnsupportedWireVersion { got, .. } if got == version
205 ),
206 "expected UnsupportedWireVersion for v{version}, got {err:?}"
207 );
208 }
209 }
210
211 #[test]
212 fn parse_frame_rejects_version_zero() {
213 let payload = b"zero";
214 let frame = make_short_frame(0, payload);
215 let err = parse_frame(&frame).expect_err("version 0 must be rejected");
216 assert!(matches!(
217 err,
218 ClusterError::UnsupportedWireVersion { got: 0, .. }
219 ));
220 }
221
222 #[test]
223 fn v3_frame_round_trips_with_epoch() {
224 use crate::cluster_epoch::set_local_cluster_epoch;
225 set_local_cluster_epoch(0);
226 set_local_cluster_epoch(7);
227 let payload = b"epoch-bound";
228 let mut buf = Vec::new();
229 write_frame(0xAB, payload, &mut buf).unwrap();
230 set_local_cluster_epoch(0);
231 let (rpc_type, body) = parse_frame(&buf).unwrap();
232 assert_eq!(rpc_type, 0xAB);
233 assert_eq!(body, payload);
234 assert_eq!(crate::cluster_epoch::current_local_cluster_epoch(), 7);
235 set_local_cluster_epoch(0);
236 }
237
238 #[test]
239 fn frame_size_current_version() {
240 let frame = make_frame(b"abcde", 1);
241 let mut hdr = [0u8; HEADER_SIZE];
242 hdr.copy_from_slice(&frame[..HEADER_SIZE]);
243 assert_eq!(frame_size(&hdr).unwrap(), HEADER_SIZE + 5);
244 }
245}