Skip to main content

noxu_rep/
protocol.rs

1//! Replication protocol messages.
2//!
3//! Replication protocol message types.
4//! and related classes. Uses a simple tag+length+value binary encoding.
5
6use crate::error::{RepError, Result};
7use crate::node_type::NodeType;
8use crate::rep_node::RepNode;
9
10/// Type of change to the replication group membership.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GroupChangeType {
13    /// A node is being added to the group.
14    Add,
15    /// A node is being removed from the group.
16    Remove,
17    /// A node's information is being updated.
18    Update,
19}
20
21/// A message in the replication protocol.
22///
23/// These messages are exchanged between master and replica nodes for
24/// handshaking, heartbeats, log replication, elections, and group
25/// management.
26#[derive(Debug, Clone, PartialEq)]
27pub enum ProtocolMessage {
28    // --- Handshake ---
29    /// Initial handshake from a node joining the replication stream.
30    Handshake { node_name: String, group_name: String, node_type: NodeType },
31    /// Response to a handshake.
32    HandshakeResponse { accepted: bool, reason: Option<String> },
33
34    // --- Heartbeat ---
35    /// Heartbeat from master to replica.
36    Heartbeat { master_vlsn: u64, timestamp_ms: u64 },
37    /// Heartbeat response from replica to master.
38    HeartbeatResponse { replica_vlsn: u64, timestamp_ms: u64 },
39
40    // --- Replication stream ---
41    /// A log entry being replicated from master to replica.
42    LogEntry { vlsn: u64, entry_type: u8, data: Vec<u8> },
43    /// Acknowledgment of a replicated log entry.
44    Ack { vlsn: u64 },
45
46    // --- Group management ---
47    /// A group membership change request.
48    GroupChange { change_type: GroupChangeType, node: RepNode },
49    /// Response to a group change request.
50    GroupChangeResponse { accepted: bool },
51
52    // --- Election ---
53    /// An election proposal from a candidate.
54    ElectionProposal { node_name: String, vlsn: u64, priority: u32, term: u64 },
55    /// A vote in response to an election proposal.
56    ElectionVote { voter: String, granted: bool, term: u64 },
57    /// The result of an election.
58    ElectionResult { master: String, term: u64 },
59
60    // --- Shutdown ---
61    /// Graceful shutdown notification.
62    Shutdown { reason: String },
63}
64
65// --- Wire format tags ---
66const TAG_HANDSHAKE: u8 = 1;
67const TAG_HANDSHAKE_RESPONSE: u8 = 2;
68const TAG_HEARTBEAT: u8 = 3;
69const TAG_HEARTBEAT_RESPONSE: u8 = 4;
70const TAG_LOG_ENTRY: u8 = 5;
71const TAG_ACK: u8 = 6;
72const TAG_GROUP_CHANGE: u8 = 7;
73const TAG_GROUP_CHANGE_RESPONSE: u8 = 8;
74const TAG_ELECTION_PROPOSAL: u8 = 9;
75const TAG_ELECTION_VOTE: u8 = 10;
76const TAG_ELECTION_RESULT: u8 = 11;
77const TAG_SHUTDOWN: u8 = 12;
78
79impl ProtocolMessage {
80    /// Encodes this message into a byte vector.
81    ///
82    /// Format: `[tag: u8][payload...]`
83    /// Strings are encoded as `[len: u32 LE][utf8 bytes]`.
84    /// Booleans as a single byte (0 or 1).
85    /// Integers as little-endian fixed-width.
86    pub fn encode(&self) -> Vec<u8> {
87        let mut buf = Vec::new();
88        match self {
89            ProtocolMessage::Handshake { node_name, group_name, node_type } => {
90                buf.push(TAG_HANDSHAKE);
91                encode_string(&mut buf, node_name);
92                encode_string(&mut buf, group_name);
93                buf.push(encode_node_type(node_type));
94            }
95            ProtocolMessage::HandshakeResponse { accepted, reason } => {
96                buf.push(TAG_HANDSHAKE_RESPONSE);
97                buf.push(if *accepted { 1 } else { 0 });
98                match reason {
99                    Some(r) => {
100                        buf.push(1); // has reason
101                        encode_string(&mut buf, r);
102                    }
103                    None => {
104                        buf.push(0); // no reason
105                    }
106                }
107            }
108            ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms } => {
109                buf.push(TAG_HEARTBEAT);
110                buf.extend_from_slice(&master_vlsn.to_le_bytes());
111                buf.extend_from_slice(&timestamp_ms.to_le_bytes());
112            }
113            ProtocolMessage::HeartbeatResponse {
114                replica_vlsn,
115                timestamp_ms,
116            } => {
117                buf.push(TAG_HEARTBEAT_RESPONSE);
118                buf.extend_from_slice(&replica_vlsn.to_le_bytes());
119                buf.extend_from_slice(&timestamp_ms.to_le_bytes());
120            }
121            ProtocolMessage::LogEntry { vlsn, entry_type, data } => {
122                buf.push(TAG_LOG_ENTRY);
123                buf.extend_from_slice(&vlsn.to_le_bytes());
124                buf.push(*entry_type);
125                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
126                buf.extend_from_slice(data);
127            }
128            ProtocolMessage::Ack { vlsn } => {
129                buf.push(TAG_ACK);
130                buf.extend_from_slice(&vlsn.to_le_bytes());
131            }
132            ProtocolMessage::GroupChange { change_type, node } => {
133                buf.push(TAG_GROUP_CHANGE);
134                buf.push(encode_change_type(change_type));
135                encode_rep_node(&mut buf, node);
136            }
137            ProtocolMessage::GroupChangeResponse { accepted } => {
138                buf.push(TAG_GROUP_CHANGE_RESPONSE);
139                buf.push(if *accepted { 1 } else { 0 });
140            }
141            ProtocolMessage::ElectionProposal {
142                node_name,
143                vlsn,
144                priority,
145                term,
146            } => {
147                buf.push(TAG_ELECTION_PROPOSAL);
148                encode_string(&mut buf, node_name);
149                buf.extend_from_slice(&vlsn.to_le_bytes());
150                buf.extend_from_slice(&priority.to_le_bytes());
151                buf.extend_from_slice(&term.to_le_bytes());
152            }
153            ProtocolMessage::ElectionVote { voter, granted, term } => {
154                buf.push(TAG_ELECTION_VOTE);
155                encode_string(&mut buf, voter);
156                buf.push(if *granted { 1 } else { 0 });
157                buf.extend_from_slice(&term.to_le_bytes());
158            }
159            ProtocolMessage::ElectionResult { master, term } => {
160                buf.push(TAG_ELECTION_RESULT);
161                encode_string(&mut buf, master);
162                buf.extend_from_slice(&term.to_le_bytes());
163            }
164            ProtocolMessage::Shutdown { reason } => {
165                buf.push(TAG_SHUTDOWN);
166                encode_string(&mut buf, reason);
167            }
168        }
169        buf
170    }
171
172    /// Decodes a message from a byte slice.
173    pub fn decode(data: &[u8]) -> Result<Self> {
174        if data.is_empty() {
175            return Err(RepError::ProtocolError("empty message".to_string()));
176        }
177        let tag = data[0];
178        let mut pos = 1;
179
180        match tag {
181            TAG_HANDSHAKE => {
182                let node_name = decode_string(data, &mut pos)?;
183                let group_name = decode_string(data, &mut pos)?;
184                let node_type = decode_node_type(data, &mut pos)?;
185                Ok(ProtocolMessage::Handshake {
186                    node_name,
187                    group_name,
188                    node_type,
189                })
190            }
191            TAG_HANDSHAKE_RESPONSE => {
192                let accepted = decode_bool(data, &mut pos)?;
193                let has_reason = decode_bool(data, &mut pos)?;
194                let reason = if has_reason {
195                    Some(decode_string(data, &mut pos)?)
196                } else {
197                    None
198                };
199                Ok(ProtocolMessage::HandshakeResponse { accepted, reason })
200            }
201            TAG_HEARTBEAT => {
202                let master_vlsn = decode_u64(data, &mut pos)?;
203                let timestamp_ms = decode_u64(data, &mut pos)?;
204                Ok(ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms })
205            }
206            TAG_HEARTBEAT_RESPONSE => {
207                let replica_vlsn = decode_u64(data, &mut pos)?;
208                let timestamp_ms = decode_u64(data, &mut pos)?;
209                Ok(ProtocolMessage::HeartbeatResponse {
210                    replica_vlsn,
211                    timestamp_ms,
212                })
213            }
214            TAG_LOG_ENTRY => {
215                let vlsn = decode_u64(data, &mut pos)?;
216                let entry_type = decode_u8(data, &mut pos)?;
217                let data_len = decode_u32(data, &mut pos)? as usize;
218                let payload = decode_bytes(data, &mut pos, data_len)?;
219                Ok(ProtocolMessage::LogEntry {
220                    vlsn,
221                    entry_type,
222                    data: payload,
223                })
224            }
225            TAG_ACK => {
226                let vlsn = decode_u64(data, &mut pos)?;
227                Ok(ProtocolMessage::Ack { vlsn })
228            }
229            TAG_GROUP_CHANGE => {
230                let change_type = decode_change_type(data, &mut pos)?;
231                let node = decode_rep_node(data, &mut pos)?;
232                Ok(ProtocolMessage::GroupChange { change_type, node })
233            }
234            TAG_GROUP_CHANGE_RESPONSE => {
235                let accepted = decode_bool(data, &mut pos)?;
236                Ok(ProtocolMessage::GroupChangeResponse { accepted })
237            }
238            TAG_ELECTION_PROPOSAL => {
239                let node_name = decode_string(data, &mut pos)?;
240                let vlsn = decode_u64(data, &mut pos)?;
241                let priority = decode_u32(data, &mut pos)?;
242                let term = decode_u64(data, &mut pos)?;
243                Ok(ProtocolMessage::ElectionProposal {
244                    node_name,
245                    vlsn,
246                    priority,
247                    term,
248                })
249            }
250            TAG_ELECTION_VOTE => {
251                let voter = decode_string(data, &mut pos)?;
252                let granted = decode_bool(data, &mut pos)?;
253                let term = decode_u64(data, &mut pos)?;
254                Ok(ProtocolMessage::ElectionVote { voter, granted, term })
255            }
256            TAG_ELECTION_RESULT => {
257                let master = decode_string(data, &mut pos)?;
258                let term = decode_u64(data, &mut pos)?;
259                Ok(ProtocolMessage::ElectionResult { master, term })
260            }
261            TAG_SHUTDOWN => {
262                let reason = decode_string(data, &mut pos)?;
263                Ok(ProtocolMessage::Shutdown { reason })
264            }
265            _ => Err(RepError::ProtocolError(format!(
266                "unknown message tag: {}",
267                tag
268            ))),
269        }
270    }
271}
272
273// --- Encoding helpers ---
274
275fn encode_string(buf: &mut Vec<u8>, s: &str) {
276    let bytes = s.as_bytes();
277    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
278    buf.extend_from_slice(bytes);
279}
280
281fn encode_node_type(nt: &NodeType) -> u8 {
282    match nt {
283        NodeType::Electable => 0,
284        NodeType::Monitor => 1,
285        NodeType::Secondary => 2,
286        NodeType::Arbiter => 3,
287    }
288}
289
290fn encode_change_type(ct: &GroupChangeType) -> u8 {
291    match ct {
292        GroupChangeType::Add => 0,
293        GroupChangeType::Remove => 1,
294        GroupChangeType::Update => 2,
295    }
296}
297
298fn encode_rep_node(buf: &mut Vec<u8>, node: &RepNode) {
299    encode_string(buf, &node.name);
300    buf.push(encode_node_type(&node.node_type));
301    encode_string(buf, &node.host);
302    buf.extend_from_slice(&node.port.to_le_bytes());
303    buf.extend_from_slice(&node.node_id.to_le_bytes());
304}
305
306// --- Decoding helpers ---
307
308fn ensure_remaining(data: &[u8], pos: usize, needed: usize) -> Result<()> {
309    if pos + needed > data.len() {
310        Err(RepError::ProtocolError(format!(
311            "unexpected end of message at offset {}, need {} more bytes",
312            pos, needed
313        )))
314    } else {
315        Ok(())
316    }
317}
318
319fn decode_u8(data: &[u8], pos: &mut usize) -> Result<u8> {
320    ensure_remaining(data, *pos, 1)?;
321    let val = data[*pos];
322    *pos += 1;
323    Ok(val)
324}
325
326fn decode_bool(data: &[u8], pos: &mut usize) -> Result<bool> {
327    let val = decode_u8(data, pos)?;
328    Ok(val != 0)
329}
330
331fn decode_u16(data: &[u8], pos: &mut usize) -> Result<u16> {
332    ensure_remaining(data, *pos, 2)?;
333    let val = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
334    *pos += 2;
335    Ok(val)
336}
337
338fn decode_u32(data: &[u8], pos: &mut usize) -> Result<u32> {
339    ensure_remaining(data, *pos, 4)?;
340    let val = u32::from_le_bytes([
341        data[*pos],
342        data[*pos + 1],
343        data[*pos + 2],
344        data[*pos + 3],
345    ]);
346    *pos += 4;
347    Ok(val)
348}
349
350fn decode_u64(data: &[u8], pos: &mut usize) -> Result<u64> {
351    ensure_remaining(data, *pos, 8)?;
352    let val = u64::from_le_bytes([
353        data[*pos],
354        data[*pos + 1],
355        data[*pos + 2],
356        data[*pos + 3],
357        data[*pos + 4],
358        data[*pos + 5],
359        data[*pos + 6],
360        data[*pos + 7],
361    ]);
362    *pos += 8;
363    Ok(val)
364}
365
366fn decode_string(data: &[u8], pos: &mut usize) -> Result<String> {
367    let len = decode_u32(data, pos)? as usize;
368    let bytes = decode_bytes(data, pos, len)?;
369    String::from_utf8(bytes).map_err(|e| {
370        RepError::ProtocolError(format!("invalid UTF-8 in string: {}", e))
371    })
372}
373
374fn decode_bytes(data: &[u8], pos: &mut usize, len: usize) -> Result<Vec<u8>> {
375    ensure_remaining(data, *pos, len)?;
376    let bytes = data[*pos..*pos + len].to_vec();
377    *pos += len;
378    Ok(bytes)
379}
380
381fn decode_node_type(data: &[u8], pos: &mut usize) -> Result<NodeType> {
382    let val = decode_u8(data, pos)?;
383    match val {
384        0 => Ok(NodeType::Electable),
385        1 => Ok(NodeType::Monitor),
386        2 => Ok(NodeType::Secondary),
387        3 => Ok(NodeType::Arbiter),
388        _ => {
389            Err(RepError::ProtocolError(format!("unknown node type: {}", val)))
390        }
391    }
392}
393
394fn decode_change_type(data: &[u8], pos: &mut usize) -> Result<GroupChangeType> {
395    let val = decode_u8(data, pos)?;
396    match val {
397        0 => Ok(GroupChangeType::Add),
398        1 => Ok(GroupChangeType::Remove),
399        2 => Ok(GroupChangeType::Update),
400        _ => Err(RepError::ProtocolError(format!(
401            "unknown change type: {}",
402            val
403        ))),
404    }
405}
406
407fn decode_rep_node(data: &[u8], pos: &mut usize) -> Result<RepNode> {
408    let name = decode_string(data, pos)?;
409    let node_type = decode_node_type(data, pos)?;
410    let host = decode_string(data, pos)?;
411    let port = decode_u16(data, pos)?;
412    let node_id = decode_u32(data, pos)?;
413    Ok(RepNode::new(name, node_type, host, port, node_id))
414}
415
416#[cfg(test)]
417mod tests {
418    use super::*;
419
420    /// Helper: encode then decode, assert round-trip equality.
421    fn round_trip(msg: &ProtocolMessage) {
422        let encoded = msg.encode();
423        let decoded = ProtocolMessage::decode(&encoded).unwrap();
424        assert_eq!(*msg, decoded);
425    }
426
427    #[test]
428    fn test_handshake_round_trip() {
429        round_trip(&ProtocolMessage::Handshake {
430            node_name: "node1".to_string(),
431            group_name: "group1".to_string(),
432            node_type: NodeType::Electable,
433        });
434    }
435
436    #[test]
437    fn test_handshake_all_node_types() {
438        for nt in &[
439            NodeType::Electable,
440            NodeType::Monitor,
441            NodeType::Secondary,
442            NodeType::Arbiter,
443        ] {
444            round_trip(&ProtocolMessage::Handshake {
445                node_name: "n".to_string(),
446                group_name: "g".to_string(),
447                node_type: *nt,
448            });
449        }
450    }
451
452    #[test]
453    fn test_handshake_response_accepted() {
454        round_trip(&ProtocolMessage::HandshakeResponse {
455            accepted: true,
456            reason: None,
457        });
458    }
459
460    #[test]
461    fn test_handshake_response_rejected() {
462        round_trip(&ProtocolMessage::HandshakeResponse {
463            accepted: false,
464            reason: Some("group mismatch".to_string()),
465        });
466    }
467
468    #[test]
469    fn test_heartbeat_round_trip() {
470        round_trip(&ProtocolMessage::Heartbeat {
471            master_vlsn: 12345,
472            timestamp_ms: 1700000000000,
473        });
474    }
475
476    #[test]
477    fn test_heartbeat_response_round_trip() {
478        round_trip(&ProtocolMessage::HeartbeatResponse {
479            replica_vlsn: 12340,
480            timestamp_ms: 1700000000001,
481        });
482    }
483
484    #[test]
485    fn test_log_entry_round_trip() {
486        round_trip(&ProtocolMessage::LogEntry {
487            vlsn: 100,
488            entry_type: 42,
489            data: vec![1, 2, 3, 4, 5],
490        });
491    }
492
493    #[test]
494    fn test_log_entry_empty_data() {
495        round_trip(&ProtocolMessage::LogEntry {
496            vlsn: 1,
497            entry_type: 0,
498            data: vec![],
499        });
500    }
501
502    #[test]
503    fn test_log_entry_large_data() {
504        let data = vec![0xAB; 10000];
505        round_trip(&ProtocolMessage::LogEntry {
506            vlsn: u64::MAX,
507            entry_type: 255,
508            data,
509        });
510    }
511
512    #[test]
513    fn test_ack_round_trip() {
514        round_trip(&ProtocolMessage::Ack { vlsn: 999 });
515    }
516
517    #[test]
518    fn test_group_change_add() {
519        round_trip(&ProtocolMessage::GroupChange {
520            change_type: GroupChangeType::Add,
521            node: RepNode::new(
522                "new_node".to_string(),
523                NodeType::Electable,
524                "10.0.0.5".to_string(),
525                5001,
526                7,
527            ),
528        });
529    }
530
531    #[test]
532    fn test_group_change_remove() {
533        round_trip(&ProtocolMessage::GroupChange {
534            change_type: GroupChangeType::Remove,
535            node: RepNode::new(
536                "old_node".to_string(),
537                NodeType::Monitor,
538                "localhost".to_string(),
539                6000,
540                3,
541            ),
542        });
543    }
544
545    #[test]
546    fn test_group_change_update() {
547        round_trip(&ProtocolMessage::GroupChange {
548            change_type: GroupChangeType::Update,
549            node: RepNode::new(
550                "node1".to_string(),
551                NodeType::Secondary,
552                "192.168.1.1".to_string(),
553                7000,
554                1,
555            ),
556        });
557    }
558
559    #[test]
560    fn test_group_change_response_accepted() {
561        round_trip(&ProtocolMessage::GroupChangeResponse { accepted: true });
562    }
563
564    #[test]
565    fn test_group_change_response_rejected() {
566        round_trip(&ProtocolMessage::GroupChangeResponse { accepted: false });
567    }
568
569    #[test]
570    fn test_election_proposal_round_trip() {
571        round_trip(&ProtocolMessage::ElectionProposal {
572            node_name: "candidate1".to_string(),
573            vlsn: 5000,
574            priority: 10,
575            term: 3,
576        });
577    }
578
579    #[test]
580    fn test_election_vote_granted() {
581        round_trip(&ProtocolMessage::ElectionVote {
582            voter: "voter1".to_string(),
583            granted: true,
584            term: 3,
585        });
586    }
587
588    #[test]
589    fn test_election_vote_denied() {
590        round_trip(&ProtocolMessage::ElectionVote {
591            voter: "voter2".to_string(),
592            granted: false,
593            term: 2,
594        });
595    }
596
597    #[test]
598    fn test_election_result_round_trip() {
599        round_trip(&ProtocolMessage::ElectionResult {
600            master: "new_master".to_string(),
601            term: 4,
602        });
603    }
604
605    #[test]
606    fn test_shutdown_round_trip() {
607        round_trip(&ProtocolMessage::Shutdown {
608            reason: "maintenance window".to_string(),
609        });
610    }
611
612    #[test]
613    fn test_decode_empty_data() {
614        let result = ProtocolMessage::decode(&[]);
615        assert!(result.is_err());
616        match result.unwrap_err() {
617            RepError::ProtocolError(msg) => assert!(msg.contains("empty")),
618            other => panic!("unexpected error: {:?}", other),
619        }
620    }
621
622    #[test]
623    fn test_decode_unknown_tag() {
624        let result = ProtocolMessage::decode(&[255]);
625        assert!(result.is_err());
626        match result.unwrap_err() {
627            RepError::ProtocolError(msg) => {
628                assert!(msg.contains("unknown message tag"))
629            }
630            other => panic!("unexpected error: {:?}", other),
631        }
632    }
633
634    #[test]
635    fn test_decode_truncated_heartbeat() {
636        // Tag for heartbeat, but missing payload.
637        let result = ProtocolMessage::decode(&[TAG_HEARTBEAT, 0, 0]);
638        assert!(result.is_err());
639        match result.unwrap_err() {
640            RepError::ProtocolError(msg) => {
641                assert!(msg.contains("unexpected end"))
642            }
643            other => panic!("unexpected error: {:?}", other),
644        }
645    }
646
647    #[test]
648    fn test_decode_truncated_string() {
649        // Handshake tag + string length says 100 bytes but only 2 provided.
650        let mut data = vec![TAG_HANDSHAKE];
651        data.extend_from_slice(&100u32.to_le_bytes());
652        data.extend_from_slice(b"ab");
653        let result = ProtocolMessage::decode(&data);
654        assert!(result.is_err());
655    }
656
657    #[test]
658    fn test_encode_produces_non_empty() {
659        let msgs = vec![
660            ProtocolMessage::Handshake {
661                node_name: "n".to_string(),
662                group_name: "g".to_string(),
663                node_type: NodeType::Electable,
664            },
665            ProtocolMessage::Ack { vlsn: 0 },
666            ProtocolMessage::Shutdown { reason: "done".to_string() },
667        ];
668        for msg in &msgs {
669            assert!(!msg.encode().is_empty());
670        }
671    }
672
673    #[test]
674    fn test_group_change_type_debug() {
675        assert_eq!(format!("{:?}", GroupChangeType::Add), "Add");
676        assert_eq!(format!("{:?}", GroupChangeType::Remove), "Remove");
677        assert_eq!(format!("{:?}", GroupChangeType::Update), "Update");
678    }
679
680    #[test]
681    fn test_unicode_string_round_trip() {
682        round_trip(&ProtocolMessage::Shutdown {
683            reason: "arret planifie".to_string(),
684        });
685    }
686
687    #[test]
688    fn test_max_values_round_trip() {
689        round_trip(&ProtocolMessage::Heartbeat {
690            master_vlsn: u64::MAX,
691            timestamp_ms: u64::MAX,
692        });
693        round_trip(&ProtocolMessage::ElectionProposal {
694            node_name: "x".to_string(),
695            vlsn: u64::MAX,
696            priority: u32::MAX,
697            term: u64::MAX,
698        });
699    }
700
701    #[test]
702    fn test_zero_values_round_trip() {
703        round_trip(&ProtocolMessage::Heartbeat {
704            master_vlsn: 0,
705            timestamp_ms: 0,
706        });
707        round_trip(&ProtocolMessage::Ack { vlsn: 0 });
708    }
709}