forge_runtime/cluster/
leader.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::watch;
8
9/// Leader election configuration.
10#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12    /// How often standbys check leader health.
13    pub check_interval: Duration,
14    /// Lease duration (leader must refresh before expiry).
15    pub lease_duration: Duration,
16    /// Lease refresh interval.
17    pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21    fn default() -> Self {
22        Self {
23            check_interval: Duration::from_secs(5),
24            lease_duration: Duration::from_secs(60),
25            refresh_interval: Duration::from_secs(30),
26        }
27    }
28}
29
30/// Leader election using PostgreSQL advisory locks.
31pub struct LeaderElection {
32    pool: sqlx::PgPool,
33    node_id: NodeId,
34    role: LeaderRole,
35    config: LeaderConfig,
36    is_leader: Arc<AtomicBool>,
37    shutdown_tx: watch::Sender<bool>,
38    shutdown_rx: watch::Receiver<bool>,
39}
40
41impl LeaderElection {
42    /// Create a new leader election instance.
43    pub fn new(
44        pool: sqlx::PgPool,
45        node_id: NodeId,
46        role: LeaderRole,
47        config: LeaderConfig,
48    ) -> Self {
49        let (shutdown_tx, shutdown_rx) = watch::channel(false);
50        Self {
51            pool,
52            node_id,
53            role,
54            config,
55            is_leader: Arc::new(AtomicBool::new(false)),
56            shutdown_tx,
57            shutdown_rx,
58        }
59    }
60
61    /// Check if this node is the leader.
62    pub fn is_leader(&self) -> bool {
63        self.is_leader.load(Ordering::SeqCst)
64    }
65
66    /// Get a shutdown receiver.
67    pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
68        self.shutdown_rx.clone()
69    }
70
71    /// Stop the leader election.
72    pub fn stop(&self) {
73        let _ = self.shutdown_tx.send(true);
74    }
75
76    /// Try to acquire leadership.
77    pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
78        // Try to acquire advisory lock (non-blocking)
79        let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
80            .bind(self.role.lock_id())
81            .fetch_optional(&self.pool)
82            .await
83            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
84
85        let acquired = result.map(|(v,)| v).unwrap_or(false);
86
87        if acquired {
88            // Record leadership in database for visibility
89            let lease_until =
90                Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
91
92            sqlx::query(
93                r#"
94                INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
95                VALUES ($1, $2, NOW(), $3)
96                ON CONFLICT (role) DO UPDATE SET
97                    node_id = EXCLUDED.node_id,
98                    acquired_at = NOW(),
99                    lease_until = EXCLUDED.lease_until
100                "#,
101            )
102            .bind(self.role.as_str())
103            .bind(self.node_id.as_uuid())
104            .bind(lease_until)
105            .execute(&self.pool)
106            .await
107            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
108
109            self.is_leader.store(true, Ordering::SeqCst);
110            tracing::info!("Became {} leader", self.role.as_str());
111        }
112
113        Ok(acquired)
114    }
115
116    /// Refresh the leadership lease.
117    pub async fn refresh_lease(&self) -> forge_core::Result<()> {
118        if !self.is_leader() {
119            return Ok(());
120        }
121
122        let lease_until =
123            Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
124
125        sqlx::query(
126            r#"
127            UPDATE forge_leaders
128            SET lease_until = $3
129            WHERE role = $1 AND node_id = $2
130            "#,
131        )
132        .bind(self.role.as_str())
133        .bind(self.node_id.as_uuid())
134        .bind(lease_until)
135        .execute(&self.pool)
136        .await
137        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139        Ok(())
140    }
141
142    /// Release leadership.
143    pub async fn release_leadership(&self) -> forge_core::Result<()> {
144        if !self.is_leader() {
145            return Ok(());
146        }
147
148        // Release the advisory lock
149        sqlx::query("SELECT pg_advisory_unlock($1)")
150            .bind(self.role.lock_id())
151            .execute(&self.pool)
152            .await
153            .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
154
155        // Clear leadership record
156        sqlx::query(
157            r#"
158            DELETE FROM forge_leaders
159            WHERE role = $1 AND node_id = $2
160            "#,
161        )
162        .bind(self.role.as_str())
163        .bind(self.node_id.as_uuid())
164        .execute(&self.pool)
165        .await
166        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
167
168        self.is_leader.store(false, Ordering::SeqCst);
169        tracing::info!("Released {} leadership", self.role.as_str());
170
171        Ok(())
172    }
173
174    /// Check if the current leader is healthy.
175    pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
176        let result: Option<(DateTime<Utc>,)> =
177            sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
178                .bind(self.role.as_str())
179                .fetch_optional(&self.pool)
180                .await
181                .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
182
183        match result {
184            Some((lease_until,)) => Ok(lease_until > Utc::now()),
185            None => Ok(false), // No leader
186        }
187    }
188
189    /// Get current leader info.
190    pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
191        let row = sqlx::query(
192            r#"
193            SELECT role, node_id, acquired_at, lease_until
194            FROM forge_leaders
195            WHERE role = $1
196            "#,
197        )
198        .bind(self.role.as_str())
199        .fetch_optional(&self.pool)
200        .await
201        .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
202
203        match row {
204            Some(row) => {
205                use sqlx::Row;
206                let role_str: String = row.get("role");
207                let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
208
209                Ok(Some(LeaderInfo {
210                    role,
211                    node_id: NodeId::from_uuid(row.get("node_id")),
212                    acquired_at: row.get("acquired_at"),
213                    lease_until: row.get("lease_until"),
214                }))
215            }
216            None => Ok(None),
217        }
218    }
219
220    /// Run the leader election loop.
221    pub async fn run(&self) {
222        let mut shutdown_rx = self.shutdown_rx.clone();
223
224        loop {
225            tokio::select! {
226                _ = tokio::time::sleep(self.config.check_interval) => {
227                    if self.is_leader() {
228                        // We're the leader, refresh lease
229                        if let Err(e) = self.refresh_lease().await {
230                            tracing::warn!("Failed to refresh lease: {}", e);
231                        }
232                    } else {
233                        // We're a standby, check if we should try to become leader
234                        match self.check_leader_health().await {
235                            Ok(false) => {
236                                // No healthy leader, try to become one
237                                if let Err(e) = self.try_become_leader().await {
238                                    tracing::warn!("Failed to acquire leadership: {}", e);
239                                }
240                            }
241                            Ok(true) => {
242                                // Leader is healthy, stay as standby
243                            }
244                            Err(e) => {
245                                tracing::warn!("Failed to check leader health: {}", e);
246                            }
247                        }
248                    }
249                }
250                _ = shutdown_rx.changed() => {
251                    if *shutdown_rx.borrow() {
252                        tracing::info!("Leader election shutting down");
253                        if let Err(e) = self.release_leadership().await {
254                            tracing::warn!("Failed to release leadership: {}", e);
255                        }
256                        break;
257                    }
258                }
259            }
260        }
261    }
262}
263
264/// RAII guard for leader-only operations.
265pub struct LeaderGuard<'a> {
266    election: &'a LeaderElection,
267}
268
269impl<'a> LeaderGuard<'a> {
270    /// Try to create a leader guard.
271    /// Returns None if not the leader.
272    pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
273        if election.is_leader() {
274            Some(Self { election })
275        } else {
276            None
277        }
278    }
279
280    /// Check if still leader.
281    pub fn is_leader(&self) -> bool {
282        self.election.is_leader()
283    }
284}
285
286#[cfg(test)]
287mod tests {
288    use super::*;
289
290    #[test]
291    fn test_leader_config_default() {
292        let config = LeaderConfig::default();
293        assert_eq!(config.check_interval, Duration::from_secs(5));
294        assert_eq!(config.lease_duration, Duration::from_secs(60));
295        assert_eq!(config.refresh_interval, Duration::from_secs(30));
296    }
297}