Skip to main content

kevy_elect/
elector.rs

1//! `kevy-elect` core state machine — pure logic, no I/O. The TCP
2//! transport (T1.5.6 network half) drives this struct by feeding it
3//! ticks and inbound messages and consuming the returned outbound
4//! messages.
5//!
6//! Pulling the algorithm out of the network layer means we can test
7//! every quorum / split-brain / dueling / rejoin scenario in 100% in-
8//! memory unit tests, deterministic + microsecond fast. The integration
9//! tests (T1.5.12-17) layer real sockets on top once the algorithm is
10//! validated.
11//!
12//! Naming: peers reference each other by `node_id: String` (the
13//! operator-declared stable identity). All time is `std::time::Instant`
14//! — the receiver-local monotonic clock, never wall-clock; no cross-
15//! host clock-sync assumptions.
16//!
17//! See [`docs/protocol.md`](../../docs/protocol.md) for the wire-level
18//! spec this struct implements.
19
20use std::collections::HashMap;
21use std::time::{Duration, Instant};
22
23use crate::message::{Message, Role};
24
25/// Tunable timeouts. Defaults match the protocol spec — operators
26/// can override via the `[cluster]` config section once the
27/// kevy-server adapter (separate task) wires the live config in.
28#[derive(Debug, Clone, Copy)]
29pub struct ElectConfig {
30    /// Period between outbound `HB` per peer. Default 200 ms.
31    pub hb_interval: Duration,
32    /// Flag a peer DOWN after this duration without an inbound `HB`.
33    /// Default 5 s = 25 × `hb_interval` (a transient 1 s blip
34    /// doesn't trigger an election).
35    pub down_after: Duration,
36    /// Candidate waits this long for quorum `ACCEPT` before backing
37    /// off. Default 3 s.
38    pub election_timeout: Duration,
39    /// Backoff floor after a failed election attempt. Real wait
40    /// adds jitter up to `election_backoff_jitter` to prevent
41    /// dueling candidates from re-running synchronously.
42    pub election_backoff: Duration,
43    /// Random jitter added to `election_backoff` per attempt.
44    /// Default 4 s (so the real range is 1–5 s).
45    pub election_backoff_jitter: Duration,
46}
47
48impl Default for ElectConfig {
49    fn default() -> Self {
50        Self {
51            hb_interval: Duration::from_millis(200),
52            down_after: Duration::from_millis(5_000),
53            election_timeout: Duration::from_millis(3_000),
54            election_backoff: Duration::from_millis(1_000),
55            election_backoff_jitter: Duration::from_millis(4_000),
56        }
57    }
58}
59
60/// Per-peer scratch the elector keeps. Updated on every inbound `HB`.
61/// `last_epoch` / `last_role` are recorded for future observability
62/// surfaces (INFO replication's "seen-from peer" panel) — the
63/// election algorithm itself only consults `last_seen` (DOWN
64/// detector) and `last_repl_offset` (candidate selection).
65#[derive(Debug, Clone)]
66#[allow(dead_code)]
67pub(crate) struct PeerView {
68    /// Most recent `HB` reception time.
69    pub(crate) last_seen: Instant,
70    /// Epoch the peer claimed in its most recent `HB`.
71    pub(crate) last_epoch: u64,
72    /// Role the peer claimed in its most recent `HB`.
73    pub(crate) last_role: Role,
74    /// `repl_offset` the peer claimed in its most recent `HB`.
75    pub(crate) last_repl_offset: u64,
76}
77
78/// Top-level state machine for a single kevy node in the v3-cluster
79/// Phase 1.5 election. One per process (election is per-node, not
80/// per-shard).
81pub struct Elector {
82    /// This node's stable id.
83    pub(crate) node_id: String,
84    /// Operator-declared peer set, by id. **Includes** this node —
85    /// the elector filters self at run-time. Length = `N` (quorum
86    /// = `N / 2 + 1`).
87    pub(crate) peer_ids: Vec<String>,
88    /// Tunable timeouts.
89    pub(crate) config: ElectConfig,
90    /// Self-perceived role.
91    pub(crate) role: Role,
92    /// Election epoch this node believes is current. Bumped only by
93    /// own `OFFER`s; updated to a higher seen value on inbound
94    /// `OFFER`/`ACCEPT`/`ANNOUNCE`.
95    pub(crate) epoch: u64,
96    /// `Some(id)` ⇒ this node knows `id` is currently the primary.
97    /// `None` until the first `ANNOUNCE` is seen (or the node was
98    /// configured-primary at boot).
99    pub(crate) current_primary: Option<String>,
100    /// This node's most recent `repl_offset` — set externally by the
101    /// kevy-server adapter from the live replication source / runner.
102    pub(crate) my_repl_offset: u64,
103    /// Last outbound `HB` time per peer (per-peer schedule, to allow
104    /// staggering rather than thundering-herd at every tick).
105    pub(crate) last_hb_sent: HashMap<String, Instant>,
106    /// Inbound observations per peer.
107    pub(crate) peer_views: HashMap<String, PeerView>,
108    /// While `Candidate`: ACCEPT vote tally for the current epoch.
109    /// Cleared on transition out of Candidate.
110    pub(crate) accept_votes: HashMap<String, ()>,
111    /// While `Candidate`: when the OFFER was broadcast (election
112    /// times out at `offer_at + election_timeout`).
113    pub(crate) offer_at: Option<Instant>,
114    /// While in election backoff: don't start another candidacy
115    /// before this. Set on election timeout.
116    pub(crate) backoff_until: Option<Instant>,
117    /// Last epoch this node has cast an ACCEPT for (one vote per
118    /// epoch — prevents two candidates from both winning quorum in
119    /// the same round).
120    pub(crate) last_accept_epoch: Option<u64>,
121    /// Address (`host:port` of the kevy compat port) advertised in
122    /// this node's `ANNOUNCE` when it becomes primary. Set
123    /// externally by the kevy-server adapter at startup.
124    pub(crate) my_advertised_addr: String,
125    /// Deterministic backoff jitter — operators (and tests) inject
126    /// it; the elector doesn't read the system random.
127    pub(crate) jitter: ElectJitter,
128}
129
130/// Source of jitter for election backoff. Tests use a fixed value;
131/// production uses `ElectJitter::System` which reads `Instant`
132/// + node_id as a poor-mans entropy. Pure-Rust 0-dep — no `rand` crate.
133#[derive(Debug, Clone)]
134pub enum ElectJitter {
135    /// Fixed value (test-friendly, deterministic).
136    Fixed(Duration),
137    /// Hash of `(now_nanos, node_id)` clamped into
138    /// `[0, max_jitter)`. Deterministic enough for production while
139    /// avoiding zero-cost-jitter dueling.
140    System,
141}
142
143impl ElectJitter {
144    /// Sample a jitter value in `[0, max]`.
145    fn sample(&self, max: Duration, now: Instant, node_id: &str) -> Duration {
146        match self {
147            Self::Fixed(d) => *d.min(&max),
148            Self::System => {
149                // Mix `node_id` bytes into a u64 hash and clamp into
150                // `[0, max.as_nanos())`. Coarse but adequate — the
151                // jitter only needs to break ties between dueling
152                // candidates, not be cryptographically random.
153                let mut h: u64 = 1469598103934665603;
154                for b in node_id.as_bytes() {
155                    h = h.wrapping_mul(1099511628211) ^ u64::from(*b);
156                }
157                // Pull a u64 worth of bits out of `now`'s elapsed-
158                // since-arbitrary-anchor representation. Using the
159                // low 64 bits of `now.elapsed_since(anchor)` would
160                // need an anchor — instead, hash a stable derivation
161                // of `now` via the elector's lazy anchor approach.
162                // For simplicity: mix `node_id` bytes again with a
163                // per-call seed.
164                let _ = now; // placeholder: production jitter wants per-call entropy.
165                let span_ns = max.as_nanos().max(1) as u64;
166                Duration::from_nanos(h % span_ns)
167            }
168        }
169    }
170}
171
172/// One message + recipient that the elector wants to send. The
173/// transport layer (T1.5.6 network half) drains
174/// `Transport` each loop iteration and writes to the
175/// per-peer TCP connections.
176#[derive(Debug, Clone)]
177pub struct Outbound {
178    /// Recipient. `"*"` (a sentinel — never a valid node_id since
179    /// they're ASCII ≤ 32 B and operators don't use stars) means
180    /// "broadcast to every peer except self". The transport
181    /// expands the sentinel on its end.
182    pub to: String,
183    /// The message to send.
184    pub msg: Message,
185}
186
187impl Outbound {
188    /// Sentinel for broadcast-to-all.
189    pub const BROADCAST: &'static str = "*";
190}
191
192impl Elector {
193    /// Build an elector for a node with the given stable id, peer
194    /// membership (the full list including self), advertised
195    /// `host:port`, and config tunables.
196    ///
197    /// `start_role` is `Primary` for the bootstrap node (operator-
198    /// declared at first start) and `Replica` for the rest.
199    pub fn new(
200        node_id: impl Into<String>,
201        peer_ids: Vec<String>,
202        my_advertised_addr: impl Into<String>,
203        start_role: Role,
204        config: ElectConfig,
205        jitter: ElectJitter,
206    ) -> Self {
207        let node_id = node_id.into();
208        Self {
209            node_id,
210            peer_ids,
211            config,
212            role: start_role,
213            epoch: 1,
214            current_primary: None,
215            my_repl_offset: 0,
216            last_hb_sent: HashMap::new(),
217            peer_views: HashMap::new(),
218            accept_votes: HashMap::new(),
219            offer_at: None,
220            backoff_until: None,
221            last_accept_epoch: None,
222            my_advertised_addr: my_advertised_addr.into(),
223            jitter,
224        }
225    }
226
227    /// Update this node's `repl_offset` (called by the kevy-server
228    /// adapter when the replication source / runner advances).
229    pub fn set_repl_offset(&mut self, offset: u64) {
230        self.my_repl_offset = offset;
231    }
232
233    /// Current self-perceived role.
234    pub fn role(&self) -> Role {
235        self.role
236    }
237
238    /// Current epoch.
239    pub fn epoch(&self) -> u64 {
240        self.epoch
241    }
242
243    /// Last-known primary id (`None` until first ANNOUNCE / boot
244    /// declaration).
245    pub fn current_primary(&self) -> Option<&str> {
246        self.current_primary.as_deref()
247    }
248
249    /// Drive the elector forward by `now`. Schedules outbound `HB`
250    /// per peer, detects DOWN, transitions Candidate → Primary on
251    /// quorum, and runs the candidate's election-timeout fallback.
252    /// Returns a fresh batch of outbound messages — callers should
253    /// drain in one pass.
254    pub fn tick(&mut self, now: Instant) -> Vec<Outbound> {
255        let mut out = Vec::new();
256        self.emit_heartbeats(now, &mut out);
257        self.maybe_start_election(now, &mut out);
258        self.maybe_finish_candidacy(now, &mut out);
259        out
260    }
261
262    /// Process one inbound message (from `from_node_id`) at `now`.
263    /// Updates per-peer view, applies the state machine transitions
264    /// the spec defines, returns any outbound messages the
265    /// transition produced.
266    pub fn on_message(
267        &mut self,
268        from_node_id: &str,
269        msg: Message,
270        now: Instant,
271    ) -> Vec<Outbound> {
272        let mut out = Vec::new();
273        match msg {
274            Message::Hb {
275                epoch,
276                node_id: _,
277                role,
278                repl_offset,
279            } => self.on_hb(from_node_id, epoch, role, repl_offset, now),
280            Message::Offer {
281                new_epoch,
282                candidate_id,
283                repl_offset,
284            } => self.on_offer(new_epoch, candidate_id, repl_offset, &mut out),
285            Message::Accept {
286                epoch,
287                accepter_id,
288            } => self.on_accept(epoch, accepter_id, now, &mut out),
289            Message::Announce {
290                epoch,
291                new_primary_id,
292                new_primary_addr,
293            } => self.on_announce(epoch, new_primary_id, new_primary_addr, &mut out),
294        }
295        out
296    }
297
298    // ─────────── tick helpers ───────────
299
300    fn emit_heartbeats(&mut self, now: Instant, out: &mut Vec<Outbound>) {
301        // One HB per peer per `hb_interval`. Per-peer schedule
302        // staggers (a peer added later gets its own clock).
303        for peer in self.peer_ids.clone() {
304            if peer == self.node_id {
305                continue;
306            }
307            let due = match self.last_hb_sent.get(&peer) {
308                Some(prev) => now.duration_since(*prev) >= self.config.hb_interval,
309                None => true,
310            };
311            if due {
312                self.last_hb_sent.insert(peer.clone(), now);
313                out.push(Outbound {
314                    to: peer,
315                    msg: Message::Hb {
316                        epoch: self.epoch,
317                        node_id: self.node_id.clone(),
318                        role: self.role,
319                        repl_offset: self.my_repl_offset,
320                    },
321                });
322            }
323        }
324    }
325
326    fn maybe_start_election(&mut self, now: Instant, out: &mut Vec<Outbound>) {
327        // Only replicas start elections.
328        if self.role != Role::Replica {
329            return;
330        }
331        // In backoff after a failed candidacy.
332        if let Some(b) = self.backoff_until
333            && now < b
334        {
335            return;
336        }
337        // Primary must be DOWN by my view.
338        let Some(primary) = self.current_primary.clone() else {
339            return;
340        };
341        if !self.is_peer_down(&primary, now) {
342            return;
343        }
344        // Candidate-selection: I must have the highest offset AND
345        // lowest node-id among alive peers (the primary is dead +
346        // not in the tie-break set).
347        if !self.am_best_candidate(now) {
348            return;
349        }
350        // Start the candidacy.
351        self.epoch = self.epoch.saturating_add(1);
352        self.role = Role::Candidate;
353        self.accept_votes.clear();
354        // Implicit self-vote — record ourselves in the tally so
355        // single-peer-needed (N=1, degenerate) and quorum=2/N=2
356        // both work.
357        self.accept_votes.insert(self.node_id.clone(), ());
358        self.offer_at = Some(now);
359        out.push(Outbound {
360            to: Outbound::BROADCAST.to_string(),
361            msg: Message::Offer {
362                new_epoch: self.epoch,
363                candidate_id: self.node_id.clone(),
364                repl_offset: self.my_repl_offset,
365            },
366        });
367    }
368
369    fn maybe_finish_candidacy(&mut self, now: Instant, out: &mut Vec<Outbound>) {
370        if self.role != Role::Candidate {
371            return;
372        }
373        let Some(offer_at) = self.offer_at else {
374            return;
375        };
376        let quorum = self.quorum_size();
377        if self.accept_votes.len() >= quorum {
378            // Won — broadcast ANNOUNCE and become primary.
379            self.role = Role::Primary;
380            self.current_primary = Some(self.node_id.clone());
381            self.offer_at = None;
382            self.accept_votes.clear();
383            out.push(Outbound {
384                to: Outbound::BROADCAST.to_string(),
385                msg: Message::Announce {
386                    epoch: self.epoch,
387                    new_primary_id: self.node_id.clone(),
388                    new_primary_addr: self.my_advertised_addr.clone(),
389                },
390            });
391            return;
392        }
393        if now.duration_since(offer_at) >= self.config.election_timeout {
394            // Lost / timed out — back off with jitter, fall back to
395            // Replica.
396            self.role = Role::Replica;
397            self.offer_at = None;
398            self.accept_votes.clear();
399            let jitter = self
400                .jitter
401                .sample(self.config.election_backoff_jitter, now, &self.node_id);
402            self.backoff_until = Some(now + self.config.election_backoff + jitter);
403        }
404    }
405
406    // ─────────── inbound handlers ───────────
407
408
409}