Skip to main content

forge_runtime/cluster/
leader.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9/// Leader election configuration.
10#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12    /// How often standbys check leader health.
13    pub check_interval: Duration,
14    /// Lease duration (leader must refresh before expiry).
15    pub lease_duration: Duration,
16    /// Lease refresh interval.
17    pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21    fn default() -> Self {
22        Self {
23            check_interval: Duration::from_secs(5),
24            lease_duration: Duration::from_secs(60),
25            refresh_interval: Duration::from_secs(30),
26        }
27    }
28}
29
30/// Leader election using PostgreSQL advisory locks.
31///
32/// Advisory locks provide a simple, reliable way to elect a leader without
33/// external coordination services. Key properties:
34///
35/// 1. **Mutual exclusion**: Only one session can hold a given lock ID at a time.
36/// 2. **Automatic release**: If the connection dies, PostgreSQL releases the lock.
37/// 3. **Non-blocking try**: `pg_try_advisory_lock` returns immediately with success/failure.
38///
39/// Each `LeaderRole` maps to a unique lock ID, allowing multiple independent
40/// leader elections (e.g., separate leaders for cron scheduler and workflow timers).
41///
42/// The `is_leader` flag uses `SeqCst` ordering because:
43/// - Multiple threads read this flag to decide whether to execute leader-only code
44/// - We need visibility guarantees across threads immediately after acquiring/releasing
45/// - The performance cost is negligible (leadership changes are rare)
46pub struct LeaderElection {
47    pool: sqlx::PgPool,
48    node_id: NodeId,
49    role: LeaderRole,
50    config: LeaderConfig,
51    /// Uses SeqCst for cross-thread visibility of leadership state changes.
52    is_leader: Arc<AtomicBool>,
53    lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54    shutdown_tx: watch::Sender<bool>,
55    shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59    /// Create a new leader election instance.
60    pub fn new(
61        pool: sqlx::PgPool,
62        node_id: NodeId,
63        role: LeaderRole,
64        config: LeaderConfig,
65    ) -> Self {
66        let (shutdown_tx, shutdown_rx) = watch::channel(false);
67        Self {
68            pool,
69            node_id,
70            role,
71            config,
72            is_leader: Arc::new(AtomicBool::new(false)),
73            lock_connection: Arc::new(Mutex::new(None)),
74            shutdown_tx,
75            shutdown_rx,
76        }
77    }
78
79    /// Check if this node is the leader.
80    pub fn is_leader(&self) -> bool {
81        self.is_leader.load(Ordering::SeqCst)
82    }
83
84    /// Get a shutdown receiver.
85    pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86        self.shutdown_rx.clone()
87    }
88
89    /// Stop the leader election.
90    pub fn stop(&self) {
91        let _ = self.shutdown_tx.send(true);
92    }
93
94    /// Try to acquire leadership.
95    pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96        if self.is_leader() {
97            return Ok(true);
98        }
99
100        let mut conn = self
101            .pool
102            .acquire()
103            .await
104            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106        // Try to acquire advisory lock (non-blocking)
107        let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
108            .bind(self.role.lock_id())
109            .fetch_optional(&mut *conn)
110            .await
111            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
112
113        let acquired = result.map(|(v,)| v).unwrap_or(false);
114
115        super::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
116
117        if acquired {
118            // Record leadership in database for visibility
119            let lease_until =
120                Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
121
122            sqlx::query(
123                r#"
124                INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
125                VALUES ($1, $2, NOW(), $3)
126                ON CONFLICT (role) DO UPDATE SET
127                    node_id = EXCLUDED.node_id,
128                    acquired_at = NOW(),
129                    lease_until = EXCLUDED.lease_until
130                "#,
131            )
132            .bind(self.role.as_str())
133            .bind(self.node_id.as_uuid())
134            .bind(lease_until)
135            .execute(&self.pool)
136            .await
137            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139            self.is_leader.store(true, Ordering::SeqCst);
140            super::metrics::set_is_leader(self.role.as_str(), true);
141            *self.lock_connection.lock().await = Some(conn);
142            tracing::info!(role = self.role.as_str(), "Acquired leadership");
143        }
144
145        Ok(acquired)
146    }
147
148    /// Refresh the leadership lease.
149    pub async fn refresh_lease(&self) -> forge_core::Result<()> {
150        if !self.is_leader() {
151            return Ok(());
152        }
153
154        let lease_until =
155            Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
156
157        sqlx::query(
158            r#"
159            UPDATE forge_leaders
160            SET lease_until = $3
161            WHERE role = $1 AND node_id = $2
162            "#,
163        )
164        .bind(self.role.as_str())
165        .bind(self.node_id.as_uuid())
166        .bind(lease_until)
167        .execute(&self.pool)
168        .await
169        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
170
171        Ok(())
172    }
173
174    /// Release leadership.
175    pub async fn release_leadership(&self) -> forge_core::Result<()> {
176        if !self.is_leader() {
177            return Ok(());
178        }
179
180        // Release the advisory lock on the same session that acquired it.
181        let mut lock_connection = self.lock_connection.lock().await;
182        if let Some(mut conn) = lock_connection.take() {
183            sqlx::query("SELECT pg_advisory_unlock($1)")
184                .bind(self.role.lock_id())
185                .execute(&mut *conn)
186                .await
187                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
188        } else {
189            tracing::warn!(
190                role = self.role.as_str(),
191                "Leader lock connection missing during release"
192            );
193        }
194        drop(lock_connection);
195
196        // Clear leadership record
197        sqlx::query(
198            r#"
199            DELETE FROM forge_leaders
200            WHERE role = $1 AND node_id = $2
201            "#,
202        )
203        .bind(self.role.as_str())
204        .bind(self.node_id.as_uuid())
205        .execute(&self.pool)
206        .await
207        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
208
209        self.is_leader.store(false, Ordering::SeqCst);
210        super::metrics::set_is_leader(self.role.as_str(), false);
211        tracing::info!(role = self.role.as_str(), "Released leadership");
212
213        Ok(())
214    }
215
216    /// Check if the current leader is healthy.
217    pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
218        let result: Option<(DateTime<Utc>,)> =
219            sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
220                .bind(self.role.as_str())
221                .fetch_optional(&self.pool)
222                .await
223                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
224
225        match result {
226            Some((lease_until,)) => Ok(lease_until > Utc::now()),
227            None => Ok(false), // No leader
228        }
229    }
230
231    /// Get current leader info.
232    pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
233        let row = sqlx::query(
234            r#"
235            SELECT role, node_id, acquired_at, lease_until
236            FROM forge_leaders
237            WHERE role = $1
238            "#,
239        )
240        .bind(self.role.as_str())
241        .fetch_optional(&self.pool)
242        .await
243        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
244
245        match row {
246            Some(row) => {
247                use sqlx::Row;
248                let role_str: String = row.get("role");
249                let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
250
251                Ok(Some(LeaderInfo {
252                    role,
253                    node_id: NodeId::from_uuid(row.get("node_id")),
254                    acquired_at: row.get("acquired_at"),
255                    lease_until: row.get("lease_until"),
256                }))
257            }
258            None => Ok(None),
259        }
260    }
261
262    /// Run the leader election loop.
263    pub async fn run(&self) {
264        let mut shutdown_rx = self.shutdown_rx.clone();
265
266        loop {
267            tokio::select! {
268                _ = tokio::time::sleep(self.config.check_interval) => {
269                    if self.is_leader() {
270                        // We're the leader, refresh lease
271                        if let Err(e) = self.refresh_lease().await {
272                            tracing::debug!(error = %e, "Failed to refresh lease");
273                        }
274                    } else {
275                        // We're a standby, check if we should try to become leader
276                        match self.check_leader_health().await {
277                            Ok(false) => {
278                                // No healthy leader, try to become one
279                                if let Err(e) = self.try_become_leader().await {
280                                    tracing::debug!(error = %e, "Failed to acquire leadership");
281                                }
282                            }
283                            Ok(true) => {
284                                // Leader is healthy, stay as standby
285                            }
286                            Err(e) => {
287                                tracing::debug!(error = %e, "Failed to check leader health");
288                            }
289                        }
290                    }
291                }
292                _ = shutdown_rx.changed() => {
293                    if *shutdown_rx.borrow() {
294                        tracing::debug!("Leader election shutting down");
295                        if let Err(e) = self.release_leadership().await {
296                            tracing::debug!(error = %e, "Failed to release leadership");
297                        }
298                        break;
299                    }
300                }
301            }
302        }
303    }
304}
305
306/// RAII guard for leader-only operations.
307pub struct LeaderGuard<'a> {
308    election: &'a LeaderElection,
309}
310
311impl<'a> LeaderGuard<'a> {
312    /// Try to create a leader guard.
313    /// Returns None if not the leader.
314    pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
315        if election.is_leader() {
316            Some(Self { election })
317        } else {
318            None
319        }
320    }
321
322    /// Check if still leader.
323    pub fn is_leader(&self) -> bool {
324        self.election.is_leader()
325    }
326}
327
328#[cfg(test)]
329mod tests {
330    use super::*;
331
332    #[test]
333    fn test_leader_config_default() {
334        let config = LeaderConfig::default();
335        assert_eq!(config.check_interval, Duration::from_secs(5));
336        assert_eq!(config.lease_duration, Duration::from_secs(60));
337        assert_eq!(config.refresh_interval, Duration::from_secs(30));
338    }
339}