Skip to main content

pmetal_distributed/
election.rs

1//! Distributed master election algorithm.
2//!
3//! Implements a seniority-based leader election protocol inspired by exo.
4//! The election ensures exactly one master is elected in the cluster,
5//! with deterministic tiebreaking based on seniority and peer ID.
6//!
7//! # Algorithm
8//!
9//! 1. When a node starts or loses connection to master, it starts a campaign
10//! 2. Nodes exchange election messages with their clock, seniority, and peer ID
11//! 3. The node with highest seniority wins; peer ID breaks ties
12//! 4. Election timeout triggers re-election if no winner emerges
13
14use anyhow::Result;
15use libp2p::PeerId;
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24/// Default election timeout.
25pub const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(3);
26
27/// Delay before starting election after connection change.
28pub const CONNECTION_ELECTION_DELAY: Duration = Duration::from_millis(200);
29
30/// Election configuration.
31#[derive(Debug, Clone)]
32pub struct ElectionConfig {
33    /// Timeout for election completion.
34    pub election_timeout: Duration,
35    /// Delay before starting election after connection change.
36    pub connection_delay: Duration,
37    /// Whether to automatically start election on peer loss.
38    pub auto_elect_on_peer_loss: bool,
39}
40
41impl Default for ElectionConfig {
42    fn default() -> Self {
43        Self {
44            election_timeout: DEFAULT_ELECTION_TIMEOUT,
45            connection_delay: CONNECTION_ELECTION_DELAY,
46            auto_elect_on_peer_loss: true,
47        }
48    }
49}
50
51/// Election message exchanged between nodes.
52#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ElectionMessage {
54    /// Logical clock for ordering.
55    pub clock: u64,
56    /// Node seniority (time since node started).
57    pub seniority: Duration,
58    /// Proposed session ID (prevents stale elections).
59    pub proposed_session: u64,
60    /// Number of commands seen (for consistency).
61    pub commands_seen: u64,
62    /// The peer ID of the sender.
63    pub sender: PeerId,
64    /// The peer ID this node is voting for.
65    pub vote_for: PeerId,
66}
67
68/// Election state for the local node.
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ElectionState {
71    /// Not in an election.
72    Idle,
73    /// Running an election campaign.
74    Campaigning,
75    /// Elected as master.
76    Master,
77    /// Following another master.
78    Follower,
79}
80
81/// Information about a candidate in the election.
82#[derive(Debug, Clone)]
83#[allow(dead_code)]
84struct CandidateInfo {
85    /// Peer ID of the candidate.
86    peer_id: PeerId,
87    /// Seniority of the candidate.
88    seniority: Duration,
89    /// Session proposed by this candidate.
90    proposed_session: u64,
91    /// Commands seen by this candidate.
92    commands_seen: u64,
93    /// When we last heard from this candidate.
94    last_seen: Instant,
95}
96
97/// Election events.
98#[derive(Debug, Clone)]
99pub enum ElectionEvent {
100    /// Election started.
101    Started { session: u64 },
102    /// We were elected as master.
103    ElectedMaster { session: u64 },
104    /// Another node was elected as master.
105    NewMaster { master: PeerId, session: u64 },
106    /// Election timed out.
107    Timeout { session: u64 },
108    /// Split brain detected.
109    SplitBrain { masters: Vec<PeerId> },
110}
111
112/// Election manager.
113pub struct ElectionManager {
114    /// Local peer ID.
115    local_peer_id: PeerId,
116    /// Configuration.
117    config: ElectionConfig,
118    /// Current state.
119    state: Arc<RwLock<ElectionState>>,
120    /// Current master (if known).
121    current_master: Arc<RwLock<Option<PeerId>>>,
122    /// Current session ID.
123    session_id: Arc<RwLock<u64>>,
124    /// When this node started (for seniority).
125    start_time: Instant,
126    /// Candidates in current election.
127    candidates: Arc<RwLock<HashMap<PeerId, CandidateInfo>>>,
128    /// Our vote in the current election.
129    current_vote: Arc<RwLock<Option<PeerId>>>,
130    /// Number of commands we've processed.
131    commands_seen: Arc<RwLock<u64>>,
132    /// Logical clock.
133    clock: Arc<RwLock<u64>>,
134    /// Event sender.
135    event_tx: mpsc::Sender<ElectionEvent>,
136}
137
138impl ElectionManager {
139    /// Create a new election manager.
140    pub fn new(
141        local_peer_id: PeerId,
142        config: ElectionConfig,
143        event_tx: mpsc::Sender<ElectionEvent>,
144    ) -> Self {
145        Self {
146            local_peer_id,
147            config,
148            state: Arc::new(RwLock::new(ElectionState::Idle)),
149            current_master: Arc::new(RwLock::new(None)),
150            session_id: Arc::new(RwLock::new(0)),
151            start_time: Instant::now(),
152            candidates: Arc::new(RwLock::new(HashMap::new())),
153            current_vote: Arc::new(RwLock::new(None)),
154            commands_seen: Arc::new(RwLock::new(0)),
155            clock: Arc::new(RwLock::new(0)),
156            event_tx,
157        }
158    }
159
160    /// Get the current election state.
161    pub fn state(&self) -> ElectionState {
162        *self.state.read()
163    }
164
165    /// Get the current master.
166    pub fn current_master(&self) -> Option<PeerId> {
167        *self.current_master.read()
168    }
169
170    /// Check if we are the master.
171    pub fn is_master(&self) -> bool {
172        *self.state.read() == ElectionState::Master
173    }
174
175    /// Check if we are a follower.
176    pub fn is_follower(&self) -> bool {
177        *self.state.read() == ElectionState::Follower
178    }
179
180    /// Get our seniority.
181    pub fn seniority(&self) -> Duration {
182        self.start_time.elapsed()
183    }
184
185    /// Get the current session ID.
186    pub fn session_id(&self) -> u64 {
187        *self.session_id.read()
188    }
189
190    /// Increment and get the logical clock.
191    fn tick(&self) -> u64 {
192        let mut clock = self.clock.write();
193        *clock += 1;
194        *clock
195    }
196
197    /// Update clock to max of local and received.
198    fn update_clock(&self, received: u64) {
199        let mut clock = self.clock.write();
200        *clock = (*clock).max(received) + 1;
201    }
202
203    /// Increment commands seen.
204    pub fn record_command(&self) {
205        *self.commands_seen.write() += 1;
206    }
207
208    /// Start a new election campaign.
209    pub async fn start_election(&self) -> Result<()> {
210        let new_session = {
211            let mut session = self.session_id.write();
212            *session += 1;
213            *session
214        };
215
216        {
217            let mut state = self.state.write();
218            *state = ElectionState::Campaigning;
219        }
220
221        // Clear previous candidates
222        self.candidates.write().clear();
223
224        // Vote for ourselves
225        *self.current_vote.write() = Some(self.local_peer_id);
226
227        // Add ourselves as a candidate
228        self.candidates.write().insert(
229            self.local_peer_id,
230            CandidateInfo {
231                peer_id: self.local_peer_id,
232                seniority: self.seniority(),
233                proposed_session: new_session,
234                commands_seen: *self.commands_seen.read(),
235                last_seen: Instant::now(),
236            },
237        );
238
239        info!(
240            "Starting election campaign, session={}, seniority={:?}",
241            new_session,
242            self.seniority()
243        );
244
245        let _ = self
246            .event_tx
247            .send(ElectionEvent::Started {
248                session: new_session,
249            })
250            .await;
251
252        Ok(())
253    }
254
255    /// Create an election message to broadcast.
256    pub fn create_message(&self) -> ElectionMessage {
257        let vote = self.current_vote.read().unwrap_or(self.local_peer_id);
258        ElectionMessage {
259            clock: self.tick(),
260            seniority: self.seniority(),
261            proposed_session: *self.session_id.read(),
262            commands_seen: *self.commands_seen.read(),
263            sender: self.local_peer_id,
264            vote_for: vote,
265        }
266    }
267
268    /// Process a received election message.
269    pub async fn process_message(&self, msg: ElectionMessage) -> Result<Option<ElectionMessage>> {
270        self.update_clock(msg.clock);
271
272        // Add/update candidate info
273        {
274            let mut candidates = self.candidates.write();
275            candidates.insert(
276                msg.sender,
277                CandidateInfo {
278                    peer_id: msg.sender,
279                    seniority: msg.seniority,
280                    proposed_session: msg.proposed_session,
281                    commands_seen: msg.commands_seen,
282                    last_seen: Instant::now(),
283                },
284            );
285        }
286
287        // If we're idle, join the election
288        if *self.state.read() == ElectionState::Idle {
289            self.start_election().await?;
290        }
291
292        // Update our vote based on candidates
293        let should_respond = self.update_vote();
294
295        if should_respond {
296            Ok(Some(self.create_message()))
297        } else {
298            Ok(None)
299        }
300    }
301
302    /// Update our vote based on current candidates.
303    fn update_vote(&self) -> bool {
304        let candidates = self.candidates.read();
305
306        // Find the best candidate
307        let best = candidates.values().max_by(|a, b| {
308            // Higher seniority wins
309            match a.seniority.cmp(&b.seniority) {
310                std::cmp::Ordering::Equal => {
311                    // Higher commands seen wins
312                    match a.commands_seen.cmp(&b.commands_seen) {
313                        std::cmp::Ordering::Equal => {
314                            // Lexicographically higher peer ID wins (deterministic)
315                            a.peer_id.cmp(&b.peer_id)
316                        }
317                        other => other,
318                    }
319                }
320                other => other,
321            }
322        });
323
324        if let Some(best_candidate) = best {
325            let mut current_vote = self.current_vote.write();
326            let old_vote = *current_vote;
327            *current_vote = Some(best_candidate.peer_id);
328
329            // Return true if vote changed
330            old_vote != Some(best_candidate.peer_id)
331        } else {
332            false
333        }
334    }
335
336    /// Check if election is complete (all candidates agree on winner).
337    pub async fn check_election_complete(&self, all_peers: &[PeerId]) -> Result<bool> {
338        // Extract values while holding locks, then drop locks before await
339        let winner_to_declare = {
340            let candidates = self.candidates.read();
341
342            // Need votes from all known peers
343            if candidates.len() < all_peers.len() {
344                return Ok(false);
345            }
346
347            // Check if all candidates vote for the same peer
348            let _votes: Vec<_> = candidates.values().map(|c| c.peer_id).collect();
349
350            // For now, simple majority - find the candidate with most votes for themselves
351            // In a proper implementation, we'd track explicit votes
352            let our_vote = self.current_vote.read();
353
354            if let Some(winner) = our_vote.as_ref() {
355                // Check if winner is in our candidates
356                if candidates.contains_key(winner) {
357                    Some(*winner)
358                } else {
359                    None
360                }
361            } else {
362                None
363            }
364        }; // locks dropped here
365
366        if let Some(winner) = winner_to_declare {
367            // Declare winner
368            self.declare_winner(winner).await?;
369            return Ok(true);
370        }
371
372        Ok(false)
373    }
374
375    /// Declare the election winner.
376    async fn declare_winner(&self, winner: PeerId) -> Result<()> {
377        let session = *self.session_id.read();
378
379        if winner == self.local_peer_id {
380            *self.state.write() = ElectionState::Master;
381            *self.current_master.write() = Some(winner);
382
383            info!("Elected as master, session={}", session);
384            let _ = self
385                .event_tx
386                .send(ElectionEvent::ElectedMaster { session })
387                .await;
388        } else {
389            *self.state.write() = ElectionState::Follower;
390            *self.current_master.write() = Some(winner);
391
392            info!(
393                "Following master {}, session={}",
394                winner.to_base58(),
395                session
396            );
397            let _ = self
398                .event_tx
399                .send(ElectionEvent::NewMaster {
400                    master: winner,
401                    session,
402                })
403                .await;
404        }
405
406        Ok(())
407    }
408
409    /// Handle master loss (triggers new election).
410    pub async fn handle_master_loss(&self) -> Result<()> {
411        if self.config.auto_elect_on_peer_loss {
412            warn!("Master lost, starting new election after delay");
413
414            // Wait for connection delay
415            tokio::time::sleep(self.config.connection_delay).await;
416
417            self.start_election().await?;
418        }
419
420        Ok(())
421    }
422
423    /// Check for election timeout.
424    pub async fn check_timeout(&self) -> Result<bool> {
425        if *self.state.read() != ElectionState::Campaigning {
426            return Ok(false);
427        }
428
429        // Check if any candidate info is stale - extract session while holding lock
430        let timed_out_session = {
431            let candidates = self.candidates.read();
432            let now = Instant::now();
433
434            let mut timed_out = None;
435            for candidate in candidates.values() {
436                if now.duration_since(candidate.last_seen) > self.config.election_timeout {
437                    timed_out = Some(*self.session_id.read());
438                    break;
439                }
440            }
441            timed_out
442        }; // locks dropped here
443
444        if let Some(session) = timed_out_session {
445            warn!("Election timeout, session={}", session);
446
447            let _ = self.event_tx.send(ElectionEvent::Timeout { session }).await;
448
449            // Start new election
450            self.start_election().await?;
451            return Ok(true);
452        }
453
454        Ok(false)
455    }
456
457    /// Force become master (for single-node clusters).
458    pub async fn force_master(&self) -> Result<()> {
459        *self.state.write() = ElectionState::Master;
460        *self.current_master.write() = Some(self.local_peer_id);
461
462        let session = {
463            let mut s = self.session_id.write();
464            *s += 1;
465            *s
466        };
467
468        info!("Forced master election (single node), session={}", session);
469        let _ = self
470            .event_tx
471            .send(ElectionEvent::ElectedMaster { session })
472            .await;
473
474        Ok(())
475    }
476
477    /// Step down from master role.
478    pub async fn step_down(&self) -> Result<()> {
479        if *self.state.read() == ElectionState::Master {
480            *self.state.write() = ElectionState::Idle;
481            *self.current_master.write() = None;
482
483            info!("Stepped down from master role");
484        }
485
486        Ok(())
487    }
488}
489
490impl std::fmt::Debug for ElectionManager {
491    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492        f.debug_struct("ElectionManager")
493            .field("local_peer_id", &self.local_peer_id.to_base58())
494            .field("state", &*self.state.read())
495            .field("current_master", &self.current_master())
496            .field("session_id", &*self.session_id.read())
497            .field("seniority", &self.seniority())
498            .finish()
499    }
500}
501
502#[cfg(test)]
503mod tests {
504    use super::*;
505    use tokio::sync::mpsc;
506
507    #[tokio::test]
508    async fn test_election_start() {
509        let (tx, mut rx) = mpsc::channel(10);
510        let peer_id = PeerId::random();
511        let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
512
513        manager.start_election().await.unwrap();
514
515        assert_eq!(manager.state(), ElectionState::Campaigning);
516        assert_eq!(manager.session_id(), 1);
517
518        // Should receive Started event
519        let event = rx.try_recv().unwrap();
520        assert!(matches!(event, ElectionEvent::Started { session: 1 }));
521    }
522
523    #[tokio::test]
524    async fn test_force_master() {
525        let (tx, mut rx) = mpsc::channel(10);
526        let peer_id = PeerId::random();
527        let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
528
529        manager.force_master().await.unwrap();
530
531        assert!(manager.is_master());
532        assert_eq!(manager.current_master(), Some(peer_id));
533
534        let event = rx.try_recv().unwrap();
535        assert!(matches!(event, ElectionEvent::ElectedMaster { .. }));
536    }
537
538    #[test]
539    fn test_seniority() {
540        let (tx, _rx) = mpsc::channel(10);
541        let manager = ElectionManager::new(PeerId::random(), ElectionConfig::default(), tx);
542
543        // Seniority should be non-zero after creation
544        std::thread::sleep(Duration::from_millis(10));
545        assert!(manager.seniority() >= Duration::from_millis(10));
546    }
547
548    #[test]
549    fn test_create_message() {
550        let (tx, _rx) = mpsc::channel(10);
551        let peer_id = PeerId::random();
552        let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
553
554        let msg = manager.create_message();
555
556        assert_eq!(msg.sender, peer_id);
557        assert!(msg.clock > 0);
558        assert!(msg.seniority > Duration::ZERO);
559    }
560}