1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
use crate::sync::CoordinationBackend;
use async_trait::async_trait;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::watch;
use tokio::time::sleep;
use tracing::debug;
use uuid::Uuid;
#[async_trait]
pub trait LeadershipGate: Send + Sync {
async fn start(self: Arc<Self>);
fn is_leader(&self) -> bool;
}
#[derive(Default)]
pub struct LocalLeadershipGate;
#[async_trait]
impl LeadershipGate for LocalLeadershipGate {
async fn start(self: Arc<Self>) {
let _ = self;
}
fn is_leader(&self) -> bool {
true
}
}
pub struct LeaderElector {
backend: Option<Arc<dyn CoordinationBackend>>,
key: String,
id: String,
ttl_ms: u64,
// Provide a signal for when leadership status changes
leader_signal: watch::Sender<bool>,
// Keep a receiver to ensure the channel remains open so send() updates the value
_keep_alive: watch::Receiver<bool>,
}
impl LeaderElector {
pub fn new(
backend: Option<Arc<dyn CoordinationBackend>>,
key: String,
ttl_ms: u64,
) -> (Arc<Self>, watch::Receiver<bool>) {
let (tx, rx) = watch::channel(false);
let elector = Arc::new(Self {
backend,
key,
id: Uuid::new_v4().to_string(),
ttl_ms,
leader_signal: tx,
_keep_alive: rx.clone(),
});
(elector, rx)
}
pub fn is_leader(&self) -> bool {
let val = *self.leader_signal.borrow();
debug!("LeaderElector[{}]::is_leader -> {}", self.id, val);
val
}
pub async fn start(self: Arc<Self>) {
debug!("LeaderElector[{}]::start for key: {}", self.id, self.key);
if self.backend.is_none() {
// Local mode: Always leader
debug!("LeaderElector: No backend, enabling local mode (always leader)");
let _ = self.leader_signal.send(true);
return;
}
let backend = self.backend.as_ref().unwrap();
let value = self.id.as_bytes();
loop {
// 1. Try to acquire lock
// eprintln!("LeaderElector: Trying to acquire lock for key: {}", self.key);
match backend.acquire_lock(&self.key, value, self.ttl_ms).await {
Ok(true) => {
// Acquired!
debug!(
"LeaderElector[{}]: Lock ACQUIRED for key: {}",
self.id, self.key
);
if !*self.leader_signal.borrow() {
let _ = self.leader_signal.send(true);
debug!("LeaderElector[{}]: Signal set to TRUE", self.id);
} else {
debug!("LeaderElector[{}]: Signal ALREADY TRUE", self.id);
}
// Maintain leadership (renew lock)
// We need to renew before TTL expires. Let's renew at 1/3 of TTL.
let renew_interval = Duration::from_millis(self.ttl_ms / 3);
let mut fail_count = 0;
loop {
let sleep_duration = if fail_count > 0 {
Duration::from_millis(std::cmp::min(self.ttl_ms / 10, 1000))
} else {
renew_interval
};
let sleep_duration =
sleep_duration + Duration::from_millis(Self::jitter_ms(100));
sleep(sleep_duration).await;
match backend.renew_lock(&self.key, value, self.ttl_ms).await {
Ok(true) => {
// Renewal successful, continue
// eprintln!("LeaderElector: Lock renewed");
fail_count = 0;
}
Ok(false) => {
// Lost lock!
debug!("LeaderElector[{}]: Lock LOST during renewal!", self.id);
let _ = self.leader_signal.send(false);
break;
}
Err(e) => {
debug!("Leader renewal error: {}", e);
fail_count += 1;
// If we fail too many times or total time exceeds TTL, we must assume lost.
// renew_interval * 3 = TTL. So if we fail 3 times in a row, we are close to expiration.
if fail_count >= 3 {
debug!(
"LeaderElector[{}]: Too many renewal errors, stepping down",
self.id
);
let _ = self.leader_signal.send(false);
break;
}
// Retry quickly in next loop iteration
}
}
}
}
Ok(false) => {
// Failed to acquire. Not leader.
if *self.leader_signal.borrow() {
debug!("LeaderElector[{}]: Stepped down from leadership", self.id);
let _ = self.leader_signal.send(false);
} else {
// eprintln!("LeaderElector: Failed to acquire lock, retrying...");
}
// Wait before retrying.
// Optimization: Check more frequently to pick up dropped leadership faster.
// But not too fast to spam backend.
// 1/10 of TTL or 1s, whichever is smaller?
let retry_wait = std::cmp::min(self.ttl_ms / 10, 1000) + Self::jitter_ms(50);
sleep(Duration::from_millis(retry_wait)).await;
}
Err(e) => {
eprintln!("Leader election error: {}", e);
sleep(Duration::from_millis(1000 + Self::jitter_ms(250))).await;
}
}
}
}
fn jitter_ms(max: u64) -> u64 {
if max == 0 {
return 0;
}
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos() as u64;
nanos % max
}
}
#[async_trait]
impl LeadershipGate for LeaderElector {
async fn start(self: Arc<Self>) {
LeaderElector::start(self).await;
}
fn is_leader(&self) -> bool {
LeaderElector::is_leader(self)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn local_leadership_gate_is_always_leader() {
let gate = Arc::new(LocalLeadershipGate);
gate.clone().start().await;
assert!(gate.is_leader());
}
}