Skip to main content

noxu_rep/
protocol.rs

1//! Replication protocol messages.
2//!
3//! Replication protocol message types.
4//! and related classes. Uses a simple tag+length+value binary encoding.
5
6use crate::error::{RepError, Result};
7use crate::node_type::NodeType;
8use crate::rep_node::RepNode;
9
10/// Type of change to the replication group membership.
11#[derive(Debug, Clone, Copy, PartialEq, Eq)]
12pub enum GroupChangeType {
13    /// A node is being added to the group.
14    Add,
15    /// A node is being removed from the group.
16    Remove,
17    /// A node's information is being updated.
18    Update,
19}
20
21/// A message in the replication protocol.
22///
23/// These messages are exchanged between master and replica nodes for
24/// handshaking, heartbeats, log replication, elections, and group
25/// management.
26#[derive(Debug, Clone, PartialEq)]
27pub enum ProtocolMessage {
28    // --- Handshake ---
29    /// Initial handshake from a node joining the replication stream.
30    Handshake { node_name: String, group_name: String, node_type: NodeType },
31    /// Response to a handshake.
32    HandshakeResponse { accepted: bool, reason: Option<String> },
33
34    // --- Heartbeat ---
35    /// Heartbeat from master to replica.
36    Heartbeat { master_vlsn: u64, timestamp_ms: u64 },
37    /// Heartbeat response from replica to master.
38    HeartbeatResponse { replica_vlsn: u64, timestamp_ms: u64 },
39
40    // --- Replication stream ---
41    /// A log entry being replicated from master to replica.
42    LogEntry { vlsn: u64, entry_type: u8, data: Vec<u8> },
43    /// Acknowledgment of a replicated log entry.
44    Ack { vlsn: u64 },
45
46    // --- Group management ---
47    /// A group membership change request.
48    GroupChange { change_type: GroupChangeType, node: RepNode },
49    /// Response to a group change request.
50    GroupChangeResponse { accepted: bool },
51
52    // --- Election ---
53    /// An election proposal from a candidate.
54    ElectionProposal {
55        node_name: String,
56        vlsn: u64,
57        priority: u32,
58        term: u64,
59        /// DTVLSN (the major election-ranking key, D2). 0 = UNINITIALIZED.
60        dtvlsn: u64,
61    },
62    /// A vote in response to an election proposal.
63    ElectionVote { voter: String, granted: bool, term: u64 },
64    /// The result of an election.
65    ElectionResult { master: String, term: u64 },
66
67    // --- Shutdown ---
68    /// Graceful shutdown notification.
69    Shutdown { reason: String },
70}
71
72// --- Wire format tags ---
73const TAG_HANDSHAKE: u8 = 1;
74const TAG_HANDSHAKE_RESPONSE: u8 = 2;
75const TAG_HEARTBEAT: u8 = 3;
76const TAG_HEARTBEAT_RESPONSE: u8 = 4;
77const TAG_LOG_ENTRY: u8 = 5;
78const TAG_ACK: u8 = 6;
79const TAG_GROUP_CHANGE: u8 = 7;
80const TAG_GROUP_CHANGE_RESPONSE: u8 = 8;
81const TAG_ELECTION_PROPOSAL: u8 = 9;
82const TAG_ELECTION_VOTE: u8 = 10;
83const TAG_ELECTION_RESULT: u8 = 11;
84const TAG_SHUTDOWN: u8 = 12;
85
86impl ProtocolMessage {
87    /// Encodes this message into a byte vector.
88    ///
89    /// Format: `[tag: u8][payload...]`
90    /// Strings are encoded as `[len: u32 LE][utf8 bytes]`.
91    /// Booleans as a single byte (0 or 1).
92    /// Integers as little-endian fixed-width.
93    pub fn encode(&self) -> Vec<u8> {
94        let mut buf = Vec::new();
95        match self {
96            ProtocolMessage::Handshake { node_name, group_name, node_type } => {
97                buf.push(TAG_HANDSHAKE);
98                encode_string(&mut buf, node_name);
99                encode_string(&mut buf, group_name);
100                buf.push(encode_node_type(node_type));
101            }
102            ProtocolMessage::HandshakeResponse { accepted, reason } => {
103                buf.push(TAG_HANDSHAKE_RESPONSE);
104                buf.push(if *accepted { 1 } else { 0 });
105                match reason {
106                    Some(r) => {
107                        buf.push(1); // has reason
108                        encode_string(&mut buf, r);
109                    }
110                    None => {
111                        buf.push(0); // no reason
112                    }
113                }
114            }
115            ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms } => {
116                buf.push(TAG_HEARTBEAT);
117                buf.extend_from_slice(&master_vlsn.to_le_bytes());
118                buf.extend_from_slice(&timestamp_ms.to_le_bytes());
119            }
120            ProtocolMessage::HeartbeatResponse {
121                replica_vlsn,
122                timestamp_ms,
123            } => {
124                buf.push(TAG_HEARTBEAT_RESPONSE);
125                buf.extend_from_slice(&replica_vlsn.to_le_bytes());
126                buf.extend_from_slice(&timestamp_ms.to_le_bytes());
127            }
128            ProtocolMessage::LogEntry { vlsn, entry_type, data } => {
129                buf.push(TAG_LOG_ENTRY);
130                buf.extend_from_slice(&vlsn.to_le_bytes());
131                buf.push(*entry_type);
132                buf.extend_from_slice(&(data.len() as u32).to_le_bytes());
133                buf.extend_from_slice(data);
134            }
135            ProtocolMessage::Ack { vlsn } => {
136                buf.push(TAG_ACK);
137                buf.extend_from_slice(&vlsn.to_le_bytes());
138            }
139            ProtocolMessage::GroupChange { change_type, node } => {
140                buf.push(TAG_GROUP_CHANGE);
141                buf.push(encode_change_type(change_type));
142                encode_rep_node(&mut buf, node);
143            }
144            ProtocolMessage::GroupChangeResponse { accepted } => {
145                buf.push(TAG_GROUP_CHANGE_RESPONSE);
146                buf.push(if *accepted { 1 } else { 0 });
147            }
148            ProtocolMessage::ElectionProposal {
149                node_name,
150                vlsn,
151                priority,
152                term,
153                dtvlsn,
154            } => {
155                buf.push(TAG_ELECTION_PROPOSAL);
156                encode_string(&mut buf, node_name);
157                buf.extend_from_slice(&vlsn.to_le_bytes());
158                buf.extend_from_slice(&priority.to_le_bytes());
159                buf.extend_from_slice(&term.to_le_bytes());
160                buf.extend_from_slice(&dtvlsn.to_le_bytes());
161            }
162            ProtocolMessage::ElectionVote { voter, granted, term } => {
163                buf.push(TAG_ELECTION_VOTE);
164                encode_string(&mut buf, voter);
165                buf.push(if *granted { 1 } else { 0 });
166                buf.extend_from_slice(&term.to_le_bytes());
167            }
168            ProtocolMessage::ElectionResult { master, term } => {
169                buf.push(TAG_ELECTION_RESULT);
170                encode_string(&mut buf, master);
171                buf.extend_from_slice(&term.to_le_bytes());
172            }
173            ProtocolMessage::Shutdown { reason } => {
174                buf.push(TAG_SHUTDOWN);
175                encode_string(&mut buf, reason);
176            }
177        }
178        buf
179    }
180
181    /// Decodes a message from a byte slice.
182    pub fn decode(data: &[u8]) -> Result<Self> {
183        if data.is_empty() {
184            return Err(RepError::ProtocolError("empty message".to_string()));
185        }
186        let tag = data[0];
187        let mut pos = 1;
188
189        match tag {
190            TAG_HANDSHAKE => {
191                let node_name = decode_string(data, &mut pos)?;
192                let group_name = decode_string(data, &mut pos)?;
193                let node_type = decode_node_type(data, &mut pos)?;
194                Ok(ProtocolMessage::Handshake {
195                    node_name,
196                    group_name,
197                    node_type,
198                })
199            }
200            TAG_HANDSHAKE_RESPONSE => {
201                let accepted = decode_bool(data, &mut pos)?;
202                let has_reason = decode_bool(data, &mut pos)?;
203                let reason = if has_reason {
204                    Some(decode_string(data, &mut pos)?)
205                } else {
206                    None
207                };
208                Ok(ProtocolMessage::HandshakeResponse { accepted, reason })
209            }
210            TAG_HEARTBEAT => {
211                let master_vlsn = decode_u64(data, &mut pos)?;
212                let timestamp_ms = decode_u64(data, &mut pos)?;
213                Ok(ProtocolMessage::Heartbeat { master_vlsn, timestamp_ms })
214            }
215            TAG_HEARTBEAT_RESPONSE => {
216                let replica_vlsn = decode_u64(data, &mut pos)?;
217                let timestamp_ms = decode_u64(data, &mut pos)?;
218                Ok(ProtocolMessage::HeartbeatResponse {
219                    replica_vlsn,
220                    timestamp_ms,
221                })
222            }
223            TAG_LOG_ENTRY => {
224                let vlsn = decode_u64(data, &mut pos)?;
225                let entry_type = decode_u8(data, &mut pos)?;
226                let data_len = decode_u32(data, &mut pos)? as usize;
227                let payload = decode_bytes(data, &mut pos, data_len)?;
228                Ok(ProtocolMessage::LogEntry {
229                    vlsn,
230                    entry_type,
231                    data: payload,
232                })
233            }
234            TAG_ACK => {
235                let vlsn = decode_u64(data, &mut pos)?;
236                Ok(ProtocolMessage::Ack { vlsn })
237            }
238            TAG_GROUP_CHANGE => {
239                let change_type = decode_change_type(data, &mut pos)?;
240                let node = decode_rep_node(data, &mut pos)?;
241                Ok(ProtocolMessage::GroupChange { change_type, node })
242            }
243            TAG_GROUP_CHANGE_RESPONSE => {
244                let accepted = decode_bool(data, &mut pos)?;
245                Ok(ProtocolMessage::GroupChangeResponse { accepted })
246            }
247            TAG_ELECTION_PROPOSAL => {
248                let node_name = decode_string(data, &mut pos)?;
249                let vlsn = decode_u64(data, &mut pos)?;
250                let priority = decode_u32(data, &mut pos)?;
251                let term = decode_u64(data, &mut pos)?;
252                let dtvlsn = decode_u64(data, &mut pos)?;
253                Ok(ProtocolMessage::ElectionProposal {
254                    node_name,
255                    vlsn,
256                    priority,
257                    term,
258                    dtvlsn,
259                })
260            }
261            TAG_ELECTION_VOTE => {
262                let voter = decode_string(data, &mut pos)?;
263                let granted = decode_bool(data, &mut pos)?;
264                let term = decode_u64(data, &mut pos)?;
265                Ok(ProtocolMessage::ElectionVote { voter, granted, term })
266            }
267            TAG_ELECTION_RESULT => {
268                let master = decode_string(data, &mut pos)?;
269                let term = decode_u64(data, &mut pos)?;
270                Ok(ProtocolMessage::ElectionResult { master, term })
271            }
272            TAG_SHUTDOWN => {
273                let reason = decode_string(data, &mut pos)?;
274                Ok(ProtocolMessage::Shutdown { reason })
275            }
276            _ => Err(RepError::ProtocolError(format!(
277                "unknown message tag: {}",
278                tag
279            ))),
280        }
281    }
282}
283
284// --- Encoding helpers ---
285
286fn encode_string(buf: &mut Vec<u8>, s: &str) {
287    let bytes = s.as_bytes();
288    buf.extend_from_slice(&(bytes.len() as u32).to_le_bytes());
289    buf.extend_from_slice(bytes);
290}
291
292fn encode_node_type(nt: &NodeType) -> u8 {
293    match nt {
294        NodeType::Electable => 0,
295        NodeType::Monitor => 1,
296        NodeType::Secondary => 2,
297        NodeType::Arbiter => 3,
298    }
299}
300
301fn encode_change_type(ct: &GroupChangeType) -> u8 {
302    match ct {
303        GroupChangeType::Add => 0,
304        GroupChangeType::Remove => 1,
305        GroupChangeType::Update => 2,
306    }
307}
308
309fn encode_rep_node(buf: &mut Vec<u8>, node: &RepNode) {
310    encode_string(buf, &node.name);
311    buf.push(encode_node_type(&node.node_type));
312    encode_string(buf, &node.host);
313    buf.extend_from_slice(&node.port.to_le_bytes());
314    buf.extend_from_slice(&node.node_id.to_le_bytes());
315}
316
317// --- Decoding helpers ---
318
319fn ensure_remaining(data: &[u8], pos: usize, needed: usize) -> Result<()> {
320    if pos + needed > data.len() {
321        Err(RepError::ProtocolError(format!(
322            "unexpected end of message at offset {}, need {} more bytes",
323            pos, needed
324        )))
325    } else {
326        Ok(())
327    }
328}
329
330fn decode_u8(data: &[u8], pos: &mut usize) -> Result<u8> {
331    ensure_remaining(data, *pos, 1)?;
332    let val = data[*pos];
333    *pos += 1;
334    Ok(val)
335}
336
337fn decode_bool(data: &[u8], pos: &mut usize) -> Result<bool> {
338    let val = decode_u8(data, pos)?;
339    Ok(val != 0)
340}
341
342fn decode_u16(data: &[u8], pos: &mut usize) -> Result<u16> {
343    ensure_remaining(data, *pos, 2)?;
344    let val = u16::from_le_bytes([data[*pos], data[*pos + 1]]);
345    *pos += 2;
346    Ok(val)
347}
348
349fn decode_u32(data: &[u8], pos: &mut usize) -> Result<u32> {
350    ensure_remaining(data, *pos, 4)?;
351    let val = u32::from_le_bytes([
352        data[*pos],
353        data[*pos + 1],
354        data[*pos + 2],
355        data[*pos + 3],
356    ]);
357    *pos += 4;
358    Ok(val)
359}
360
361fn decode_u64(data: &[u8], pos: &mut usize) -> Result<u64> {
362    ensure_remaining(data, *pos, 8)?;
363    let val = u64::from_le_bytes([
364        data[*pos],
365        data[*pos + 1],
366        data[*pos + 2],
367        data[*pos + 3],
368        data[*pos + 4],
369        data[*pos + 5],
370        data[*pos + 6],
371        data[*pos + 7],
372    ]);
373    *pos += 8;
374    Ok(val)
375}
376
377fn decode_string(data: &[u8], pos: &mut usize) -> Result<String> {
378    let len = decode_u32(data, pos)? as usize;
379    let bytes = decode_bytes(data, pos, len)?;
380    String::from_utf8(bytes).map_err(|e| {
381        RepError::ProtocolError(format!("invalid UTF-8 in string: {}", e))
382    })
383}
384
385fn decode_bytes(data: &[u8], pos: &mut usize, len: usize) -> Result<Vec<u8>> {
386    ensure_remaining(data, *pos, len)?;
387    let bytes = data[*pos..*pos + len].to_vec();
388    *pos += len;
389    Ok(bytes)
390}
391
392fn decode_node_type(data: &[u8], pos: &mut usize) -> Result<NodeType> {
393    let val = decode_u8(data, pos)?;
394    match val {
395        0 => Ok(NodeType::Electable),
396        1 => Ok(NodeType::Monitor),
397        2 => Ok(NodeType::Secondary),
398        3 => Ok(NodeType::Arbiter),
399        _ => {
400            Err(RepError::ProtocolError(format!("unknown node type: {}", val)))
401        }
402    }
403}
404
405fn decode_change_type(data: &[u8], pos: &mut usize) -> Result<GroupChangeType> {
406    let val = decode_u8(data, pos)?;
407    match val {
408        0 => Ok(GroupChangeType::Add),
409        1 => Ok(GroupChangeType::Remove),
410        2 => Ok(GroupChangeType::Update),
411        _ => Err(RepError::ProtocolError(format!(
412            "unknown change type: {}",
413            val
414        ))),
415    }
416}
417
418fn decode_rep_node(data: &[u8], pos: &mut usize) -> Result<RepNode> {
419    let name = decode_string(data, pos)?;
420    let node_type = decode_node_type(data, pos)?;
421    let host = decode_string(data, pos)?;
422    let port = decode_u16(data, pos)?;
423    let node_id = decode_u32(data, pos)?;
424    Ok(RepNode::new(name, node_type, host, port, node_id))
425}
426
427#[cfg(test)]
428mod tests {
429    use super::*;
430
431    /// Helper: encode then decode, assert round-trip equality.
432    fn round_trip(msg: &ProtocolMessage) {
433        let encoded = msg.encode();
434        let decoded = ProtocolMessage::decode(&encoded).unwrap();
435        assert_eq!(*msg, decoded);
436    }
437
438    #[test]
439    fn test_handshake_round_trip() {
440        round_trip(&ProtocolMessage::Handshake {
441            node_name: "node1".to_string(),
442            group_name: "group1".to_string(),
443            node_type: NodeType::Electable,
444        });
445    }
446
447    #[test]
448    fn test_handshake_all_node_types() {
449        for nt in &[
450            NodeType::Electable,
451            NodeType::Monitor,
452            NodeType::Secondary,
453            NodeType::Arbiter,
454        ] {
455            round_trip(&ProtocolMessage::Handshake {
456                node_name: "n".to_string(),
457                group_name: "g".to_string(),
458                node_type: *nt,
459            });
460        }
461    }
462
463    #[test]
464    fn test_handshake_response_accepted() {
465        round_trip(&ProtocolMessage::HandshakeResponse {
466            accepted: true,
467            reason: None,
468        });
469    }
470
471    #[test]
472    fn test_handshake_response_rejected() {
473        round_trip(&ProtocolMessage::HandshakeResponse {
474            accepted: false,
475            reason: Some("group mismatch".to_string()),
476        });
477    }
478
479    #[test]
480    fn test_heartbeat_round_trip() {
481        round_trip(&ProtocolMessage::Heartbeat {
482            master_vlsn: 12345,
483            timestamp_ms: 1700000000000,
484        });
485    }
486
487    #[test]
488    fn test_heartbeat_response_round_trip() {
489        round_trip(&ProtocolMessage::HeartbeatResponse {
490            replica_vlsn: 12340,
491            timestamp_ms: 1700000000001,
492        });
493    }
494
495    #[test]
496    fn test_log_entry_round_trip() {
497        round_trip(&ProtocolMessage::LogEntry {
498            vlsn: 100,
499            entry_type: 42,
500            data: vec![1, 2, 3, 4, 5],
501        });
502    }
503
504    #[test]
505    fn test_log_entry_empty_data() {
506        round_trip(&ProtocolMessage::LogEntry {
507            vlsn: 1,
508            entry_type: 0,
509            data: vec![],
510        });
511    }
512
513    #[test]
514    fn test_log_entry_large_data() {
515        let data = vec![0xAB; 10000];
516        round_trip(&ProtocolMessage::LogEntry {
517            vlsn: u64::MAX,
518            entry_type: 255,
519            data,
520        });
521    }
522
523    #[test]
524    fn test_ack_round_trip() {
525        round_trip(&ProtocolMessage::Ack { vlsn: 999 });
526    }
527
528    #[test]
529    fn test_group_change_add() {
530        round_trip(&ProtocolMessage::GroupChange {
531            change_type: GroupChangeType::Add,
532            node: RepNode::new(
533                "new_node".to_string(),
534                NodeType::Electable,
535                "10.0.0.5".to_string(),
536                5001,
537                7,
538            ),
539        });
540    }
541
542    #[test]
543    fn test_group_change_remove() {
544        round_trip(&ProtocolMessage::GroupChange {
545            change_type: GroupChangeType::Remove,
546            node: RepNode::new(
547                "old_node".to_string(),
548                NodeType::Monitor,
549                "localhost".to_string(),
550                6000,
551                3,
552            ),
553        });
554    }
555
556    #[test]
557    fn test_group_change_update() {
558        round_trip(&ProtocolMessage::GroupChange {
559            change_type: GroupChangeType::Update,
560            node: RepNode::new(
561                "node1".to_string(),
562                NodeType::Secondary,
563                "192.168.1.1".to_string(),
564                7000,
565                1,
566            ),
567        });
568    }
569
570    #[test]
571    fn test_group_change_response_accepted() {
572        round_trip(&ProtocolMessage::GroupChangeResponse { accepted: true });
573    }
574
575    #[test]
576    fn test_group_change_response_rejected() {
577        round_trip(&ProtocolMessage::GroupChangeResponse { accepted: false });
578    }
579
580    #[test]
581    fn test_election_proposal_round_trip() {
582        round_trip(&ProtocolMessage::ElectionProposal {
583            node_name: "candidate1".to_string(),
584            vlsn: 5000,
585            priority: 10,
586            term: 3,
587            dtvlsn: 4900,
588        });
589    }
590
591    #[test]
592    fn test_election_vote_granted() {
593        round_trip(&ProtocolMessage::ElectionVote {
594            voter: "voter1".to_string(),
595            granted: true,
596            term: 3,
597        });
598    }
599
600    #[test]
601    fn test_election_vote_denied() {
602        round_trip(&ProtocolMessage::ElectionVote {
603            voter: "voter2".to_string(),
604            granted: false,
605            term: 2,
606        });
607    }
608
609    #[test]
610    fn test_election_result_round_trip() {
611        round_trip(&ProtocolMessage::ElectionResult {
612            master: "new_master".to_string(),
613            term: 4,
614        });
615    }
616
617    #[test]
618    fn test_shutdown_round_trip() {
619        round_trip(&ProtocolMessage::Shutdown {
620            reason: "maintenance window".to_string(),
621        });
622    }
623
624    #[test]
625    fn test_decode_empty_data() {
626        let result = ProtocolMessage::decode(&[]);
627        assert!(result.is_err());
628        match result.unwrap_err() {
629            RepError::ProtocolError(msg) => assert!(msg.contains("empty")),
630            other => panic!("unexpected error: {:?}", other),
631        }
632    }
633
634    #[test]
635    fn test_decode_unknown_tag() {
636        let result = ProtocolMessage::decode(&[255]);
637        assert!(result.is_err());
638        match result.unwrap_err() {
639            RepError::ProtocolError(msg) => {
640                assert!(msg.contains("unknown message tag"))
641            }
642            other => panic!("unexpected error: {:?}", other),
643        }
644    }
645
646    #[test]
647    fn test_decode_truncated_heartbeat() {
648        // Tag for heartbeat, but missing payload.
649        let result = ProtocolMessage::decode(&[TAG_HEARTBEAT, 0, 0]);
650        assert!(result.is_err());
651        match result.unwrap_err() {
652            RepError::ProtocolError(msg) => {
653                assert!(msg.contains("unexpected end"))
654            }
655            other => panic!("unexpected error: {:?}", other),
656        }
657    }
658
659    #[test]
660    fn test_decode_truncated_string() {
661        // Handshake tag + string length says 100 bytes but only 2 provided.
662        let mut data = vec![TAG_HANDSHAKE];
663        data.extend_from_slice(&100u32.to_le_bytes());
664        data.extend_from_slice(b"ab");
665        let result = ProtocolMessage::decode(&data);
666        assert!(result.is_err());
667    }
668
669    #[test]
670    fn test_encode_produces_non_empty() {
671        let msgs = vec![
672            ProtocolMessage::Handshake {
673                node_name: "n".to_string(),
674                group_name: "g".to_string(),
675                node_type: NodeType::Electable,
676            },
677            ProtocolMessage::Ack { vlsn: 0 },
678            ProtocolMessage::Shutdown { reason: "done".to_string() },
679        ];
680        for msg in &msgs {
681            assert!(!msg.encode().is_empty());
682        }
683    }
684
685    #[test]
686    fn test_group_change_type_debug() {
687        assert_eq!(format!("{:?}", GroupChangeType::Add), "Add");
688        assert_eq!(format!("{:?}", GroupChangeType::Remove), "Remove");
689        assert_eq!(format!("{:?}", GroupChangeType::Update), "Update");
690    }
691
692    #[test]
693    fn test_unicode_string_round_trip() {
694        round_trip(&ProtocolMessage::Shutdown {
695            reason: "arret planifie".to_string(),
696        });
697    }
698
699    #[test]
700    fn test_max_values_round_trip() {
701        round_trip(&ProtocolMessage::Heartbeat {
702            master_vlsn: u64::MAX,
703            timestamp_ms: u64::MAX,
704        });
705        round_trip(&ProtocolMessage::ElectionProposal {
706            node_name: "x".to_string(),
707            vlsn: u64::MAX,
708            priority: u32::MAX,
709            term: u64::MAX,
710            dtvlsn: u64::MAX,
711        });
712    }
713
714    #[test]
715    fn test_zero_values_round_trip() {
716        round_trip(&ProtocolMessage::Heartbeat {
717            master_vlsn: 0,
718            timestamp_ms: 0,
719        });
720        round_trip(&ProtocolMessage::Ack { vlsn: 0 });
721    }
722}