1use crate::error::{ClusterError, Result};
22use crate::wire::WIRE_VERSION;
23use nodedb_raft::message::{
24 AppendEntriesRequest, AppendEntriesResponse, InstallSnapshotRequest, InstallSnapshotResponse,
25 RequestVoteRequest, RequestVoteResponse,
26};
27
28pub const HEADER_SIZE: usize = 10;
30
31const MAX_RPC_PAYLOAD_SIZE: u32 = 64 * 1024 * 1024;
35
36const RPC_APPEND_ENTRIES_REQ: u8 = 1;
38const RPC_APPEND_ENTRIES_RESP: u8 = 2;
39const RPC_REQUEST_VOTE_REQ: u8 = 3;
40const RPC_REQUEST_VOTE_RESP: u8 = 4;
41const RPC_INSTALL_SNAPSHOT_REQ: u8 = 5;
42const RPC_INSTALL_SNAPSHOT_RESP: u8 = 6;
43const RPC_JOIN_REQ: u8 = 7;
44const RPC_JOIN_RESP: u8 = 8;
45const RPC_PING: u8 = 9;
46const RPC_PONG: u8 = 10;
47const RPC_TOPOLOGY_UPDATE: u8 = 11;
48const RPC_TOPOLOGY_ACK: u8 = 12;
49const RPC_FORWARD_REQ: u8 = 13;
50const RPC_FORWARD_RESP: u8 = 14;
51const RPC_VSHARD_ENVELOPE: u8 = 15;
52const RPC_METADATA_PROPOSE_REQ: u8 = 16;
53const RPC_METADATA_PROPOSE_RESP: u8 = 17;
54
55#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
62pub struct ForwardRequest {
63 pub sql: String,
65 pub tenant_id: u32,
67 pub deadline_remaining_ms: u64,
69 pub trace_id: u64,
71}
72
73#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
75pub struct ForwardResponse {
76 pub success: bool,
78 pub payloads: Vec<Vec<u8>>,
81 pub error_message: String,
83}
84
85#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
91pub struct MetadataProposeRequest {
92 pub bytes: Vec<u8>,
95}
96
97#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
109pub struct MetadataProposeResponse {
110 pub success: bool,
111 pub log_index: u64,
112 pub leader_hint: Option<u64>,
113 pub error_message: String,
114}
115
116impl MetadataProposeResponse {
117 pub fn ok(log_index: u64) -> Self {
118 Self {
119 success: true,
120 log_index,
121 leader_hint: None,
122 error_message: String::new(),
123 }
124 }
125
126 pub fn err(message: impl Into<String>, leader_hint: Option<u64>) -> Self {
127 Self {
128 success: false,
129 log_index: 0,
130 leader_hint,
131 error_message: message.into(),
132 }
133 }
134}
135
136#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
138pub struct PingRequest {
139 pub sender_id: u64,
140 pub topology_version: u64,
142}
143
144#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
146pub struct PongResponse {
147 pub responder_id: u64,
148 pub topology_version: u64,
149}
150
151#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
153pub struct TopologyUpdate {
154 pub version: u64,
155 pub nodes: Vec<JoinNodeInfo>,
156}
157
158#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
160pub struct TopologyAck {
161 pub responder_id: u64,
162 pub accepted_version: u64,
163}
164
165#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
167pub struct JoinRequest {
168 pub node_id: u64,
169 pub listen_addr: String,
171 pub wire_version: u16,
177}
178
179pub const LEADER_REDIRECT_PREFIX: &str = "not leader; retry at ";
190
191#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
193pub struct JoinResponse {
194 pub success: bool,
195 pub error: String,
196 pub cluster_id: u64,
202 pub nodes: Vec<JoinNodeInfo>,
204 pub vshard_to_group: Vec<u64>,
206 pub groups: Vec<JoinGroupInfo>,
208}
209
210#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
212pub struct JoinNodeInfo {
213 pub node_id: u64,
214 pub addr: String,
215 pub state: u8,
217 pub raft_groups: Vec<u64>,
218 pub wire_version: u16,
222}
223
224#[derive(Debug, Clone, rkyv::Archive, rkyv::Serialize, rkyv::Deserialize)]
231pub struct JoinGroupInfo {
232 pub group_id: u64,
233 pub leader: u64,
234 pub members: Vec<u64>,
235 pub learners: Vec<u64>,
236}
237
238#[derive(Debug, Clone)]
242pub enum RaftRpc {
243 AppendEntriesRequest(AppendEntriesRequest),
245 AppendEntriesResponse(AppendEntriesResponse),
246 RequestVoteRequest(RequestVoteRequest),
247 RequestVoteResponse(RequestVoteResponse),
248 InstallSnapshotRequest(InstallSnapshotRequest),
249 InstallSnapshotResponse(InstallSnapshotResponse),
250 JoinRequest(JoinRequest),
252 JoinResponse(JoinResponse),
253 Ping(PingRequest),
255 Pong(PongResponse),
256 TopologyUpdate(TopologyUpdate),
258 TopologyAck(TopologyAck),
259 ForwardRequest(ForwardRequest),
261 ForwardResponse(ForwardResponse),
262 VShardEnvelope(Vec<u8>), MetadataProposeRequest(MetadataProposeRequest),
271 MetadataProposeResponse(MetadataProposeResponse),
272}
273
274impl RaftRpc {
275 fn rpc_type(&self) -> u8 {
276 match self {
277 Self::AppendEntriesRequest(_) => RPC_APPEND_ENTRIES_REQ,
278 Self::AppendEntriesResponse(_) => RPC_APPEND_ENTRIES_RESP,
279 Self::RequestVoteRequest(_) => RPC_REQUEST_VOTE_REQ,
280 Self::RequestVoteResponse(_) => RPC_REQUEST_VOTE_RESP,
281 Self::InstallSnapshotRequest(_) => RPC_INSTALL_SNAPSHOT_REQ,
282 Self::InstallSnapshotResponse(_) => RPC_INSTALL_SNAPSHOT_RESP,
283 Self::JoinRequest(_) => RPC_JOIN_REQ,
284 Self::JoinResponse(_) => RPC_JOIN_RESP,
285 Self::Ping(_) => RPC_PING,
286 Self::Pong(_) => RPC_PONG,
287 Self::TopologyUpdate(_) => RPC_TOPOLOGY_UPDATE,
288 Self::TopologyAck(_) => RPC_TOPOLOGY_ACK,
289 Self::ForwardRequest(_) => RPC_FORWARD_REQ,
290 Self::ForwardResponse(_) => RPC_FORWARD_RESP,
291 Self::VShardEnvelope(_) => RPC_VSHARD_ENVELOPE,
292 Self::MetadataProposeRequest(_) => RPC_METADATA_PROPOSE_REQ,
293 Self::MetadataProposeResponse(_) => RPC_METADATA_PROPOSE_RESP,
294 }
295 }
296}
297
298pub fn encode(rpc: &RaftRpc) -> Result<Vec<u8>> {
300 let payload = serialize_payload(rpc)?;
301 let payload_len: u32 = payload.len().try_into().map_err(|_| ClusterError::Codec {
302 detail: format!("payload too large: {} bytes", payload.len()),
303 })?;
304
305 let crc = crc32c::crc32c(&payload);
306
307 let mut frame = Vec::with_capacity(HEADER_SIZE + payload.len());
308 frame.push(WIRE_VERSION as u8);
310 frame.push(rpc.rpc_type());
311 frame.extend_from_slice(&payload_len.to_le_bytes());
312 frame.extend_from_slice(&crc.to_le_bytes());
313 frame.extend_from_slice(&payload);
314
315 Ok(frame)
316}
317
318pub fn decode(data: &[u8]) -> Result<RaftRpc> {
320 if data.len() < HEADER_SIZE {
321 return Err(ClusterError::Codec {
322 detail: format!("frame too short: {} bytes, need {HEADER_SIZE}", data.len()),
323 });
324 }
325
326 let version = data[0];
327 if version != WIRE_VERSION as u8 {
328 return Err(ClusterError::Codec {
329 detail: format!("unsupported wire version: {version}, expected {WIRE_VERSION}"),
330 });
331 }
332
333 let rpc_type = data[1];
334 let payload_len = u32::from_le_bytes([data[2], data[3], data[4], data[5]]);
335 let expected_crc = u32::from_le_bytes([data[6], data[7], data[8], data[9]]);
336
337 if payload_len > MAX_RPC_PAYLOAD_SIZE {
338 return Err(ClusterError::Codec {
339 detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
340 });
341 }
342
343 let expected_total = HEADER_SIZE + payload_len as usize;
344 if data.len() < expected_total {
345 return Err(ClusterError::Codec {
346 detail: format!(
347 "frame truncated: got {} bytes, expected {expected_total}",
348 data.len()
349 ),
350 });
351 }
352
353 let payload = &data[HEADER_SIZE..expected_total];
354
355 let actual_crc = crc32c::crc32c(payload);
356 if actual_crc != expected_crc {
357 return Err(ClusterError::Codec {
358 detail: format!(
359 "CRC32C mismatch: expected {expected_crc:#010x}, got {actual_crc:#010x}"
360 ),
361 });
362 }
363
364 deserialize_payload(rpc_type, payload)
365}
366
367pub fn frame_size(header: &[u8; HEADER_SIZE]) -> Result<usize> {
370 let payload_len = u32::from_le_bytes([header[2], header[3], header[4], header[5]]);
371 if payload_len > MAX_RPC_PAYLOAD_SIZE {
372 return Err(ClusterError::Codec {
373 detail: format!("payload length {payload_len} exceeds maximum {MAX_RPC_PAYLOAD_SIZE}"),
374 });
375 }
376 Ok(HEADER_SIZE + payload_len as usize)
377}
378
379fn serialize_payload(rpc: &RaftRpc) -> Result<Vec<u8>> {
382 let bytes = match rpc {
383 RaftRpc::AppendEntriesRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
384 RaftRpc::AppendEntriesResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
385 RaftRpc::RequestVoteRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
386 RaftRpc::RequestVoteResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
387 RaftRpc::InstallSnapshotRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
388 RaftRpc::InstallSnapshotResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
389 RaftRpc::JoinRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
390 RaftRpc::JoinResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
391 RaftRpc::Ping(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
392 RaftRpc::Pong(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
393 RaftRpc::TopologyUpdate(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
394 RaftRpc::TopologyAck(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
395 RaftRpc::ForwardRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
396 RaftRpc::ForwardResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
397 RaftRpc::VShardEnvelope(bytes) => return Ok(bytes.clone()), RaftRpc::MetadataProposeRequest(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
399 RaftRpc::MetadataProposeResponse(msg) => rkyv::to_bytes::<rkyv::rancor::Error>(msg),
400 };
401 bytes.map(|b| b.to_vec()).map_err(|e| ClusterError::Codec {
402 detail: format!("rkyv serialize failed: {e}"),
403 })
404}
405
406fn deserialize_payload(rpc_type: u8, payload: &[u8]) -> Result<RaftRpc> {
407 let mut aligned = rkyv::util::AlignedVec::<16>::with_capacity(payload.len());
410 aligned.extend_from_slice(payload);
411
412 match rpc_type {
413 RPC_APPEND_ENTRIES_REQ => {
414 let msg = rkyv::from_bytes::<AppendEntriesRequest, rkyv::rancor::Error>(&aligned)
415 .map_err(|e| ClusterError::Codec {
416 detail: format!("rkyv deserialize AppendEntriesRequest: {e}"),
417 })?;
418 Ok(RaftRpc::AppendEntriesRequest(msg))
419 }
420 RPC_APPEND_ENTRIES_RESP => {
421 let msg = rkyv::from_bytes::<AppendEntriesResponse, rkyv::rancor::Error>(&aligned)
422 .map_err(|e| ClusterError::Codec {
423 detail: format!("rkyv deserialize AppendEntriesResponse: {e}"),
424 })?;
425 Ok(RaftRpc::AppendEntriesResponse(msg))
426 }
427 RPC_REQUEST_VOTE_REQ => {
428 let msg = rkyv::from_bytes::<RequestVoteRequest, rkyv::rancor::Error>(&aligned)
429 .map_err(|e| ClusterError::Codec {
430 detail: format!("rkyv deserialize RequestVoteRequest: {e}"),
431 })?;
432 Ok(RaftRpc::RequestVoteRequest(msg))
433 }
434 RPC_REQUEST_VOTE_RESP => {
435 let msg = rkyv::from_bytes::<RequestVoteResponse, rkyv::rancor::Error>(&aligned)
436 .map_err(|e| ClusterError::Codec {
437 detail: format!("rkyv deserialize RequestVoteResponse: {e}"),
438 })?;
439 Ok(RaftRpc::RequestVoteResponse(msg))
440 }
441 RPC_INSTALL_SNAPSHOT_REQ => {
442 let msg = rkyv::from_bytes::<InstallSnapshotRequest, rkyv::rancor::Error>(&aligned)
443 .map_err(|e| ClusterError::Codec {
444 detail: format!("rkyv deserialize InstallSnapshotRequest: {e}"),
445 })?;
446 Ok(RaftRpc::InstallSnapshotRequest(msg))
447 }
448 RPC_INSTALL_SNAPSHOT_RESP => {
449 let msg = rkyv::from_bytes::<InstallSnapshotResponse, rkyv::rancor::Error>(&aligned)
450 .map_err(|e| ClusterError::Codec {
451 detail: format!("rkyv deserialize InstallSnapshotResponse: {e}"),
452 })?;
453 Ok(RaftRpc::InstallSnapshotResponse(msg))
454 }
455 RPC_JOIN_REQ => {
456 let msg =
457 rkyv::from_bytes::<JoinRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
458 ClusterError::Codec {
459 detail: format!("rkyv deserialize JoinRequest: {e}"),
460 }
461 })?;
462 Ok(RaftRpc::JoinRequest(msg))
463 }
464 RPC_JOIN_RESP => {
465 let msg =
466 rkyv::from_bytes::<JoinResponse, rkyv::rancor::Error>(&aligned).map_err(|e| {
467 ClusterError::Codec {
468 detail: format!("rkyv deserialize JoinResponse: {e}"),
469 }
470 })?;
471 Ok(RaftRpc::JoinResponse(msg))
472 }
473 RPC_PING => {
474 let msg =
475 rkyv::from_bytes::<PingRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
476 ClusterError::Codec {
477 detail: format!("rkyv deserialize PingRequest: {e}"),
478 }
479 })?;
480 Ok(RaftRpc::Ping(msg))
481 }
482 RPC_PONG => {
483 let msg =
484 rkyv::from_bytes::<PongResponse, rkyv::rancor::Error>(&aligned).map_err(|e| {
485 ClusterError::Codec {
486 detail: format!("rkyv deserialize PongResponse: {e}"),
487 }
488 })?;
489 Ok(RaftRpc::Pong(msg))
490 }
491 RPC_TOPOLOGY_UPDATE => {
492 let msg =
493 rkyv::from_bytes::<TopologyUpdate, rkyv::rancor::Error>(&aligned).map_err(|e| {
494 ClusterError::Codec {
495 detail: format!("rkyv deserialize TopologyUpdate: {e}"),
496 }
497 })?;
498 Ok(RaftRpc::TopologyUpdate(msg))
499 }
500 RPC_TOPOLOGY_ACK => {
501 let msg =
502 rkyv::from_bytes::<TopologyAck, rkyv::rancor::Error>(&aligned).map_err(|e| {
503 ClusterError::Codec {
504 detail: format!("rkyv deserialize TopologyAck: {e}"),
505 }
506 })?;
507 Ok(RaftRpc::TopologyAck(msg))
508 }
509 RPC_FORWARD_REQ => {
510 let msg =
511 rkyv::from_bytes::<ForwardRequest, rkyv::rancor::Error>(&aligned).map_err(|e| {
512 ClusterError::Codec {
513 detail: format!("rkyv deserialize ForwardRequest: {e}"),
514 }
515 })?;
516 Ok(RaftRpc::ForwardRequest(msg))
517 }
518 RPC_FORWARD_RESP => {
519 let msg = rkyv::from_bytes::<ForwardResponse, rkyv::rancor::Error>(&aligned).map_err(
520 |e| ClusterError::Codec {
521 detail: format!("rkyv deserialize ForwardResponse: {e}"),
522 },
523 )?;
524 Ok(RaftRpc::ForwardResponse(msg))
525 }
526 RPC_VSHARD_ENVELOPE => {
527 Ok(RaftRpc::VShardEnvelope(payload.to_vec()))
529 }
530 RPC_METADATA_PROPOSE_REQ => {
531 let msg = rkyv::from_bytes::<MetadataProposeRequest, rkyv::rancor::Error>(&aligned)
532 .map_err(|e| ClusterError::Codec {
533 detail: format!("rkyv deserialize MetadataProposeRequest: {e}"),
534 })?;
535 Ok(RaftRpc::MetadataProposeRequest(msg))
536 }
537 RPC_METADATA_PROPOSE_RESP => {
538 let msg = rkyv::from_bytes::<MetadataProposeResponse, rkyv::rancor::Error>(&aligned)
539 .map_err(|e| ClusterError::Codec {
540 detail: format!("rkyv deserialize MetadataProposeResponse: {e}"),
541 })?;
542 Ok(RaftRpc::MetadataProposeResponse(msg))
543 }
544 _ => Err(ClusterError::Codec {
545 detail: format!("unknown rpc_type: {rpc_type}"),
546 }),
547 }
548}
549
550#[cfg(test)]
551mod tests {
552 use super::*;
553 use nodedb_raft::message::LogEntry;
554
555 #[test]
556 fn roundtrip_append_entries_request() {
557 let req = AppendEntriesRequest {
558 term: 5,
559 leader_id: 1,
560 prev_log_index: 99,
561 prev_log_term: 4,
562 entries: vec![
563 LogEntry {
564 term: 5,
565 index: 100,
566 data: b"put x=1".to_vec(),
567 },
568 LogEntry {
569 term: 5,
570 index: 101,
571 data: b"put y=2".to_vec(),
572 },
573 ],
574 leader_commit: 98,
575 group_id: 7,
576 };
577
578 let rpc = RaftRpc::AppendEntriesRequest(req.clone());
579 let encoded = encode(&rpc).unwrap();
580 let decoded = decode(&encoded).unwrap();
581
582 match decoded {
583 RaftRpc::AppendEntriesRequest(d) => {
584 assert_eq!(d.term, req.term);
585 assert_eq!(d.leader_id, req.leader_id);
586 assert_eq!(d.prev_log_index, req.prev_log_index);
587 assert_eq!(d.prev_log_term, req.prev_log_term);
588 assert_eq!(d.entries.len(), 2);
589 assert_eq!(d.entries[0].data, b"put x=1");
590 assert_eq!(d.entries[1].data, b"put y=2");
591 assert_eq!(d.leader_commit, req.leader_commit);
592 assert_eq!(d.group_id, req.group_id);
593 }
594 other => panic!("expected AppendEntriesRequest, got {other:?}"),
595 }
596 }
597
598 #[test]
599 fn roundtrip_append_entries_heartbeat() {
600 let req = AppendEntriesRequest {
601 term: 3,
602 leader_id: 1,
603 prev_log_index: 10,
604 prev_log_term: 2,
605 entries: vec![],
606 leader_commit: 8,
607 group_id: 0,
608 };
609
610 let rpc = RaftRpc::AppendEntriesRequest(req);
611 let encoded = encode(&rpc).unwrap();
612 let decoded = decode(&encoded).unwrap();
613
614 match decoded {
615 RaftRpc::AppendEntriesRequest(d) => {
616 assert!(d.entries.is_empty());
617 assert_eq!(d.term, 3);
618 }
619 other => panic!("expected heartbeat, got {other:?}"),
620 }
621 }
622
623 #[test]
624 fn roundtrip_append_entries_response() {
625 let resp = AppendEntriesResponse {
626 term: 5,
627 success: true,
628 last_log_index: 100,
629 };
630
631 let rpc = RaftRpc::AppendEntriesResponse(resp);
632 let encoded = encode(&rpc).unwrap();
633 let decoded = decode(&encoded).unwrap();
634
635 match decoded {
636 RaftRpc::AppendEntriesResponse(d) => {
637 assert_eq!(d.term, 5);
638 assert!(d.success);
639 assert_eq!(d.last_log_index, 100);
640 }
641 other => panic!("expected AppendEntriesResponse, got {other:?}"),
642 }
643 }
644
645 #[test]
646 fn roundtrip_request_vote_request() {
647 let req = RequestVoteRequest {
648 term: 10,
649 candidate_id: 3,
650 last_log_index: 200,
651 last_log_term: 9,
652 group_id: 42,
653 };
654
655 let rpc = RaftRpc::RequestVoteRequest(req);
656 let encoded = encode(&rpc).unwrap();
657 let decoded = decode(&encoded).unwrap();
658
659 match decoded {
660 RaftRpc::RequestVoteRequest(d) => {
661 assert_eq!(d.term, 10);
662 assert_eq!(d.candidate_id, 3);
663 assert_eq!(d.last_log_index, 200);
664 assert_eq!(d.last_log_term, 9);
665 assert_eq!(d.group_id, 42);
666 }
667 other => panic!("expected RequestVoteRequest, got {other:?}"),
668 }
669 }
670
671 #[test]
672 fn roundtrip_request_vote_response() {
673 let resp = RequestVoteResponse {
674 term: 10,
675 vote_granted: true,
676 };
677
678 let rpc = RaftRpc::RequestVoteResponse(resp);
679 let encoded = encode(&rpc).unwrap();
680 let decoded = decode(&encoded).unwrap();
681
682 match decoded {
683 RaftRpc::RequestVoteResponse(d) => {
684 assert_eq!(d.term, 10);
685 assert!(d.vote_granted);
686 }
687 other => panic!("expected RequestVoteResponse, got {other:?}"),
688 }
689 }
690
691 #[test]
692 fn roundtrip_install_snapshot_request() {
693 let data: Vec<u8> = [0xDE, 0xAD, 0xBE, 0xEF]
694 .iter()
695 .copied()
696 .cycle()
697 .take(1024)
698 .collect();
699 let req = InstallSnapshotRequest {
700 term: 7,
701 leader_id: 1,
702 last_included_index: 500,
703 last_included_term: 6,
704 offset: 0,
705 data: data.clone(),
706 done: false,
707 group_id: 3,
708 };
709
710 let rpc = RaftRpc::InstallSnapshotRequest(req);
711 let encoded = encode(&rpc).unwrap();
712 let decoded = decode(&encoded).unwrap();
713
714 match decoded {
715 RaftRpc::InstallSnapshotRequest(d) => {
716 assert_eq!(d.term, 7);
717 assert_eq!(d.leader_id, 1);
718 assert_eq!(d.last_included_index, 500);
719 assert_eq!(d.last_included_term, 6);
720 assert_eq!(d.offset, 0);
721 assert_eq!(d.data, data);
722 assert!(!d.done);
723 assert_eq!(d.group_id, 3);
724 }
725 other => panic!("expected InstallSnapshotRequest, got {other:?}"),
726 }
727 }
728
729 #[test]
730 fn roundtrip_install_snapshot_final_chunk() {
731 let req = InstallSnapshotRequest {
732 term: 7,
733 leader_id: 1,
734 last_included_index: 500,
735 last_included_term: 6,
736 offset: 4096,
737 data: vec![0xFF; 128],
738 done: true,
739 group_id: 3,
740 };
741
742 let rpc = RaftRpc::InstallSnapshotRequest(req);
743 let encoded = encode(&rpc).unwrap();
744 let decoded = decode(&encoded).unwrap();
745
746 match decoded {
747 RaftRpc::InstallSnapshotRequest(d) => {
748 assert!(d.done);
749 assert_eq!(d.offset, 4096);
750 }
751 other => panic!("expected InstallSnapshotRequest, got {other:?}"),
752 }
753 }
754
755 #[test]
756 fn roundtrip_install_snapshot_response() {
757 let resp = InstallSnapshotResponse { term: 7 };
758
759 let rpc = RaftRpc::InstallSnapshotResponse(resp);
760 let encoded = encode(&rpc).unwrap();
761 let decoded = decode(&encoded).unwrap();
762
763 match decoded {
764 RaftRpc::InstallSnapshotResponse(d) => {
765 assert_eq!(d.term, 7);
766 }
767 other => panic!("expected InstallSnapshotResponse, got {other:?}"),
768 }
769 }
770
771 #[test]
772 fn crc_corruption_detected() {
773 let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
774 term: 1,
775 vote_granted: false,
776 });
777 let mut encoded = encode(&rpc).unwrap();
778
779 if let Some(last) = encoded.last_mut() {
781 *last ^= 0x01;
782 }
783
784 let err = decode(&encoded).unwrap_err();
785 assert!(err.to_string().contains("CRC32C mismatch"), "{err}");
786 }
787
788 #[test]
789 fn version_mismatch_rejected() {
790 let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
791 term: 1,
792 vote_granted: false,
793 });
794 let mut encoded = encode(&rpc).unwrap();
795
796 encoded[0] = 99;
798
799 let err = decode(&encoded).unwrap_err();
800 assert!(
801 err.to_string().contains("unsupported wire version"),
802 "{err}"
803 );
804 }
805
806 #[test]
807 fn truncated_frame_rejected() {
808 let err = decode(&[1, 2, 3]).unwrap_err();
809 assert!(err.to_string().contains("frame too short"), "{err}");
810 }
811
812 #[test]
813 fn unknown_rpc_type_rejected() {
814 let rpc = RaftRpc::RequestVoteResponse(RequestVoteResponse {
815 term: 1,
816 vote_granted: false,
817 });
818 let mut encoded = encode(&rpc).unwrap();
819
820 encoded[1] = 255;
822
823 let err = decode(&encoded).unwrap_err();
830 assert!(err.to_string().contains("unknown rpc_type"), "{err}");
831 }
832
833 #[test]
834 fn payload_too_large_rejected() {
835 let mut frame = vec![0u8; HEADER_SIZE];
837 frame[0] = WIRE_VERSION as u8;
838 frame[1] = RPC_APPEND_ENTRIES_REQ;
839 let huge: u32 = MAX_RPC_PAYLOAD_SIZE + 1;
840 frame[2..6].copy_from_slice(&huge.to_le_bytes());
841
842 let err = decode(&frame).unwrap_err();
843 assert!(err.to_string().contains("exceeds maximum"), "{err}");
844 }
845
846 #[test]
847 fn frame_size_helper() {
848 let rpc = RaftRpc::AppendEntriesResponse(AppendEntriesResponse {
849 term: 1,
850 success: true,
851 last_log_index: 5,
852 });
853 let encoded = encode(&rpc).unwrap();
854
855 let header: [u8; HEADER_SIZE] = encoded[..HEADER_SIZE].try_into().unwrap();
856 let size = frame_size(&header).unwrap();
857 assert_eq!(size, encoded.len());
858 }
859
860 #[test]
861 fn large_snapshot_roundtrip() {
862 let data = vec![0xAB; 1024 * 1024];
864 let req = InstallSnapshotRequest {
865 term: 100,
866 leader_id: 5,
867 last_included_index: 999_999,
868 last_included_term: 99,
869 offset: 0,
870 data: data.clone(),
871 done: false,
872 group_id: 0,
873 };
874
875 let rpc = RaftRpc::InstallSnapshotRequest(req);
876 let encoded = encode(&rpc).unwrap();
877 let decoded = decode(&encoded).unwrap();
878
879 match decoded {
880 RaftRpc::InstallSnapshotRequest(d) => {
881 assert_eq!(d.data.len(), 1024 * 1024);
882 assert_eq!(d.data, data);
883 }
884 other => panic!("expected InstallSnapshotRequest, got {other:?}"),
885 }
886 }
887
888 #[test]
889 fn roundtrip_join_request() {
890 let req = JoinRequest {
891 node_id: 42,
892 listen_addr: "10.0.0.5:9400".into(),
893 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
894 };
895
896 let rpc = RaftRpc::JoinRequest(req);
897 let encoded = encode(&rpc).unwrap();
898 let decoded = decode(&encoded).unwrap();
899
900 match decoded {
901 RaftRpc::JoinRequest(d) => {
902 assert_eq!(d.node_id, 42);
903 assert_eq!(d.listen_addr, "10.0.0.5:9400");
904 }
905 other => panic!("expected JoinRequest, got {other:?}"),
906 }
907 }
908
909 #[test]
910 fn roundtrip_join_response() {
911 let resp = JoinResponse {
912 success: true,
913 error: String::new(),
914 cluster_id: 12345,
915 nodes: vec![
916 JoinNodeInfo {
917 node_id: 1,
918 addr: "10.0.0.1:9400".into(),
919 state: 1,
920 raft_groups: vec![0, 1],
921 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
922 },
923 JoinNodeInfo {
924 node_id: 2,
925 addr: "10.0.0.2:9400".into(),
926 state: 1,
927 raft_groups: vec![0, 1],
928 wire_version: crate::topology::CLUSTER_WIRE_FORMAT_VERSION,
929 },
930 ],
931 vshard_to_group: (0..1024u64).map(|i| i % 4).collect(),
932 groups: vec![JoinGroupInfo {
933 group_id: 0,
934 leader: 1,
935 members: vec![1, 2],
936 learners: vec![],
937 }],
938 };
939
940 let rpc = RaftRpc::JoinResponse(resp);
941 let encoded = encode(&rpc).unwrap();
942 let decoded = decode(&encoded).unwrap();
943
944 match decoded {
945 RaftRpc::JoinResponse(d) => {
946 assert!(d.success);
947 assert_eq!(d.nodes.len(), 2);
948 assert_eq!(d.vshard_to_group.len(), 1024);
949 assert_eq!(d.groups.len(), 1);
950 assert_eq!(d.groups[0].leader, 1);
951 }
952 other => panic!("expected JoinResponse, got {other:?}"),
953 }
954 }
955}