kevy_elect/message.rs
1//! Wire message types for `kevy-elect`'s control plane.
2//!
3//! All messages travel as RESP2 multi-bulk arrays (same format as
4//! the kevy keyspace plane), so encode/decode reuses `kevy-resp`'s
5//! borrowed parser. See [`docs/protocol.md`](../../docs/protocol.md)
6//! for the wire shape per variant and the state machine that
7//! consumes them.
8//!
9//! Verbs (UPPERCASE bulk strings on the wire) — uniform with kevy's
10//! existing command shape:
11//!
12//! - `HB <epoch> <node_id> <role> <repl_offset>`
13//! - `OFFER <new_epoch> <candidate_id> <repl_offset>`
14//! - `ACCEPT <epoch> <accepter_id>`
15//! - `ANNOUNCE <epoch> <new_primary_id> <new_primary_addr>`
16//!
17//! The numeric fields (epoch, offset) ride as RESP bulk-string
18//! decimals — same convention as `kevy-replicate`'s
19//! `REPLICATE FROM <offset> ID <replica_id>` handshake. Keeps every
20//! frame text-friendly for tcpdump / strace debugging.
21
22/// Self-perceived role of a node in its heartbeat. The state
23/// machine in `kevy-elect`'s reactor decides which transitions are
24/// legal; this enum is just what gets put on the wire.
25#[derive(Debug, Clone, Copy, PartialEq, Eq)]
26pub enum Role {
27 /// This node currently accepts writes.
28 Primary,
29 /// This node mirrors a primary.
30 Replica,
31 /// This node has sent `OFFER` for the current epoch and is
32 /// waiting for quorum `ACCEPT`. Transitional — once enough
33 /// ACCEPTs arrive it flips to `Primary` and broadcasts
34 /// `ANNOUNCE`; if the election times out it flips back to
35 /// `Replica` and re-arms its DOWN detector.
36 Candidate,
37}
38
39impl Role {
40 /// The wire-form lowercase ASCII for this role.
41 pub fn as_str(self) -> &'static str {
42 match self {
43 Self::Primary => "primary",
44 Self::Replica => "replica",
45 Self::Candidate => "candidate",
46 }
47 }
48
49 /// Parse the wire-form (case-insensitive).
50 pub fn parse(s: &[u8]) -> Option<Self> {
51 if s.eq_ignore_ascii_case(b"primary") {
52 Some(Self::Primary)
53 } else if s.eq_ignore_ascii_case(b"replica") {
54 Some(Self::Replica)
55 } else if s.eq_ignore_ascii_case(b"candidate") {
56 Some(Self::Candidate)
57 } else {
58 None
59 }
60 }
61}
62
63/// One decoded message off the control wire. The four variants
64/// mirror the four verbs in the protocol spec.
65#[derive(Debug, Clone)]
66pub enum Message {
67 /// `HB <epoch> <node_id> <role> <repl_offset>` — heartbeat.
68 /// Sent every `hb_interval_ms` (default 200 ms) by every node
69 /// to every other peer. Receiver updates its per-peer
70 /// last-seen + cached view; there is no ACK.
71 Hb {
72 /// Election epoch the sender believes is current.
73 epoch: u64,
74 /// Sender's node id (operator-declared, stable, unique).
75 node_id: String,
76 /// Sender's self-perceived role.
77 role: Role,
78 /// Highest applied replication offset on the sender.
79 repl_offset: u64,
80 },
81
82 /// `OFFER <new_epoch> <candidate_id> <repl_offset>` — a
83 /// replica that flagged the primary DOWN AND won candidate-
84 /// selection (highest offset → lowest node-id) broadcasts
85 /// this to ask for quorum ACCEPT.
86 Offer {
87 /// Strictly greater than every previously-seen epoch.
88 new_epoch: u64,
89 /// Candidate's node id.
90 candidate_id: String,
91 /// Candidate's `repl_offset` — peers reject the OFFER if
92 /// they themselves have a higher offset (a better
93 /// candidate must exist).
94 repl_offset: u64,
95 },
96
97 /// `ACCEPT <epoch> <accepter_id>` — a peer's vote for an
98 /// `OFFER`. Each peer casts at most ONE accept per epoch
99 /// (prevents two candidates from gathering quorum in the same
100 /// round).
101 Accept {
102 /// The epoch being voted for.
103 epoch: u64,
104 /// The voter's node id.
105 accepter_id: String,
106 },
107
108 /// `ANNOUNCE <epoch> <new_primary_id> <new_primary_addr>` —
109 /// the winning candidate broadcasts this on hitting quorum
110 /// `N/2 + 1` ACCEPTs. Peers update their `current_epoch` and
111 /// `current_primary`, then retarget `kevy-replicate` at the
112 /// new primary. The old primary (if alive) sees this with a
113 /// newer epoch and demotes.
114 Announce {
115 /// The new election epoch.
116 epoch: u64,
117 /// New primary's node id.
118 new_primary_id: String,
119 /// New primary's `host:port` (the kevy compat port, where
120 /// the v1.18 `REPLICAOF` handshake connects).
121 new_primary_addr: String,
122 },
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128
129 #[test]
130 fn role_round_trip() {
131 for r in [Role::Primary, Role::Replica, Role::Candidate] {
132 assert_eq!(Role::parse(r.as_str().as_bytes()), Some(r));
133 }
134 }
135
136 #[test]
137 fn role_parse_case_insensitive() {
138 assert_eq!(Role::parse(b"PRIMARY"), Some(Role::Primary));
139 assert_eq!(Role::parse(b"Replica"), Some(Role::Replica));
140 assert_eq!(Role::parse(b"caNDidaTE"), Some(Role::Candidate));
141 }
142
143 #[test]
144 fn role_parse_unknown_is_none() {
145 assert_eq!(Role::parse(b"leader"), None);
146 assert_eq!(Role::parse(b""), None);
147 }
148}