Skip to main content

forge_runtime/cluster/
leader.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9/// Leader election configuration.
10#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12    /// How often standbys check leader health.
13    pub check_interval: Duration,
14    /// Lease duration (leader must refresh before expiry).
15    pub lease_duration: Duration,
16    /// Lease refresh interval.
17    pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21    fn default() -> Self {
22        Self {
23            check_interval: Duration::from_secs(5),
24            lease_duration: Duration::from_secs(60),
25            refresh_interval: Duration::from_secs(30),
26        }
27    }
28}
29
30/// Leader election using PostgreSQL advisory locks.
31///
32/// Advisory locks provide a simple, reliable way to elect a leader without
33/// external coordination services. Key properties:
34///
35/// 1. **Mutual exclusion**: Only one session can hold a given lock ID at a time.
36/// 2. **Automatic release**: If the connection dies, PostgreSQL releases the lock.
37/// 3. **Non-blocking try**: `pg_try_advisory_lock` returns immediately with success/failure.
38///
39/// Each `LeaderRole` maps to a unique lock ID, allowing multiple independent
40/// leader elections (e.g., separate leaders for cron scheduler and workflow timers).
41///
42/// The `is_leader` flag uses `SeqCst` ordering because:
43/// - Multiple threads read this flag to decide whether to execute leader-only code
44/// - We need visibility guarantees across threads immediately after acquiring/releasing
45/// - The performance cost is negligible (leadership changes are rare)
46pub struct LeaderElection {
47    pool: sqlx::PgPool,
48    node_id: NodeId,
49    role: LeaderRole,
50    config: LeaderConfig,
51    /// Uses SeqCst for cross-thread visibility of leadership state changes.
52    is_leader: Arc<AtomicBool>,
53    lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
54    shutdown_tx: watch::Sender<bool>,
55    shutdown_rx: watch::Receiver<bool>,
56}
57
58impl LeaderElection {
59    /// Create a new leader election instance.
60    pub fn new(
61        pool: sqlx::PgPool,
62        node_id: NodeId,
63        role: LeaderRole,
64        config: LeaderConfig,
65    ) -> Self {
66        let (shutdown_tx, shutdown_rx) = watch::channel(false);
67        Self {
68            pool,
69            node_id,
70            role,
71            config,
72            is_leader: Arc::new(AtomicBool::new(false)),
73            lock_connection: Arc::new(Mutex::new(None)),
74            shutdown_tx,
75            shutdown_rx,
76        }
77    }
78
79    /// Check if this node is the leader.
80    pub fn is_leader(&self) -> bool {
81        self.is_leader.load(Ordering::SeqCst)
82    }
83
84    /// Get a shutdown receiver.
85    pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
86        self.shutdown_rx.clone()
87    }
88
89    /// Stop the leader election.
90    pub fn stop(&self) {
91        let _ = self.shutdown_tx.send(true);
92    }
93
94    /// Try to acquire leadership.
95    pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
96        if self.is_leader() {
97            return Ok(true);
98        }
99
100        let mut conn = self
101            .pool
102            .acquire()
103            .await
104            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
105
106        // Try to acquire advisory lock (non-blocking)
107        let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
108            .bind(self.role.lock_id())
109            .fetch_optional(&mut *conn)
110            .await
111            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
112
113        let acquired = result.map(|(v,)| v).unwrap_or(false);
114
115        if acquired {
116            // Record leadership in database for visibility
117            let lease_until =
118                Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
119
120            sqlx::query(
121                r#"
122                INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
123                VALUES ($1, $2, NOW(), $3)
124                ON CONFLICT (role) DO UPDATE SET
125                    node_id = EXCLUDED.node_id,
126                    acquired_at = NOW(),
127                    lease_until = EXCLUDED.lease_until
128                "#,
129            )
130            .bind(self.role.as_str())
131            .bind(self.node_id.as_uuid())
132            .bind(lease_until)
133            .execute(&self.pool)
134            .await
135            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
136
137            self.is_leader.store(true, Ordering::SeqCst);
138            *self.lock_connection.lock().await = Some(conn);
139            tracing::info!(role = self.role.as_str(), "Acquired leadership");
140        }
141
142        Ok(acquired)
143    }
144
145    /// Refresh the leadership lease.
146    pub async fn refresh_lease(&self) -> forge_core::Result<()> {
147        if !self.is_leader() {
148            return Ok(());
149        }
150
151        let lease_until =
152            Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
153
154        sqlx::query(
155            r#"
156            UPDATE forge_leaders
157            SET lease_until = $3
158            WHERE role = $1 AND node_id = $2
159            "#,
160        )
161        .bind(self.role.as_str())
162        .bind(self.node_id.as_uuid())
163        .bind(lease_until)
164        .execute(&self.pool)
165        .await
166        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
167
168        Ok(())
169    }
170
171    /// Release leadership.
172    pub async fn release_leadership(&self) -> forge_core::Result<()> {
173        if !self.is_leader() {
174            return Ok(());
175        }
176
177        // Release the advisory lock on the same session that acquired it.
178        let mut lock_connection = self.lock_connection.lock().await;
179        if let Some(mut conn) = lock_connection.take() {
180            sqlx::query("SELECT pg_advisory_unlock($1)")
181                .bind(self.role.lock_id())
182                .execute(&mut *conn)
183                .await
184                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
185        } else {
186            tracing::warn!(
187                role = self.role.as_str(),
188                "Leader lock connection missing during release"
189            );
190        }
191        drop(lock_connection);
192
193        // Clear leadership record
194        sqlx::query(
195            r#"
196            DELETE FROM forge_leaders
197            WHERE role = $1 AND node_id = $2
198            "#,
199        )
200        .bind(self.role.as_str())
201        .bind(self.node_id.as_uuid())
202        .execute(&self.pool)
203        .await
204        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
205
206        self.is_leader.store(false, Ordering::SeqCst);
207        tracing::info!(role = self.role.as_str(), "Released leadership");
208
209        Ok(())
210    }
211
212    /// Check if the current leader is healthy.
213    pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
214        let result: Option<(DateTime<Utc>,)> =
215            sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
216                .bind(self.role.as_str())
217                .fetch_optional(&self.pool)
218                .await
219                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
220
221        match result {
222            Some((lease_until,)) => Ok(lease_until > Utc::now()),
223            None => Ok(false), // No leader
224        }
225    }
226
227    /// Get current leader info.
228    pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
229        let row = sqlx::query(
230            r#"
231            SELECT role, node_id, acquired_at, lease_until
232            FROM forge_leaders
233            WHERE role = $1
234            "#,
235        )
236        .bind(self.role.as_str())
237        .fetch_optional(&self.pool)
238        .await
239        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
240
241        match row {
242            Some(row) => {
243                use sqlx::Row;
244                let role_str: String = row.get("role");
245                let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
246
247                Ok(Some(LeaderInfo {
248                    role,
249                    node_id: NodeId::from_uuid(row.get("node_id")),
250                    acquired_at: row.get("acquired_at"),
251                    lease_until: row.get("lease_until"),
252                }))
253            }
254            None => Ok(None),
255        }
256    }
257
258    /// Run the leader election loop.
259    pub async fn run(&self) {
260        let mut shutdown_rx = self.shutdown_rx.clone();
261
262        loop {
263            tokio::select! {
264                _ = tokio::time::sleep(self.config.check_interval) => {
265                    if self.is_leader() {
266                        // We're the leader, refresh lease
267                        if let Err(e) = self.refresh_lease().await {
268                            tracing::debug!(error = %e, "Failed to refresh lease");
269                        }
270                    } else {
271                        // We're a standby, check if we should try to become leader
272                        match self.check_leader_health().await {
273                            Ok(false) => {
274                                // No healthy leader, try to become one
275                                if let Err(e) = self.try_become_leader().await {
276                                    tracing::debug!(error = %e, "Failed to acquire leadership");
277                                }
278                            }
279                            Ok(true) => {
280                                // Leader is healthy, stay as standby
281                            }
282                            Err(e) => {
283                                tracing::debug!(error = %e, "Failed to check leader health");
284                            }
285                        }
286                    }
287                }
288                _ = shutdown_rx.changed() => {
289                    if *shutdown_rx.borrow() {
290                        tracing::debug!("Leader election shutting down");
291                        if let Err(e) = self.release_leadership().await {
292                            tracing::debug!(error = %e, "Failed to release leadership");
293                        }
294                        break;
295                    }
296                }
297            }
298        }
299    }
300}
301
302/// RAII guard for leader-only operations.
303pub struct LeaderGuard<'a> {
304    election: &'a LeaderElection,
305}
306
307impl<'a> LeaderGuard<'a> {
308    /// Try to create a leader guard.
309    /// Returns None if not the leader.
310    pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
311        if election.is_leader() {
312            Some(Self { election })
313        } else {
314            None
315        }
316    }
317
318    /// Check if still leader.
319    pub fn is_leader(&self) -> bool {
320        self.election.is_leader()
321    }
322}
323
324#[cfg(test)]
325mod tests {
326    use super::*;
327
328    #[test]
329    fn test_leader_config_default() {
330        let config = LeaderConfig::default();
331        assert_eq!(config.check_interval, Duration::from_secs(5));
332        assert_eq!(config.lease_duration, Duration::from_secs(60));
333        assert_eq!(config.refresh_interval, Duration::from_secs(30));
334    }
335}