forge_runtime/cluster/
leader.rs1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::{DateTime, Utc};
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::watch;
8
9#[derive(Debug, Clone)]
11pub struct LeaderConfig {
12 pub check_interval: Duration,
14 pub lease_duration: Duration,
16 pub refresh_interval: Duration,
18}
19
20impl Default for LeaderConfig {
21 fn default() -> Self {
22 Self {
23 check_interval: Duration::from_secs(5),
24 lease_duration: Duration::from_secs(60),
25 refresh_interval: Duration::from_secs(30),
26 }
27 }
28}
29
30pub struct LeaderElection {
32 pool: sqlx::PgPool,
33 node_id: NodeId,
34 role: LeaderRole,
35 config: LeaderConfig,
36 is_leader: Arc<AtomicBool>,
37 shutdown_tx: watch::Sender<bool>,
38 shutdown_rx: watch::Receiver<bool>,
39}
40
41impl LeaderElection {
42 pub fn new(
44 pool: sqlx::PgPool,
45 node_id: NodeId,
46 role: LeaderRole,
47 config: LeaderConfig,
48 ) -> Self {
49 let (shutdown_tx, shutdown_rx) = watch::channel(false);
50 Self {
51 pool,
52 node_id,
53 role,
54 config,
55 is_leader: Arc::new(AtomicBool::new(false)),
56 shutdown_tx,
57 shutdown_rx,
58 }
59 }
60
61 pub fn is_leader(&self) -> bool {
63 self.is_leader.load(Ordering::SeqCst)
64 }
65
66 pub fn shutdown_receiver(&self) -> watch::Receiver<bool> {
68 self.shutdown_rx.clone()
69 }
70
71 pub fn stop(&self) {
73 let _ = self.shutdown_tx.send(true);
74 }
75
76 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
78 let result: Option<(bool,)> = sqlx::query_as("SELECT pg_try_advisory_lock($1) as acquired")
80 .bind(self.role.lock_id())
81 .fetch_optional(&self.pool)
82 .await
83 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
84
85 let acquired = result.map(|(v,)| v).unwrap_or(false);
86
87 if acquired {
88 let lease_until =
90 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
91
92 sqlx::query(
93 r#"
94 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
95 VALUES ($1, $2, NOW(), $3)
96 ON CONFLICT (role) DO UPDATE SET
97 node_id = EXCLUDED.node_id,
98 acquired_at = NOW(),
99 lease_until = EXCLUDED.lease_until
100 "#,
101 )
102 .bind(self.role.as_str())
103 .bind(self.node_id.as_uuid())
104 .bind(lease_until)
105 .execute(&self.pool)
106 .await
107 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
108
109 self.is_leader.store(true, Ordering::SeqCst);
110 tracing::info!("Became {} leader", self.role.as_str());
111 }
112
113 Ok(acquired)
114 }
115
116 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
118 if !self.is_leader() {
119 return Ok(());
120 }
121
122 let lease_until =
123 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
124
125 sqlx::query(
126 r#"
127 UPDATE forge_leaders
128 SET lease_until = $3
129 WHERE role = $1 AND node_id = $2
130 "#,
131 )
132 .bind(self.role.as_str())
133 .bind(self.node_id.as_uuid())
134 .bind(lease_until)
135 .execute(&self.pool)
136 .await
137 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
138
139 Ok(())
140 }
141
142 pub async fn release_leadership(&self) -> forge_core::Result<()> {
144 if !self.is_leader() {
145 return Ok(());
146 }
147
148 sqlx::query("SELECT pg_advisory_unlock($1)")
150 .bind(self.role.lock_id())
151 .execute(&self.pool)
152 .await
153 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
154
155 sqlx::query(
157 r#"
158 DELETE FROM forge_leaders
159 WHERE role = $1 AND node_id = $2
160 "#,
161 )
162 .bind(self.role.as_str())
163 .bind(self.node_id.as_uuid())
164 .execute(&self.pool)
165 .await
166 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
167
168 self.is_leader.store(false, Ordering::SeqCst);
169 tracing::info!("Released {} leadership", self.role.as_str());
170
171 Ok(())
172 }
173
174 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
176 let result: Option<(DateTime<Utc>,)> =
177 sqlx::query_as("SELECT lease_until FROM forge_leaders WHERE role = $1")
178 .bind(self.role.as_str())
179 .fetch_optional(&self.pool)
180 .await
181 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
182
183 match result {
184 Some((lease_until,)) => Ok(lease_until > Utc::now()),
185 None => Ok(false), }
187 }
188
189 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
191 let row = sqlx::query(
192 r#"
193 SELECT role, node_id, acquired_at, lease_until
194 FROM forge_leaders
195 WHERE role = $1
196 "#,
197 )
198 .bind(self.role.as_str())
199 .fetch_optional(&self.pool)
200 .await
201 .map_err(|e| forge_core::ForgeError::Database(e.to_string()))?;
202
203 match row {
204 Some(row) => {
205 use sqlx::Row;
206 let role_str: String = row.get("role");
207 let role = role_str.parse().unwrap_or(LeaderRole::Scheduler);
208
209 Ok(Some(LeaderInfo {
210 role,
211 node_id: NodeId::from_uuid(row.get("node_id")),
212 acquired_at: row.get("acquired_at"),
213 lease_until: row.get("lease_until"),
214 }))
215 }
216 None => Ok(None),
217 }
218 }
219
220 pub async fn run(&self) {
222 let mut shutdown_rx = self.shutdown_rx.clone();
223
224 loop {
225 tokio::select! {
226 _ = tokio::time::sleep(self.config.check_interval) => {
227 if self.is_leader() {
228 if let Err(e) = self.refresh_lease().await {
230 tracing::warn!("Failed to refresh lease: {}", e);
231 }
232 } else {
233 match self.check_leader_health().await {
235 Ok(false) => {
236 if let Err(e) = self.try_become_leader().await {
238 tracing::warn!("Failed to acquire leadership: {}", e);
239 }
240 }
241 Ok(true) => {
242 }
244 Err(e) => {
245 tracing::warn!("Failed to check leader health: {}", e);
246 }
247 }
248 }
249 }
250 _ = shutdown_rx.changed() => {
251 if *shutdown_rx.borrow() {
252 tracing::info!("Leader election shutting down");
253 if let Err(e) = self.release_leadership().await {
254 tracing::warn!("Failed to release leadership: {}", e);
255 }
256 break;
257 }
258 }
259 }
260 }
261 }
262}
263
264pub struct LeaderGuard<'a> {
266 election: &'a LeaderElection,
267}
268
269impl<'a> LeaderGuard<'a> {
270 pub fn try_new(election: &'a LeaderElection) -> Option<Self> {
273 if election.is_leader() {
274 Some(Self { election })
275 } else {
276 None
277 }
278 }
279
280 pub fn is_leader(&self) -> bool {
282 self.election.is_leader()
283 }
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289
290 #[test]
291 fn test_leader_config_default() {
292 let config = LeaderConfig::default();
293 assert_eq!(config.check_interval, Duration::from_secs(5));
294 assert_eq!(config.lease_duration, Duration::from_secs(60));
295 assert_eq!(config.refresh_interval, Duration::from_secs(30));
296 }
297}