1use anyhow::Result;
15use libp2p::PeerId;
16use parking_lot::RwLock;
17use serde::{Deserialize, Serialize};
18use std::collections::HashMap;
19use std::sync::Arc;
20use std::time::{Duration, Instant};
21use tokio::sync::mpsc;
22use tracing::{info, warn};
23
24pub const DEFAULT_ELECTION_TIMEOUT: Duration = Duration::from_secs(3);
26
27pub const CONNECTION_ELECTION_DELAY: Duration = Duration::from_millis(200);
29
30#[derive(Debug, Clone)]
32pub struct ElectionConfig {
33 pub election_timeout: Duration,
35 pub connection_delay: Duration,
37 pub auto_elect_on_peer_loss: bool,
39}
40
41impl Default for ElectionConfig {
42 fn default() -> Self {
43 Self {
44 election_timeout: DEFAULT_ELECTION_TIMEOUT,
45 connection_delay: CONNECTION_ELECTION_DELAY,
46 auto_elect_on_peer_loss: true,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Serialize, Deserialize)]
53pub struct ElectionMessage {
54 pub clock: u64,
56 pub seniority: Duration,
58 pub proposed_session: u64,
60 pub commands_seen: u64,
62 pub sender: PeerId,
64 pub vote_for: PeerId,
66}
67
68#[derive(Debug, Clone, Copy, PartialEq, Eq)]
70pub enum ElectionState {
71 Idle,
73 Campaigning,
75 Master,
77 Follower,
79}
80
81#[derive(Debug, Clone)]
83#[allow(dead_code)]
84struct CandidateInfo {
85 peer_id: PeerId,
87 seniority: Duration,
89 proposed_session: u64,
91 commands_seen: u64,
93 last_seen: Instant,
95}
96
97#[derive(Debug, Clone)]
99pub enum ElectionEvent {
100 Started { session: u64 },
102 ElectedMaster { session: u64 },
104 NewMaster { master: PeerId, session: u64 },
106 Timeout { session: u64 },
108 SplitBrain { masters: Vec<PeerId> },
110}
111
112pub struct ElectionManager {
114 local_peer_id: PeerId,
116 config: ElectionConfig,
118 state: Arc<RwLock<ElectionState>>,
120 current_master: Arc<RwLock<Option<PeerId>>>,
122 session_id: Arc<RwLock<u64>>,
124 start_time: Instant,
126 candidates: Arc<RwLock<HashMap<PeerId, CandidateInfo>>>,
128 current_vote: Arc<RwLock<Option<PeerId>>>,
130 commands_seen: Arc<RwLock<u64>>,
132 clock: Arc<RwLock<u64>>,
134 event_tx: mpsc::Sender<ElectionEvent>,
136}
137
138impl ElectionManager {
139 pub fn new(
141 local_peer_id: PeerId,
142 config: ElectionConfig,
143 event_tx: mpsc::Sender<ElectionEvent>,
144 ) -> Self {
145 Self {
146 local_peer_id,
147 config,
148 state: Arc::new(RwLock::new(ElectionState::Idle)),
149 current_master: Arc::new(RwLock::new(None)),
150 session_id: Arc::new(RwLock::new(0)),
151 start_time: Instant::now(),
152 candidates: Arc::new(RwLock::new(HashMap::new())),
153 current_vote: Arc::new(RwLock::new(None)),
154 commands_seen: Arc::new(RwLock::new(0)),
155 clock: Arc::new(RwLock::new(0)),
156 event_tx,
157 }
158 }
159
160 pub fn state(&self) -> ElectionState {
162 *self.state.read()
163 }
164
165 pub fn current_master(&self) -> Option<PeerId> {
167 *self.current_master.read()
168 }
169
170 pub fn is_master(&self) -> bool {
172 *self.state.read() == ElectionState::Master
173 }
174
175 pub fn is_follower(&self) -> bool {
177 *self.state.read() == ElectionState::Follower
178 }
179
180 pub fn seniority(&self) -> Duration {
182 self.start_time.elapsed()
183 }
184
185 pub fn session_id(&self) -> u64 {
187 *self.session_id.read()
188 }
189
190 fn tick(&self) -> u64 {
192 let mut clock = self.clock.write();
193 *clock += 1;
194 *clock
195 }
196
197 fn update_clock(&self, received: u64) {
199 let mut clock = self.clock.write();
200 *clock = (*clock).max(received) + 1;
201 }
202
203 pub fn record_command(&self) {
205 *self.commands_seen.write() += 1;
206 }
207
208 pub async fn start_election(&self) -> Result<()> {
210 let new_session = {
211 let mut session = self.session_id.write();
212 *session += 1;
213 *session
214 };
215
216 {
217 let mut state = self.state.write();
218 *state = ElectionState::Campaigning;
219 }
220
221 self.candidates.write().clear();
223
224 *self.current_vote.write() = Some(self.local_peer_id);
226
227 self.candidates.write().insert(
229 self.local_peer_id,
230 CandidateInfo {
231 peer_id: self.local_peer_id,
232 seniority: self.seniority(),
233 proposed_session: new_session,
234 commands_seen: *self.commands_seen.read(),
235 last_seen: Instant::now(),
236 },
237 );
238
239 info!(
240 "Starting election campaign, session={}, seniority={:?}",
241 new_session,
242 self.seniority()
243 );
244
245 let _ = self
246 .event_tx
247 .send(ElectionEvent::Started {
248 session: new_session,
249 })
250 .await;
251
252 Ok(())
253 }
254
255 pub fn create_message(&self) -> ElectionMessage {
257 let vote = self.current_vote.read().unwrap_or(self.local_peer_id);
258 ElectionMessage {
259 clock: self.tick(),
260 seniority: self.seniority(),
261 proposed_session: *self.session_id.read(),
262 commands_seen: *self.commands_seen.read(),
263 sender: self.local_peer_id,
264 vote_for: vote,
265 }
266 }
267
268 pub async fn process_message(&self, msg: ElectionMessage) -> Result<Option<ElectionMessage>> {
270 self.update_clock(msg.clock);
271
272 {
274 let mut candidates = self.candidates.write();
275 candidates.insert(
276 msg.sender,
277 CandidateInfo {
278 peer_id: msg.sender,
279 seniority: msg.seniority,
280 proposed_session: msg.proposed_session,
281 commands_seen: msg.commands_seen,
282 last_seen: Instant::now(),
283 },
284 );
285 }
286
287 if *self.state.read() == ElectionState::Idle {
289 self.start_election().await?;
290 }
291
292 let should_respond = self.update_vote();
294
295 if should_respond {
296 Ok(Some(self.create_message()))
297 } else {
298 Ok(None)
299 }
300 }
301
302 fn update_vote(&self) -> bool {
304 let candidates = self.candidates.read();
305
306 let best = candidates.values().max_by(|a, b| {
308 match a.seniority.cmp(&b.seniority) {
310 std::cmp::Ordering::Equal => {
311 match a.commands_seen.cmp(&b.commands_seen) {
313 std::cmp::Ordering::Equal => {
314 a.peer_id.cmp(&b.peer_id)
316 }
317 other => other,
318 }
319 }
320 other => other,
321 }
322 });
323
324 if let Some(best_candidate) = best {
325 let mut current_vote = self.current_vote.write();
326 let old_vote = *current_vote;
327 *current_vote = Some(best_candidate.peer_id);
328
329 old_vote != Some(best_candidate.peer_id)
331 } else {
332 false
333 }
334 }
335
336 pub async fn check_election_complete(&self, all_peers: &[PeerId]) -> Result<bool> {
338 let winner_to_declare = {
340 let candidates = self.candidates.read();
341
342 if candidates.len() < all_peers.len() {
344 return Ok(false);
345 }
346
347 let _votes: Vec<_> = candidates.values().map(|c| c.peer_id).collect();
349
350 let our_vote = self.current_vote.read();
353
354 if let Some(winner) = our_vote.as_ref() {
355 if candidates.contains_key(winner) {
357 Some(*winner)
358 } else {
359 None
360 }
361 } else {
362 None
363 }
364 }; if let Some(winner) = winner_to_declare {
367 self.declare_winner(winner).await?;
369 return Ok(true);
370 }
371
372 Ok(false)
373 }
374
375 async fn declare_winner(&self, winner: PeerId) -> Result<()> {
377 let session = *self.session_id.read();
378
379 if winner == self.local_peer_id {
380 *self.state.write() = ElectionState::Master;
381 *self.current_master.write() = Some(winner);
382
383 info!("Elected as master, session={}", session);
384 let _ = self
385 .event_tx
386 .send(ElectionEvent::ElectedMaster { session })
387 .await;
388 } else {
389 *self.state.write() = ElectionState::Follower;
390 *self.current_master.write() = Some(winner);
391
392 info!(
393 "Following master {}, session={}",
394 winner.to_base58(),
395 session
396 );
397 let _ = self
398 .event_tx
399 .send(ElectionEvent::NewMaster {
400 master: winner,
401 session,
402 })
403 .await;
404 }
405
406 Ok(())
407 }
408
409 pub async fn handle_master_loss(&self) -> Result<()> {
411 if self.config.auto_elect_on_peer_loss {
412 warn!("Master lost, starting new election after delay");
413
414 tokio::time::sleep(self.config.connection_delay).await;
416
417 self.start_election().await?;
418 }
419
420 Ok(())
421 }
422
423 pub async fn check_timeout(&self) -> Result<bool> {
425 if *self.state.read() != ElectionState::Campaigning {
426 return Ok(false);
427 }
428
429 let timed_out_session = {
431 let candidates = self.candidates.read();
432 let now = Instant::now();
433
434 let mut timed_out = None;
435 for candidate in candidates.values() {
436 if now.duration_since(candidate.last_seen) > self.config.election_timeout {
437 timed_out = Some(*self.session_id.read());
438 break;
439 }
440 }
441 timed_out
442 }; if let Some(session) = timed_out_session {
445 warn!("Election timeout, session={}", session);
446
447 let _ = self.event_tx.send(ElectionEvent::Timeout { session }).await;
448
449 self.start_election().await?;
451 return Ok(true);
452 }
453
454 Ok(false)
455 }
456
457 pub async fn force_master(&self) -> Result<()> {
459 *self.state.write() = ElectionState::Master;
460 *self.current_master.write() = Some(self.local_peer_id);
461
462 let session = {
463 let mut s = self.session_id.write();
464 *s += 1;
465 *s
466 };
467
468 info!("Forced master election (single node), session={}", session);
469 let _ = self
470 .event_tx
471 .send(ElectionEvent::ElectedMaster { session })
472 .await;
473
474 Ok(())
475 }
476
477 pub async fn step_down(&self) -> Result<()> {
479 if *self.state.read() == ElectionState::Master {
480 *self.state.write() = ElectionState::Idle;
481 *self.current_master.write() = None;
482
483 info!("Stepped down from master role");
484 }
485
486 Ok(())
487 }
488}
489
490impl std::fmt::Debug for ElectionManager {
491 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
492 f.debug_struct("ElectionManager")
493 .field("local_peer_id", &self.local_peer_id.to_base58())
494 .field("state", &*self.state.read())
495 .field("current_master", &self.current_master())
496 .field("session_id", &*self.session_id.read())
497 .field("seniority", &self.seniority())
498 .finish()
499 }
500}
501
502#[cfg(test)]
503mod tests {
504 use super::*;
505 use tokio::sync::mpsc;
506
507 #[tokio::test]
508 async fn test_election_start() {
509 let (tx, mut rx) = mpsc::channel(10);
510 let peer_id = PeerId::random();
511 let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
512
513 manager.start_election().await.unwrap();
514
515 assert_eq!(manager.state(), ElectionState::Campaigning);
516 assert_eq!(manager.session_id(), 1);
517
518 let event = rx.try_recv().unwrap();
520 assert!(matches!(event, ElectionEvent::Started { session: 1 }));
521 }
522
523 #[tokio::test]
524 async fn test_force_master() {
525 let (tx, mut rx) = mpsc::channel(10);
526 let peer_id = PeerId::random();
527 let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
528
529 manager.force_master().await.unwrap();
530
531 assert!(manager.is_master());
532 assert_eq!(manager.current_master(), Some(peer_id));
533
534 let event = rx.try_recv().unwrap();
535 assert!(matches!(event, ElectionEvent::ElectedMaster { .. }));
536 }
537
538 #[test]
539 fn test_seniority() {
540 let (tx, _rx) = mpsc::channel(10);
541 let manager = ElectionManager::new(PeerId::random(), ElectionConfig::default(), tx);
542
543 std::thread::sleep(Duration::from_millis(10));
545 assert!(manager.seniority() >= Duration::from_millis(10));
546 }
547
548 #[test]
549 fn test_create_message() {
550 let (tx, _rx) = mpsc::channel(10);
551 let peer_id = PeerId::random();
552 let manager = ElectionManager::new(peer_id, ElectionConfig::default(), tx);
553
554 let msg = manager.create_message();
555
556 assert_eq!(msg.sender, peer_id);
557 assert!(msg.clock > 0);
558 assert!(msg.seniority > Duration::ZERO);
559 }
560}