pub struct LeaderElection { /* private fields */ }Expand description
Leader election using PostgreSQL advisory locks.
Advisory locks provide a simple, reliable way to elect a leader without external coordination services. Key properties:
- Mutual exclusion: Only one session can hold a given lock ID at a time.
- Automatic release: If the connection dies, PostgreSQL releases the lock.
- Non-blocking try:
pg_try_advisory_lockreturns immediately with success/failure.
Each LeaderRole maps to a unique lock ID, allowing multiple independent
leader elections (e.g., separate leaders for cron scheduler and workflow timers).
The is_leader flag uses SeqCst ordering because:
- Multiple threads read this flag to decide whether to execute leader-only code
- We need visibility guarantees across threads immediately after acquiring/releasing
- The performance cost is negligible (leadership changes are rare)
Implementations§
Source§impl LeaderElection
impl LeaderElection
pub fn new( pool: PgPool, node_id: NodeId, role: LeaderRole, config: LeaderConfig, ) -> Self
Sourcepub fn with_notify_bus(self, bus: Arc<PgNotifyBus>) -> Self
pub fn with_notify_bus(self, bus: Arc<PgNotifyBus>) -> Self
Attach a PgNotifyBus so this election emits NOTIFY on
forge_leader_released during voluntary release and subscribes to
the same channel to wake standbys without waiting for the next
check_interval tick.
pub fn is_leader(&self) -> bool
Sourcepub fn lock_validate_interval(&self) -> Duration
pub fn lock_validate_interval(&self) -> Duration
How often the leader validates the advisory lock is still held.
Sourcepub fn check_interval(&self) -> Duration
pub fn check_interval(&self) -> Duration
How often the leader refreshes its lease row in forge_leaders.
Daemon runners use this cadence to call refresh_lease() so that
standbys see a live lease and can distinguish a running leader from a
zombie whose lease has simply expired.
pub fn stop(&self)
Sourcepub async fn try_become_leader(&self) -> Result<bool>
pub async fn try_become_leader(&self) -> Result<bool>
Try to acquire leadership.
The advisory lock and the forge_leaders INSERT run on the same
connection. If that connection dies between the lock acquire and the
INSERT, PostgreSQL releases the lock and the INSERT fails together —
no torn leader rows pointing at a node that holds nothing.
Zombie leader preemption: if pg_try_advisory_lock fails but the
current leader’s lease is stale (expired), the application process that
owned the lock is presumed dead. A connection pooler may be keeping the
PG backend alive, preventing automatic lock release. In that case we
locate the lock-holding backend via pg_locks and call
pg_terminate_backend() to evict it, then retry the lock acquisition
once. pg_terminate_backend requires superuser or pg_signal_backend
role; if the call is refused or the backend is already gone, we log and
return false rather than erroring out — election will be retried on
the next check interval tick.
Sourcepub async fn validate_lock_held(&self) -> Result<()>
pub async fn validate_lock_held(&self) -> Result<()>
Confirm the advisory lock is still held on the lock-owning connection.
Runs on its own cadence (lock_validate_interval, default 1s) so a
long lease (60s) still detects an out-of-band lock loss promptly. If
PostgreSQL released the lock (backend terminated, sqlx reconnected,
etc.) we drop leadership locally and surface an error: keeping the
lease alive without the underlying lock would risk split brain.
Sourcepub async fn keepalive(&self) -> Result<()>
pub async fn keepalive(&self) -> Result<()>
Send a lightweight keepalive ping on the lock-owning connection.
Firewalls and load-balancers silently drop idle TCP connections after
their idle-timeout (commonly 5–10 minutes). PostgreSQL may do the same
via tcp_keepalives_idle. Either way the advisory lock is released
without the process knowing, leading to silent leadership loss between
validate_lock_held intervals.
Issuing SELECT 1 every 30 s keeps the connection active at the TCP
level and ensures PostgreSQL doesn’t reclaim the backend. This is a
no-op for standbys (no lock connection) and is distinct from
validate_lock_held: that method verifies the lock is still held;
this method prevents the connection from going idle in the first place.
Sourcepub async fn refresh_lease(&self) -> Result<()>
pub async fn refresh_lease(&self) -> Result<()>
Refresh the leadership lease.
Validates the advisory lock and extends forge_leaders.lease_until
as a single critical section: the lock_connection Mutex is held
across both the pg_locks probe and the UPDATE. That guarantees a
concurrent try_become_leader cannot repopulate the slot with a
different backend’s connection between validate and refresh, which
would otherwise leave us extending the lease against a connection
that no longer holds the lock we just checked.
pub async fn release_leadership(&self) -> Result<()>
pub async fn check_leader_health(&self) -> Result<bool>
pub async fn get_leader(&self) -> Result<Option<LeaderInfo>>
Sourcepub async fn run(&self)
pub async fn run(&self)
Run the leader election loop.
Three independent cadences:
lock_validate_interval(leader only): re-checkpg_locksto confirm the advisory lock is still held. Faster thancheck_intervalso a long lease detects an out-of-band lock loss within seconds.check_interval(leader): refresh the lease row. Validates first insiderefresh_lease, so the validate is idempotent with the faster timer above.check_interval(standby): check whether the current leader’s lease is healthy and try to take over if not.
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for LeaderElection
impl !RefUnwindSafe for LeaderElection
impl Send for LeaderElection
impl Sync for LeaderElection
impl Unpin for LeaderElection
impl UnsafeUnpin for LeaderElection
impl !UnwindSafe for LeaderElection
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Source§impl<T> Instrument for T
impl<T> Instrument for T
Source§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
Source§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left is true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self into a Left variant of Either<Self, Self>
if into_left(&self) returns true.
Converts self into a Right variant of Either<Self, Self>
otherwise. Read more