1use crate::error::{RepError, Result};
7use crate::node_type::NodeType;
8use crate::rep_node::RepNode;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GroupChangeType {
13 Add,
15 Remove,
17 Update,
19}
20
21#[derive(Debug, Clone, PartialEq)]
27pub enum ProtocolMessage {
28 Handshake { node_name: String, group_name: String, node_type: NodeType },
31 HandshakeResponse { accepted: bool, reason: Option<String> },
33
34 Heartbeat { master_vlsn: u64, timestamp_ms: u64 },
37 HeartbeatResponse { replica_vlsn: u64, timestamp_ms: u64 },
39
40 LogEntry { vlsn: u64, entry_type: u8, data: Vec<u8> },
43 Ack { vlsn: u64 },
45
46 GroupChange { change_type: GroupChangeType, node: RepNode },
49 GroupChangeResponse { accepted: bool },
51
52 ElectionProposal {
55 node_name: String,
56 vlsn: u64,
57 priority: u32,
58 term: u64,
59 dtvlsn: u64,
61 },
62 ElectionVote { voter: String, granted: bool, term: u64 },
64 ElectionResult { master: String, term: u64 },
66
67 Shutdown { reason: String },
70}
71
72const TAG_HANDSHAKE: u8 = 1;
74const TAG_HANDSHAKE_RESPONSE: u8 = 2;
75const TAG_HEARTBEAT: u8 = 3;
76const TAG_HEARTBEAT_RESPONSE: u8 = 4;
77const TAG_LOG_ENTRY: u8 = 5;
78const TAG_ACK: u8 = 6;
79const TAG_GROUP_CHANGE: u8 = 7;
80const TAG_GROUP_CHANGE_RESPONSE: u8 = 8;
81const TAG_ELECTION_PROPOSAL: u8 = 9;
82const TAG_ELECTION_VOTE: u8 = 10;
83const TAG_ELECTION_RESULT: u8 = 11;
84const TAG_SHUTDOWN: u8 = 12;
85
86impl ProtocolMessage {
87 pub fn encode(&self) -> Vec<u8> {
94 let mut buf = Vec::new();
95 match self {
96 ProtocolMessage::Handshake { node_name, group_name, node_type } => {
97 buf.push(TAG_HANDSHAKE);
98 encode_string(&mut buf, node_name);
99 encode_string(&mut buf, group_name);
100 buf.push(encode_node_type(node_type));
101 }
102 ProtocolMessage::HandshakeResponse { accepted, reason } => {
103 buf.push(TAG_HANDSHAKE_RESPONSE);
104 buf.push(if *accepted { 1 } else { 0 });
105 match reason {
106 Some(r) => {
107 buf.push(1); encode_string(&mut buf, r);
109 }
110 None => {
111 buf.push(0); }
113 }
114 }
115 ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms } => {
116 buf.push(TAG_HEARTBEAT);
117 buf.extend_from_slice(&master_vlsn.to_le_bytes());
118 buf.extend_from_slice(×tamp_ms.to_le_bytes());
119 }
120 ProtocolMessage::HeartbeatResponse {
121 replica_vlsn,
122 timestamp_ms,
123 } => {
124 buf.push(TAG_HEARTBEAT_RESPONSE);
125 buf.extend_from_slice(&replica_vlsn.to_le_bytes());
126 buf.extend_from_slice(×tamp_ms.to_le_bytes());
127 }
128 ProtocolMessage::LogEntry { vlsn, entry_type, data } => {
129 buf.push(TAG_LOG_ENTRY);
130 buf.extend_from_slice(&vlsn.to_le_bytes());
131 buf.push(*entry_type);
132 buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
133 buf.extend_from_slice(data);
134 }
135 ProtocolMessage::Ack { vlsn } => {
136 buf.push(TAG_ACK);
137 buf.extend_from_slice(&vlsn.to_le_bytes());
138 }
139 ProtocolMessage::GroupChange { change_type, node } => {
140 buf.push(TAG_GROUP_CHANGE);
141 buf.push(encode_change_type(change_type));
142 encode_rep_node(&mut buf, node);
143 }
144 ProtocolMessage::GroupChangeResponse { accepted } => {
145 buf.push(TAG_GROUP_CHANGE_RESPONSE);
146 buf.push(if *accepted { 1 } else { 0 });
147 }
148 ProtocolMessage::ElectionProposal {
149 node_name,
150 vlsn,
151 priority,
152 term,
153 dtvlsn,
154 } => {
155 buf.push(TAG_ELECTION_PROPOSAL);
156 encode_string(&mut buf, node_name);
157 buf.extend_from_slice(&vlsn.to_le_bytes());
158 buf.extend_from_slice(&priority.to_le_bytes());
159 buf.extend_from_slice(&term.to_le_bytes());
160 buf.extend_from_slice(&dtvlsn.to_le_bytes());
161 }
162 ProtocolMessage::ElectionVote { voter, granted, term } => {
163 buf.push(TAG_ELECTION_VOTE);
164 encode_string(&mut buf, voter);
165 buf.push(if *granted { 1 } else { 0 });
166 buf.extend_from_slice(&term.to_le_bytes());
167 }
168 ProtocolMessage::ElectionResult { master, term } => {
169 buf.push(TAG_ELECTION_RESULT);
170 encode_string(&mut buf, master);
171 buf.extend_from_slice(&term.to_le_bytes());
172 }
173 ProtocolMessage::Shutdown { reason } => {
174 buf.push(TAG_SHUTDOWN);
175 encode_string(&mut buf, reason);
176 }
177 }
178 buf
179 }
180
181 pub fn decode(data: &[u8]) -> Result<Self> {
183 if data.is_empty() {
184 return Err(RepError::ProtocolError("empty message".to_string()));
185 }
186 let tag = data[0];
187 let mut pos = 1;
188
189 match tag {
190 TAG_HANDSHAKE => {
191 let node_name = decode_string(data, &mut pos)?;
192 let group_name = decode_string(data, &mut pos)?;
193 let node_type = decode_node_type(data, &mut pos)?;
194 Ok(ProtocolMessage::Handshake {
195 node_name,
196 group_name,
197 node_type,
198 })
199 }
200 TAG_HANDSHAKE_RESPONSE => {
201 let accepted = decode_bool(data, &mut pos)?;
202 let has_reason = decode_bool(data, &mut pos)?;
203 let reason = if has_reason {
204 Some(decode_string(data, &mut pos)?)
205 } else {
206 None
207 };
208 Ok(ProtocolMessage::HandshakeResponse { accepted, reason })
209 }
210 TAG_HEARTBEAT => {
211 let master_vlsn = decode_u64(data, &mut pos)?;
212 let timestamp_ms = decode_u64(data, &mut pos)?;
213 Ok(ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms })
214 }
215 TAG_HEARTBEAT_RESPONSE => {
216 let replica_vlsn = decode_u64(data, &mut pos)?;
217 let timestamp_ms = decode_u64(data, &mut pos)?;
218 Ok(ProtocolMessage::HeartbeatResponse {
219 replica_vlsn,
220 timestamp_ms,
221 })
222 }
223 TAG_LOG_ENTRY => {
224 let vlsn = decode_u64(data, &mut pos)?;
225 let entry_type = decode_u8(data, &mut pos)?;
226 let data_len = decode_u32(data, &mut pos)? as usize;
227 let payload = decode_bytes(data, &mut pos, data_len)?;
228 Ok(ProtocolMessage::LogEntry {
229 vlsn,
230 entry_type,
231 data: payload,
232 })
233 }
234 TAG_ACK => {
235 let vlsn = decode_u64(data, &mut pos)?;
236 Ok(ProtocolMessage::Ack { vlsn })
237 }
238 TAG_GROUP_CHANGE => {
239 let change_type = decode_change_type(data, &mut pos)?;
240 let node = decode_rep_node(data, &mut pos)?;
241 Ok(ProtocolMessage::GroupChange { change_type, node })
242 }
243 TAG_GROUP_CHANGE_RESPONSE => {
244 let accepted = decode_bool(data, &mut pos)?;
245 Ok(ProtocolMessage::GroupChangeResponse { accepted })
246 }
247 TAG_ELECTION_PROPOSAL => {
248 let node_name = decode_string(data, &mut pos)?;
249 let vlsn = decode_u64(data, &mut pos)?;
250 let priority = decode_u32(data, &mut pos)?;
251 let term = decode_u64(data, &mut pos)?;
252 let dtvlsn = decode_u64(data, &mut pos)?;
253 Ok(ProtocolMessage::ElectionProposal {
254 node_name,
255 vlsn,
256 priority,
257 term,
258 dtvlsn,
259 })
260 }
261 TAG_ELECTION_VOTE => {
262 let voter = decode_string(data, &mut pos)?;
263 let granted = decode_bool(data, &mut pos)?;
264 let term = decode_u64(data, &mut pos)?;
265 Ok(ProtocolMessage::ElectionVote { voter, granted, term })
266 }
267 TAG_ELECTION_RESULT => {
268 let master = decode_string(data, &mut pos)?;
269 let term = decode_u64(data, &mut pos)?;
270 Ok(ProtocolMessage::ElectionResult { master, term })
271 }
272 TAG_SHUTDOWN => {
273 let reason = decode_string(data, &mut pos)?;
274 Ok(ProtocolMessage::Shutdown { reason })
275 }
276 _ => Err(RepError::ProtocolError(format!(
277 "unknown message tag: {}",
278 tag
279 ))),
280 }
281 }
282}
283
284fn encode_string(buf: &mut Vec<u8>, s: &str) {
287 let bytes = s.as_bytes();
288 buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
289 buf.extend_from_slice(bytes);
290}
291
292fn encode_node_type(nt: &NodeType) -> u8 {
293 match nt {
294 NodeType::Electable => 0,
295 NodeType::Monitor => 1,
296 NodeType::Secondary => 2,
297 NodeType::Arbiter => 3,
298 }
299}
300
301fn encode_change_type(ct: &GroupChangeType) -> u8 {
302 match ct {
303 GroupChangeType::Add => 0,
304 GroupChangeType::Remove => 1,
305 GroupChangeType::Update => 2,
306 }
307}
308
309fn encode_rep_node(buf: &mut Vec<u8>, node: &RepNode) {
310 encode_string(buf, &node.name);
311 buf.push(encode_node_type(&node.node_type));
312 encode_string(buf, &node.host);
313 buf.extend_from_slice(&node.port.to_le_bytes());
314 buf.extend_from_slice(&node.node_id.to_le_bytes());
315}
316
317fn ensure_remaining(data: &[u8], pos: usize, needed: usize) -> Result<()> {
320 if pos + needed > data.len() {
321 Err(RepError::ProtocolError(format!(
322 "unexpected end of message at offset {}, need {} more bytes",
323 pos, needed
324 )))
325 } else {
326 Ok(())
327 }
328}
329
330fn decode_u8(data: &[u8], pos: &mut usize) -> Result<u8> {
331 ensure_remaining(data, *pos, 1)?;
332 let val = data[*pos];
333 *pos += 1;
334 Ok(val)
335}
336
337fn decode_bool(data: &[u8], pos: &mut usize) -> Result<bool> {
338 let val = decode_u8(data, pos)?;
339 Ok(val != 0)
340}
341
342fn decode_u16(data: &[u8], pos: &mut usize) -> Result<u16> {
343 ensure_remaining(data, *pos, 2)?;
344 let val = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
345 *pos += 2;
346 Ok(val)
347}
348
349fn decode_u32(data: &[u8], pos: &mut usize) -> Result<u32> {
350 ensure_remaining(data, *pos, 4)?;
351 let val = u32::from_le_bytes([
352 data[*pos],
353 data[*pos + 1],
354 data[*pos + 2],
355 data[*pos + 3],
356 ]);
357 *pos += 4;
358 Ok(val)
359}
360
361fn decode_u64(data: &[u8], pos: &mut usize) -> Result<u64> {
362 ensure_remaining(data, *pos, 8)?;
363 let val = u64::from_le_bytes([
364 data[*pos],
365 data[*pos + 1],
366 data[*pos + 2],
367 data[*pos + 3],
368 data[*pos + 4],
369 data[*pos + 5],
370 data[*pos + 6],
371 data[*pos + 7],
372 ]);
373 *pos += 8;
374 Ok(val)
375}
376
377fn decode_string(data: &[u8], pos: &mut usize) -> Result<String> {
378 let len = decode_u32(data, pos)? as usize;
379 let bytes = decode_bytes(data, pos, len)?;
380 String::from_utf8(bytes).map_err(|e| {
381 RepError::ProtocolError(format!("invalid UTF-8 in string: {}", e))
382 })
383}
384
385fn decode_bytes(data: &[u8], pos: &mut usize, len: usize) -> Result<Vec<u8>> {
386 ensure_remaining(data, *pos, len)?;
387 let bytes = data[*pos..*pos + len].to_vec();
388 *pos += len;
389 Ok(bytes)
390}
391
392fn decode_node_type(data: &[u8], pos: &mut usize) -> Result<NodeType> {
393 let val = decode_u8(data, pos)?;
394 match val {
395 0 => Ok(NodeType::Electable),
396 1 => Ok(NodeType::Monitor),
397 2 => Ok(NodeType::Secondary),
398 3 => Ok(NodeType::Arbiter),
399 _ => {
400 Err(RepError::ProtocolError(format!("unknown node type: {}", val)))
401 }
402 }
403}
404
405fn decode_change_type(data: &[u8], pos: &mut usize) -> Result<GroupChangeType> {
406 let val = decode_u8(data, pos)?;
407 match val {
408 0 => Ok(GroupChangeType::Add),
409 1 => Ok(GroupChangeType::Remove),
410 2 => Ok(GroupChangeType::Update),
411 _ => Err(RepError::ProtocolError(format!(
412 "unknown change type: {}",
413 val
414 ))),
415 }
416}
417
418fn decode_rep_node(data: &[u8], pos: &mut usize) -> Result<RepNode> {
419 let name = decode_string(data, pos)?;
420 let node_type = decode_node_type(data, pos)?;
421 let host = decode_string(data, pos)?;
422 let port = decode_u16(data, pos)?;
423 let node_id = decode_u32(data, pos)?;
424 Ok(RepNode::new(name, node_type, host, port, node_id))
425}
426
427#[cfg(test)]
428mod tests {
429 use super::*;
430
431 fn round_trip(msg: &ProtocolMessage) {
433 let encoded = msg.encode();
434 let decoded = ProtocolMessage::decode(&encoded).unwrap();
435 assert_eq!(*msg, decoded);
436 }
437
438 #[test]
439 fn test_handshake_round_trip() {
440 round_trip(&ProtocolMessage::Handshake {
441 node_name: "node1".to_string(),
442 group_name: "group1".to_string(),
443 node_type: NodeType::Electable,
444 });
445 }
446
447 #[test]
448 fn test_handshake_all_node_types() {
449 for nt in &[
450 NodeType::Electable,
451 NodeType::Monitor,
452 NodeType::Secondary,
453 NodeType::Arbiter,
454 ] {
455 round_trip(&ProtocolMessage::Handshake {
456 node_name: "n".to_string(),
457 group_name: "g".to_string(),
458 node_type: *nt,
459 });
460 }
461 }
462
463 #[test]
464 fn test_handshake_response_accepted() {
465 round_trip(&ProtocolMessage::HandshakeResponse {
466 accepted: true,
467 reason: None,
468 });
469 }
470
471 #[test]
472 fn test_handshake_response_rejected() {
473 round_trip(&ProtocolMessage::HandshakeResponse {
474 accepted: false,
475 reason: Some("group mismatch".to_string()),
476 });
477 }
478
479 #[test]
480 fn test_heartbeat_round_trip() {
481 round_trip(&ProtocolMessage::Heartbeat {
482 master_vlsn: 12345,
483 timestamp_ms: 1700000000000,
484 });
485 }
486
487 #[test]
488 fn test_heartbeat_response_round_trip() {
489 round_trip(&ProtocolMessage::HeartbeatResponse {
490 replica_vlsn: 12340,
491 timestamp_ms: 1700000000001,
492 });
493 }
494
495 #[test]
496 fn test_log_entry_round_trip() {
497 round_trip(&ProtocolMessage::LogEntry {
498 vlsn: 100,
499 entry_type: 42,
500 data: vec![1, 2, 3, 4, 5],
501 });
502 }
503
504 #[test]
505 fn test_log_entry_empty_data() {
506 round_trip(&ProtocolMessage::LogEntry {
507 vlsn: 1,
508 entry_type: 0,
509 data: vec![],
510 });
511 }
512
513 #[test]
514 fn test_log_entry_large_data() {
515 let data = vec![0xAB; 10000];
516 round_trip(&ProtocolMessage::LogEntry {
517 vlsn: u64::MAX,
518 entry_type: 255,
519 data,
520 });
521 }
522
523 #[test]
524 fn test_ack_round_trip() {
525 round_trip(&ProtocolMessage::Ack { vlsn: 999 });
526 }
527
528 #[test]
529 fn test_group_change_add() {
530 round_trip(&ProtocolMessage::GroupChange {
531 change_type: GroupChangeType::Add,
532 node: RepNode::new(
533 "new_node".to_string(),
534 NodeType::Electable,
535 "10.0.0.5".to_string(),
536 5001,
537 7,
538 ),
539 });
540 }
541
542 #[test]
543 fn test_group_change_remove() {
544 round_trip(&ProtocolMessage::GroupChange {
545 change_type: GroupChangeType::Remove,
546 node: RepNode::new(
547 "old_node".to_string(),
548 NodeType::Monitor,
549 "localhost".to_string(),
550 6000,
551 3,
552 ),
553 });
554 }
555
556 #[test]
557 fn test_group_change_update() {
558 round_trip(&ProtocolMessage::GroupChange {
559 change_type: GroupChangeType::Update,
560 node: RepNode::new(
561 "node1".to_string(),
562 NodeType::Secondary,
563 "192.168.1.1".to_string(),
564 7000,
565 1,
566 ),
567 });
568 }
569
570 #[test]
571 fn test_group_change_response_accepted() {
572 round_trip(&ProtocolMessage::GroupChangeResponse { accepted: true });
573 }
574
575 #[test]
576 fn test_group_change_response_rejected() {
577 round_trip(&ProtocolMessage::GroupChangeResponse { accepted: false });
578 }
579
580 #[test]
581 fn test_election_proposal_round_trip() {
582 round_trip(&ProtocolMessage::ElectionProposal {
583 node_name: "candidate1".to_string(),
584 vlsn: 5000,
585 priority: 10,
586 term: 3,
587 dtvlsn: 4900,
588 });
589 }
590
591 #[test]
592 fn test_election_vote_granted() {
593 round_trip(&ProtocolMessage::ElectionVote {
594 voter: "voter1".to_string(),
595 granted: true,
596 term: 3,
597 });
598 }
599
600 #[test]
601 fn test_election_vote_denied() {
602 round_trip(&ProtocolMessage::ElectionVote {
603 voter: "voter2".to_string(),
604 granted: false,
605 term: 2,
606 });
607 }
608
609 #[test]
610 fn test_election_result_round_trip() {
611 round_trip(&ProtocolMessage::ElectionResult {
612 master: "new_master".to_string(),
613 term: 4,
614 });
615 }
616
617 #[test]
618 fn test_shutdown_round_trip() {
619 round_trip(&ProtocolMessage::Shutdown {
620 reason: "maintenance window".to_string(),
621 });
622 }
623
624 #[test]
625 fn test_decode_empty_data() {
626 let result = ProtocolMessage::decode(&[]);
627 assert!(result.is_err());
628 match result.unwrap_err() {
629 RepError::ProtocolError(msg) => assert!(msg.contains("empty")),
630 other => panic!("unexpected error: {:?}", other),
631 }
632 }
633
634 #[test]
635 fn test_decode_unknown_tag() {
636 let result = ProtocolMessage::decode(&[255]);
637 assert!(result.is_err());
638 match result.unwrap_err() {
639 RepError::ProtocolError(msg) => {
640 assert!(msg.contains("unknown message tag"))
641 }
642 other => panic!("unexpected error: {:?}", other),
643 }
644 }
645
646 #[test]
647 fn test_decode_truncated_heartbeat() {
648 let result = ProtocolMessage::decode(&[TAG_HEARTBEAT, 0, 0]);
650 assert!(result.is_err());
651 match result.unwrap_err() {
652 RepError::ProtocolError(msg) => {
653 assert!(msg.contains("unexpected end"))
654 }
655 other => panic!("unexpected error: {:?}", other),
656 }
657 }
658
659 #[test]
660 fn test_decode_truncated_string() {
661 let mut data = vec![TAG_HANDSHAKE];
663 data.extend_from_slice(&100u32.to_le_bytes());
664 data.extend_from_slice(b"ab");
665 let result = ProtocolMessage::decode(&data);
666 assert!(result.is_err());
667 }
668
669 #[test]
670 fn test_encode_produces_non_empty() {
671 let msgs = vec![
672 ProtocolMessage::Handshake {
673 node_name: "n".to_string(),
674 group_name: "g".to_string(),
675 node_type: NodeType::Electable,
676 },
677 ProtocolMessage::Ack { vlsn: 0 },
678 ProtocolMessage::Shutdown { reason: "done".to_string() },
679 ];
680 for msg in &msgs {
681 assert!(!msg.encode().is_empty());
682 }
683 }
684
685 #[test]
686 fn test_group_change_type_debug() {
687 assert_eq!(format!("{:?}", GroupChangeType::Add), "Add");
688 assert_eq!(format!("{:?}", GroupChangeType::Remove), "Remove");
689 assert_eq!(format!("{:?}", GroupChangeType::Update), "Update");
690 }
691
692 #[test]
693 fn test_unicode_string_round_trip() {
694 round_trip(&ProtocolMessage::Shutdown {
695 reason: "arret planifie".to_string(),
696 });
697 }
698
699 #[test]
700 fn test_max_values_round_trip() {
701 round_trip(&ProtocolMessage::Heartbeat {
702 master_vlsn: u64::MAX,
703 timestamp_ms: u64::MAX,
704 });
705 round_trip(&ProtocolMessage::ElectionProposal {
706 node_name: "x".to_string(),
707 vlsn: u64::MAX,
708 priority: u32::MAX,
709 term: u64::MAX,
710 dtvlsn: u64::MAX,
711 });
712 }
713
714 #[test]
715 fn test_zero_values_round_trip() {
716 round_trip(&ProtocolMessage::Heartbeat {
717 master_vlsn: 0,
718 timestamp_ms: 0,
719 });
720 round_trip(&ProtocolMessage::Ack { vlsn: 0 });
721 }
722}