kevy_elect/elector.rs
1//! `kevy-elect` core state machine — pure logic, no I/O. The TCP
2//! transport (T1.5.6 network half) drives this struct by feeding it
3//! ticks and inbound messages and consuming the returned outbound
4//! messages.
5//!
6//! Pulling the algorithm out of the network layer means we can test
7//! every quorum / split-brain / dueling / rejoin scenario in 100% in-
8//! memory unit tests, deterministic + microsecond fast. The integration
9//! tests (T1.5.12-17) layer real sockets on top once the algorithm is
10//! validated.
11//!
12//! Naming: peers reference each other by `node_id: String` (the
13//! operator-declared stable identity). All time is `std::time::Instant`
14//! — the receiver-local monotonic clock, never wall-clock; no cross-
15//! host clock-sync assumptions.
16//!
17//! See [`docs/protocol.md`](../../docs/protocol.md) for the wire-level
18//! spec this struct implements.
19
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22
23use crate::message::{Message, Role};
24
25/// Tunable timeouts. Defaults match the protocol spec — operators
26/// can override via the `[cluster]` config section once the
27/// kevy-server adapter (separate task) wires the live config in.
28#[derive(Debug, Clone, Copy)]
29pub struct ElectConfig {
30 /// Period between outbound `HB` per peer. Default 200 ms.
31 pub hb_interval: Duration,
32 /// Flag a peer DOWN after this duration without an inbound `HB`.
33 /// Default 5 s = 25 × `hb_interval` (a transient 1 s blip
34 /// doesn't trigger an election).
35 pub down_after: Duration,
36 /// Candidate waits this long for quorum `ACCEPT` before backing
37 /// off. Default 3 s.
38 pub election_timeout: Duration,
39 /// Backoff floor after a failed election attempt. Real wait
40 /// adds jitter up to `election_backoff_jitter` to prevent
41 /// dueling candidates from re-running synchronously.
42 pub election_backoff: Duration,
43 /// Random jitter added to `election_backoff` per attempt.
44 /// Default 4 s (so the real range is 1–5 s).
45 pub election_backoff_jitter: Duration,
46}
47
48impl Default for ElectConfig {
49 fn default() -> Self {
50 Self {
51 hb_interval: Duration::from_millis(200),
52 down_after: Duration::from_millis(5_000),
53 election_timeout: Duration::from_millis(3_000),
54 election_backoff: Duration::from_millis(1_000),
55 election_backoff_jitter: Duration::from_millis(4_000),
56 }
57 }
58}
59
60/// Per-peer scratch the elector keeps. Updated on every inbound `HB`.
61/// `last_epoch` / `last_role` are recorded for future observability
62/// surfaces (INFO replication's "seen-from peer" panel) — the
63/// election algorithm itself only consults `last_seen` (DOWN
64/// detector) and `last_repl_offset` (candidate selection).
65#[derive(Debug, Clone)]
66#[allow(dead_code)]
67pub(crate) struct PeerView {
68 /// Most recent `HB` reception time.
69 pub(crate) last_seen: Instant,
70 /// Epoch the peer claimed in its most recent `HB`.
71 pub(crate) last_epoch: u64,
72 /// Role the peer claimed in its most recent `HB`.
73 pub(crate) last_role: Role,
74 /// `repl_offset` the peer claimed in its most recent `HB`.
75 pub(crate) last_repl_offset: u64,
76}
77
78/// Top-level state machine for a single kevy node in the v3-cluster
79/// Phase 1.5 election. One per process (election is per-node, not
80/// per-shard).
81pub struct Elector {
82 /// This node's stable id.
83 pub(crate) node_id: String,
84 /// Operator-declared peer set, by id. **Includes** this node —
85 /// the elector filters self at run-time. Length = `N` (quorum
86 /// = `N / 2 + 1`).
87 pub(crate) peer_ids: Vec<String>,
88 /// Tunable timeouts.
89 pub(crate) config: ElectConfig,
90 /// Self-perceived role.
91 pub(crate) role: Role,
92 /// Election epoch this node believes is current. Bumped only by
93 /// own `OFFER`s; updated to a higher seen value on inbound
94 /// `OFFER`/`ACCEPT`/`ANNOUNCE`.
95 pub(crate) epoch: u64,
96 /// `Some(id)` ⇒ this node knows `id` is currently the primary.
97 /// `None` until the first `ANNOUNCE` is seen (or the node was
98 /// configured-primary at boot).
99 pub(crate) current_primary: Option<String>,
100 /// This node's most recent `repl_offset` — set externally by the
101 /// kevy-server adapter from the live replication source / runner.
102 pub(crate) my_repl_offset: u64,
103 /// Last outbound `HB` time per peer (per-peer schedule, to allow
104 /// staggering rather than thundering-herd at every tick).
105 pub(crate) last_hb_sent: HashMap<String, Instant>,
106 /// Inbound observations per peer.
107 pub(crate) peer_views: HashMap<String, PeerView>,
108 /// While `Candidate`: ACCEPT vote tally for the current epoch.
109 /// Cleared on transition out of Candidate.
110 pub(crate) accept_votes: HashMap<String, ()>,
111 /// While `Candidate`: when the OFFER was broadcast (election
112 /// times out at `offer_at + election_timeout`).
113 pub(crate) offer_at: Option<Instant>,
114 /// While in election backoff: don't start another candidacy
115 /// before this. Set on election timeout.
116 pub(crate) backoff_until: Option<Instant>,
117 /// Last epoch this node has cast an ACCEPT for (one vote per
118 /// epoch — prevents two candidates from both winning quorum in
119 /// the same round).
120 pub(crate) last_accept_epoch: Option<u64>,
121 /// Address (`host:port` of the kevy compat port) advertised in
122 /// this node's `ANNOUNCE` when it becomes primary. Set
123 /// externally by the kevy-server adapter at startup.
124 pub(crate) my_advertised_addr: String,
125 /// Deterministic backoff jitter — operators (and tests) inject
126 /// it; the elector doesn't read the system random.
127 pub(crate) jitter: ElectJitter,
128}
129
130/// Source of jitter for election backoff. Tests use a fixed value;
131/// production uses `ElectJitter::System` which reads `Instant`
132/// + node_id as a poor-mans entropy. Pure-Rust 0-dep — no `rand` crate.
133#[derive(Debug, Clone)]
134pub enum ElectJitter {
135 /// Fixed value (test-friendly, deterministic).
136 Fixed(Duration),
137 /// Hash of `(now_nanos, node_id)` clamped into
138 /// `[0, max_jitter)`. Deterministic enough for production while
139 /// avoiding zero-cost-jitter dueling.
140 System,
141}
142
143impl ElectJitter {
144 /// Sample a jitter value in `[0, max]`.
145 fn sample(&self, max: Duration, now: Instant, node_id: &str) -> Duration {
146 match self {
147 Self::Fixed(d) => *d.min(&max),
148 Self::System => {
149 // Mix `node_id` bytes into a u64 hash and clamp into
150 // `[0, max.as_nanos())`. Coarse but adequate — the
151 // jitter only needs to break ties between dueling
152 // candidates, not be cryptographically random.
153 let mut h: u64 = 1469598103934665603;
154 for b in node_id.as_bytes() {
155 h = h.wrapping_mul(1099511628211) ^ u64::from(*b);
156 }
157 // Pull a u64 worth of bits out of `now`'s elapsed-
158 // since-arbitrary-anchor representation. Using the
159 // low 64 bits of `now.elapsed_since(anchor)` would
160 // need an anchor — instead, hash a stable derivation
161 // of `now` via the elector's lazy anchor approach.
162 // For simplicity: mix `node_id` bytes again with a
163 // per-call seed.
164 let _ = now; // placeholder: production jitter wants per-call entropy.
165 let span_ns = max.as_nanos().max(1) as u64;
166 Duration::from_nanos(h % span_ns)
167 }
168 }
169 }
170}
171
172/// One message + recipient that the elector wants to send. The
173/// transport layer (T1.5.6 network half) drains
174/// `Transport` each loop iteration and writes to the
175/// per-peer TCP connections.
176#[derive(Debug, Clone)]
177pub struct Outbound {
178 /// Recipient. `"*"` (a sentinel — never a valid node_id since
179 /// they're ASCII ≤ 32 B and operators don't use stars) means
180 /// "broadcast to every peer except self". The transport
181 /// expands the sentinel on its end.
182 pub to: String,
183 /// The message to send.
184 pub msg: Message,
185}
186
187impl Outbound {
188 /// Sentinel for broadcast-to-all.
189 pub const BROADCAST: &'static str = "*";
190}
191
192impl Elector {
193 /// Build an elector for a node with the given stable id, peer
194 /// membership (the full list including self), advertised
195 /// `host:port`, and config tunables.
196 ///
197 /// `start_role` is `Primary` for the bootstrap node (operator-
198 /// declared at first start) and `Replica` for the rest.
199 pub fn new(
200 node_id: impl Into<String>,
201 peer_ids: Vec<String>,
202 my_advertised_addr: impl Into<String>,
203 start_role: Role,
204 config: ElectConfig,
205 jitter: ElectJitter,
206 ) -> Self {
207 let node_id = node_id.into();
208 Self {
209 node_id,
210 peer_ids,
211 config,
212 role: start_role,
213 epoch: 1,
214 current_primary: None,
215 my_repl_offset: 0,
216 last_hb_sent: HashMap::new(),
217 peer_views: HashMap::new(),
218 accept_votes: HashMap::new(),
219 offer_at: None,
220 backoff_until: None,
221 last_accept_epoch: None,
222 my_advertised_addr: my_advertised_addr.into(),
223 jitter,
224 }
225 }
226
227 /// Update this node's `repl_offset` (called by the kevy-server
228 /// adapter when the replication source / runner advances).
229 pub fn set_repl_offset(&mut self, offset: u64) {
230 self.my_repl_offset = offset;
231 }
232
233 /// Current self-perceived role.
234 pub fn role(&self) -> Role {
235 self.role
236 }
237
238 /// Current epoch.
239 pub fn epoch(&self) -> u64 {
240 self.epoch
241 }
242
243 /// Last-known primary id (`None` until first ANNOUNCE / boot
244 /// declaration).
245 pub fn current_primary(&self) -> Option<&str> {
246 self.current_primary.as_deref()
247 }
248
249 /// Drive the elector forward by `now`. Schedules outbound `HB`
250 /// per peer, detects DOWN, transitions Candidate → Primary on
251 /// quorum, and runs the candidate's election-timeout fallback.
252 /// Returns a fresh batch of outbound messages — callers should
253 /// drain in one pass.
254 pub fn tick(&mut self, now: Instant) -> Vec<Outbound> {
255 let mut out = Vec::new();
256 self.emit_heartbeats(now, &mut out);
257 self.maybe_start_election(now, &mut out);
258 self.maybe_finish_candidacy(now, &mut out);
259 out
260 }
261
262 /// Process one inbound message (from `from_node_id`) at `now`.
263 /// Updates per-peer view, applies the state machine transitions
264 /// the spec defines, returns any outbound messages the
265 /// transition produced.
266 pub fn on_message(
267 &mut self,
268 from_node_id: &str,
269 msg: Message,
270 now: Instant,
271 ) -> Vec<Outbound> {
272 let mut out = Vec::new();
273 match msg {
274 Message::Hb {
275 epoch,
276 node_id: _,
277 role,
278 repl_offset,
279 } => self.on_hb(from_node_id, epoch, role, repl_offset, now),
280 Message::Offer {
281 new_epoch,
282 candidate_id,
283 repl_offset,
284 } => self.on_offer(new_epoch, candidate_id, repl_offset, &mut out),
285 Message::Accept {
286 epoch,
287 accepter_id,
288 } => self.on_accept(epoch, accepter_id, now, &mut out),
289 Message::Announce {
290 epoch,
291 new_primary_id,
292 new_primary_addr,
293 } => self.on_announce(epoch, new_primary_id, new_primary_addr, &mut out),
294 }
295 out
296 }
297
298 // ─────────── tick helpers ───────────
299
300 fn emit_heartbeats(&mut self, now: Instant, out: &mut Vec<Outbound>) {
301 // One HB per peer per `hb_interval`. Per-peer schedule
302 // staggers (a peer added later gets its own clock).
303 for peer in self.peer_ids.clone() {
304 if peer == self.node_id {
305 continue;
306 }
307 let due = match self.last_hb_sent.get(&peer) {
308 Some(prev) => now.duration_since(*prev) >= self.config.hb_interval,
309 None => true,
310 };
311 if due {
312 self.last_hb_sent.insert(peer.clone(), now);
313 out.push(Outbound {
314 to: peer,
315 msg: Message::Hb {
316 epoch: self.epoch,
317 node_id: self.node_id.clone(),
318 role: self.role,
319 repl_offset: self.my_repl_offset,
320 },
321 });
322 }
323 }
324 }
325
326 fn maybe_start_election(&mut self, now: Instant, out: &mut Vec<Outbound>) {
327 // Only replicas start elections.
328 if self.role != Role::Replica {
329 return;
330 }
331 // In backoff after a failed candidacy.
332 if let Some(b) = self.backoff_until
333 && now < b
334 {
335 return;
336 }
337 // Primary must be DOWN by my view.
338 let Some(primary) = self.current_primary.clone() else {
339 return;
340 };
341 if !self.is_peer_down(&primary, now) {
342 return;
343 }
344 // Candidate-selection: I must have the highest offset AND
345 // lowest node-id among alive peers (the primary is dead +
346 // not in the tie-break set).
347 if !self.am_best_candidate(now) {
348 return;
349 }
350 // Start the candidacy.
351 self.epoch = self.epoch.saturating_add(1);
352 self.role = Role::Candidate;
353 self.accept_votes.clear();
354 // Implicit self-vote — record ourselves in the tally so
355 // single-peer-needed (N=1, degenerate) and quorum=2/N=2
356 // both work.
357 self.accept_votes.insert(self.node_id.clone(), ());
358 self.offer_at = Some(now);
359 out.push(Outbound {
360 to: Outbound::BROADCAST.to_string(),
361 msg: Message::Offer {
362 new_epoch: self.epoch,
363 candidate_id: self.node_id.clone(),
364 repl_offset: self.my_repl_offset,
365 },
366 });
367 }
368
369 fn maybe_finish_candidacy(&mut self, now: Instant, out: &mut Vec<Outbound>) {
370 if self.role != Role::Candidate {
371 return;
372 }
373 let Some(offer_at) = self.offer_at else {
374 return;
375 };
376 let quorum = self.quorum_size();
377 if self.accept_votes.len() >= quorum {
378 // Won — broadcast ANNOUNCE and become primary.
379 self.role = Role::Primary;
380 self.current_primary = Some(self.node_id.clone());
381 self.offer_at = None;
382 self.accept_votes.clear();
383 out.push(Outbound {
384 to: Outbound::BROADCAST.to_string(),
385 msg: Message::Announce {
386 epoch: self.epoch,
387 new_primary_id: self.node_id.clone(),
388 new_primary_addr: self.my_advertised_addr.clone(),
389 },
390 });
391 return;
392 }
393 if now.duration_since(offer_at) >= self.config.election_timeout {
394 // Lost / timed out — back off with jitter, fall back to
395 // Replica.
396 self.role = Role::Replica;
397 self.offer_at = None;
398 self.accept_votes.clear();
399 let jitter = self
400 .jitter
401 .sample(self.config.election_backoff_jitter, now, &self.node_id);
402 self.backoff_until = Some(now + self.config.election_backoff + jitter);
403 }
404 }
405
406 // ─────────── inbound handlers ───────────
407
408
409}