ranvier_runtime/
cluster.rs1use async_trait::async_trait;
2use std::sync::Arc;
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::time::Duration;
5use tokio::time::sleep;
6use tracing::{info, warn};
7
8use crate::distributed::{DistributedError, DistributedLock, LockOptions};
10
11#[async_trait]
13pub trait LeaderElection: Send + Sync {
14 async fn try_become_leader(&self) -> Result<bool, DistributedError>;
16
17 fn is_leader(&self) -> bool;
19
20 async fn step_down(&self) -> Result<(), DistributedError>;
22}
23
24pub struct LockBasedElection<L: DistributedLock> {
26 lock: Arc<L>,
27 node_id: String,
28 resource_key: String,
29 is_leader: Arc<AtomicBool>,
30 ttl_ms: u64,
31}
32
33impl<L: DistributedLock> LockBasedElection<L> {
34 pub fn new(lock: Arc<L>, node_id: String, resource_key: String, ttl_ms: u64) -> Self {
35 Self {
36 lock,
37 node_id,
38 resource_key,
39 is_leader: Arc::new(AtomicBool::new(false)),
40 ttl_ms,
41 }
42 }
43}
44
45#[async_trait]
46impl<L: DistributedLock> LeaderElection for LockBasedElection<L> {
47 async fn try_become_leader(&self) -> Result<bool, DistributedError> {
48 let opts = LockOptions {
49 ttl_ms: self.ttl_ms,
50 retry_count: 0,
51 retry_delay_ms: 0,
52 };
53
54 match self.lock.acquire(&self.resource_key, opts).await {
55 Ok(_guard) => {
56 if !self.is_leader.load(Ordering::Relaxed) {
57 info!("Node {} became the cluster leader", self.node_id);
58 self.is_leader.store(true, Ordering::SeqCst);
59 }
60 Ok(true)
63 }
64 Err(DistributedError::LockError(_)) => {
65 if self.is_leader.load(Ordering::Relaxed) {
66 warn!("Node {} lost leadership!", self.node_id);
67 self.is_leader.store(false, Ordering::SeqCst);
68 }
69 Ok(false)
70 }
71 Err(e) => Err(e),
72 }
73 }
74
75 fn is_leader(&self) -> bool {
76 self.is_leader.load(Ordering::Relaxed)
77 }
78
79 async fn step_down(&self) -> Result<(), DistributedError> {
80 if self.is_leader.swap(false, Ordering::SeqCst) {
81 info!("Node {} stepping down from leadership", self.node_id);
82 }
84 Ok(())
85 }
86}
87
88pub struct ClusterManager {
90 }
92
93impl ClusterManager {
94 pub fn start_election_loop<E: LeaderElection + 'static>(
96 election: Arc<E>,
97 interval: Duration,
98 ) -> tokio::task::JoinHandle<()> {
99 tokio::spawn(async move {
100 loop {
101 match election.try_become_leader().await {
102 Ok(true) => {
103 }
105 Ok(false) => {
106 }
108 Err(err) => {
109 warn!("Error in leader election: {}", err);
110 }
111 }
112 sleep(interval).await;
113 }
114 })
115 }
116}