1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
//! Leader election using PostgreSQL advisory locks for multi-instance deployments.
use crate::config;
use crate::metrics::errors::component::LEADER_ELECTION;
use sqlx::PgPool;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, instrument};
/// Background task for leader election
/// Runs periodically to maintain leadership or attempt to acquire it
///
/// We use leadership election for figuring out who runs background tasks like sending probes to
/// the endpoints. At some point, we may want to expand this to other tasks as well.
///
/// PostgreSQL advisory locks are session-based, so we need to maintain a dedicated connection
/// for the entire duration we want to hold the lock.
#[instrument(skip(pool, config, lock_id, shutdown_token, on_gain_leadership, on_lose_leadership))]
pub async fn leader_election_task<F1, F2, Fut1, Fut2>(
pool: PgPool,
config: config::Config,
is_leader: Arc<AtomicBool>,
lock_id: i64,
shutdown_token: CancellationToken,
on_gain_leadership: F1,
on_lose_leadership: F2,
) where
F1: Fn(PgPool, config::Config) -> Fut1 + Send + 'static,
F2: Fn(PgPool, config::Config) -> Fut2 + Send + 'static,
Fut1: std::future::Future<Output = Result<(), anyhow::Error>> + Send + 'static,
Fut2: std::future::Future<Output = Result<(), anyhow::Error>> + Send + 'static,
{
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
let mut leader_conn: Option<sqlx::pool::PoolConnection<sqlx::Postgres>> = None;
loop {
tokio::select! {
_ = interval.tick() => {}
_ = shutdown_token.cancelled() => {
info!("Shutdown signal received, stopping leader election");
// If we're currently the leader, call the lose leadership callback
if is_leader.load(Ordering::Relaxed) {
is_leader.store(false, Ordering::Relaxed);
if let Err(e) = on_lose_leadership(pool.clone(), config.clone()).await {
crate::background_error!(LEADER_ELECTION, "lose_callback", Error, "Failed to execute on_lose_leadership callback during shutdown: {}", e);
}
}
break;
}
}
let current_status = is_leader.load(Ordering::Relaxed);
// If we're not leader, try to acquire the lock
if !current_status {
// Try to acquire a connection and the lock
match pool.acquire().await {
Ok(mut conn) => {
match sqlx::query_scalar::<_, bool>("SELECT pg_try_advisory_lock($1)")
.bind(lock_id)
.fetch_one(&mut *conn)
.await
{
Ok(true) => {
// Successfully acquired lock!
info!("Gained leadership");
is_leader.store(true, Ordering::Relaxed);
leader_conn = Some(conn); // Keep connection alive
if let Err(e) = on_gain_leadership(pool.clone(), config.clone()).await {
crate::background_error!(
LEADER_ELECTION,
"gain_callback",
Error,
"Failed to execute on_gain_leadership callback: {}",
e
);
}
}
Ok(false) => {
// Someone else has the lock
debug!("Following - will retry");
}
Err(e) => {
crate::background_error!(LEADER_ELECTION, "lock_check", Warning, "Failed to check leader lock: {}", e);
}
}
}
Err(e) => {
crate::background_error!(
LEADER_ELECTION,
"db_acquire",
Warning,
"Failed to acquire connection for leader election: {}",
e
);
}
}
} else {
// We think we're leader - verify we still hold the lock
// by checking if our connection is still valid
if let Some(ref mut conn) = leader_conn {
// Ping the connection to keep it alive
match sqlx::query("SELECT 1").execute(&mut **conn).await {
Ok(_) => {
debug!(" Leadership renewed (connection alive)");
}
Err(e) => {
// Connection died, which will drop the advisory lock, we lost leadership
tracing::warn!("Lost leadership (connection died): {}", e);
info!("Lost leadership");
is_leader.store(false, Ordering::Relaxed);
leader_conn = None;
if let Err(e) = on_lose_leadership(pool.clone(), config.clone()).await {
crate::background_error!(
LEADER_ELECTION,
"lose_callback",
Error,
"Failed to execute on_lose_leadership callback: {}",
e
);
}
}
}
} else {
// We think we're leader but have no connection, this can't happen
crate::background_error!(
LEADER_ELECTION,
"invariant_violation",
Critical,
"Inconsistent state: is_leader=true but no connection"
);
is_leader.store(false, Ordering::Relaxed);
}
}
}
}