Skip to main content

nodedb_cluster/
rpc_codec.rs

1//! Raft RPC binary codec.
2//!
3//! Encodes/decodes all Raft RPC messages into a compact binary wire format
4//! using rkyv (zero-copy deserialization). Every frame includes a CRC32C
5//! integrity checksum and a version field for protocol evolution.
6//!
7//! Wire layout (8-byte header + payload):
8//!
9//! ```text
10//! ┌─────────┬──────────┬────────────┬──────────┬─────────────────────┐
11//! │ version │ rpc_type │ payload_len│ crc32c   │ rkyv payload bytes  │
12//! │  1 byte │  1 byte  │  4 bytes   │ 4 bytes  │  payload_len bytes  │
13//! └─────────┴──────────┴────────────┴──────────┴─────────────────────┘
14//! ```
15//!
16//! - `version`: Wire protocol version (currently `1`).
17//! - `rpc_type`: Discriminant for [`RaftRpc`] variant.
18//! - `payload_len`: Little-endian u32, byte count of the rkyv payload.
19//! - `crc32c`: CRC32C over the rkyv payload bytes only.
20
21use crate::error::{ClusterError, Result};
22use crate::wire::WIRE_VERSION;
23use nodedb_raft::message::{
24    AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
25    RequestVoteRequest, RequestVoteResponse,
26};
27
28/// Header size in bytes: version(1) + rpc_type(1) + payload_len(4) + crc32c(4).
29pub const HEADER_SIZE: usize = 10;
30
31/// Maximum RPC message payload size (64 MiB). Distinct from WAL's MAX_WAL_PAYLOAD_SIZE.
32///
33/// Prevents degenerate allocations from corrupt frames.
34const MAX_RPC_PAYLOAD_SIZE: u32 = 64 * 1024 * 1024;
35
36/// RPC type discriminants.
37const RPC_APPEND_ENTRIES_REQ: u8 = 1;
38const RPC_APPEND_ENTRIES_RESP: u8 = 2;
39const RPC_REQUEST_VOTE_REQ: u8 = 3;
40const RPC_REQUEST_VOTE_RESP: u8 = 4;
41const RPC_INSTALL_SNAPSHOT_REQ: u8 = 5;
42const RPC_INSTALL_SNAPSHOT_RESP: u8 = 6;
43const RPC_JOIN_REQ: u8 = 7;
44const RPC_JOIN_RESP: u8 = 8;
45const RPC_PING: u8 = 9;
46const RPC_PONG: u8 = 10;
47const RPC_TOPOLOGY_UPDATE: u8 = 11;
48const RPC_TOPOLOGY_ACK: u8 = 12;
49const RPC_FORWARD_REQ: u8 = 13;
50const RPC_FORWARD_RESP: u8 = 14;
51const RPC_VSHARD_ENVELOPE: u8 = 15;
52const RPC_METADATA_PROPOSE_REQ: u8 = 16;
53const RPC_METADATA_PROPOSE_RESP: u8 = 17;
54
55// ── Cluster management wire types ───────────────────────────────────
56
57/// Forward a SQL query to the leader node for a vShard.
58///
59/// Used when a client connects to a non-leader node. The receiving node
60/// re-plans and executes the SQL locally against its Data Plane.
61#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
62pub struct ForwardRequest {
63    /// The SQL statement to execute.
64    pub sql: String,
65    /// Tenant ID (authenticated on the originating node, trusted here).
66    pub tenant_id: u32,
67    /// Milliseconds remaining until the client's deadline.
68    pub deadline_remaining_ms: u64,
69    /// Distributed trace ID for observability.
70    pub trace_id: u64,
71}
72
73/// Response to a forwarded SQL query.
74#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
75pub struct ForwardResponse {
76    /// True if the query succeeded.
77    pub success: bool,
78    /// Result payloads — one per result set produced by the query.
79    /// Each payload is the raw bytes from the Data Plane response.
80    pub payloads: Vec<Vec<u8>>,
81    /// Non-empty if success=false.
82    pub error_message: String,
83}
84
85/// Forward an opaque metadata-group proposal payload to the
86/// metadata-group leader. Used by `RaftLoop::propose_to_metadata_group_via_leader`
87/// when the local node is not the leader of the metadata raft
88/// group (group 0). The receiving node MUST be the current leader;
89/// if it is not, it returns `MetadataProposeResponse::not_leader`.
90#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
91pub struct MetadataProposeRequest {
92    /// Encoded `MetadataEntry` bytes (as produced by
93    /// `metadata_group::codec::encode_entry`).
94    pub bytes: Vec<u8>,
95}
96
97/// Response to a forwarded metadata-group proposal.
98///
99/// `success == true` means the leader accepted the proposal and
100/// `log_index` is the assigned raft log index. `error_message` is
101/// always empty in that case.
102///
103/// `success == false` means the proposal failed. `log_index` is `0`
104/// and `error_message` carries the failure detail. Common cases:
105/// the receiving node is not the leader (`leader_hint` may carry
106/// a redirect), the proposal failed validation, or the underlying
107/// raft propose returned an error.
108#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
109pub struct MetadataProposeResponse {
110    pub success: bool,
111    pub log_index: u64,
112    pub leader_hint: Option<u64>,
113    pub error_message: String,
114}
115
116impl MetadataProposeResponse {
117    pub fn ok(log_index: u64) -> Self {
118        Self {
119            success: true,
120            log_index,
121            leader_hint: None,
122            error_message: String::new(),
123        }
124    }
125
126    pub fn err(message: impl Into<String>, leader_hint: Option<u64>) -> Self {
127        Self {
128            success: false,
129            log_index: 0,
130            leader_hint,
131            error_message: message.into(),
132        }
133    }
134}
135
136/// Health check ping.
137#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
138pub struct PingRequest {
139    pub sender_id: u64,
140    /// Sender's current topology version — lets the responder detect staleness.
141    pub topology_version: u64,
142}
143
144/// Health check pong.
145#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
146pub struct PongResponse {
147    pub responder_id: u64,
148    pub topology_version: u64,
149}
150
151/// Push topology update to a peer.
152#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
153pub struct TopologyUpdate {
154    pub version: u64,
155    pub nodes: Vec<JoinNodeInfo>,
156}
157
158/// Acknowledgement of a topology update.
159#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
160pub struct TopologyAck {
161    pub responder_id: u64,
162    pub accepted_version: u64,
163}
164
165/// Request to join an existing cluster.
166#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
167pub struct JoinRequest {
168    pub node_id: u64,
169    /// Listen address for Raft RPCs (e.g. "10.0.0.5:9400").
170    pub listen_addr: String,
171    /// Wire format version the joiner is running. The leader
172    /// stamps this onto the joiner's `NodeInfo` so every peer
173    /// sees the correct version in the topology snapshot they
174    /// receive back. See
175    /// `topology::CLUSTER_WIRE_FORMAT_VERSION`.
176    pub wire_version: u16,
177}
178
179/// Wire-level redirect contract between the join-flow producer
180/// (`raft_loop::join::join_flow`) and the client-side parser
181/// (`bootstrap::join::parse_leader_hint`).
182///
183/// When a non-leader receives a `JoinRequest`, it returns a
184/// `JoinResponse { success: false, error: format!("{LEADER_REDIRECT_PREFIX}{addr}") }`.
185/// The client looks for this exact prefix to decide whether to
186/// follow a hint or treat the rejection as a hard failure. Both
187/// sides MUST import this constant — never inline the literal, or
188/// a refactor on one side will silently break the other.
189pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
190
191/// Response to a join request — carries full cluster state.
192#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
193pub struct JoinResponse {
194    pub success: bool,
195    pub error: String,
196    /// Unique id of the cluster this node has joined. The client
197    /// persists this via `ClusterCatalog::save_cluster_id` so a
198    /// subsequent restart takes the `restart()` path (via
199    /// `is_bootstrapped`) instead of running a fresh bootstrap.
200    /// Zero on rejection responses (where nothing was joined).
201    pub cluster_id: u64,
202    /// All nodes in the cluster.
203    pub nodes: Vec<JoinNodeInfo>,
204    /// vShard → Raft group mapping (1024 entries).
205    pub vshard_to_group: Vec<u64>,
206    /// Raft group membership.
207    pub groups: Vec<JoinGroupInfo>,
208}
209
210/// Node info in the join response wire format.
211#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
212pub struct JoinNodeInfo {
213    pub node_id: u64,
214    pub addr: String,
215    /// NodeState as u8 (0=Joining, 1=Active, 2=Draining, 3=Decommissioned).
216    pub state: u8,
217    pub raft_groups: Vec<u64>,
218    /// Mirror of `NodeInfo::wire_version` so joiners learn the
219    /// version of every peer in one RPC round-trip and never
220    /// silently fall back to the minimum-supported default.
221    pub wire_version: u16,
222}
223
224/// Raft group membership in the join response wire format.
225///
226/// `members` are voting members; `learners` are non-voting catch-up peers
227/// (see `nodedb-raft` learner semantics). A joining node that finds its
228/// own id in `learners` creates the local Raft group in the `Learner`
229/// role and waits for a subsequent `PromoteLearner` conf-change.
230#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
231pub struct JoinGroupInfo {
232    pub group_id: u64,
233    pub leader: u64,
234    pub members: Vec<u64>,
235    pub learners: Vec<u64>,
236}
237
238// ── RPC enum ────────────────────────────────────────────────────────
239
240/// An RPC message — Raft consensus or cluster management.
241#[derive(Debug, Clone)]
242pub enum RaftRpc {
243    // Raft consensus
244    AppendEntriesRequest(AppendEntriesRequest),
245    AppendEntriesResponse(AppendEntriesResponse),
246    RequestVoteRequest(RequestVoteRequest),
247    RequestVoteResponse(RequestVoteResponse),
248    InstallSnapshotRequest(InstallSnapshotRequest),
249    InstallSnapshotResponse(InstallSnapshotResponse),
250    // Cluster management
251    JoinRequest(JoinRequest),
252    JoinResponse(JoinResponse),
253    // Health check
254    Ping(PingRequest),
255    Pong(PongResponse),
256    // Topology broadcast
257    TopologyUpdate(TopologyUpdate),
258    TopologyAck(TopologyAck),
259    // Query forwarding
260    ForwardRequest(ForwardRequest),
261    ForwardResponse(ForwardResponse),
262    // VShardEnvelope — carries graph BSP, timeseries scatter-gather, migration,
263    // retention, and archival messages. The inner VShardMessageType determines
264    // the handler.
265    VShardEnvelope(Vec<u8>), // Serialized VShardEnvelope bytes.
266    // Metadata-group proposal forwarding (group 0). Used by
267    // `RaftLoop::propose_to_metadata_group_via_leader` to forward
268    // a `MetadataEntry` payload from a follower to the current
269    // leader of the metadata raft group.
270    MetadataProposeRequest(MetadataProposeRequest),
271    MetadataProposeResponse(MetadataProposeResponse),
272}
273
274impl RaftRpc {
275    fn rpc_type(&self) -> u8 {
276        match self {
277            Self::AppendEntriesRequest(_) => RPC_APPEND_ENTRIES_REQ,
278            Self::AppendEntriesResponse(_) => RPC_APPEND_ENTRIES_RESP,
279            Self::RequestVoteRequest(_) => RPC_REQUEST_VOTE_REQ,
280            Self::RequestVoteResponse(_) => RPC_REQUEST_VOTE_RESP,
281            Self::InstallSnapshotRequest(_) => RPC_INSTALL_SNAPSHOT_REQ,
282            Self::InstallSnapshotResponse(_) => RPC_INSTALL_SNAPSHOT_RESP,
283            Self::JoinRequest(_) => RPC_JOIN_REQ,
284            Self::JoinResponse(_) => RPC_JOIN_RESP,
285            Self::Ping(_) => RPC_PING,
286            Self::Pong(_) => RPC_PONG,
287            Self::TopologyUpdate(_) => RPC_TOPOLOGY_UPDATE,
288            Self::TopologyAck(_) => RPC_TOPOLOGY_ACK,
289            Self::ForwardRequest(_) => RPC_FORWARD_REQ,
290            Self::ForwardResponse(_) => RPC_FORWARD_RESP,
291            Self::VShardEnvelope(_) => RPC_VSHARD_ENVELOPE,
292            Self::MetadataProposeRequest(_) => RPC_METADATA_PROPOSE_REQ,
293            Self::MetadataProposeResponse(_) => RPC_METADATA_PROPOSE_RESP,
294        }
295    }
296}
297
298/// Encode a [`RaftRpc`] into a framed binary message.
299pub fn encode(rpc: &RaftRpc) -> Result<Vec<u8>> {
300    let payload = serialize_payload(rpc)?;
301    let payload_len: u32 = payload.len().try_into().map_err(|_| ClusterError::Codec {
302        detail: format!("payload too large: {} bytes", payload.len()),
303    })?;
304
305    let crc = crc32c::crc32c(&payload);
306
307    let mut frame = Vec::with_capacity(HEADER_SIZE + payload.len());
308    // Version field is 1 byte on the wire (see header diagram); narrowing cast is intentional.
309    frame.push(WIRE_VERSION as u8);
310    frame.push(rpc.rpc_type());
311    frame.extend_from_slice(&payload_len.to_le_bytes());
312    frame.extend_from_slice(&crc.to_le_bytes());
313    frame.extend_from_slice(&payload);
314
315    Ok(frame)
316}
317
318/// Decode a framed binary message into a [`RaftRpc`].
319pub fn decode(data: &[u8]) -> Result<RaftRpc> {
320    if data.len() < HEADER_SIZE {
321        return Err(ClusterError::Codec {
322            detail: format!("frame too short: {} bytes, need {HEADER_SIZE}", data.len()),
323        });
324    }
325
326    let version = data[0];
327    if version != WIRE_VERSION as u8 {
328        return Err(ClusterError::Codec {
329            detail: format!("unsupported wire version: {version}, expected {WIRE_VERSION}"),
330        });
331    }
332
333    let rpc_type = data[1];
334    let payload_len = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
335    let expected_crc = u32::from_le_bytes([data[6], data[7], data[8], data[9]]);
336
337    if payload_len > MAX_RPC_PAYLOAD_SIZE {
338        return Err(ClusterError::Codec {
339            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
340        });
341    }
342
343    let expected_total = HEADER_SIZE + payload_len as usize;
344    if data.len() < expected_total {
345        return Err(ClusterError::Codec {
346            detail: format!(
347                "frame truncated: got {} bytes, expected {expected_total}",
348                data.len()
349            ),
350        });
351    }
352
353    let payload = &data[HEADER_SIZE..expected_total];
354
355    let actual_crc = crc32c::crc32c(payload);
356    if actual_crc != expected_crc {
357        return Err(ClusterError::Codec {
358            detail: format!(
359                "CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}"
360            ),
361        });
362    }
363
364    deserialize_payload(rpc_type, payload)
365}
366
367/// Return the total frame size for a buffer that starts with a valid header.
368/// Useful for stream framing — read the header, then read the remaining payload.
369pub fn frame_size(header: &[u8; HEADER_SIZE]) -> Result<usize> {
370    let payload_len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]);
371    if payload_len > MAX_RPC_PAYLOAD_SIZE {
372        return Err(ClusterError::Codec {
373            detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
374        });
375    }
376    Ok(HEADER_SIZE + payload_len as usize)
377}
378
379// ── Serialization helpers ───────────────────────────────────────────
380
381fn serialize_payload(rpc: &RaftRpc) -> Result<Vec<u8>> {
382    let bytes = match rpc {
383        RaftRpc::AppendEntriesRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
384        RaftRpc::AppendEntriesResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
385        RaftRpc::RequestVoteRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
386        RaftRpc::RequestVoteResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
387        RaftRpc::InstallSnapshotRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
388        RaftRpc::InstallSnapshotResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
389        RaftRpc::JoinRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
390        RaftRpc::JoinResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
391        RaftRpc::Ping(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
392        RaftRpc::Pong(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
393        RaftRpc::TopologyUpdate(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
394        RaftRpc::TopologyAck(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
395        RaftRpc::ForwardRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
396        RaftRpc::ForwardResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
397        RaftRpc::VShardEnvelope(bytes) => return Ok(bytes.clone()), // Already serialized.
398        RaftRpc::MetadataProposeRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
399        RaftRpc::MetadataProposeResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
400    };
401    bytes.map(|b| b.to_vec()).map_err(|e| ClusterError::Codec {
402        detail: format!("rkyv serialize failed: {e}"),
403    })
404}
405
406fn deserialize_payload(rpc_type: u8, payload: &[u8]) -> Result<RaftRpc> {
407    // rkyv requires aligned data for zero-copy access. Network-received slices
408    // are not guaranteed to be aligned, so copy into an AlignedVec first.
409    let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity(payload.len());
410    aligned.extend_from_slice(payload);
411
412    match rpc_type {
413        RPC_APPEND_ENTRIES_REQ => {
414            let msg = rkyv::from_bytes::<AppendEntriesRequest, rkyv::rancor::Error>(&aligned)
415                .map_err(|e| ClusterError::Codec {
416                    detail: format!("rkyv deserialize AppendEntriesRequest: {e}"),
417                })?;
418            Ok(RaftRpc::AppendEntriesRequest(msg))
419        }
420        RPC_APPEND_ENTRIES_RESP => {
421            let msg = rkyv::from_bytes::<AppendEntriesResponse, rkyv::rancor::Error>(&aligned)
422                .map_err(|e| ClusterError::Codec {
423                    detail: format!("rkyv deserialize AppendEntriesResponse: {e}"),
424                })?;
425            Ok(RaftRpc::AppendEntriesResponse(msg))
426        }
427        RPC_REQUEST_VOTE_REQ => {
428            let msg = rkyv::from_bytes::<RequestVoteRequest, rkyv::rancor::Error>(&aligned)
429                .map_err(|e| ClusterError::Codec {
430                    detail: format!("rkyv deserialize RequestVoteRequest: {e}"),
431                })?;
432            Ok(RaftRpc::RequestVoteRequest(msg))
433        }
434        RPC_REQUEST_VOTE_RESP => {
435            let msg = rkyv::from_bytes::<RequestVoteResponse, rkyv::rancor::Error>(&aligned)
436                .map_err(|e| ClusterError::Codec {
437                    detail: format!("rkyv deserialize RequestVoteResponse: {e}"),
438                })?;
439            Ok(RaftRpc::RequestVoteResponse(msg))
440        }
441        RPC_INSTALL_SNAPSHOT_REQ => {
442            let msg = rkyv::from_bytes::<InstallSnapshotRequest, rkyv::rancor::Error>(&aligned)
443                .map_err(|e| ClusterError::Codec {
444                    detail: format!("rkyv deserialize InstallSnapshotRequest: {e}"),
445                })?;
446            Ok(RaftRpc::InstallSnapshotRequest(msg))
447        }
448        RPC_INSTALL_SNAPSHOT_RESP => {
449            let msg = rkyv::from_bytes::<InstallSnapshotResponse, rkyv::rancor::Error>(&aligned)
450                .map_err(|e| ClusterError::Codec {
451                    detail: format!("rkyv deserialize InstallSnapshotResponse: {e}"),
452                })?;
453            Ok(RaftRpc::InstallSnapshotResponse(msg))
454        }
455        RPC_JOIN_REQ => {
456            let msg =
457                rkyv::from_bytes::<JoinRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
458                    ClusterError::Codec {
459                        detail: format!("rkyv deserialize JoinRequest: {e}"),
460                    }
461                })?;
462            Ok(RaftRpc::JoinRequest(msg))
463        }
464        RPC_JOIN_RESP => {
465            let msg =
466                rkyv::from_bytes::<JoinResponse, rkyv::rancor::Error>(&aligned).map_err(|e| {
467                    ClusterError::Codec {
468                        detail: format!("rkyv deserialize JoinResponse: {e}"),
469                    }
470                })?;
471            Ok(RaftRpc::JoinResponse(msg))
472        }
473        RPC_PING => {
474            let msg =
475                rkyv::from_bytes::<PingRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
476                    ClusterError::Codec {
477                        detail: format!("rkyv deserialize PingRequest: {e}"),
478                    }
479                })?;
480            Ok(RaftRpc::Ping(msg))
481        }
482        RPC_PONG => {
483            let msg =
484                rkyv::from_bytes::<PongResponse, rkyv::rancor::Error>(&aligned).map_err(|e| {
485                    ClusterError::Codec {
486                        detail: format!("rkyv deserialize PongResponse: {e}"),
487                    }
488                })?;
489            Ok(RaftRpc::Pong(msg))
490        }
491        RPC_TOPOLOGY_UPDATE => {
492            let msg =
493                rkyv::from_bytes::<TopologyUpdate, rkyv::rancor::Error>(&aligned).map_err(|e| {
494                    ClusterError::Codec {
495                        detail: format!("rkyv deserialize TopologyUpdate: {e}"),
496                    }
497                })?;
498            Ok(RaftRpc::TopologyUpdate(msg))
499        }
500        RPC_TOPOLOGY_ACK => {
501            let msg =
502                rkyv::from_bytes::<TopologyAck, rkyv::rancor::Error>(&aligned).map_err(|e| {
503                    ClusterError::Codec {
504                        detail: format!("rkyv deserialize TopologyAck: {e}"),
505                    }
506                })?;
507            Ok(RaftRpc::TopologyAck(msg))
508        }
509        RPC_FORWARD_REQ => {
510            let msg =
511                rkyv::from_bytes::<ForwardRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
512                    ClusterError::Codec {
513                        detail: format!("rkyv deserialize ForwardRequest: {e}"),
514                    }
515                })?;
516            Ok(RaftRpc::ForwardRequest(msg))
517        }
518        RPC_FORWARD_RESP => {
519            let msg = rkyv::from_bytes::<ForwardResponse, rkyv::rancor::Error>(&aligned).map_err(
520                |e| ClusterError::Codec {
521                    detail: format!("rkyv deserialize ForwardResponse: {e}"),
522                },
523            )?;
524            Ok(RaftRpc::ForwardResponse(msg))
525        }
526        RPC_VSHARD_ENVELOPE => {
527            // VShardEnvelope is already in its own binary format — pass through raw.
528            Ok(RaftRpc::VShardEnvelope(payload.to_vec()))
529        }
530        RPC_METADATA_PROPOSE_REQ => {
531            let msg = rkyv::from_bytes::<MetadataProposeRequest, rkyv::rancor::Error>(&aligned)
532                .map_err(|e| ClusterError::Codec {
533                    detail: format!("rkyv deserialize MetadataProposeRequest: {e}"),
534                })?;
535            Ok(RaftRpc::MetadataProposeRequest(msg))
536        }
537        RPC_METADATA_PROPOSE_RESP => {
538            let msg = rkyv::from_bytes::<MetadataProposeResponse, rkyv::rancor::Error>(&aligned)
539                .map_err(|e| ClusterError::Codec {
540                    detail: format!("rkyv deserialize MetadataProposeResponse: {e}"),
541                })?;
542            Ok(RaftRpc::MetadataProposeResponse(msg))
543        }
544        _ => Err(ClusterError::Codec {
545            detail: format!("unknown rpc_type: {rpc_type}"),
546        }),
547    }
548}
549
550#[cfg(test)]
551mod tests {
552    use super::*;
553    use nodedb_raft::message::LogEntry;
554
555    #[test]
556    fn roundtrip_append_entries_request() {
557        let req = AppendEntriesRequest {
558            term: 5,
559            leader_id: 1,
560            prev_log_index: 99,
561            prev_log_term: 4,
562            entries: vec![
563                LogEntry {
564                    term: 5,
565                    index: 100,
566                    data: b"put x=1".to_vec(),
567                },
568                LogEntry {
569                    term: 5,
570                    index: 101,
571                    data: b"put y=2".to_vec(),
572                },
573            ],
574            leader_commit: 98,
575            group_id: 7,
576        };
577
578        let rpc = RaftRpc::AppendEntriesRequest(req.clone());
579        let encoded = encode(&rpc).unwrap();
580        let decoded = decode(&encoded).unwrap();
581
582        match decoded {
583            RaftRpc::AppendEntriesRequest(d) => {
584                assert_eq!(d.term, req.term);
585                assert_eq!(d.leader_id, req.leader_id);
586                assert_eq!(d.prev_log_index, req.prev_log_index);
587                assert_eq!(d.prev_log_term, req.prev_log_term);
588                assert_eq!(d.entries.len(), 2);
589                assert_eq!(d.entries[0].data, b"put x=1");
590                assert_eq!(d.entries[1].data, b"put y=2");
591                assert_eq!(d.leader_commit, req.leader_commit);
592                assert_eq!(d.group_id, req.group_id);
593            }
594            other => panic!("expected AppendEntriesRequest, got {other:?}"),
595        }
596    }
597
598    #[test]
599    fn roundtrip_append_entries_heartbeat() {
600        let req = AppendEntriesRequest {
601            term: 3,
602            leader_id: 1,
603            prev_log_index: 10,
604            prev_log_term: 2,
605            entries: vec![],
606            leader_commit: 8,
607            group_id: 0,
608        };
609
610        let rpc = RaftRpc::AppendEntriesRequest(req);
611        let encoded = encode(&rpc).unwrap();
612        let decoded = decode(&encoded).unwrap();
613
614        match decoded {
615            RaftRpc::AppendEntriesRequest(d) => {
616                assert!(d.entries.is_empty());
617                assert_eq!(d.term, 3);
618            }
619            other => panic!("expected heartbeat, got {other:?}"),
620        }
621    }
622
623    #[test]
624    fn roundtrip_append_entries_response() {
625        let resp = AppendEntriesResponse {
626            term: 5,
627            success: true,
628            last_log_index: 100,
629        };
630
631        let rpc = RaftRpc::AppendEntriesResponse(resp);
632        let encoded = encode(&rpc).unwrap();
633        let decoded = decode(&encoded).unwrap();
634
635        match decoded {
636            RaftRpc::AppendEntriesResponse(d) => {
637                assert_eq!(d.term, 5);
638                assert!(d.success);
639                assert_eq!(d.last_log_index, 100);
640            }
641            other => panic!("expected AppendEntriesResponse, got {other:?}"),
642        }
643    }
644
645    #[test]
646    fn roundtrip_request_vote_request() {
647        let req = RequestVoteRequest {
648            term: 10,
649            candidate_id: 3,
650            last_log_index: 200,
651            last_log_term: 9,
652            group_id: 42,
653        };
654
655        let rpc = RaftRpc::RequestVoteRequest(req);
656        let encoded = encode(&rpc).unwrap();
657        let decoded = decode(&encoded).unwrap();
658
659        match decoded {
660            RaftRpc::RequestVoteRequest(d) => {
661                assert_eq!(d.term, 10);
662                assert_eq!(d.candidate_id, 3);
663                assert_eq!(d.last_log_index, 200);
664                assert_eq!(d.last_log_term, 9);
665                assert_eq!(d.group_id, 42);
666            }
667            other => panic!("expected RequestVoteRequest, got {other:?}"),
668        }
669    }
670
671    #[test]
672    fn roundtrip_request_vote_response() {
673        let resp = RequestVoteResponse {
674            term: 10,
675            vote_granted: true,
676        };
677
678        let rpc = RaftRpc::RequestVoteResponse(resp);
679        let encoded = encode(&rpc).unwrap();
680        let decoded = decode(&encoded).unwrap();
681
682        match decoded {
683            RaftRpc::RequestVoteResponse(d) => {
684                assert_eq!(d.term, 10);
685                assert!(d.vote_granted);
686            }
687            other => panic!("expected RequestVoteResponse, got {other:?}"),
688        }
689    }
690
691    #[test]
692    fn roundtrip_install_snapshot_request() {
693        let data: Vec<u8> = [0xDE, 0xAD, 0xBE, 0xEF]
694            .iter()
695            .copied()
696            .cycle()
697            .take(1024)
698            .collect();
699        let req = InstallSnapshotRequest {
700            term: 7,
701            leader_id: 1,
702            last_included_index: 500,
703            last_included_term: 6,
704            offset: 0,
705            data: data.clone(),
706            done: false,
707            group_id: 3,
708        };
709
710        let rpc = RaftRpc::InstallSnapshotRequest(req);
711        let encoded = encode(&rpc).unwrap();
712        let decoded = decode(&encoded).unwrap();
713
714        match decoded {
715            RaftRpc::InstallSnapshotRequest(d) => {
716                assert_eq!(d.term, 7);
717                assert_eq!(d.leader_id, 1);
718                assert_eq!(d.last_included_index, 500);
719                assert_eq!(d.last_included_term, 6);
720                assert_eq!(d.offset, 0);
721                assert_eq!(d.data, data);
722                assert!(!d.done);
723                assert_eq!(d.group_id, 3);
724            }
725            other => panic!("expected InstallSnapshotRequest, got {other:?}"),
726        }
727    }
728
729    #[test]
730    fn roundtrip_install_snapshot_final_chunk() {
731        let req = InstallSnapshotRequest {
732            term: 7,
733            leader_id: 1,
734            last_included_index: 500,
735            last_included_term: 6,
736            offset: 4096,
737            data: vec![0xFF; 128],
738            done: true,
739            group_id: 3,
740        };
741
742        let rpc = RaftRpc::InstallSnapshotRequest(req);
743        let encoded = encode(&rpc).unwrap();
744        let decoded = decode(&encoded).unwrap();
745
746        match decoded {
747            RaftRpc::InstallSnapshotRequest(d) => {
748                assert!(d.done);
749                assert_eq!(d.offset, 4096);
750            }
751            other => panic!("expected InstallSnapshotRequest, got {other:?}"),
752        }
753    }
754
755    #[test]
756    fn roundtrip_install_snapshot_response() {
757        let resp = InstallSnapshotResponse { term: 7 };
758
759        let rpc = RaftRpc::InstallSnapshotResponse(resp);
760        let encoded = encode(&rpc).unwrap();
761        let decoded = decode(&encoded).unwrap();
762
763        match decoded {
764            RaftRpc::InstallSnapshotResponse(d) => {
765                assert_eq!(d.term, 7);
766            }
767            other => panic!("expected InstallSnapshotResponse, got {other:?}"),
768        }
769    }
770
771    #[test]
772    fn crc_corruption_detected() {
773        let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
774            term: 1,
775            vote_granted: false,
776        });
777        let mut encoded = encode(&rpc).unwrap();
778
779        // Flip a bit in the payload.
780        if let Some(last) = encoded.last_mut() {
781            *last ^= 0x01;
782        }
783
784        let err = decode(&encoded).unwrap_err();
785        assert!(err.to_string().contains("CRC32C mismatch"), "{err}");
786    }
787
788    #[test]
789    fn version_mismatch_rejected() {
790        let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
791            term: 1,
792            vote_granted: false,
793        });
794        let mut encoded = encode(&rpc).unwrap();
795
796        // Set version to 99.
797        encoded[0] = 99;
798
799        let err = decode(&encoded).unwrap_err();
800        assert!(
801            err.to_string().contains("unsupported wire version"),
802            "{err}"
803        );
804    }
805
806    #[test]
807    fn truncated_frame_rejected() {
808        let err = decode(&[1, 2, 3]).unwrap_err();
809        assert!(err.to_string().contains("frame too short"), "{err}");
810    }
811
812    #[test]
813    fn unknown_rpc_type_rejected() {
814        let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
815            term: 1,
816            vote_granted: false,
817        });
818        let mut encoded = encode(&rpc).unwrap();
819
820        // Set rpc_type to 255.
821        encoded[1] = 255;
822
823        // CRC will mismatch because we didn't change payload — but the rpc_type
824        // byte is in the header, not covered by CRC. The decode will fail on
825        // unknown rpc_type after CRC passes. Actually, CRC only covers payload,
826        // so the type corruption is caught by the type discriminant check.
827        // However, the CRC is still valid (payload unchanged), so we get the
828        // unknown type error.
829        let err = decode(&encoded).unwrap_err();
830        assert!(err.to_string().contains("unknown rpc_type"), "{err}");
831    }
832
833    #[test]
834    fn payload_too_large_rejected() {
835        // Craft a header claiming a massive payload.
836        let mut frame = vec![0u8; HEADER_SIZE];
837        frame[0] = WIRE_VERSION as u8;
838        frame[1] = RPC_APPEND_ENTRIES_REQ;
839        let huge: u32 = MAX_RPC_PAYLOAD_SIZE + 1;
840        frame[2..6].copy_from_slice(&huge.to_le_bytes());
841
842        let err = decode(&frame).unwrap_err();
843        assert!(err.to_string().contains("exceeds maximum"), "{err}");
844    }
845
846    #[test]
847    fn frame_size_helper() {
848        let rpc = RaftRpc::AppendEntriesResponse(AppendEntriesResponse {
849            term: 1,
850            success: true,
851            last_log_index: 5,
852        });
853        let encoded = encode(&rpc).unwrap();
854
855        let header: [u8; HEADER_SIZE] = encoded[..HEADER_SIZE].try_into().unwrap();
856        let size = frame_size(&header).unwrap();
857        assert_eq!(size, encoded.len());
858    }
859
860    #[test]
861    fn large_snapshot_roundtrip() {
862        // 1 MiB snapshot chunk.
863        let data = vec![0xAB; 1024 * 1024];
864        let req = InstallSnapshotRequest {
865            term: 100,
866            leader_id: 5,
867            last_included_index: 999_999,
868            last_included_term: 99,
869            offset: 0,
870            data: data.clone(),
871            done: false,
872            group_id: 0,
873        };
874
875        let rpc = RaftRpc::InstallSnapshotRequest(req);
876        let encoded = encode(&rpc).unwrap();
877        let decoded = decode(&encoded).unwrap();
878
879        match decoded {
880            RaftRpc::InstallSnapshotRequest(d) => {
881                assert_eq!(d.data.len(), 1024 * 1024);
882                assert_eq!(d.data, data);
883            }
884            other => panic!("expected InstallSnapshotRequest, got {other:?}"),
885        }
886    }
887
888    #[test]
889    fn roundtrip_join_request() {
890        let req = JoinRequest {
891            node_id: 42,
892            listen_addr: "10.0.0.5:9400".into(),
893            wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
894        };
895
896        let rpc = RaftRpc::JoinRequest(req);
897        let encoded = encode(&rpc).unwrap();
898        let decoded = decode(&encoded).unwrap();
899
900        match decoded {
901            RaftRpc::JoinRequest(d) => {
902                assert_eq!(d.node_id, 42);
903                assert_eq!(d.listen_addr, "10.0.0.5:9400");
904            }
905            other => panic!("expected JoinRequest, got {other:?}"),
906        }
907    }
908
909    #[test]
910    fn roundtrip_join_response() {
911        let resp = JoinResponse {
912            success: true,
913            error: String::new(),
914            cluster_id: 12345,
915            nodes: vec![
916                JoinNodeInfo {
917                    node_id: 1,
918                    addr: "10.0.0.1:9400".into(),
919                    state: 1,
920                    raft_groups: vec![0, 1],
921                    wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
922                },
923                JoinNodeInfo {
924                    node_id: 2,
925                    addr: "10.0.0.2:9400".into(),
926                    state: 1,
927                    raft_groups: vec![0, 1],
928                    wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
929                },
930            ],
931            vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
932            groups: vec![JoinGroupInfo {
933                group_id: 0,
934                leader: 1,
935                members: vec![1, 2],
936                learners: vec![],
937            }],
938        };
939
940        let rpc = RaftRpc::JoinResponse(resp);
941        let encoded = encode(&rpc).unwrap();
942        let decoded = decode(&encoded).unwrap();
943
944        match decoded {
945            RaftRpc::JoinResponse(d) => {
946                assert!(d.success);
947                assert_eq!(d.nodes.len(), 2);
948                assert_eq!(d.vshard_to_group.len(), 1024);
949                assert_eq!(d.groups.len(), 1);
950                assert_eq!(d.groups[0].leader, 1);
951            }
952            other => panic!("expected JoinResponse, got {other:?}"),
953        }
954    }
955}