forge_runtime/cluster/
leader.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12 pub check_interval: Duration,
14 pub lease_duration: Duration,
16 pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21 fn default() -> Self {
22 Self {
23 check_interval: Duration::from_secs(5),
24 lease_duration: Duration::from_secs(60),
25 refresh_interval: Duration::from_secs(30),
26 }
27 }
28}
29
30pub struct LeaderElection {
47 pool: sqlx::PgPool,
48 node_id: NodeId,
49 role: LeaderRole,
50 config: LeaderConfig,
51 is_leader: Arc<AtomicBool>,
53 lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54 shutdown_tx: watch::Sender<bool>,
55 shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59 pub fn new(
61 pool: sqlx::PgPool,
62 node_id: NodeId,
63 role: LeaderRole,
64 config: LeaderConfig,
65 ) -> Self {
66 let (shutdown_tx, shutdown_rx) = watch::channel(false);
67 Self {
68 pool,
69 node_id,
70 role,
71 config,
72 is_leader: Arc::new(AtomicBool::new(false)),
73 lock_connection: Arc::new(Mutex::new(None)),
74 shutdown_tx,
75 shutdown_rx,
76 }
77 }
78
79 pub fn is_leader(&self) -> bool {
81 self.is_leader.load(Ordering::SeqCst)
82 }
83
84 pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86 self.shutdown_rx.clone()
87 }
88
89 pub fn stop(&self) {
91 let _ = self.shutdown_tx.send(true);
92 }
93
94 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96 if self.is_leader() {
97 return Ok(true);
98 }
99
100 let mut conn = self
101 .pool
102 .acquire()
103 .await
104 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106 let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
108 .bind(self.role.lock_id())
109 .fetch_optional(&mut *conn)
110 .await
111 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
112
113 let acquired = result.map(|(v,)| v).unwrap_or(false);
114
115 if acquired {
116 let lease_until =
118 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
119
120 sqlx::query(
121 r#"
122 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
123 VALUES ($1, $2, NOW(), $3)
124 ON CONFLICT (role) DO UPDATE SET
125 node_id = EXCLUDED.node_id,
126 acquired_at = NOW(),
127 lease_until = EXCLUDED.lease_until
128 "#,
129 )
130 .bind(self.role.as_str())
131 .bind(self.node_id.as_uuid())
132 .bind(lease_until)
133 .execute(&self.pool)
134 .await
135 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
136
137 self.is_leader.store(true, Ordering::SeqCst);
138 *self.lock_connection.lock().await = Some(conn);
139 tracing::info!(role = self.role.as_str(), "Acquired leadership");
140 }
141
142 Ok(acquired)
143 }
144
145 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
147 if !self.is_leader() {
148 return Ok(());
149 }
150
151 let lease_until =
152 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
153
154 sqlx::query(
155 r#"
156 UPDATE forge_leaders
157 SET lease_until = $3
158 WHERE role = $1 AND node_id = $2
159 "#,
160 )
161 .bind(self.role.as_str())
162 .bind(self.node_id.as_uuid())
163 .bind(lease_until)
164 .execute(&self.pool)
165 .await
166 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
167
168 Ok(())
169 }
170
171 pub async fn release_leadership(&self) -> forge_core::Result<()> {
173 if !self.is_leader() {
174 return Ok(());
175 }
176
177 let mut lock_connection = self.lock_connection.lock().await;
179 if let Some(mut conn) = lock_connection.take() {
180 sqlx::query("SELECT pg_advisory_unlock($1)")
181 .bind(self.role.lock_id())
182 .execute(&mut *conn)
183 .await
184 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
185 } else {
186 tracing::warn!(
187 role = self.role.as_str(),
188 "Leader lock connection missing during release"
189 );
190 }
191 drop(lock_connection);
192
193 sqlx::query(
195 r#"
196 DELETE FROM forge_leaders
197 WHERE role = $1 AND node_id = $2
198 "#,
199 )
200 .bind(self.role.as_str())
201 .bind(self.node_id.as_uuid())
202 .execute(&self.pool)
203 .await
204 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
205
206 self.is_leader.store(false, Ordering::SeqCst);
207 tracing::info!(role = self.role.as_str(), "Released leadership");
208
209 Ok(())
210 }
211
212 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
214 let result: Option<(DateTime<Utc>,)> =
215 sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
216 .bind(self.role.as_str())
217 .fetch_optional(&self.pool)
218 .await
219 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
220
221 match result {
222 Some((lease_until,)) => Ok(lease_until > Utc::now()),
223 None => Ok(false), }
225 }
226
227 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
229 let row = sqlx::query(
230 r#"
231 SELECT role, node_id, acquired_at, lease_until
232 FROM forge_leaders
233 WHERE role = $1
234 "#,
235 )
236 .bind(self.role.as_str())
237 .fetch_optional(&self.pool)
238 .await
239 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
240
241 match row {
242 Some(row) => {
243 use sqlx::Row;
244 let role_str: String = row.get("role");
245 let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
246
247 Ok(Some(LeaderInfo {
248 role,
249 node_id: NodeId::from_uuid(row.get("node_id")),
250 acquired_at: row.get("acquired_at"),
251 lease_until: row.get("lease_until"),
252 }))
253 }
254 None => Ok(None),
255 }
256 }
257
258 pub async fn run(&self) {
260 let mut shutdown_rx = self.shutdown_rx.clone();
261
262 loop {
263 tokio::select! {
264 _ = tokio::time::sleep(self.config.check_interval) => {
265 if self.is_leader() {
266 if let Err(e) = self.refresh_lease().await {
268 tracing::debug!(error = %e, "Failed to refresh lease");
269 }
270 } else {
271 match self.check_leader_health().await {
273 Ok(false) => {
274 if let Err(e) = self.try_become_leader().await {
276 tracing::debug!(error = %e, "Failed to acquire leadership");
277 }
278 }
279 Ok(true) => {
280 }
282 Err(e) => {
283 tracing::debug!(error = %e, "Failed to check leader health");
284 }
285 }
286 }
287 }
288 _ = shutdown_rx.changed() => {
289 if *shutdown_rx.borrow() {
290 tracing::debug!("Leader election shutting down");
291 if let Err(e) = self.release_leadership().await {
292 tracing::debug!(error = %e, "Failed to release leadership");
293 }
294 break;
295 }
296 }
297 }
298 }
299 }
300}
301
302pub struct LeaderGuard<'a> {
304 election: &'a LeaderElection,
305}
306
307impl<'a> LeaderGuard<'a> {
308 pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
311 if election.is_leader() {
312 Some(Self { election })
313 } else {
314 None
315 }
316 }
317
318 pub fn is_leader(&self) -> bool {
320 self.election.is_leader()
321 }
322}
323
324#[cfg(test)]
325mod tests {
326 use super::*;
327
328 #[test]
329 fn test_leader_config_default() {
330 let config = LeaderConfig::default();
331 assert_eq!(config.check_interval, Duration::from_secs(5));
332 assert_eq!(config.lease_duration, Duration::from_secs(60));
333 assert_eq!(config.refresh_interval, Duration::from_secs(30));
334 }
335}