Skip to main content

forge_runtime/pg/
leader.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::Utc;
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9use crate::pg::notify_bus::PgNotifyBus;
10
11/// PG NOTIFY channel pinged when a leader voluntarily releases its slot.
12/// Payload is the role string; subscribers filter by their own role.
13pub const LEADER_RELEASED_CHANNEL: &str = "forge_leader_released";
14
15/// Leader election configuration.
16#[derive(Debug, Clone)]
17pub struct LeaderConfig {
18    /// How often standbys check leader health and leaders refresh the
19    /// `forge_leaders` lease row.
20    pub check_interval: Duration,
21    /// Lease duration. The leader must refresh before expiry or standbys
22    /// will assume the seat is vacant.
23    pub lease_duration: Duration,
24    /// How often the leader re-checks `pg_locks` to confirm it still holds
25    /// the advisory lock on its lock-owning connection. Defaults to 1s so
26    /// a long lease (60s) still detects an out-of-band lock loss within a
27    /// second instead of waiting for the next refresh tick.
28    pub lock_validate_interval: Duration,
29    /// How often a lightweight `SELECT 1` is issued on the lock-owning
30    /// connection to prevent firewalls, load-balancers, or PostgreSQL's own
31    /// `tcp_keepalives_idle` from silently terminating an idle connection
32    /// and thereby releasing the advisory lock without the process noticing.
33    /// Should be well below the shortest idle-connection timeout in the
34    /// network path (typical firewall idle timeout is 5–10 minutes; 30 s
35    /// gives a comfortable margin).  Defaults to 30 s.
36    pub keepalive_interval: Duration,
37}
38
39impl Default for LeaderConfig {
40    fn default() -> Self {
41        Self {
42            check_interval: Duration::from_secs(5),
43            lease_duration: Duration::from_secs(60),
44            lock_validate_interval: Duration::from_secs(1),
45            keepalive_interval: Duration::from_secs(30),
46        }
47    }
48}
49
50/// Leader election using PostgreSQL advisory locks.
51///
52/// Advisory locks provide a simple, reliable way to elect a leader without
53/// external coordination services. Key properties:
54///
55/// 1. **Mutual exclusion**: Only one session can hold a given lock ID at a time.
56/// 2. **Automatic release**: If the connection dies, PostgreSQL releases the lock.
57/// 3. **Non-blocking try**: `pg_try_advisory_lock` returns immediately with success/failure.
58///
59/// Each `LeaderRole` maps to a unique lock ID, allowing multiple independent
60/// leader elections (e.g., separate leaders for cron scheduler and workflow timers).
61///
62/// The `is_leader` flag uses `SeqCst` ordering because:
63/// - Multiple threads read this flag to decide whether to execute leader-only code
64/// - We need visibility guarantees across threads immediately after acquiring/releasing
65/// - The performance cost is negligible (leadership changes are rare)
66pub struct LeaderElection {
67    pool: sqlx::PgPool,
68    node_id: NodeId,
69    role: LeaderRole,
70    config: LeaderConfig,
71    /// Uses SeqCst for cross-thread visibility of leadership state changes.
72    is_leader: Arc<AtomicBool>,
73    lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
74    shutdown_tx: watch::Sender<bool>,
75    shutdown_rx: watch::Receiver<bool>,
76    /// Cached result of the last successful `pg_locks` probe. Set to `None`
77    /// when a keepalive failure invalidates the cache, forcing the next
78    /// `validate_lock_held` to actually query `pg_locks`.
79    last_lock_validated: Mutex<Option<std::time::Instant>>,
80    /// Optional notify bus used to (a) emit a NOTIFY on
81    /// `forge_leader_released` from the outgoing leader during voluntary
82    /// shutdown so standbys take over immediately instead of waiting for
83    /// `check_interval`, and (b) subscribe on standbys to wake the election
84    /// loop without polling the lease table.
85    notify_bus: Option<Arc<PgNotifyBus>>,
86}
87
88impl std::fmt::Debug for LeaderElection {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("LeaderElection")
91            .field("role", &self.role)
92            .field(
93                "is_leader",
94                &self.is_leader.load(std::sync::atomic::Ordering::Relaxed),
95            )
96            .finish_non_exhaustive()
97    }
98}
99
100impl LeaderElection {
101    pub fn new(
102        pool: sqlx::PgPool,
103        node_id: NodeId,
104        role: LeaderRole,
105        config: LeaderConfig,
106    ) -> Self {
107        let (shutdown_tx, shutdown_rx) = watch::channel(false);
108        Self {
109            pool,
110            node_id,
111            role,
112            config,
113            is_leader: Arc::new(AtomicBool::new(false)),
114            lock_connection: Arc::new(Mutex::new(None)),
115            shutdown_tx,
116            shutdown_rx,
117            last_lock_validated: Mutex::new(None),
118            notify_bus: None,
119        }
120    }
121
122    /// Attach a [`PgNotifyBus`] so this election emits NOTIFY on
123    /// `forge_leader_released` during voluntary release and subscribes to
124    /// the same channel to wake standbys without waiting for the next
125    /// `check_interval` tick.
126    pub fn with_notify_bus(mut self, bus: Arc<PgNotifyBus>) -> Self {
127        self.notify_bus = Some(bus);
128        self
129    }
130
131    pub fn is_leader(&self) -> bool {
132        self.is_leader.load(Ordering::SeqCst)
133    }
134
135    /// How often the leader validates the advisory lock is still held.
136    pub fn lock_validate_interval(&self) -> Duration {
137        self.config.lock_validate_interval
138    }
139
140    /// How often the leader refreshes its lease row in `forge_leaders`.
141    ///
142    /// Daemon runners use this cadence to call `refresh_lease()` so that
143    /// standbys see a live lease and can distinguish a running leader from a
144    /// zombie whose lease has simply expired.
145    pub fn check_interval(&self) -> Duration {
146        self.config.check_interval
147    }
148
149    pub fn stop(&self) {
150        let _ = self.shutdown_tx.send(true);
151    }
152
153    /// Try to acquire leadership.
154    ///
155    /// The advisory lock and the `forge_leaders` INSERT run on the same
156    /// connection. If that connection dies between the lock acquire and the
157    /// INSERT, PostgreSQL releases the lock and the INSERT fails together —
158    /// no torn leader rows pointing at a node that holds nothing.
159    ///
160    /// **Zombie leader preemption**: if `pg_try_advisory_lock` fails but the
161    /// current leader's lease is stale (expired), the application process that
162    /// owned the lock is presumed dead. A connection pooler may be keeping the
163    /// PG backend alive, preventing automatic lock release. In that case we
164    /// locate the lock-holding backend via `pg_locks` and call
165    /// `pg_terminate_backend()` to evict it, then retry the lock acquisition
166    /// once. `pg_terminate_backend` requires superuser or `pg_signal_backend`
167    /// role; if the call is refused or the backend is already gone, we log and
168    /// return `false` rather than erroring out — election will be retried on
169    /// the next check interval tick.
170    pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
171        if self.is_leader() {
172            return Ok(true);
173        }
174
175        let mut conn = self
176            .pool
177            .acquire()
178            .await
179            .map_err(forge_core::ForgeError::Database)?;
180
181        let mut acquired = sqlx::query_scalar!(
182            r#"SELECT pg_try_advisory_lock($1) as "acquired!""#,
183            self.role.lock_id()
184        )
185        .fetch_one(&mut *conn)
186        .await
187        .map_err(forge_core::ForgeError::Database)?;
188
189        if !acquired {
190            match self.try_preempt_zombie_leader(&mut conn).await {
191                Ok(true) => {
192                    acquired = true;
193                }
194                Ok(false) => {}
195                Err(e) => {
196                    tracing::warn!(
197                        role = self.role.as_str(),
198                        error = %e,
199                        "Zombie leader preemption attempt failed; will retry next election cycle"
200                    );
201                }
202            }
203        }
204
205        crate::cluster::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
206
207        if acquired {
208            let lease_until =
209                Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
210
211            sqlx::query!(
212                r#"
213                INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
214                VALUES ($1, $2, NOW(), $3)
215                ON CONFLICT (role) DO UPDATE SET
216                    node_id = EXCLUDED.node_id,
217                    acquired_at = NOW(),
218                    lease_until = EXCLUDED.lease_until
219                "#,
220                self.role.as_str(),
221                self.node_id.as_uuid(),
222                lease_until,
223            )
224            .execute(&mut *conn)
225            .await
226            .map_err(forge_core::ForgeError::Database)?;
227
228            self.is_leader.store(true, Ordering::SeqCst);
229            crate::cluster::metrics::set_is_leader(self.role.as_str(), true);
230            *self.lock_connection.lock().await = Some(conn);
231            tracing::info!(role = self.role.as_str(), "Acquired leadership");
232        }
233
234        Ok(acquired)
235    }
236
237    /// Attempt to preempt a zombie leader by terminating its PG backend.
238    ///
239    /// A zombie leader is one whose `forge_leaders` lease has expired but whose
240    /// PG backend is still alive (held open by a connection pooler), preventing
241    /// automatic advisory lock release.
242    ///
243    /// Steps:
244    /// 1. Check if there is a stale lease in `forge_leaders` for this role.
245    /// 2. If stale, find the backend PID holding the advisory lock via `pg_locks`.
246    /// 3. Call `pg_terminate_backend()` to evict it.
247    /// 4. Retry `pg_try_advisory_lock` once on the provided connection.
248    ///
249    /// Returns `true` only if we successfully terminated the zombie backend
250    /// **and** subsequently acquired the lock. Returns `false` if the lease is
251    /// not yet stale, no lock-holding backend is found, termination was refused
252    /// (insufficient privilege), or the retry still failed.
253    async fn try_preempt_zombie_leader(
254        &self,
255        conn: &mut sqlx::pool::PoolConnection<sqlx::Postgres>,
256    ) -> forge_core::Result<bool> {
257        let lease_expired = sqlx::query_scalar!(
258            r#"
259            SELECT EXISTS(
260                SELECT 1 FROM forge_leaders
261                WHERE role = $1
262                  AND lease_until < NOW()
263            ) AS "expired!"
264            "#,
265            self.role.as_str(),
266        )
267        .fetch_one(&mut **conn)
268        .await
269        .map_err(forge_core::ForgeError::Database)?;
270
271        if !lease_expired {
272            return Ok(false);
273        }
274
275        // pg_locks splits a single-int8 lock ID into classid (upper 32 bits)
276        // and objid (lower 32 bits); signed cast matches what PostgreSQL stores internally.
277        let lock_id = self.role.lock_id();
278        let classid = (lock_id >> 32) as i32;
279        let objid = (lock_id & 0xFFFF_FFFF) as i32;
280
281        let zombie_pid = sqlx::query_scalar!(
282            r#"
283            SELECT pid AS "pid?"
284            FROM pg_locks
285            WHERE locktype = 'advisory'
286              AND classid::int = $1
287              AND objid::int = $2
288              AND granted
289            LIMIT 1
290            "#,
291            classid,
292            objid,
293        )
294        .fetch_one(&mut **conn)
295        .await
296        .map_err(forge_core::ForgeError::Database)?;
297
298        let pid = match zombie_pid {
299            Some(p) => p,
300            None => {
301                tracing::debug!(
302                    role = self.role.as_str(),
303                    "Stale lease detected but no lock-holding backend found; \
304                     lock may have already been released"
305                );
306                return Ok(false);
307            }
308        };
309
310        // pg_terminate_backend returns false when permission is denied or the backend is already gone.
311        let terminated =
312            sqlx::query_scalar!(r#"SELECT pg_terminate_backend($1) AS "terminated!""#, pid,)
313                .fetch_one(&mut **conn)
314                .await
315                .map_err(forge_core::ForgeError::Database)?;
316
317        if !terminated {
318            tracing::warn!(
319                role = self.role.as_str(),
320                zombie_pid = pid,
321                "Could not terminate zombie leader backend; \
322                 may lack pg_signal_backend privilege or backend already exited. \
323                 Leadership acquisition blocked until the connection pooler \
324                 recycles the holding connection."
325            );
326            return Ok(false);
327        }
328
329        tracing::warn!(
330            role = self.role.as_str(),
331            zombie_pid = pid,
332            "Terminated zombie leader backend with expired lease; retrying lock acquisition"
333        );
334
335        // Yield to let PG process the termination before retrying the lock.
336        tokio::task::yield_now().await;
337
338        let acquired = sqlx::query_scalar!(
339            r#"SELECT pg_try_advisory_lock($1) AS "acquired!""#,
340            self.role.lock_id(),
341        )
342        .fetch_one(&mut **conn)
343        .await
344        .map_err(forge_core::ForgeError::Database)?;
345
346        Ok(acquired)
347    }
348
349    /// Confirm the advisory lock is still held on the lock-owning connection.
350    ///
351    /// Runs on its own cadence (`lock_validate_interval`, default 1s) so a
352    /// long lease (60s) still detects an out-of-band lock loss promptly. If
353    /// PostgreSQL released the lock (backend terminated, sqlx reconnected,
354    /// etc.) we drop leadership locally and surface an error: keeping the
355    /// lease alive without the underlying lock would risk split brain.
356    pub async fn validate_lock_held(&self) -> forge_core::Result<()> {
357        if !self.is_leader() {
358            return Ok(());
359        }
360
361        {
362            let cached = self.last_lock_validated.lock().await;
363            if let Some(last) = *cached
364                && last.elapsed() < self.config.lock_validate_interval
365            {
366                return Ok(());
367            }
368        }
369
370        let mut lock_connection = self.lock_connection.lock().await;
371        let conn = match lock_connection.as_mut() {
372            Some(conn) => conn,
373            None => {
374                drop(lock_connection);
375                self.drop_leadership_locally();
376                return Err(forge_core::ForgeError::internal(
377                    "Lock connection missing during validation; dropped leadership",
378                ));
379            }
380        };
381
382        // pg_locks splits a single-int8 advisory lock into classid (upper 32 bits)
383        // and objid (lower 32 bits), both stored as oid but exposed as int4. The
384        // signed-cast preserves the bit pattern that PostgreSQL stores internally.
385        let lock_id = self.role.lock_id();
386        let classid = (lock_id >> 32) as i32;
387        let objid = (lock_id & 0xFFFF_FFFF) as i32;
388
389        let still_held = sqlx::query_scalar!(
390            r#"
391            SELECT EXISTS(
392                SELECT 1 FROM pg_locks
393                WHERE locktype = 'advisory'
394                  AND classid::int = $1
395                  AND objid::int = $2
396                  AND pid = pg_backend_pid()
397                  AND granted
398            ) AS "held!"
399            "#,
400            classid,
401            objid,
402        )
403        .fetch_one(&mut **conn)
404        .await
405        .map_err(forge_core::ForgeError::Database)?;
406
407        if !still_held {
408            *lock_connection = None;
409            drop(lock_connection);
410            self.invalidate_lock_cache().await;
411            self.drop_leadership_locally();
412            tracing::error!(
413                role = self.role.as_str(),
414                "Advisory lock no longer held on leader connection; dropped leadership"
415            );
416            return Err(forge_core::ForgeError::internal(
417                "Advisory lock no longer held; dropped leadership",
418            ));
419        }
420
421        *self.last_lock_validated.lock().await = Some(std::time::Instant::now());
422        Ok(())
423    }
424
425    /// Send a lightweight keepalive ping on the lock-owning connection.
426    ///
427    /// Firewalls and load-balancers silently drop idle TCP connections after
428    /// their idle-timeout (commonly 5–10 minutes). PostgreSQL may do the same
429    /// via `tcp_keepalives_idle`. Either way the advisory lock is released
430    /// without the process knowing, leading to silent leadership loss between
431    /// `validate_lock_held` intervals.
432    ///
433    /// Issuing `SELECT 1` every 30 s keeps the connection active at the TCP
434    /// level and ensures PostgreSQL doesn't reclaim the backend. This is a
435    /// no-op for standbys (no lock connection) and is distinct from
436    /// `validate_lock_held`: that method verifies the lock is still held;
437    /// this method prevents the connection from going idle in the first place.
438    pub async fn keepalive(&self) -> forge_core::Result<()> {
439        if !self.is_leader() {
440            return Ok(());
441        }
442
443        let mut lock_connection = self.lock_connection.lock().await;
444        let conn = match lock_connection.as_mut() {
445            Some(conn) => conn,
446            None => return Ok(()),
447        };
448
449        use sqlx::Connection as _;
450        conn.ping()
451            .await
452            .map_err(forge_core::ForgeError::Database)?;
453
454        Ok(())
455    }
456
457    /// Refresh the leadership lease.
458    ///
459    /// Validates the advisory lock and extends `forge_leaders.lease_until`
460    /// as a single critical section: the `lock_connection` Mutex is held
461    /// across both the `pg_locks` probe and the UPDATE. That guarantees a
462    /// concurrent `try_become_leader` cannot repopulate the slot with a
463    /// different backend's connection between validate and refresh, which
464    /// would otherwise leave us extending the lease against a connection
465    /// that no longer holds the lock we just checked.
466    pub async fn refresh_lease(&self) -> forge_core::Result<()> {
467        if !self.is_leader() {
468            return Ok(());
469        }
470
471        let mut lock_connection = self.lock_connection.lock().await;
472        let conn = match lock_connection.as_mut() {
473            Some(conn) => conn,
474            None => {
475                drop(lock_connection);
476                self.drop_leadership_locally();
477                return Err(forge_core::ForgeError::internal(
478                    "Lock connection missing during lease refresh; dropped leadership",
479                ));
480            }
481        };
482
483        // Probe pg_locks on the held connection; Mutex stays locked across both
484        // the probe and the UPDATE below to prevent a concurrent try_become_leader
485        // from repopulating the slot with a different backend's connection.
486        let lock_id = self.role.lock_id();
487        let classid = (lock_id >> 32) as i32;
488        let objid = (lock_id & 0xFFFF_FFFF) as i32;
489
490        let still_held = sqlx::query_scalar!(
491            r#"
492            SELECT EXISTS(
493                SELECT 1 FROM pg_locks
494                WHERE locktype = 'advisory'
495                  AND classid::int = $1
496                  AND objid::int = $2
497                  AND pid = pg_backend_pid()
498                  AND granted
499            ) AS "held!"
500            "#,
501            classid,
502            objid,
503        )
504        .fetch_one(&mut **conn)
505        .await
506        .map_err(forge_core::ForgeError::Database)?;
507
508        if !still_held {
509            *lock_connection = None;
510            drop(lock_connection);
511            self.invalidate_lock_cache().await;
512            self.drop_leadership_locally();
513            tracing::error!(
514                role = self.role.as_str(),
515                "Advisory lock no longer held on leader connection; dropped leadership"
516            );
517            return Err(forge_core::ForgeError::internal(
518                "Advisory lock no longer held; dropped leadership",
519            ));
520        }
521
522        let lease_until =
523            Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
524
525        sqlx::query!(
526            r#"
527            UPDATE forge_leaders
528            SET lease_until = $3
529            WHERE role = $1 AND node_id = $2
530            "#,
531            self.role.as_str(),
532            self.node_id.as_uuid(),
533            lease_until,
534        )
535        .execute(&mut **conn)
536        .await
537        .map_err(forge_core::ForgeError::Database)?;
538
539        drop(lock_connection);
540        *self.last_lock_validated.lock().await = Some(std::time::Instant::now());
541
542        Ok(())
543    }
544
545    async fn invalidate_lock_cache(&self) {
546        *self.last_lock_validated.lock().await = None;
547    }
548
549    fn drop_leadership_locally(&self) {
550        self.is_leader.store(false, Ordering::SeqCst);
551        crate::cluster::metrics::set_is_leader(self.role.as_str(), false);
552    }
553
554    pub async fn release_leadership(&self) -> forge_core::Result<()> {
555        if !self.is_leader() {
556            return Ok(());
557        }
558
559        // Release the advisory lock on the same session that acquired it.
560        // pg_advisory_unlock returns true iff this session held the lock and
561        // released it. A false result means we lost the lock between acquire
562        // and release without refresh_lease catching it (PG terminated the
563        // backend, sqlx reconnected, etc.) — warn so the operator sees the
564        // miss instead of silently swallowing it. Unlike refresh_lease, this
565        // is a shutdown path: we keep going to clear the leader row and local
566        // state, since the worst case is already a no-op (split brain is
567        // resolved by the lock being gone).
568        let mut lock_connection = self.lock_connection.lock().await;
569        if let Some(mut conn) = lock_connection.take() {
570            // Emit NOTIFY before unlock so standbys wake only when the lock is
571            // genuinely about to be free. Failure is non-fatal: standbys fall
572            // back to their normal check_interval timer.
573            if let Err(e) = sqlx::query!(
574                "SELECT pg_notify($1, $2)",
575                LEADER_RELEASED_CHANNEL,
576                self.role.as_str(),
577            )
578            .execute(&mut *conn)
579            .await
580            {
581                tracing::warn!(
582                    role = self.role.as_str(),
583                    error = %e,
584                    "Failed to emit leader-released NOTIFY; standbys will wait for next check tick",
585                );
586            }
587
588            let released = sqlx::query_scalar!(
589                "SELECT pg_advisory_unlock($1) as \"released!\"",
590                self.role.lock_id()
591            )
592            .fetch_one(&mut *conn)
593            .await
594            .map_err(forge_core::ForgeError::Database)?;
595
596            if !released {
597                tracing::warn!(
598                    role = self.role.as_str(),
599                    "pg_advisory_unlock returned false during release; \
600                     lock was not held by this session"
601                );
602            }
603
604            // DELETE on the lock-owning connection so the row is gone the moment
605            // the lock is released, with no window where the lock is absent but
606            // the row still names us. WHERE node_id = $2 is safe when another node
607            // has already overwritten the row — that row is left untouched.
608            sqlx::query!(
609                r#"
610            DELETE FROM forge_leaders
611            WHERE role = $1 AND node_id = $2
612            "#,
613                self.role.as_str(),
614                self.node_id.as_uuid(),
615            )
616            .execute(&mut *conn)
617            .await
618            .map_err(forge_core::ForgeError::Database)?;
619        } else {
620            tracing::warn!(
621                role = self.role.as_str(),
622                "Leader lock connection missing during release"
623            );
624        }
625        drop(lock_connection);
626
627        self.is_leader.store(false, Ordering::SeqCst);
628        crate::cluster::metrics::set_is_leader(self.role.as_str(), false);
629        tracing::info!(role = self.role.as_str(), "Released leadership");
630
631        Ok(())
632    }
633
634    pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
635        let result = sqlx::query_scalar!(
636            "SELECT lease_until FROM forge_leaders WHERE role = $1",
637            self.role.as_str()
638        )
639        .fetch_optional(&self.pool)
640        .await
641        .map_err(forge_core::ForgeError::Database)?;
642
643        match result {
644            Some(lease_until) => Ok(lease_until > Utc::now()),
645            None => Ok(false),
646        }
647    }
648
649    pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
650        let row = sqlx::query!(
651            r#"
652            SELECT role, node_id, acquired_at, lease_until
653            FROM forge_leaders
654            WHERE role = $1
655            "#,
656            self.role.as_str(),
657        )
658        .fetch_optional(&self.pool)
659        .await
660        .map_err(forge_core::ForgeError::Database)?;
661
662        match row {
663            Some(row) => {
664                let role = row.role.parse::<LeaderRole>().map_err(|_| {
665                    forge_core::ForgeError::internal(format!(
666                        "forge_leaders row has unrecognised role string: {:?}",
667                        row.role
668                    ))
669                })?;
670
671                Ok(Some(LeaderInfo {
672                    role,
673                    node_id: NodeId::from_uuid(row.node_id),
674                    acquired_at: row.acquired_at,
675                    lease_until: row.lease_until,
676                }))
677            }
678            None => Ok(None),
679        }
680    }
681
682    /// Run the leader election loop.
683    ///
684    /// Three independent cadences:
685    /// - `lock_validate_interval` (leader only): re-check `pg_locks` to confirm
686    ///   the advisory lock is still held. Faster than `check_interval` so a
687    ///   long lease detects an out-of-band lock loss within seconds.
688    /// - `check_interval` (leader): refresh the lease row. Validates first
689    ///   inside `refresh_lease`, so the validate is idempotent with the
690    ///   faster timer above.
691    /// - `check_interval` (standby): check whether the current leader's
692    ///   lease is healthy and try to take over if not.
693    pub async fn run(&self) {
694        let mut shutdown_rx = self.shutdown_rx.clone();
695        let mut validate_timer = tokio::time::interval(self.config.lock_validate_interval);
696        validate_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
697        let mut check_timer = tokio::time::interval(self.config.check_interval);
698        check_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
699        let mut keepalive_timer = tokio::time::interval(self.config.keepalive_interval);
700        keepalive_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
701
702        // Collapses any number of NOTIFY messages into a single notify_one so
703        // a backlog doesn't queue up multiple acquisition attempts.
704        let release_wakeup = Arc::new(tokio::sync::Notify::new());
705        let release_forwarder = if let Some(bus) = self.notify_bus.as_ref()
706            && let Some(mut rx) = bus.subscribe(LEADER_RELEASED_CHANNEL)
707        {
708            let wakeup = release_wakeup.clone();
709            let role = self.role.as_str().to_string();
710            let mut forwarder_shutdown = self.shutdown_rx.clone();
711            Some(tokio::spawn(async move {
712                loop {
713                    tokio::select! {
714                        _ = forwarder_shutdown.changed() => {
715                            if *forwarder_shutdown.borrow() {
716                                return;
717                            }
718                        }
719                        result = rx.recv() => {
720                            match result {
721                                Ok(payload) => {
722                                    if payload == role {
723                                        wakeup.notify_one();
724                                    }
725                                }
726                                Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
727                                    tracing::debug!(
728                                        missed = n,
729                                        "Leader-released wakeup receiver lagged"
730                                    );
731                                    wakeup.notify_one();
732                                }
733                                Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
734                            }
735                        }
736                    }
737                }
738            }))
739        } else {
740            None
741        };
742
743        loop {
744            tokio::select! {
745                _ = validate_timer.tick() => {
746                    if let Err(e) = self.validate_lock_held().await {
747                        tracing::debug!(error = %e, "Lock validation failed");
748                    }
749                }
750                _ = keepalive_timer.tick() => {
751                    if let Err(e) = self.keepalive().await {
752                        tracing::warn!(error = %e, "Leader connection keepalive failed; validating lock");
753                        self.invalidate_lock_cache().await;
754                        if let Err(ve) = self.validate_lock_held().await {
755                            tracing::warn!(error = %ve, "Lock validation after keepalive failure dropped leadership");
756                        }
757                    }
758                }
759                _ = check_timer.tick() => {
760                    if self.is_leader() {
761                        if let Err(e) = self.refresh_lease().await {
762                            tracing::debug!(error = %e, "Failed to refresh lease");
763                        }
764                    } else {
765                        match self.check_leader_health().await {
766                            Ok(false) => {
767                                if let Err(e) = self.try_become_leader().await {
768                                    tracing::debug!(error = %e, "Failed to acquire leadership");
769                                }
770                            }
771                            Ok(true) => {}
772                            Err(e) => {
773                                tracing::debug!(error = %e, "Failed to check leader health");
774                            }
775                        }
776                    }
777                }
778                _ = release_wakeup.notified() => {
779                    if !self.is_leader()
780                        && let Err(e) = self.try_become_leader().await
781                    {
782                        tracing::debug!(error = %e, "Failed to acquire leadership after release NOTIFY");
783                    }
784                }
785                _ = shutdown_rx.changed() => {
786                    if *shutdown_rx.borrow() {
787                        tracing::debug!("Leader election shutting down");
788                        if let Err(e) = self.release_leadership().await {
789                            tracing::debug!(error = %e, "Failed to release leadership");
790                        }
791                        break;
792                    }
793                }
794            }
795        }
796
797        if let Some(handle) = release_forwarder {
798            handle.abort();
799        }
800    }
801}
802
803#[cfg(test)]
804mod tests {
805    use super::*;
806
807    #[test]
808    fn test_leader_config_default() {
809        let config = LeaderConfig::default();
810        assert_eq!(config.check_interval, Duration::from_secs(5));
811        assert_eq!(config.lease_duration, Duration::from_secs(60));
812        assert_eq!(config.lock_validate_interval, Duration::from_secs(1));
813        assert_eq!(config.keepalive_interval, Duration::from_secs(30));
814        assert!(
815            config.lock_validate_interval < config.check_interval,
816            "validate must run faster than check or it serves no purpose",
817        );
818        assert!(
819            config.keepalive_interval < Duration::from_secs(5 * 60),
820            "keepalive must fire well before typical firewall idle timeout (5 min)",
821        );
822    }
823}
824
825#[cfg(all(test, feature = "testcontainers"))]
826#[allow(
827    clippy::unwrap_used,
828    clippy::indexing_slicing,
829    clippy::panic,
830    clippy::disallowed_methods
831)]
832mod integration_tests {
833    use super::*;
834    use forge_core::testing::{IsolatedTestDb, TestDatabase};
835
836    async fn setup_db(test_name: &str) -> IsolatedTestDb {
837        let base = TestDatabase::from_env()
838            .await
839            .expect("Failed to create test database");
840        let db = base
841            .isolated(test_name)
842            .await
843            .expect("Failed to create isolated db");
844        let system_sql = crate::pg::migration::get_all_system_sql();
845        db.run_sql(&system_sql)
846            .await
847            .expect("Failed to apply system schema");
848        db
849    }
850
851    #[tokio::test]
852    async fn refresh_lease_drops_leadership_when_lock_lost() {
853        let db = setup_db("leader_refresh_lock_lost").await;
854        let election = LeaderElection::new(
855            db.pool().clone(),
856            NodeId::new(),
857            LeaderRole::Scheduler,
858            LeaderConfig::default(),
859        );
860
861        assert!(election.try_become_leader().await.unwrap());
862        assert!(election.is_leader());
863
864        // Simulate a connection-level loss of the advisory lock by manually
865        // unlocking on the same connection that holds it. This mirrors the
866        // failure mode the audit calls out (PG terminated the backend, sqlx
867        // reconnected, etc.).
868        {
869            let mut conn_guard = election.lock_connection.lock().await;
870            let conn = conn_guard.as_mut().expect("lock connection present");
871            sqlx::query_scalar!(
872                "SELECT pg_advisory_unlock($1) as \"released!\"",
873                LeaderRole::Scheduler.lock_id()
874            )
875            .fetch_one(&mut **conn)
876            .await
877            .unwrap();
878        }
879
880        let err = election.refresh_lease().await.unwrap_err();
881        assert!(matches!(err, forge_core::ForgeError::Internal { .. }));
882        assert!(!election.is_leader());
883    }
884
885    #[tokio::test]
886    async fn refresh_lease_succeeds_while_lock_held() {
887        let db = setup_db("leader_refresh_lock_held").await;
888        let election = LeaderElection::new(
889            db.pool().clone(),
890            NodeId::new(),
891            LeaderRole::Scheduler,
892            LeaderConfig::default(),
893        );
894
895        assert!(election.try_become_leader().await.unwrap());
896        for _ in 0..3 {
897            election.refresh_lease().await.expect("refresh succeeds");
898            assert!(election.is_leader());
899        }
900    }
901
902    #[tokio::test]
903    async fn try_become_leader_records_row_on_lock_connection() {
904        let db = setup_db("leader_row_atomic").await;
905        let election = LeaderElection::new(
906            db.pool().clone(),
907            NodeId::new(),
908            LeaderRole::Scheduler,
909            LeaderConfig::default(),
910        );
911
912        assert!(election.try_become_leader().await.unwrap());
913
914        let info = election
915            .get_leader()
916            .await
917            .unwrap()
918            .expect("leader row exists after acquire");
919        assert_eq!(info.role, LeaderRole::Scheduler);
920        assert_eq!(info.node_id, election.node_id);
921    }
922
923    /// release_leadership tolerates the lock having already gone away on
924    /// the held connection (e.g., a PG-side backend reset). It must still
925    /// clear local state and remove the leader row instead of erroring out
926    /// halfway through cleanup.
927    #[tokio::test]
928    async fn release_leadership_handles_lock_already_gone() {
929        let db = setup_db("leader_release_lock_gone").await;
930        let election = LeaderElection::new(
931            db.pool().clone(),
932            NodeId::new(),
933            LeaderRole::Scheduler,
934            LeaderConfig::default(),
935        );
936
937        assert!(election.try_become_leader().await.unwrap());
938
939        // Drop the lock on the held connection without going through
940        // release_leadership, simulating an out-of-band loss.
941        {
942            let mut conn_guard = election.lock_connection.lock().await;
943            let conn = conn_guard.as_mut().expect("lock connection present");
944            let released = sqlx::query_scalar!(
945                "SELECT pg_advisory_unlock($1) as \"released!\"",
946                LeaderRole::Scheduler.lock_id()
947            )
948            .fetch_one(&mut **conn)
949            .await
950            .unwrap();
951            assert!(released, "preflight unlock must succeed");
952        }
953
954        // release_leadership should not error on the second unlock returning
955        // false; it should still clear local state and the leader row.
956        election
957            .release_leadership()
958            .await
959            .expect("release path must tolerate pg_advisory_unlock returning false");
960        assert!(!election.is_leader());
961        assert!(
962            election.get_leader().await.unwrap().is_none(),
963            "leader row removed even when unlock returned false"
964        );
965    }
966
967    /// validate_lock_held detects an out-of-band lock loss and drops
968    /// leadership without touching the lease row. The separate validate
969    /// path is what lets the run loop catch a lost lock within
970    /// `lock_validate_interval` even when `check_interval` is much larger.
971    #[tokio::test]
972    async fn validate_lock_held_drops_leadership_when_lock_lost() {
973        let db = setup_db("leader_validate_lock_lost").await;
974        let election = LeaderElection::new(
975            db.pool().clone(),
976            NodeId::new(),
977            LeaderRole::Scheduler,
978            LeaderConfig::default(),
979        );
980
981        assert!(election.try_become_leader().await.unwrap());
982
983        {
984            let mut conn_guard = election.lock_connection.lock().await;
985            let conn = conn_guard.as_mut().expect("lock connection present");
986            sqlx::query_scalar!(
987                "SELECT pg_advisory_unlock($1) as \"released!\"",
988                LeaderRole::Scheduler.lock_id()
989            )
990            .fetch_one(&mut **conn)
991            .await
992            .unwrap();
993        }
994
995        let err = election.validate_lock_held().await.unwrap_err();
996        assert!(matches!(err, forge_core::ForgeError::Internal { .. }));
997        assert!(!election.is_leader());
998    }
999
1000    /// validate_lock_held is a no-op for standbys and an OK for held leaders.
1001    /// Calling it many times in a row must not require a lease refresh.
1002    #[tokio::test]
1003    async fn validate_lock_held_is_idempotent_when_held() {
1004        let db = setup_db("leader_validate_idempotent").await;
1005        let election = LeaderElection::new(
1006            db.pool().clone(),
1007            NodeId::new(),
1008            LeaderRole::Scheduler,
1009            LeaderConfig::default(),
1010        );
1011
1012        // Standby case: no error, no state change.
1013        election
1014            .validate_lock_held()
1015            .await
1016            .expect("standby validate must be a no-op");
1017        assert!(!election.is_leader());
1018
1019        // Leader case: many validates between lease refreshes.
1020        assert!(election.try_become_leader().await.unwrap());
1021        for _ in 0..5 {
1022            election
1023                .validate_lock_held()
1024                .await
1025                .expect("validate must succeed while lock held");
1026            assert!(election.is_leader());
1027        }
1028    }
1029
1030    /// try_become_leader skips zombie preemption when the lease is still valid.
1031    ///
1032    /// If another node holds the lock and its lease is current (not expired),
1033    /// we must not attempt termination — that would be a hostile preemption of
1034    /// a healthy leader. `try_become_leader` must return `false` without issuing
1035    /// any `pg_terminate_backend` call.
1036    #[tokio::test]
1037    async fn try_become_leader_does_not_preempt_healthy_leader() {
1038        let db = setup_db("leader_no_preempt_healthy").await;
1039
1040        // Node A acquires leadership.
1041        let leader = LeaderElection::new(
1042            db.pool().clone(),
1043            NodeId::new(),
1044            LeaderRole::Scheduler,
1045            LeaderConfig::default(),
1046        );
1047        assert!(leader.try_become_leader().await.unwrap());
1048
1049        // Node B tries to acquire but must not succeed — leader A is healthy.
1050        let standby = LeaderElection::new(
1051            db.pool().clone(),
1052            NodeId::new(),
1053            LeaderRole::Scheduler,
1054            LeaderConfig::default(),
1055        );
1056        let got = standby.try_become_leader().await.unwrap();
1057        assert!(!got, "standby must not preempt a healthy leader");
1058        assert!(!standby.is_leader());
1059
1060        // A is still leader.
1061        assert!(leader.is_leader());
1062    }
1063
1064    /// try_become_leader acquires leadership after preempting a zombie.
1065    ///
1066    /// Simulates a zombie leader: the `forge_leaders` lease is expired (the
1067    /// application process died without refreshing), but the PG backend that
1068    /// holds the advisory lock is still alive (connection pooler scenario).
1069    /// After the standby calls `try_become_leader`, it must terminate the
1070    /// zombie backend and take over.
1071    #[tokio::test]
1072    async fn try_become_leader_preempts_zombie_with_expired_lease() {
1073        let db = setup_db("leader_preempt_zombie").await;
1074
1075        // Acquire leadership normally.
1076        let zombie = LeaderElection::new(
1077            db.pool().clone(),
1078            NodeId::new(),
1079            LeaderRole::Scheduler,
1080            LeaderConfig::default(),
1081        );
1082        assert!(zombie.try_become_leader().await.unwrap());
1083        assert!(zombie.is_leader());
1084
1085        // Artificially expire the lease so standbys see a stale leader.
1086        #[allow(clippy::disallowed_methods)]
1087        sqlx::query(
1088            "UPDATE forge_leaders SET lease_until = NOW() - INTERVAL '1 second' WHERE role = $1",
1089        )
1090        .bind(LeaderRole::Scheduler.as_str())
1091        .execute(db.pool())
1092        .await
1093        .unwrap();
1094
1095        // The zombie's lock-holding connection is still alive (we haven't
1096        // dropped it), simulating a connection-pooler-kept backend.
1097        //
1098        // A standby now tries to acquire. It should detect the expired lease,
1099        // find the lock-holding PID, terminate it, and acquire the lock.
1100        let standby = LeaderElection::new(
1101            db.pool().clone(),
1102            NodeId::new(),
1103            LeaderRole::Scheduler,
1104            LeaderConfig::default(),
1105        );
1106        let got = standby.try_become_leader().await.unwrap();
1107        assert!(
1108            got,
1109            "standby must take over after terminating zombie backend"
1110        );
1111        assert!(standby.is_leader());
1112    }
1113}