Skip to main content

kevy_elect/
message.rs

1//! Wire message types for `kevy-elect`'s control plane.
2//!
3//! All messages travel as RESP2 multi-bulk arrays (same format as
4//! the kevy keyspace plane), so encode/decode reuses `kevy-resp`'s
5//! borrowed parser. See [`docs/protocol.md`](../../docs/protocol.md)
6//! for the wire shape per variant and the state machine that
7//! consumes them.
8//!
9//! Verbs (UPPERCASE bulk strings on the wire) — uniform with kevy's
10//! existing command shape:
11//!
12//! - `HB <epoch> <node_id> <role> <repl_offset>`
13//! - `OFFER <new_epoch> <candidate_id> <repl_offset>`
14//! - `ACCEPT <epoch> <accepter_id>`
15//! - `ANNOUNCE <epoch> <new_primary_id> <new_primary_addr>`
16//!
17//! The numeric fields (epoch, offset) ride as RESP bulk-string
18//! decimals — same convention as `kevy-replicate`'s
19//! `REPLICATE FROM <offset> ID <replica_id>` handshake. Keeps every
20//! frame text-friendly for tcpdump / strace debugging.
21
22/// Self-perceived role of a node in its heartbeat. The state
23/// machine in `kevy-elect`'s reactor decides which transitions are
24/// legal; this enum is just what gets put on the wire.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Role {
27    /// This node currently accepts writes.
28    Primary,
29    /// This node mirrors a primary.
30    Replica,
31    /// This node has sent `OFFER` for the current epoch and is
32    /// waiting for quorum `ACCEPT`. Transitional — once enough
33    /// ACCEPTs arrive it flips to `Primary` and broadcasts
34    /// `ANNOUNCE`; if the election times out it flips back to
35    /// `Replica` and re-arms its DOWN detector.
36    Candidate,
37}
38
39impl Role {
40    /// The wire-form lowercase ASCII for this role.
41    pub fn as_str(self) -> &'static str {
42        match self {
43            Self::Primary => "primary",
44            Self::Replica => "replica",
45            Self::Candidate => "candidate",
46        }
47    }
48
49    /// Parse the wire-form (case-insensitive).
50    pub fn parse(s: &[u8]) -> Option<Self> {
51        if s.eq_ignore_ascii_case(b"primary") {
52            Some(Self::Primary)
53        } else if s.eq_ignore_ascii_case(b"replica") {
54            Some(Self::Replica)
55        } else if s.eq_ignore_ascii_case(b"candidate") {
56            Some(Self::Candidate)
57        } else {
58            None
59        }
60    }
61}
62
63/// One decoded message off the control wire. The four variants
64/// mirror the four verbs in the protocol spec.
65#[derive(Debug, Clone)]
66pub enum Message {
67    /// `HB <epoch> <node_id> <role> <repl_offset>` — heartbeat.
68    /// Sent every `hb_interval_ms` (default 200 ms) by every node
69    /// to every other peer. Receiver updates its per-peer
70    /// last-seen + cached view; there is no ACK.
71    Hb {
72        /// Election epoch the sender believes is current.
73        epoch: u64,
74        /// Sender's node id (operator-declared, stable, unique).
75        node_id: String,
76        /// Sender's self-perceived role.
77        role: Role,
78        /// Highest applied replication offset on the sender.
79        repl_offset: u64,
80    },
81
82    /// `OFFER <new_epoch> <candidate_id> <repl_offset>` — a
83    /// replica that flagged the primary DOWN AND won candidate-
84    /// selection (highest offset → lowest node-id) broadcasts
85    /// this to ask for quorum ACCEPT.
86    Offer {
87        /// Strictly greater than every previously-seen epoch.
88        new_epoch: u64,
89        /// Candidate's node id.
90        candidate_id: String,
91        /// Candidate's `repl_offset` — peers reject the OFFER if
92        /// they themselves have a higher offset (a better
93        /// candidate must exist).
94        repl_offset: u64,
95    },
96
97    /// `ACCEPT <epoch> <accepter_id>` — a peer's vote for an
98    /// `OFFER`. Each peer casts at most ONE accept per epoch
99    /// (prevents two candidates from gathering quorum in the same
100    /// round).
101    Accept {
102        /// The epoch being voted for.
103        epoch: u64,
104        /// The voter's node id.
105        accepter_id: String,
106    },
107
108    /// `ANNOUNCE <epoch> <new_primary_id> <new_primary_addr>` —
109    /// the winning candidate broadcasts this on hitting quorum
110    /// `N/2 + 1` ACCEPTs. Peers update their `current_epoch` and
111    /// `current_primary`, then retarget `kevy-replicate` at the
112    /// new primary. The old primary (if alive) sees this with a
113    /// newer epoch and demotes.
114    Announce {
115        /// The new election epoch.
116        epoch: u64,
117        /// New primary's node id.
118        new_primary_id: String,
119        /// New primary's `host:port` (the kevy compat port, where
120        /// the v1.18 `REPLICAOF` handshake connects).
121        new_primary_addr: String,
122    },
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128
129    #[test]
130    fn role_round_trip() {
131        for r in [Role::Primary, Role::Replica, Role::Candidate] {
132            assert_eq!(Role::parse(r.as_str().as_bytes()), Some(r));
133        }
134    }
135
136    #[test]
137    fn role_parse_case_insensitive() {
138        assert_eq!(Role::parse(b"PRIMARY"), Some(Role::Primary));
139        assert_eq!(Role::parse(b"Replica"), Some(Role::Replica));
140        assert_eq!(Role::parse(b"caNDidaTE"), Some(Role::Candidate));
141    }
142
143    #[test]
144    fn role_parse_unknown_is_none() {
145        assert_eq!(Role::parse(b"leader"), None);
146        assert_eq!(Role::parse(b""), None);
147    }
148}