forge_runtime/cluster/
leader.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12 pub check_interval: Duration,
14 pub lease_duration: Duration,
16 pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21 fn default() -> Self {
22 Self {
23 check_interval: Duration::from_secs(5),
24 lease_duration: Duration::from_secs(60),
25 refresh_interval: Duration::from_secs(30),
26 }
27 }
28}
29
30pub struct LeaderElection {
47 pool: sqlx::PgPool,
48 node_id: NodeId,
49 role: LeaderRole,
50 config: LeaderConfig,
51 is_leader: Arc<AtomicBool>,
53 lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54 shutdown_tx: watch::Sender<bool>,
55 shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59 pub fn new(
61 pool: sqlx::PgPool,
62 node_id: NodeId,
63 role: LeaderRole,
64 config: LeaderConfig,
65 ) -> Self {
66 let (shutdown_tx, shutdown_rx) = watch::channel(false);
67 Self {
68 pool,
69 node_id,
70 role,
71 config,
72 is_leader: Arc::new(AtomicBool::new(false)),
73 lock_connection: Arc::new(Mutex::new(None)),
74 shutdown_tx,
75 shutdown_rx,
76 }
77 }
78
79 pub fn is_leader(&self) -> bool {
81 self.is_leader.load(Ordering::SeqCst)
82 }
83
84 pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86 self.shutdown_rx.clone()
87 }
88
89 pub fn stop(&self) {
91 let _ = self.shutdown_tx.send(true);
92 }
93
94 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96 if self.is_leader() {
97 return Ok(true);
98 }
99
100 let mut conn = self
101 .pool
102 .acquire()
103 .await
104 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106 let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
108 .bind(self.role.lock_id())
109 .fetch_optional(&mut *conn)
110 .await
111 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
112
113 let acquired = result.map(|(v,)| v).unwrap_or(false);
114
115 super::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
116
117 if acquired {
118 let lease_until =
120 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
121
122 sqlx::query(
123 r#"
124 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
125 VALUES ($1, $2, NOW(), $3)
126 ON CONFLICT (role) DO UPDATE SET
127 node_id = EXCLUDED.node_id,
128 acquired_at = NOW(),
129 lease_until = EXCLUDED.lease_until
130 "#,
131 )
132 .bind(self.role.as_str())
133 .bind(self.node_id.as_uuid())
134 .bind(lease_until)
135 .execute(&self.pool)
136 .await
137 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139 self.is_leader.store(true, Ordering::SeqCst);
140 super::metrics::set_is_leader(self.role.as_str(), true);
141 *self.lock_connection.lock().await = Some(conn);
142 tracing::info!(role = self.role.as_str(), "Acquired leadership");
143 }
144
145 Ok(acquired)
146 }
147
148 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
150 if !self.is_leader() {
151 return Ok(());
152 }
153
154 let lease_until =
155 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
156
157 sqlx::query(
158 r#"
159 UPDATE forge_leaders
160 SET lease_until = $3
161 WHERE role = $1 AND node_id = $2
162 "#,
163 )
164 .bind(self.role.as_str())
165 .bind(self.node_id.as_uuid())
166 .bind(lease_until)
167 .execute(&self.pool)
168 .await
169 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
170
171 Ok(())
172 }
173
174 pub async fn release_leadership(&self) -> forge_core::Result<()> {
176 if !self.is_leader() {
177 return Ok(());
178 }
179
180 let mut lock_connection = self.lock_connection.lock().await;
182 if let Some(mut conn) = lock_connection.take() {
183 sqlx::query("SELECT pg_advisory_unlock($1)")
184 .bind(self.role.lock_id())
185 .execute(&mut *conn)
186 .await
187 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
188 } else {
189 tracing::warn!(
190 role = self.role.as_str(),
191 "Leader lock connection missing during release"
192 );
193 }
194 drop(lock_connection);
195
196 sqlx::query(
198 r#"
199 DELETE FROM forge_leaders
200 WHERE role = $1 AND node_id = $2
201 "#,
202 )
203 .bind(self.role.as_str())
204 .bind(self.node_id.as_uuid())
205 .execute(&self.pool)
206 .await
207 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209 self.is_leader.store(false, Ordering::SeqCst);
210 super::metrics::set_is_leader(self.role.as_str(), false);
211 tracing::info!(role = self.role.as_str(), "Released leadership");
212
213 Ok(())
214 }
215
216 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
218 let result: Option<(DateTime<Utc>,)> =
219 sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
220 .bind(self.role.as_str())
221 .fetch_optional(&self.pool)
222 .await
223 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
224
225 match result {
226 Some((lease_until,)) => Ok(lease_until > Utc::now()),
227 None => Ok(false), }
229 }
230
231 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
233 let row = sqlx::query(
234 r#"
235 SELECT role, node_id, acquired_at, lease_until
236 FROM forge_leaders
237 WHERE role = $1
238 "#,
239 )
240 .bind(self.role.as_str())
241 .fetch_optional(&self.pool)
242 .await
243 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
244
245 match row {
246 Some(row) => {
247 use sqlx::Row;
248 let role_str: String = row.get("role");
249 let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
250
251 Ok(Some(LeaderInfo {
252 role,
253 node_id: NodeId::from_uuid(row.get("node_id")),
254 acquired_at: row.get("acquired_at"),
255 lease_until: row.get("lease_until"),
256 }))
257 }
258 None => Ok(None),
259 }
260 }
261
262 pub async fn run(&self) {
264 let mut shutdown_rx = self.shutdown_rx.clone();
265
266 loop {
267 tokio::select! {
268 _ = tokio::time::sleep(self.config.check_interval) => {
269 if self.is_leader() {
270 if let Err(e) = self.refresh_lease().await {
272 tracing::debug!(error = %e, "Failed to refresh lease");
273 }
274 } else {
275 match self.check_leader_health().await {
277 Ok(false) => {
278 if let Err(e) = self.try_become_leader().await {
280 tracing::debug!(error = %e, "Failed to acquire leadership");
281 }
282 }
283 Ok(true) => {
284 }
286 Err(e) => {
287 tracing::debug!(error = %e, "Failed to check leader health");
288 }
289 }
290 }
291 }
292 _ = shutdown_rx.changed() => {
293 if *shutdown_rx.borrow() {
294 tracing::debug!("Leader election shutting down");
295 if let Err(e) = self.release_leadership().await {
296 tracing::debug!(error = %e, "Failed to release leadership");
297 }
298 break;
299 }
300 }
301 }
302 }
303 }
304}
305
306pub struct LeaderGuard<'a> {
308 election: &'a LeaderElection,
309}
310
311impl<'a> LeaderGuard<'a> {
312 pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
315 if election.is_leader() {
316 Some(Self { election })
317 } else {
318 None
319 }
320 }
321
322 pub fn is_leader(&self) -> bool {
324 self.election.is_leader()
325 }
326}
327
328#[cfg(test)]
329mod tests {
330 use super::*;
331
332 #[test]
333 fn test_leader_config_default() {
334 let config = LeaderConfig::default();
335 assert_eq!(config.check_interval, Duration::from_secs(5));
336 assert_eq!(config.lease_duration, Duration::from_secs(60));
337 assert_eq!(config.refresh_interval, Duration::from_secs(30));
338 }
339}