forge_runtime/cluster/
leader.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::Utc;
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12 pub check_interval: Duration,
14 pub lease_duration: Duration,
16 pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21 fn default() -> Self {
22 Self {
23 check_interval: Duration::from_secs(5),
24 lease_duration: Duration::from_secs(60),
25 refresh_interval: Duration::from_secs(30),
26 }
27 }
28}
29
30pub struct LeaderElection {
47 pool: sqlx::PgPool,
48 node_id: NodeId,
49 role: LeaderRole,
50 config: LeaderConfig,
51 is_leader: Arc<AtomicBool>,
53 lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54 shutdown_tx: watch::Sender<bool>,
55 shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59 pub fn new(
61 pool: sqlx::PgPool,
62 node_id: NodeId,
63 role: LeaderRole,
64 config: LeaderConfig,
65 ) -> Self {
66 let (shutdown_tx, shutdown_rx) = watch::channel(false);
67 Self {
68 pool,
69 node_id,
70 role,
71 config,
72 is_leader: Arc::new(AtomicBool::new(false)),
73 lock_connection: Arc::new(Mutex::new(None)),
74 shutdown_tx,
75 shutdown_rx,
76 }
77 }
78
79 pub fn is_leader(&self) -> bool {
81 self.is_leader.load(Ordering::SeqCst)
82 }
83
84 pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86 self.shutdown_rx.clone()
87 }
88
89 pub fn stop(&self) {
91 let _ = self.shutdown_tx.send(true);
92 }
93
94 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96 if self.is_leader() {
97 return Ok(true);
98 }
99
100 let mut conn = self
101 .pool
102 .acquire()
103 .await
104 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106 let acquired = sqlx::query_scalar!(
108 r#"SELECT pg_try_advisory_lock($1) as "acquired!""#,
109 self.role.lock_id()
110 )
111 .fetch_one(&mut *conn)
112 .await
113 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
114
115 super::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
116
117 if acquired {
118 let lease_until =
120 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
121
122 sqlx::query(
123 r#"
124 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
125 VALUES ($1, $2, NOW(), $3)
126 ON CONFLICT (role) DO UPDATE SET
127 node_id = EXCLUDED.node_id,
128 acquired_at = NOW(),
129 lease_until = EXCLUDED.lease_until
130 "#,
131 )
132 .bind(self.role.as_str())
133 .bind(self.node_id.as_uuid())
134 .bind(lease_until)
135 .execute(&self.pool)
136 .await
137 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139 self.is_leader.store(true, Ordering::SeqCst);
140 super::metrics::set_is_leader(self.role.as_str(), true);
141 *self.lock_connection.lock().await = Some(conn);
142 tracing::info!(role = self.role.as_str(), "Acquired leadership");
143 }
144
145 Ok(acquired)
146 }
147
148 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
150 if !self.is_leader() {
151 return Ok(());
152 }
153
154 let lease_until =
155 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
156
157 sqlx::query(
158 r#"
159 UPDATE forge_leaders
160 SET lease_until = $3
161 WHERE role = $1 AND node_id = $2
162 "#,
163 )
164 .bind(self.role.as_str())
165 .bind(self.node_id.as_uuid())
166 .bind(lease_until)
167 .execute(&self.pool)
168 .await
169 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
170
171 Ok(())
172 }
173
174 pub async fn release_leadership(&self) -> forge_core::Result<()> {
176 if !self.is_leader() {
177 return Ok(());
178 }
179
180 let mut lock_connection = self.lock_connection.lock().await;
182 if let Some(mut conn) = lock_connection.take() {
183 sqlx::query("SELECT pg_advisory_unlock($1)")
184 .bind(self.role.lock_id())
185 .execute(&mut *conn)
186 .await
187 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
188 } else {
189 tracing::warn!(
190 role = self.role.as_str(),
191 "Leader lock connection missing during release"
192 );
193 }
194 drop(lock_connection);
195
196 sqlx::query(
198 r#"
199 DELETE FROM forge_leaders
200 WHERE role = $1 AND node_id = $2
201 "#,
202 )
203 .bind(self.role.as_str())
204 .bind(self.node_id.as_uuid())
205 .execute(&self.pool)
206 .await
207 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209 self.is_leader.store(false, Ordering::SeqCst);
210 super::metrics::set_is_leader(self.role.as_str(), false);
211 tracing::info!(role = self.role.as_str(), "Released leadership");
212
213 Ok(())
214 }
215
216 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
218 let result = sqlx::query_scalar!(
219 "SELECT lease_until FROM forge_leaders WHERE role = $1",
220 self.role.as_str()
221 )
222 .fetch_optional(&self.pool)
223 .await
224 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
225
226 match result {
227 Some(lease_until) => Ok(lease_until > Utc::now()),
228 None => Ok(false), }
230 }
231
232 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
234 let row = sqlx::query(
235 r#"
236 SELECT role, node_id, acquired_at, lease_until
237 FROM forge_leaders
238 WHERE role = $1
239 "#,
240 )
241 .bind(self.role.as_str())
242 .fetch_optional(&self.pool)
243 .await
244 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
245
246 match row {
247 Some(row) => {
248 use sqlx::Row;
249 let role_str: String = row.get("role");
250 let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
251
252 Ok(Some(LeaderInfo {
253 role,
254 node_id: NodeId::from_uuid(row.get("node_id")),
255 acquired_at: row.get("acquired_at"),
256 lease_until: row.get("lease_until"),
257 }))
258 }
259 None => Ok(None),
260 }
261 }
262
263 pub async fn run(&self) {
265 let mut shutdown_rx = self.shutdown_rx.clone();
266
267 loop {
268 tokio::select! {
269 _ = tokio::time::sleep(self.config.check_interval) => {
270 if self.is_leader() {
271 if let Err(e) = self.refresh_lease().await {
273 tracing::debug!(error = %e, "Failed to refresh lease");
274 }
275 } else {
276 match self.check_leader_health().await {
278 Ok(false) => {
279 if let Err(e) = self.try_become_leader().await {
281 tracing::debug!(error = %e, "Failed to acquire leadership");
282 }
283 }
284 Ok(true) => {
285 }
287 Err(e) => {
288 tracing::debug!(error = %e, "Failed to check leader health");
289 }
290 }
291 }
292 }
293 _ = shutdown_rx.changed() => {
294 if *shutdown_rx.borrow() {
295 tracing::debug!("Leader election shutting down");
296 if let Err(e) = self.release_leadership().await {
297 tracing::debug!(error = %e, "Failed to release leadership");
298 }
299 break;
300 }
301 }
302 }
303 }
304 }
305}
306
307pub struct LeaderGuard<'a> {
309 election: &'a LeaderElection,
310}
311
312impl<'a> LeaderGuard<'a> {
313 pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
316 if election.is_leader() {
317 Some(Self { election })
318 } else {
319 None
320 }
321 }
322
323 pub fn is_leader(&self) -> bool {
325 self.election.is_leader()
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_leader_config_default() {
335 let config = LeaderConfig::default();
336 assert_eq!(config.check_interval, Duration::from_secs(5));
337 assert_eq!(config.lease_duration, Duration::from_secs(60));
338 assert_eq!(config.refresh_interval, Duration::from_secs(30));
339 }
340}