forge_runtime/cluster/
leader.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::Utc;
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12 pub check_interval: Duration,
14 pub lease_duration: Duration,
16 pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21 fn default() -> Self {
22 Self {
23 check_interval: Duration::from_secs(5),
24 lease_duration: Duration::from_secs(60),
25 refresh_interval: Duration::from_secs(30),
26 }
27 }
28}
29
30pub struct LeaderElection {
47 pool: sqlx::PgPool,
48 node_id: NodeId,
49 role: LeaderRole,
50 config: LeaderConfig,
51 is_leader: Arc<AtomicBool>,
53 lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54 shutdown_tx: watch::Sender<bool>,
55 shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59 pub fn new(
61 pool: sqlx::PgPool,
62 node_id: NodeId,
63 role: LeaderRole,
64 config: LeaderConfig,
65 ) -> Self {
66 let (shutdown_tx, shutdown_rx) = watch::channel(false);
67 Self {
68 pool,
69 node_id,
70 role,
71 config,
72 is_leader: Arc::new(AtomicBool::new(false)),
73 lock_connection: Arc::new(Mutex::new(None)),
74 shutdown_tx,
75 shutdown_rx,
76 }
77 }
78
79 pub fn is_leader(&self) -> bool {
81 self.is_leader.load(Ordering::SeqCst)
82 }
83
84 pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86 self.shutdown_rx.clone()
87 }
88
89 pub fn stop(&self) {
91 let _ = self.shutdown_tx.send(true);
92 }
93
94 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96 if self.is_leader() {
97 return Ok(true);
98 }
99
100 let mut conn = self
101 .pool
102 .acquire()
103 .await
104 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106 let acquired = sqlx::query_scalar!(
108 r#"SELECT pg_try_advisory_lock($1) as "acquired!""#,
109 self.role.lock_id()
110 )
111 .fetch_one(&mut *conn)
112 .await
113 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
114
115 super::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
116
117 if acquired {
118 let lease_until =
120 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
121
122 sqlx::query!(
123 r#"
124 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
125 VALUES ($1, $2, NOW(), $3)
126 ON CONFLICT (role) DO UPDATE SET
127 node_id = EXCLUDED.node_id,
128 acquired_at = NOW(),
129 lease_until = EXCLUDED.lease_until
130 "#,
131 self.role.as_str(),
132 self.node_id.as_uuid(),
133 lease_until,
134 )
135 .execute(&self.pool)
136 .await
137 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139 self.is_leader.store(true, Ordering::SeqCst);
140 super::metrics::set_is_leader(self.role.as_str(), true);
141 *self.lock_connection.lock().await = Some(conn);
142 tracing::info!(role = self.role.as_str(), "Acquired leadership");
143 }
144
145 Ok(acquired)
146 }
147
148 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
150 if !self.is_leader() {
151 return Ok(());
152 }
153
154 let lease_until =
155 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
156
157 sqlx::query!(
158 r#"
159 UPDATE forge_leaders
160 SET lease_until = $3
161 WHERE role = $1 AND node_id = $2
162 "#,
163 self.role.as_str(),
164 self.node_id.as_uuid(),
165 lease_until,
166 )
167 .execute(&self.pool)
168 .await
169 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
170
171 Ok(())
172 }
173
174 pub async fn release_leadership(&self) -> forge_core::Result<()> {
176 if !self.is_leader() {
177 return Ok(());
178 }
179
180 let mut lock_connection = self.lock_connection.lock().await;
182 if let Some(mut conn) = lock_connection.take() {
183 sqlx::query_scalar!("SELECT pg_advisory_unlock($1)", self.role.lock_id())
184 .fetch_one(&mut *conn)
185 .await
186 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
187 } else {
188 tracing::warn!(
189 role = self.role.as_str(),
190 "Leader lock connection missing during release"
191 );
192 }
193 drop(lock_connection);
194
195 sqlx::query!(
197 r#"
198 DELETE FROM forge_leaders
199 WHERE role = $1 AND node_id = $2
200 "#,
201 self.role.as_str(),
202 self.node_id.as_uuid(),
203 )
204 .execute(&self.pool)
205 .await
206 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
207
208 self.is_leader.store(false, Ordering::SeqCst);
209 super::metrics::set_is_leader(self.role.as_str(), false);
210 tracing::info!(role = self.role.as_str(), "Released leadership");
211
212 Ok(())
213 }
214
215 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
217 let result = sqlx::query_scalar!(
218 "SELECT lease_until FROM forge_leaders WHERE role = $1",
219 self.role.as_str()
220 )
221 .fetch_optional(&self.pool)
222 .await
223 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
224
225 match result {
226 Some(lease_until) => Ok(lease_until > Utc::now()),
227 None => Ok(false), }
229 }
230
231 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
233 let row = sqlx::query!(
234 r#"
235 SELECT role, node_id, acquired_at, lease_until
236 FROM forge_leaders
237 WHERE role = $1
238 "#,
239 self.role.as_str(),
240 )
241 .fetch_optional(&self.pool)
242 .await
243 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
244
245 match row {
246 Some(row) => {
247 let role = row.role.parse().unwrap_or(LeaderRole::Scheduler);
248
249 Ok(Some(LeaderInfo {
250 role,
251 node_id: NodeId::from_uuid(row.node_id),
252 acquired_at: row.acquired_at,
253 lease_until: row.lease_until,
254 }))
255 }
256 None => Ok(None),
257 }
258 }
259
260 pub async fn run(&self) {
262 let mut shutdown_rx = self.shutdown_rx.clone();
263
264 loop {
265 tokio::select! {
266 _ = tokio::time::sleep(self.config.check_interval) => {
267 if self.is_leader() {
268 if let Err(e) = self.refresh_lease().await {
270 tracing::debug!(error = %e, "Failed to refresh lease");
271 }
272 } else {
273 match self.check_leader_health().await {
275 Ok(false) => {
276 if let Err(e) = self.try_become_leader().await {
278 tracing::debug!(error = %e, "Failed to acquire leadership");
279 }
280 }
281 Ok(true) => {
282 }
284 Err(e) => {
285 tracing::debug!(error = %e, "Failed to check leader health");
286 }
287 }
288 }
289 }
290 _ = shutdown_rx.changed() => {
291 if *shutdown_rx.borrow() {
292 tracing::debug!("Leader election shutting down");
293 if let Err(e) = self.release_leadership().await {
294 tracing::debug!(error = %e, "Failed to release leadership");
295 }
296 break;
297 }
298 }
299 }
300 }
301 }
302}
303
304pub struct LeaderGuard<'a> {
306 election: &'a LeaderElection,
307}
308
309impl<'a> LeaderGuard<'a> {
310 pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
313 if election.is_leader() {
314 Some(Self { election })
315 } else {
316 None
317 }
318 }
319
320 pub fn is_leader(&self) -> bool {
322 self.election.is_leader()
323 }
324}
325
326#[cfg(test)]
327mod tests {
328 use super::*;
329
330 #[test]
331 fn test_leader_config_default() {
332 let config = LeaderConfig::default();
333 assert_eq!(config.check_interval, Duration::from_secs(5));
334 assert_eq!(config.lease_duration, Duration::from_secs(60));
335 assert_eq!(config.refresh_interval, Duration::from_secs(30));
336 }
337}