Skip to main content

forge_runtime/cluster/
leader.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::Utc;
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9/// Leader election configuration.
10#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12    /// How often standbys check leader health.
13    pub check_interval: Duration,
14    /// Lease duration (leader must refresh before expiry).
15    pub lease_duration: Duration,
16    /// Lease refresh interval.
17    pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21    fn default() -> Self {
22        Self {
23            check_interval: Duration::from_secs(5),
24            lease_duration: Duration::from_secs(60),
25            refresh_interval: Duration::from_secs(30),
26        }
27    }
28}
29
30/// Leader election using PostgreSQL advisory locks.
31///
32/// Advisory locks provide a simple, reliable way to elect a leader without
33/// external coordination services. Key properties:
34///
35/// 1. **Mutual exclusion**: Only one session can hold a given lock ID at a time.
36/// 2. **Automatic release**: If the connection dies, PostgreSQL releases the lock.
37/// 3. **Non-blocking try**: `pg_try_advisory_lock` returns immediately with success/failure.
38///
39/// Each `LeaderRole` maps to a unique lock ID, allowing multiple independent
40/// leader elections (e.g., separate leaders for cron scheduler and workflow timers).
41///
42/// The `is_leader` flag uses `SeqCst` ordering because:
43/// - Multiple threads read this flag to decide whether to execute leader-only code
44/// - We need visibility guarantees across threads immediately after acquiring/releasing
45/// - The performance cost is negligible (leadership changes are rare)
46pub struct LeaderElection {
47    pool: sqlx::PgPool,
48    node_id: NodeId,
49    role: LeaderRole,
50    config: LeaderConfig,
51    /// Uses SeqCst for cross-thread visibility of leadership state changes.
52    is_leader: Arc<AtomicBool>,
53    lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54    shutdown_tx: watch::Sender<bool>,
55    shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59    /// Create a new leader election instance.
60    pub fn new(
61        pool: sqlx::PgPool,
62        node_id: NodeId,
63        role: LeaderRole,
64        config: LeaderConfig,
65    ) -> Self {
66        let (shutdown_tx, shutdown_rx) = watch::channel(false);
67        Self {
68            pool,
69            node_id,
70            role,
71            config,
72            is_leader: Arc::new(AtomicBool::new(false)),
73            lock_connection: Arc::new(Mutex::new(None)),
74            shutdown_tx,
75            shutdown_rx,
76        }
77    }
78
79    /// Check if this node is the leader.
80    pub fn is_leader(&self) -> bool {
81        self.is_leader.load(Ordering::SeqCst)
82    }
83
84    /// Get a shutdown receiver.
85    pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86        self.shutdown_rx.clone()
87    }
88
89    /// Stop the leader election.
90    pub fn stop(&self) {
91        let _ = self.shutdown_tx.send(true);
92    }
93
94    /// Try to acquire leadership.
95    pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96        if self.is_leader() {
97            return Ok(true);
98        }
99
100        let mut conn = self
101            .pool
102            .acquire()
103            .await
104            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106        // Try to acquire advisory lock (non-blocking)
107        let acquired = sqlx::query_scalar!(
108            r#"SELECT pg_try_advisory_lock($1) as "acquired!""#,
109            self.role.lock_id()
110        )
111        .fetch_one(&mut *conn)
112        .await
113        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
114
115        super::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
116
117        if acquired {
118            // Record leadership in database for visibility
119            let lease_until =
120                Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
121
122            sqlx::query(
123                r#"
124                INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
125                VALUES ($1, $2, NOW(), $3)
126                ON CONFLICT (role) DO UPDATE SET
127                    node_id = EXCLUDED.node_id,
128                    acquired_at = NOW(),
129                    lease_until = EXCLUDED.lease_until
130                "#,
131            )
132            .bind(self.role.as_str())
133            .bind(self.node_id.as_uuid())
134            .bind(lease_until)
135            .execute(&self.pool)
136            .await
137            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139            self.is_leader.store(true, Ordering::SeqCst);
140            super::metrics::set_is_leader(self.role.as_str(), true);
141            *self.lock_connection.lock().await = Some(conn);
142            tracing::info!(role = self.role.as_str(), "Acquired leadership");
143        }
144
145        Ok(acquired)
146    }
147
148    /// Refresh the leadership lease.
149    pub async fn refresh_lease(&self) -> forge_core::Result<()> {
150        if !self.is_leader() {
151            return Ok(());
152        }
153
154        let lease_until =
155            Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
156
157        sqlx::query(
158            r#"
159            UPDATE forge_leaders
160            SET lease_until = $3
161            WHERE role = $1 AND node_id = $2
162            "#,
163        )
164        .bind(self.role.as_str())
165        .bind(self.node_id.as_uuid())
166        .bind(lease_until)
167        .execute(&self.pool)
168        .await
169        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
170
171        Ok(())
172    }
173
174    /// Release leadership.
175    pub async fn release_leadership(&self) -> forge_core::Result<()> {
176        if !self.is_leader() {
177            return Ok(());
178        }
179
180        // Release the advisory lock on the same session that acquired it.
181        let mut lock_connection = self.lock_connection.lock().await;
182        if let Some(mut conn) = lock_connection.take() {
183            sqlx::query("SELECT pg_advisory_unlock($1)")
184                .bind(self.role.lock_id())
185                .execute(&mut *conn)
186                .await
187                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
188        } else {
189            tracing::warn!(
190                role = self.role.as_str(),
191                "Leader lock connection missing during release"
192            );
193        }
194        drop(lock_connection);
195
196        // Clear leadership record
197        sqlx::query(
198            r#"
199            DELETE FROM forge_leaders
200            WHERE role = $1 AND node_id = $2
201            "#,
202        )
203        .bind(self.role.as_str())
204        .bind(self.node_id.as_uuid())
205        .execute(&self.pool)
206        .await
207        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209        self.is_leader.store(false, Ordering::SeqCst);
210        super::metrics::set_is_leader(self.role.as_str(), false);
211        tracing::info!(role = self.role.as_str(), "Released leadership");
212
213        Ok(())
214    }
215
216    /// Check if the current leader is healthy.
217    pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
218        let result = sqlx::query_scalar!(
219            "SELECT lease_until FROM forge_leaders WHERE role = $1",
220            self.role.as_str()
221        )
222        .fetch_optional(&self.pool)
223        .await
224        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
225
226        match result {
227            Some(lease_until) => Ok(lease_until > Utc::now()),
228            None => Ok(false), // No leader
229        }
230    }
231
232    /// Get current leader info.
233    pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
234        let row = sqlx::query(
235            r#"
236            SELECT role, node_id, acquired_at, lease_until
237            FROM forge_leaders
238            WHERE role = $1
239            "#,
240        )
241        .bind(self.role.as_str())
242        .fetch_optional(&self.pool)
243        .await
244        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
245
246        match row {
247            Some(row) => {
248                use sqlx::Row;
249                let role_str: String = row.get("role");
250                let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
251
252                Ok(Some(LeaderInfo {
253                    role,
254                    node_id: NodeId::from_uuid(row.get("node_id")),
255                    acquired_at: row.get("acquired_at"),
256                    lease_until: row.get("lease_until"),
257                }))
258            }
259            None => Ok(None),
260        }
261    }
262
263    /// Run the leader election loop.
264    pub async fn run(&self) {
265        let mut shutdown_rx = self.shutdown_rx.clone();
266
267        loop {
268            tokio::select! {
269                _ = tokio::time::sleep(self.config.check_interval) => {
270                    if self.is_leader() {
271                        // We're the leader, refresh lease
272                        if let Err(e) = self.refresh_lease().await {
273                            tracing::debug!(error = %e, "Failed to refresh lease");
274                        }
275                    } else {
276                        // We're a standby, check if we should try to become leader
277                        match self.check_leader_health().await {
278                            Ok(false) => {
279                                // No healthy leader, try to become one
280                                if let Err(e) = self.try_become_leader().await {
281                                    tracing::debug!(error = %e, "Failed to acquire leadership");
282                                }
283                            }
284                            Ok(true) => {
285                                // Leader is healthy, stay as standby
286                            }
287                            Err(e) => {
288                                tracing::debug!(error = %e, "Failed to check leader health");
289                            }
290                        }
291                    }
292                }
293                _ = shutdown_rx.changed() => {
294                    if *shutdown_rx.borrow() {
295                        tracing::debug!("Leader election shutting down");
296                        if let Err(e) = self.release_leadership().await {
297                            tracing::debug!(error = %e, "Failed to release leadership");
298                        }
299                        break;
300                    }
301                }
302            }
303        }
304    }
305}
306
307/// RAII guard for leader-only operations.
308pub struct LeaderGuard<'a> {
309    election: &'a LeaderElection,
310}
311
312impl<'a> LeaderGuard<'a> {
313    /// Try to create a leader guard.
314    /// Returns None if not the leader.
315    pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
316        if election.is_leader() {
317            Some(Self { election })
318        } else {
319            None
320        }
321    }
322
323    /// Check if still leader.
324    pub fn is_leader(&self) -> bool {
325        self.election.is_leader()
326    }
327}
328
329#[cfg(test)]
330mod tests {
331    use super::*;
332
333    #[test]
334    fn test_leader_config_default() {
335        let config = LeaderConfig::default();
336        assert_eq!(config.check_interval, Duration::from_secs(5));
337        assert_eq!(config.lease_duration, Duration::from_secs(60));
338        assert_eq!(config.refresh_interval, Duration::from_secs(30));
339    }
340}