1use crate::error::{RepError, Result};
7use crate::node_type::NodeType;
8use crate::rep_node::RepNode;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GroupChangeType {
13 Add,
15 Remove,
17 Update,
19}
20
21#[derive(Debug, Clone, PartialEq)]
27pub enum ProtocolMessage {
28 Handshake { node_name: String, group_name: String, node_type: NodeType },
31 HandshakeResponse { accepted: bool, reason: Option<String> },
33
34 Heartbeat { master_vlsn: u64, timestamp_ms: u64 },
37 HeartbeatResponse { replica_vlsn: u64, timestamp_ms: u64 },
39
40 LogEntry { vlsn: u64, entry_type: u8, data: Vec<u8> },
43 Ack { vlsn: u64 },
45
46 GroupChange { change_type: GroupChangeType, node: RepNode },
49 GroupChangeResponse { accepted: bool },
51
52 ElectionProposal { node_name: String, vlsn: u64, priority: u32, term: u64 },
55 ElectionVote { voter: String, granted: bool, term: u64 },
57 ElectionResult { master: String, term: u64 },
59
60 Shutdown { reason: String },
63}
64
65const TAG_HANDSHAKE: u8 = 1;
67const TAG_HANDSHAKE_RESPONSE: u8 = 2;
68const TAG_HEARTBEAT: u8 = 3;
69const TAG_HEARTBEAT_RESPONSE: u8 = 4;
70const TAG_LOG_ENTRY: u8 = 5;
71const TAG_ACK: u8 = 6;
72const TAG_GROUP_CHANGE: u8 = 7;
73const TAG_GROUP_CHANGE_RESPONSE: u8 = 8;
74const TAG_ELECTION_PROPOSAL: u8 = 9;
75const TAG_ELECTION_VOTE: u8 = 10;
76const TAG_ELECTION_RESULT: u8 = 11;
77const TAG_SHUTDOWN: u8 = 12;
78
79impl ProtocolMessage {
80 pub fn encode(&self) -> Vec<u8> {
87 let mut buf = Vec::new();
88 match self {
89 ProtocolMessage::Handshake { node_name, group_name, node_type } => {
90 buf.push(TAG_HANDSHAKE);
91 encode_string(&mut buf, node_name);
92 encode_string(&mut buf, group_name);
93 buf.push(encode_node_type(node_type));
94 }
95 ProtocolMessage::HandshakeResponse { accepted, reason } => {
96 buf.push(TAG_HANDSHAKE_RESPONSE);
97 buf.push(if *accepted { 1 } else { 0 });
98 match reason {
99 Some(r) => {
100 buf.push(1); encode_string(&mut buf, r);
102 }
103 None => {
104 buf.push(0); }
106 }
107 }
108 ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms } => {
109 buf.push(TAG_HEARTBEAT);
110 buf.extend_from_slice(&master_vlsn.to_le_bytes());
111 buf.extend_from_slice(×tamp_ms.to_le_bytes());
112 }
113 ProtocolMessage::HeartbeatResponse {
114 replica_vlsn,
115 timestamp_ms,
116 } => {
117 buf.push(TAG_HEARTBEAT_RESPONSE);
118 buf.extend_from_slice(&replica_vlsn.to_le_bytes());
119 buf.extend_from_slice(×tamp_ms.to_le_bytes());
120 }
121 ProtocolMessage::LogEntry { vlsn, entry_type, data } => {
122 buf.push(TAG_LOG_ENTRY);
123 buf.extend_from_slice(&vlsn.to_le_bytes());
124 buf.push(*entry_type);
125 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
126 buf.extend_from_slice(data);
127 }
128 ProtocolMessage::Ack { vlsn } => {
129 buf.push(TAG_ACK);
130 buf.extend_from_slice(&vlsn.to_le_bytes());
131 }
132 ProtocolMessage::GroupChange { change_type, node } => {
133 buf.push(TAG_GROUP_CHANGE);
134 buf.push(encode_change_type(change_type));
135 encode_rep_node(&mut buf, node);
136 }
137 ProtocolMessage::GroupChangeResponse { accepted } => {
138 buf.push(TAG_GROUP_CHANGE_RESPONSE);
139 buf.push(if *accepted { 1 } else { 0 });
140 }
141 ProtocolMessage::ElectionProposal {
142 node_name,
143 vlsn,
144 priority,
145 term,
146 } => {
147 buf.push(TAG_ELECTION_PROPOSAL);
148 encode_string(&mut buf, node_name);
149 buf.extend_from_slice(&vlsn.to_le_bytes());
150 buf.extend_from_slice(&priority.to_le_bytes());
151 buf.extend_from_slice(&term.to_le_bytes());
152 }
153 ProtocolMessage::ElectionVote { voter, granted, term } => {
154 buf.push(TAG_ELECTION_VOTE);
155 encode_string(&mut buf, voter);
156 buf.push(if *granted { 1 } else { 0 });
157 buf.extend_from_slice(&term.to_le_bytes());
158 }
159 ProtocolMessage::ElectionResult { master, term } => {
160 buf.push(TAG_ELECTION_RESULT);
161 encode_string(&mut buf, master);
162 buf.extend_from_slice(&term.to_le_bytes());
163 }
164 ProtocolMessage::Shutdown { reason } => {
165 buf.push(TAG_SHUTDOWN);
166 encode_string(&mut buf, reason);
167 }
168 }
169 buf
170 }
171
172 pub fn decode(data: &[u8]) -> Result<Self> {
174 if data.is_empty() {
175 return Err(RepError::ProtocolError("empty message".to_string()));
176 }
177 let tag = data[0];
178 let mut pos = 1;
179
180 match tag {
181 TAG_HANDSHAKE => {
182 let node_name = decode_string(data, &mut pos)?;
183 let group_name = decode_string(data, &mut pos)?;
184 let node_type = decode_node_type(data, &mut pos)?;
185 Ok(ProtocolMessage::Handshake {
186 node_name,
187 group_name,
188 node_type,
189 })
190 }
191 TAG_HANDSHAKE_RESPONSE => {
192 let accepted = decode_bool(data, &mut pos)?;
193 let has_reason = decode_bool(data, &mut pos)?;
194 let reason = if has_reason {
195 Some(decode_string(data, &mut pos)?)
196 } else {
197 None
198 };
199 Ok(ProtocolMessage::HandshakeResponse { accepted, reason })
200 }
201 TAG_HEARTBEAT => {
202 let master_vlsn = decode_u64(data, &mut pos)?;
203 let timestamp_ms = decode_u64(data, &mut pos)?;
204 Ok(ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms })
205 }
206 TAG_HEARTBEAT_RESPONSE => {
207 let replica_vlsn = decode_u64(data, &mut pos)?;
208 let timestamp_ms = decode_u64(data, &mut pos)?;
209 Ok(ProtocolMessage::HeartbeatResponse {
210 replica_vlsn,
211 timestamp_ms,
212 })
213 }
214 TAG_LOG_ENTRY => {
215 let vlsn = decode_u64(data, &mut pos)?;
216 let entry_type = decode_u8(data, &mut pos)?;
217 let data_len = decode_u32(data, &mut pos)? as usize;
218 let payload = decode_bytes(data, &mut pos, data_len)?;
219 Ok(ProtocolMessage::LogEntry {
220 vlsn,
221 entry_type,
222 data: payload,
223 })
224 }
225 TAG_ACK => {
226 let vlsn = decode_u64(data, &mut pos)?;
227 Ok(ProtocolMessage::Ack { vlsn })
228 }
229 TAG_GROUP_CHANGE => {
230 let change_type = decode_change_type(data, &mut pos)?;
231 let node = decode_rep_node(data, &mut pos)?;
232 Ok(ProtocolMessage::GroupChange { change_type, node })
233 }
234 TAG_GROUP_CHANGE_RESPONSE => {
235 let accepted = decode_bool(data, &mut pos)?;
236 Ok(ProtocolMessage::GroupChangeResponse { accepted })
237 }
238 TAG_ELECTION_PROPOSAL => {
239 let node_name = decode_string(data, &mut pos)?;
240 let vlsn = decode_u64(data, &mut pos)?;
241 let priority = decode_u32(data, &mut pos)?;
242 let term = decode_u64(data, &mut pos)?;
243 Ok(ProtocolMessage::ElectionProposal {
244 node_name,
245 vlsn,
246 priority,
247 term,
248 })
249 }
250 TAG_ELECTION_VOTE => {
251 let voter = decode_string(data, &mut pos)?;
252 let granted = decode_bool(data, &mut pos)?;
253 let term = decode_u64(data, &mut pos)?;
254 Ok(ProtocolMessage::ElectionVote { voter, granted, term })
255 }
256 TAG_ELECTION_RESULT => {
257 let master = decode_string(data, &mut pos)?;
258 let term = decode_u64(data, &mut pos)?;
259 Ok(ProtocolMessage::ElectionResult { master, term })
260 }
261 TAG_SHUTDOWN => {
262 let reason = decode_string(data, &mut pos)?;
263 Ok(ProtocolMessage::Shutdown { reason })
264 }
265 _ => Err(RepError::ProtocolError(format!(
266 "unknown message tag: {}",
267 tag
268 ))),
269 }
270 }
271}
272
273fn encode_string(buf: &mut Vec<u8>, s: &str) {
276 let bytes = s.as_bytes();
277 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
278 buf.extend_from_slice(bytes);
279}
280
281fn encode_node_type(nt: &NodeType) -> u8 {
282 match nt {
283 NodeType::Electable => 0,
284 NodeType::Monitor => 1,
285 NodeType::Secondary => 2,
286 NodeType::Arbiter => 3,
287 }
288}
289
290fn encode_change_type(ct: &GroupChangeType) -> u8 {
291 match ct {
292 GroupChangeType::Add => 0,
293 GroupChangeType::Remove => 1,
294 GroupChangeType::Update => 2,
295 }
296}
297
298fn encode_rep_node(buf: &mut Vec<u8>, node: &RepNode) {
299 encode_string(buf, &node.name);
300 buf.push(encode_node_type(&node.node_type));
301 encode_string(buf, &node.host);
302 buf.extend_from_slice(&node.port.to_le_bytes());
303 buf.extend_from_slice(&node.node_id.to_le_bytes());
304}
305
306fn ensure_remaining(data: &[u8], pos: usize, needed: usize) -> Result<()> {
309 if pos + needed > data.len() {
310 Err(RepError::ProtocolError(format!(
311 "unexpected end of message at offset {}, need {} more bytes",
312 pos, needed
313 )))
314 } else {
315 Ok(())
316 }
317}
318
319fn decode_u8(data: &[u8], pos: &mut usize) -> Result<u8> {
320 ensure_remaining(data, *pos, 1)?;
321 let val = data[*pos];
322 *pos += 1;
323 Ok(val)
324}
325
326fn decode_bool(data: &[u8], pos: &mut usize) -> Result<bool> {
327 let val = decode_u8(data, pos)?;
328 Ok(val != 0)
329}
330
331fn decode_u16(data: &[u8], pos: &mut usize) -> Result<u16> {
332 ensure_remaining(data, *pos, 2)?;
333 let val = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
334 *pos += 2;
335 Ok(val)
336}
337
338fn decode_u32(data: &[u8], pos: &mut usize) -> Result<u32> {
339 ensure_remaining(data, *pos, 4)?;
340 let val = u32::from_le_bytes([
341 data[*pos],
342 data[*pos + 1],
343 data[*pos + 2],
344 data[*pos + 3],
345 ]);
346 *pos += 4;
347 Ok(val)
348}
349
350fn decode_u64(data: &[u8], pos: &mut usize) -> Result<u64> {
351 ensure_remaining(data, *pos, 8)?;
352 let val = u64::from_le_bytes([
353 data[*pos],
354 data[*pos + 1],
355 data[*pos + 2],
356 data[*pos + 3],
357 data[*pos + 4],
358 data[*pos + 5],
359 data[*pos + 6],
360 data[*pos + 7],
361 ]);
362 *pos += 8;
363 Ok(val)
364}
365
366fn decode_string(data: &[u8], pos: &mut usize) -> Result<String> {
367 let len = decode_u32(data, pos)? as usize;
368 let bytes = decode_bytes(data, pos, len)?;
369 String::from_utf8(bytes).map_err(|e| {
370 RepError::ProtocolError(format!("invalid UTF-8 in string: {}", e))
371 })
372}
373
374fn decode_bytes(data: &[u8], pos: &mut usize, len: usize) -> Result<Vec<u8>> {
375 ensure_remaining(data, *pos, len)?;
376 let bytes = data[*pos..*pos + len].to_vec();
377 *pos += len;
378 Ok(bytes)
379}
380
381fn decode_node_type(data: &[u8], pos: &mut usize) -> Result<NodeType> {
382 let val = decode_u8(data, pos)?;
383 match val {
384 0 => Ok(NodeType::Electable),
385 1 => Ok(NodeType::Monitor),
386 2 => Ok(NodeType::Secondary),
387 3 => Ok(NodeType::Arbiter),
388 _ => {
389 Err(RepError::ProtocolError(format!("unknown node type: {}", val)))
390 }
391 }
392}
393
394fn decode_change_type(data: &[u8], pos: &mut usize) -> Result<GroupChangeType> {
395 let val = decode_u8(data, pos)?;
396 match val {
397 0 => Ok(GroupChangeType::Add),
398 1 => Ok(GroupChangeType::Remove),
399 2 => Ok(GroupChangeType::Update),
400 _ => Err(RepError::ProtocolError(format!(
401 "unknown change type: {}",
402 val
403 ))),
404 }
405}
406
407fn decode_rep_node(data: &[u8], pos: &mut usize) -> Result<RepNode> {
408 let name = decode_string(data, pos)?;
409 let node_type = decode_node_type(data, pos)?;
410 let host = decode_string(data, pos)?;
411 let port = decode_u16(data, pos)?;
412 let node_id = decode_u32(data, pos)?;
413 Ok(RepNode::new(name, node_type, host, port, node_id))
414}
415
416#[cfg(test)]
417mod tests {
418 use super::*;
419
420 fn round_trip(msg: &ProtocolMessage) {
422 let encoded = msg.encode();
423 let decoded = ProtocolMessage::decode(&encoded).unwrap();
424 assert_eq!(*msg, decoded);
425 }
426
427 #[test]
428 fn test_handshake_round_trip() {
429 round_trip(&ProtocolMessage::Handshake {
430 node_name: "node1".to_string(),
431 group_name: "group1".to_string(),
432 node_type: NodeType::Electable,
433 });
434 }
435
436 #[test]
437 fn test_handshake_all_node_types() {
438 for nt in &[
439 NodeType::Electable,
440 NodeType::Monitor,
441 NodeType::Secondary,
442 NodeType::Arbiter,
443 ] {
444 round_trip(&ProtocolMessage::Handshake {
445 node_name: "n".to_string(),
446 group_name: "g".to_string(),
447 node_type: *nt,
448 });
449 }
450 }
451
452 #[test]
453 fn test_handshake_response_accepted() {
454 round_trip(&ProtocolMessage::HandshakeResponse {
455 accepted: true,
456 reason: None,
457 });
458 }
459
460 #[test]
461 fn test_handshake_response_rejected() {
462 round_trip(&ProtocolMessage::HandshakeResponse {
463 accepted: false,
464 reason: Some("group mismatch".to_string()),
465 });
466 }
467
468 #[test]
469 fn test_heartbeat_round_trip() {
470 round_trip(&ProtocolMessage::Heartbeat {
471 master_vlsn: 12345,
472 timestamp_ms: 1700000000000,
473 });
474 }
475
476 #[test]
477 fn test_heartbeat_response_round_trip() {
478 round_trip(&ProtocolMessage::HeartbeatResponse {
479 replica_vlsn: 12340,
480 timestamp_ms: 1700000000001,
481 });
482 }
483
484 #[test]
485 fn test_log_entry_round_trip() {
486 round_trip(&ProtocolMessage::LogEntry {
487 vlsn: 100,
488 entry_type: 42,
489 data: vec![1, 2, 3, 4, 5],
490 });
491 }
492
493 #[test]
494 fn test_log_entry_empty_data() {
495 round_trip(&ProtocolMessage::LogEntry {
496 vlsn: 1,
497 entry_type: 0,
498 data: vec![],
499 });
500 }
501
502 #[test]
503 fn test_log_entry_large_data() {
504 let data = vec![0xAB; 10000];
505 round_trip(&ProtocolMessage::LogEntry {
506 vlsn: u64::MAX,
507 entry_type: 255,
508 data,
509 });
510 }
511
512 #[test]
513 fn test_ack_round_trip() {
514 round_trip(&ProtocolMessage::Ack { vlsn: 999 });
515 }
516
517 #[test]
518 fn test_group_change_add() {
519 round_trip(&ProtocolMessage::GroupChange {
520 change_type: GroupChangeType::Add,
521 node: RepNode::new(
522 "new_node".to_string(),
523 NodeType::Electable,
524 "10.0.0.5".to_string(),
525 5001,
526 7,
527 ),
528 });
529 }
530
531 #[test]
532 fn test_group_change_remove() {
533 round_trip(&ProtocolMessage::GroupChange {
534 change_type: GroupChangeType::Remove,
535 node: RepNode::new(
536 "old_node".to_string(),
537 NodeType::Monitor,
538 "localhost".to_string(),
539 6000,
540 3,
541 ),
542 });
543 }
544
545 #[test]
546 fn test_group_change_update() {
547 round_trip(&ProtocolMessage::GroupChange {
548 change_type: GroupChangeType::Update,
549 node: RepNode::new(
550 "node1".to_string(),
551 NodeType::Secondary,
552 "192.168.1.1".to_string(),
553 7000,
554 1,
555 ),
556 });
557 }
558
559 #[test]
560 fn test_group_change_response_accepted() {
561 round_trip(&ProtocolMessage::GroupChangeResponse { accepted: true });
562 }
563
564 #[test]
565 fn test_group_change_response_rejected() {
566 round_trip(&ProtocolMessage::GroupChangeResponse { accepted: false });
567 }
568
569 #[test]
570 fn test_election_proposal_round_trip() {
571 round_trip(&ProtocolMessage::ElectionProposal {
572 node_name: "candidate1".to_string(),
573 vlsn: 5000,
574 priority: 10,
575 term: 3,
576 });
577 }
578
579 #[test]
580 fn test_election_vote_granted() {
581 round_trip(&ProtocolMessage::ElectionVote {
582 voter: "voter1".to_string(),
583 granted: true,
584 term: 3,
585 });
586 }
587
588 #[test]
589 fn test_election_vote_denied() {
590 round_trip(&ProtocolMessage::ElectionVote {
591 voter: "voter2".to_string(),
592 granted: false,
593 term: 2,
594 });
595 }
596
597 #[test]
598 fn test_election_result_round_trip() {
599 round_trip(&ProtocolMessage::ElectionResult {
600 master: "new_master".to_string(),
601 term: 4,
602 });
603 }
604
605 #[test]
606 fn test_shutdown_round_trip() {
607 round_trip(&ProtocolMessage::Shutdown {
608 reason: "maintenance window".to_string(),
609 });
610 }
611
612 #[test]
613 fn test_decode_empty_data() {
614 let result = ProtocolMessage::decode(&[]);
615 assert!(result.is_err());
616 match result.unwrap_err() {
617 RepError::ProtocolError(msg) => assert!(msg.contains("empty")),
618 other => panic!("unexpected error: {:?}", other),
619 }
620 }
621
622 #[test]
623 fn test_decode_unknown_tag() {
624 let result = ProtocolMessage::decode(&[255]);
625 assert!(result.is_err());
626 match result.unwrap_err() {
627 RepError::ProtocolError(msg) => {
628 assert!(msg.contains("unknown message tag"))
629 }
630 other => panic!("unexpected error: {:?}", other),
631 }
632 }
633
634 #[test]
635 fn test_decode_truncated_heartbeat() {
636 let result = ProtocolMessage::decode(&[TAG_HEARTBEAT, 0, 0]);
638 assert!(result.is_err());
639 match result.unwrap_err() {
640 RepError::ProtocolError(msg) => {
641 assert!(msg.contains("unexpected end"))
642 }
643 other => panic!("unexpected error: {:?}", other),
644 }
645 }
646
647 #[test]
648 fn test_decode_truncated_string() {
649 let mut data = vec![TAG_HANDSHAKE];
651 data.extend_from_slice(&100u32.to_le_bytes());
652 data.extend_from_slice(b"ab");
653 let result = ProtocolMessage::decode(&data);
654 assert!(result.is_err());
655 }
656
657 #[test]
658 fn test_encode_produces_non_empty() {
659 let msgs = vec![
660 ProtocolMessage::Handshake {
661 node_name: "n".to_string(),
662 group_name: "g".to_string(),
663 node_type: NodeType::Electable,
664 },
665 ProtocolMessage::Ack { vlsn: 0 },
666 ProtocolMessage::Shutdown { reason: "done".to_string() },
667 ];
668 for msg in &msgs {
669 assert!(!msg.encode().is_empty());
670 }
671 }
672
673 #[test]
674 fn test_group_change_type_debug() {
675 assert_eq!(format!("{:?}", GroupChangeType::Add), "Add");
676 assert_eq!(format!("{:?}", GroupChangeType::Remove), "Remove");
677 assert_eq!(format!("{:?}", GroupChangeType::Update), "Update");
678 }
679
680 #[test]
681 fn test_unicode_string_round_trip() {
682 round_trip(&ProtocolMessage::Shutdown {
683 reason: "arret planifie".to_string(),
684 });
685 }
686
687 #[test]
688 fn test_max_values_round_trip() {
689 round_trip(&ProtocolMessage::Heartbeat {
690 master_vlsn: u64::MAX,
691 timestamp_ms: u64::MAX,
692 });
693 round_trip(&ProtocolMessage::ElectionProposal {
694 node_name: "x".to_string(),
695 vlsn: u64::MAX,
696 priority: u32::MAX,
697 term: u64::MAX,
698 });
699 }
700
701 #[test]
702 fn test_zero_values_round_trip() {
703 round_trip(&ProtocolMessage::Heartbeat {
704 master_vlsn: 0,
705 timestamp_ms: 0,
706 });
707 round_trip(&ProtocolMessage::Ack { vlsn: 0 });
708 }
709}