1use std::sync::Arc;
2use std::sync::atomic::{AtomicBool, Ordering};
3use std::time::Duration;
4
5use chrono::Utc;
6use forge_core::cluster::{LeaderInfo, LeaderRole, NodeId};
7use tokio::sync::{Mutex, watch};
8
9use crate::pg::notify_bus::PgNotifyBus;
10
11pub const LEADER_RELEASED_CHANNEL: &str = "forge_leader_released";
14
15#[derive(Debug, Clone)]
17pub struct LeaderConfig {
18 pub check_interval: Duration,
21 pub lease_duration: Duration,
24 pub lock_validate_interval: Duration,
29 pub keepalive_interval: Duration,
37}
38
39impl Default for LeaderConfig {
40 fn default() -> Self {
41 Self {
42 check_interval: Duration::from_secs(5),
43 lease_duration: Duration::from_secs(60),
44 lock_validate_interval: Duration::from_secs(1),
45 keepalive_interval: Duration::from_secs(30),
46 }
47 }
48}
49
50pub struct LeaderElection {
67 pool: sqlx::PgPool,
68 node_id: NodeId,
69 role: LeaderRole,
70 config: LeaderConfig,
71 is_leader: Arc<AtomicBool>,
73 lock_connection: Arc<Mutex<Option<sqlx::pool::PoolConnection<sqlx::Postgres>>>>,
74 shutdown_tx: watch::Sender<bool>,
75 shutdown_rx: watch::Receiver<bool>,
76 last_lock_validated: Mutex<Option<std::time::Instant>>,
80 notify_bus: Option<Arc<PgNotifyBus>>,
86}
87
88impl std::fmt::Debug for LeaderElection {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("LeaderElection")
91 .field("role", &self.role)
92 .field(
93 "is_leader",
94 &self.is_leader.load(std::sync::atomic::Ordering::Relaxed),
95 )
96 .finish_non_exhaustive()
97 }
98}
99
100impl LeaderElection {
101 pub fn new(
102 pool: sqlx::PgPool,
103 node_id: NodeId,
104 role: LeaderRole,
105 config: LeaderConfig,
106 ) -> Self {
107 let (shutdown_tx, shutdown_rx) = watch::channel(false);
108 Self {
109 pool,
110 node_id,
111 role,
112 config,
113 is_leader: Arc::new(AtomicBool::new(false)),
114 lock_connection: Arc::new(Mutex::new(None)),
115 shutdown_tx,
116 shutdown_rx,
117 last_lock_validated: Mutex::new(None),
118 notify_bus: None,
119 }
120 }
121
122 pub fn with_notify_bus(mut self, bus: Arc<PgNotifyBus>) -> Self {
127 self.notify_bus = Some(bus);
128 self
129 }
130
131 pub fn is_leader(&self) -> bool {
132 self.is_leader.load(Ordering::SeqCst)
133 }
134
135 pub fn lock_validate_interval(&self) -> Duration {
137 self.config.lock_validate_interval
138 }
139
140 pub fn check_interval(&self) -> Duration {
146 self.config.check_interval
147 }
148
149 pub fn stop(&self) {
150 let _ = self.shutdown_tx.send(true);
151 }
152
153 pub async fn try_become_leader(&self) -> forge_core::Result<bool> {
171 if self.is_leader() {
172 return Ok(true);
173 }
174
175 let mut conn = self
176 .pool
177 .acquire()
178 .await
179 .map_err(forge_core::ForgeError::Database)?;
180
181 let mut acquired = sqlx::query_scalar!(
182 r#"SELECT pg_try_advisory_lock($1) as "acquired!""#,
183 self.role.lock_id()
184 )
185 .fetch_one(&mut *conn)
186 .await
187 .map_err(forge_core::ForgeError::Database)?;
188
189 if !acquired {
190 match self.try_preempt_zombie_leader(&mut conn).await {
191 Ok(true) => {
192 acquired = true;
193 }
194 Ok(false) => {}
195 Err(e) => {
196 tracing::warn!(
197 role = self.role.as_str(),
198 error = %e,
199 "Zombie leader preemption attempt failed; will retry next election cycle"
200 );
201 }
202 }
203 }
204
205 crate::cluster::metrics::record_leader_election_attempt(self.role.as_str(), acquired);
206
207 if acquired {
208 let lease_until =
209 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
210
211 sqlx::query!(
212 r#"
213 INSERT INTO forge_leaders (role, node_id, acquired_at, lease_until)
214 VALUES ($1, $2, NOW(), $3)
215 ON CONFLICT (role) DO UPDATE SET
216 node_id = EXCLUDED.node_id,
217 acquired_at = NOW(),
218 lease_until = EXCLUDED.lease_until
219 "#,
220 self.role.as_str(),
221 self.node_id.as_uuid(),
222 lease_until,
223 )
224 .execute(&mut *conn)
225 .await
226 .map_err(forge_core::ForgeError::Database)?;
227
228 self.is_leader.store(true, Ordering::SeqCst);
229 crate::cluster::metrics::set_is_leader(self.role.as_str(), true);
230 *self.lock_connection.lock().await = Some(conn);
231 tracing::info!(role = self.role.as_str(), "Acquired leadership");
232 }
233
234 Ok(acquired)
235 }
236
237 async fn try_preempt_zombie_leader(
254 &self,
255 conn: &mut sqlx::pool::PoolConnection<sqlx::Postgres>,
256 ) -> forge_core::Result<bool> {
257 let lease_expired = sqlx::query_scalar!(
258 r#"
259 SELECT EXISTS(
260 SELECT 1 FROM forge_leaders
261 WHERE role = $1
262 AND lease_until < NOW()
263 ) AS "expired!"
264 "#,
265 self.role.as_str(),
266 )
267 .fetch_one(&mut **conn)
268 .await
269 .map_err(forge_core::ForgeError::Database)?;
270
271 if !lease_expired {
272 return Ok(false);
273 }
274
275 let lock_id = self.role.lock_id();
278 let classid = (lock_id >> 32) as i32;
279 let objid = (lock_id & 0xFFFF_FFFF) as i32;
280
281 let zombie_pid = sqlx::query_scalar!(
282 r#"
283 SELECT pid AS "pid?"
284 FROM pg_locks
285 WHERE locktype = 'advisory'
286 AND classid::int = $1
287 AND objid::int = $2
288 AND granted
289 LIMIT 1
290 "#,
291 classid,
292 objid,
293 )
294 .fetch_one(&mut **conn)
295 .await
296 .map_err(forge_core::ForgeError::Database)?;
297
298 let pid = match zombie_pid {
299 Some(p) => p,
300 None => {
301 tracing::debug!(
302 role = self.role.as_str(),
303 "Stale lease detected but no lock-holding backend found; \
304 lock may have already been released"
305 );
306 return Ok(false);
307 }
308 };
309
310 let terminated =
312 sqlx::query_scalar!(r#"SELECT pg_terminate_backend($1) AS "terminated!""#, pid,)
313 .fetch_one(&mut **conn)
314 .await
315 .map_err(forge_core::ForgeError::Database)?;
316
317 if !terminated {
318 tracing::warn!(
319 role = self.role.as_str(),
320 zombie_pid = pid,
321 "Could not terminate zombie leader backend; \
322 may lack pg_signal_backend privilege or backend already exited. \
323 Leadership acquisition blocked until the connection pooler \
324 recycles the holding connection."
325 );
326 return Ok(false);
327 }
328
329 tracing::warn!(
330 role = self.role.as_str(),
331 zombie_pid = pid,
332 "Terminated zombie leader backend with expired lease; retrying lock acquisition"
333 );
334
335 tokio::task::yield_now().await;
337
338 let acquired = sqlx::query_scalar!(
339 r#"SELECT pg_try_advisory_lock($1) AS "acquired!""#,
340 self.role.lock_id(),
341 )
342 .fetch_one(&mut **conn)
343 .await
344 .map_err(forge_core::ForgeError::Database)?;
345
346 Ok(acquired)
347 }
348
349 pub async fn validate_lock_held(&self) -> forge_core::Result<()> {
357 if !self.is_leader() {
358 return Ok(());
359 }
360
361 {
362 let cached = self.last_lock_validated.lock().await;
363 if let Some(last) = *cached
364 && last.elapsed() < self.config.lock_validate_interval
365 {
366 return Ok(());
367 }
368 }
369
370 let mut lock_connection = self.lock_connection.lock().await;
371 let conn = match lock_connection.as_mut() {
372 Some(conn) => conn,
373 None => {
374 drop(lock_connection);
375 self.drop_leadership_locally();
376 return Err(forge_core::ForgeError::internal(
377 "Lock connection missing during validation; dropped leadership",
378 ));
379 }
380 };
381
382 let lock_id = self.role.lock_id();
386 let classid = (lock_id >> 32) as i32;
387 let objid = (lock_id & 0xFFFF_FFFF) as i32;
388
389 let still_held = sqlx::query_scalar!(
390 r#"
391 SELECT EXISTS(
392 SELECT 1 FROM pg_locks
393 WHERE locktype = 'advisory'
394 AND classid::int = $1
395 AND objid::int = $2
396 AND pid = pg_backend_pid()
397 AND granted
398 ) AS "held!"
399 "#,
400 classid,
401 objid,
402 )
403 .fetch_one(&mut **conn)
404 .await
405 .map_err(forge_core::ForgeError::Database)?;
406
407 if !still_held {
408 *lock_connection = None;
409 drop(lock_connection);
410 self.invalidate_lock_cache().await;
411 self.drop_leadership_locally();
412 tracing::error!(
413 role = self.role.as_str(),
414 "Advisory lock no longer held on leader connection; dropped leadership"
415 );
416 return Err(forge_core::ForgeError::internal(
417 "Advisory lock no longer held; dropped leadership",
418 ));
419 }
420
421 *self.last_lock_validated.lock().await = Some(std::time::Instant::now());
422 Ok(())
423 }
424
425 pub async fn keepalive(&self) -> forge_core::Result<()> {
439 if !self.is_leader() {
440 return Ok(());
441 }
442
443 let mut lock_connection = self.lock_connection.lock().await;
444 let conn = match lock_connection.as_mut() {
445 Some(conn) => conn,
446 None => return Ok(()),
447 };
448
449 use sqlx::Connection as _;
450 conn.ping()
451 .await
452 .map_err(forge_core::ForgeError::Database)?;
453
454 Ok(())
455 }
456
457 pub async fn refresh_lease(&self) -> forge_core::Result<()> {
467 if !self.is_leader() {
468 return Ok(());
469 }
470
471 let mut lock_connection = self.lock_connection.lock().await;
472 let conn = match lock_connection.as_mut() {
473 Some(conn) => conn,
474 None => {
475 drop(lock_connection);
476 self.drop_leadership_locally();
477 return Err(forge_core::ForgeError::internal(
478 "Lock connection missing during lease refresh; dropped leadership",
479 ));
480 }
481 };
482
483 let lock_id = self.role.lock_id();
487 let classid = (lock_id >> 32) as i32;
488 let objid = (lock_id & 0xFFFF_FFFF) as i32;
489
490 let still_held = sqlx::query_scalar!(
491 r#"
492 SELECT EXISTS(
493 SELECT 1 FROM pg_locks
494 WHERE locktype = 'advisory'
495 AND classid::int = $1
496 AND objid::int = $2
497 AND pid = pg_backend_pid()
498 AND granted
499 ) AS "held!"
500 "#,
501 classid,
502 objid,
503 )
504 .fetch_one(&mut **conn)
505 .await
506 .map_err(forge_core::ForgeError::Database)?;
507
508 if !still_held {
509 *lock_connection = None;
510 drop(lock_connection);
511 self.invalidate_lock_cache().await;
512 self.drop_leadership_locally();
513 tracing::error!(
514 role = self.role.as_str(),
515 "Advisory lock no longer held on leader connection; dropped leadership"
516 );
517 return Err(forge_core::ForgeError::internal(
518 "Advisory lock no longer held; dropped leadership",
519 ));
520 }
521
522 let lease_until =
523 Utc::now() + chrono::Duration::seconds(self.config.lease_duration.as_secs() as i64);
524
525 sqlx::query!(
526 r#"
527 UPDATE forge_leaders
528 SET lease_until = $3
529 WHERE role = $1 AND node_id = $2
530 "#,
531 self.role.as_str(),
532 self.node_id.as_uuid(),
533 lease_until,
534 )
535 .execute(&mut **conn)
536 .await
537 .map_err(forge_core::ForgeError::Database)?;
538
539 drop(lock_connection);
540 *self.last_lock_validated.lock().await = Some(std::time::Instant::now());
541
542 Ok(())
543 }
544
545 async fn invalidate_lock_cache(&self) {
546 *self.last_lock_validated.lock().await = None;
547 }
548
549 fn drop_leadership_locally(&self) {
550 self.is_leader.store(false, Ordering::SeqCst);
551 crate::cluster::metrics::set_is_leader(self.role.as_str(), false);
552 }
553
554 pub async fn release_leadership(&self) -> forge_core::Result<()> {
555 if !self.is_leader() {
556 return Ok(());
557 }
558
559 let mut lock_connection = self.lock_connection.lock().await;
569 if let Some(mut conn) = lock_connection.take() {
570 if let Err(e) = sqlx::query!(
574 "SELECT pg_notify($1, $2)",
575 LEADER_RELEASED_CHANNEL,
576 self.role.as_str(),
577 )
578 .execute(&mut *conn)
579 .await
580 {
581 tracing::warn!(
582 role = self.role.as_str(),
583 error = %e,
584 "Failed to emit leader-released NOTIFY; standbys will wait for next check tick",
585 );
586 }
587
588 let released = sqlx::query_scalar!(
589 "SELECT pg_advisory_unlock($1) as \"released!\"",
590 self.role.lock_id()
591 )
592 .fetch_one(&mut *conn)
593 .await
594 .map_err(forge_core::ForgeError::Database)?;
595
596 if !released {
597 tracing::warn!(
598 role = self.role.as_str(),
599 "pg_advisory_unlock returned false during release; \
600 lock was not held by this session"
601 );
602 }
603
604 sqlx::query!(
609 r#"
610 DELETE FROM forge_leaders
611 WHERE role = $1 AND node_id = $2
612 "#,
613 self.role.as_str(),
614 self.node_id.as_uuid(),
615 )
616 .execute(&mut *conn)
617 .await
618 .map_err(forge_core::ForgeError::Database)?;
619 } else {
620 tracing::warn!(
621 role = self.role.as_str(),
622 "Leader lock connection missing during release"
623 );
624 }
625 drop(lock_connection);
626
627 self.is_leader.store(false, Ordering::SeqCst);
628 crate::cluster::metrics::set_is_leader(self.role.as_str(), false);
629 tracing::info!(role = self.role.as_str(), "Released leadership");
630
631 Ok(())
632 }
633
634 pub async fn check_leader_health(&self) -> forge_core::Result<bool> {
635 let result = sqlx::query_scalar!(
636 "SELECT lease_until FROM forge_leaders WHERE role = $1",
637 self.role.as_str()
638 )
639 .fetch_optional(&self.pool)
640 .await
641 .map_err(forge_core::ForgeError::Database)?;
642
643 match result {
644 Some(lease_until) => Ok(lease_until > Utc::now()),
645 None => Ok(false),
646 }
647 }
648
649 pub async fn get_leader(&self) -> forge_core::Result<Option<LeaderInfo>> {
650 let row = sqlx::query!(
651 r#"
652 SELECT role, node_id, acquired_at, lease_until
653 FROM forge_leaders
654 WHERE role = $1
655 "#,
656 self.role.as_str(),
657 )
658 .fetch_optional(&self.pool)
659 .await
660 .map_err(forge_core::ForgeError::Database)?;
661
662 match row {
663 Some(row) => {
664 let role = row.role.parse::<LeaderRole>().map_err(|_| {
665 forge_core::ForgeError::internal(format!(
666 "forge_leaders row has unrecognised role string: {:?}",
667 row.role
668 ))
669 })?;
670
671 Ok(Some(LeaderInfo {
672 role,
673 node_id: NodeId::from_uuid(row.node_id),
674 acquired_at: row.acquired_at,
675 lease_until: row.lease_until,
676 }))
677 }
678 None => Ok(None),
679 }
680 }
681
682 pub async fn run(&self) {
694 let mut shutdown_rx = self.shutdown_rx.clone();
695 let mut validate_timer = tokio::time::interval(self.config.lock_validate_interval);
696 validate_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
697 let mut check_timer = tokio::time::interval(self.config.check_interval);
698 check_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
699 let mut keepalive_timer = tokio::time::interval(self.config.keepalive_interval);
700 keepalive_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
701
702 let release_wakeup = Arc::new(tokio::sync::Notify::new());
705 let release_forwarder = if let Some(bus) = self.notify_bus.as_ref()
706 && let Some(mut rx) = bus.subscribe(LEADER_RELEASED_CHANNEL)
707 {
708 let wakeup = release_wakeup.clone();
709 let role = self.role.as_str().to_string();
710 let mut forwarder_shutdown = self.shutdown_rx.clone();
711 Some(tokio::spawn(async move {
712 loop {
713 tokio::select! {
714 _ = forwarder_shutdown.changed() => {
715 if *forwarder_shutdown.borrow() {
716 return;
717 }
718 }
719 result = rx.recv() => {
720 match result {
721 Ok(payload) => {
722 if payload == role {
723 wakeup.notify_one();
724 }
725 }
726 Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
727 tracing::debug!(
728 missed = n,
729 "Leader-released wakeup receiver lagged"
730 );
731 wakeup.notify_one();
732 }
733 Err(tokio::sync::broadcast::error::RecvError::Closed) => return,
734 }
735 }
736 }
737 }
738 }))
739 } else {
740 None
741 };
742
743 loop {
744 tokio::select! {
745 _ = validate_timer.tick() => {
746 if let Err(e) = self.validate_lock_held().await {
747 tracing::debug!(error = %e, "Lock validation failed");
748 }
749 }
750 _ = keepalive_timer.tick() => {
751 if let Err(e) = self.keepalive().await {
752 tracing::warn!(error = %e, "Leader connection keepalive failed; validating lock");
753 self.invalidate_lock_cache().await;
754 if let Err(ve) = self.validate_lock_held().await {
755 tracing::warn!(error = %ve, "Lock validation after keepalive failure dropped leadership");
756 }
757 }
758 }
759 _ = check_timer.tick() => {
760 if self.is_leader() {
761 if let Err(e) = self.refresh_lease().await {
762 tracing::debug!(error = %e, "Failed to refresh lease");
763 }
764 } else {
765 match self.check_leader_health().await {
766 Ok(false) => {
767 if let Err(e) = self.try_become_leader().await {
768 tracing::debug!(error = %e, "Failed to acquire leadership");
769 }
770 }
771 Ok(true) => {}
772 Err(e) => {
773 tracing::debug!(error = %e, "Failed to check leader health");
774 }
775 }
776 }
777 }
778 _ = release_wakeup.notified() => {
779 if !self.is_leader()
780 && let Err(e) = self.try_become_leader().await
781 {
782 tracing::debug!(error = %e, "Failed to acquire leadership after release NOTIFY");
783 }
784 }
785 _ = shutdown_rx.changed() => {
786 if *shutdown_rx.borrow() {
787 tracing::debug!("Leader election shutting down");
788 if let Err(e) = self.release_leadership().await {
789 tracing::debug!(error = %e, "Failed to release leadership");
790 }
791 break;
792 }
793 }
794 }
795 }
796
797 if let Some(handle) = release_forwarder {
798 handle.abort();
799 }
800 }
801}
802
803#[cfg(test)]
804mod tests {
805 use super::*;
806
807 #[test]
808 fn test_leader_config_default() {
809 let config = LeaderConfig::default();
810 assert_eq!(config.check_interval, Duration::from_secs(5));
811 assert_eq!(config.lease_duration, Duration::from_secs(60));
812 assert_eq!(config.lock_validate_interval, Duration::from_secs(1));
813 assert_eq!(config.keepalive_interval, Duration::from_secs(30));
814 assert!(
815 config.lock_validate_interval < config.check_interval,
816 "validate must run faster than check or it serves no purpose",
817 );
818 assert!(
819 config.keepalive_interval < Duration::from_secs(5 * 60),
820 "keepalive must fire well before typical firewall idle timeout (5 min)",
821 );
822 }
823}
824
825#[cfg(all(test, feature = "testcontainers"))]
826#[allow(
827 clippy::unwrap_used,
828 clippy::indexing_slicing,
829 clippy::panic,
830 clippy::disallowed_methods
831)]
832mod integration_tests {
833 use super::*;
834 use forge_core::testing::{IsolatedTestDb, TestDatabase};
835
836 async fn setup_db(test_name: &str) -> IsolatedTestDb {
837 let base = TestDatabase::from_env()
838 .await
839 .expect("Failed to create test database");
840 let db = base
841 .isolated(test_name)
842 .await
843 .expect("Failed to create isolated db");
844 let system_sql = crate::pg::migration::get_all_system_sql();
845 db.run_sql(&system_sql)
846 .await
847 .expect("Failed to apply system schema");
848 db
849 }
850
851 #[tokio::test]
852 async fn refresh_lease_drops_leadership_when_lock_lost() {
853 let db = setup_db("leader_refresh_lock_lost").await;
854 let election = LeaderElection::new(
855 db.pool().clone(),
856 NodeId::new(),
857 LeaderRole::Scheduler,
858 LeaderConfig::default(),
859 );
860
861 assert!(election.try_become_leader().await.unwrap());
862 assert!(election.is_leader());
863
864 {
869 let mut conn_guard = election.lock_connection.lock().await;
870 let conn = conn_guard.as_mut().expect("lock connection present");
871 sqlx::query_scalar!(
872 "SELECT pg_advisory_unlock($1) as \"released!\"",
873 LeaderRole::Scheduler.lock_id()
874 )
875 .fetch_one(&mut **conn)
876 .await
877 .unwrap();
878 }
879
880 let err = election.refresh_lease().await.unwrap_err();
881 assert!(matches!(err, forge_core::ForgeError::Internal { .. }));
882 assert!(!election.is_leader());
883 }
884
885 #[tokio::test]
886 async fn refresh_lease_succeeds_while_lock_held() {
887 let db = setup_db("leader_refresh_lock_held").await;
888 let election = LeaderElection::new(
889 db.pool().clone(),
890 NodeId::new(),
891 LeaderRole::Scheduler,
892 LeaderConfig::default(),
893 );
894
895 assert!(election.try_become_leader().await.unwrap());
896 for _ in 0..3 {
897 election.refresh_lease().await.expect("refresh succeeds");
898 assert!(election.is_leader());
899 }
900 }
901
902 #[tokio::test]
903 async fn try_become_leader_records_row_on_lock_connection() {
904 let db = setup_db("leader_row_atomic").await;
905 let election = LeaderElection::new(
906 db.pool().clone(),
907 NodeId::new(),
908 LeaderRole::Scheduler,
909 LeaderConfig::default(),
910 );
911
912 assert!(election.try_become_leader().await.unwrap());
913
914 let info = election
915 .get_leader()
916 .await
917 .unwrap()
918 .expect("leader row exists after acquire");
919 assert_eq!(info.role, LeaderRole::Scheduler);
920 assert_eq!(info.node_id, election.node_id);
921 }
922
923 #[tokio::test]
928 async fn release_leadership_handles_lock_already_gone() {
929 let db = setup_db("leader_release_lock_gone").await;
930 let election = LeaderElection::new(
931 db.pool().clone(),
932 NodeId::new(),
933 LeaderRole::Scheduler,
934 LeaderConfig::default(),
935 );
936
937 assert!(election.try_become_leader().await.unwrap());
938
939 {
942 let mut conn_guard = election.lock_connection.lock().await;
943 let conn = conn_guard.as_mut().expect("lock connection present");
944 let released = sqlx::query_scalar!(
945 "SELECT pg_advisory_unlock($1) as \"released!\"",
946 LeaderRole::Scheduler.lock_id()
947 )
948 .fetch_one(&mut **conn)
949 .await
950 .unwrap();
951 assert!(released, "preflight unlock must succeed");
952 }
953
954 election
957 .release_leadership()
958 .await
959 .expect("release path must tolerate pg_advisory_unlock returning false");
960 assert!(!election.is_leader());
961 assert!(
962 election.get_leader().await.unwrap().is_none(),
963 "leader row removed even when unlock returned false"
964 );
965 }
966
967 #[tokio::test]
972 async fn validate_lock_held_drops_leadership_when_lock_lost() {
973 let db = setup_db("leader_validate_lock_lost").await;
974 let election = LeaderElection::new(
975 db.pool().clone(),
976 NodeId::new(),
977 LeaderRole::Scheduler,
978 LeaderConfig::default(),
979 );
980
981 assert!(election.try_become_leader().await.unwrap());
982
983 {
984 let mut conn_guard = election.lock_connection.lock().await;
985 let conn = conn_guard.as_mut().expect("lock connection present");
986 sqlx::query_scalar!(
987 "SELECT pg_advisory_unlock($1) as \"released!\"",
988 LeaderRole::Scheduler.lock_id()
989 )
990 .fetch_one(&mut **conn)
991 .await
992 .unwrap();
993 }
994
995 let err = election.validate_lock_held().await.unwrap_err();
996 assert!(matches!(err, forge_core::ForgeError::Internal { .. }));
997 assert!(!election.is_leader());
998 }
999
1000 #[tokio::test]
1003 async fn validate_lock_held_is_idempotent_when_held() {
1004 let db = setup_db("leader_validate_idempotent").await;
1005 let election = LeaderElection::new(
1006 db.pool().clone(),
1007 NodeId::new(),
1008 LeaderRole::Scheduler,
1009 LeaderConfig::default(),
1010 );
1011
1012 election
1014 .validate_lock_held()
1015 .await
1016 .expect("standby validate must be a no-op");
1017 assert!(!election.is_leader());
1018
1019 assert!(election.try_become_leader().await.unwrap());
1021 for _ in 0..5 {
1022 election
1023 .validate_lock_held()
1024 .await
1025 .expect("validate must succeed while lock held");
1026 assert!(election.is_leader());
1027 }
1028 }
1029
1030 #[tokio::test]
1037 async fn try_become_leader_does_not_preempt_healthy_leader() {
1038 let db = setup_db("leader_no_preempt_healthy").await;
1039
1040 let leader = LeaderElection::new(
1042 db.pool().clone(),
1043 NodeId::new(),
1044 LeaderRole::Scheduler,
1045 LeaderConfig::default(),
1046 );
1047 assert!(leader.try_become_leader().await.unwrap());
1048
1049 let standby = LeaderElection::new(
1051 db.pool().clone(),
1052 NodeId::new(),
1053 LeaderRole::Scheduler,
1054 LeaderConfig::default(),
1055 );
1056 let got = standby.try_become_leader().await.unwrap();
1057 assert!(!got, "standby must not preempt a healthy leader");
1058 assert!(!standby.is_leader());
1059
1060 assert!(leader.is_leader());
1062 }
1063
1064 #[tokio::test]
1072 async fn try_become_leader_preempts_zombie_with_expired_lease() {
1073 let db = setup_db("leader_preempt_zombie").await;
1074
1075 let zombie = LeaderElection::new(
1077 db.pool().clone(),
1078 NodeId::new(),
1079 LeaderRole::Scheduler,
1080 LeaderConfig::default(),
1081 );
1082 assert!(zombie.try_become_leader().await.unwrap());
1083 assert!(zombie.is_leader());
1084
1085 #[allow(clippy::disallowed_methods)]
1087 sqlx::query(
1088 "UPDATE forge_leaders SET lease_until = NOW() - INTERVAL '1 second' WHERE role = $1",
1089 )
1090 .bind(LeaderRole::Scheduler.as_str())
1091 .execute(db.pool())
1092 .await
1093 .unwrap();
1094
1095 let standby = LeaderElection::new(
1101 db.pool().clone(),
1102 NodeId::new(),
1103 LeaderRole::Scheduler,
1104 LeaderConfig::default(),
1105 );
1106 let got = standby.try_become_leader().await.unwrap();
1107 assert!(
1108 got,
1109 "standby must take over after terminating zombie backend"
1110 );
1111 assert!(standby.is_leader());
1112 }
1113}