Skip to main content

LeaderElection

Struct LeaderElection 

Source
pub struct LeaderElection { /* private fields */ }
Expand description

Leader election using PostgreSQL advisory locks.

Advisory locks provide a simple, reliable way to elect a leader without external coordination services. Key properties:

  1. Mutual exclusion: Only one session can hold a given lock ID at a time.
  2. Automatic release: If the connection dies, PostgreSQL releases the lock.
  3. Non-blocking try: pg_try_advisory_lock returns immediately with success/failure.

Each LeaderRole maps to a unique lock ID, allowing multiple independent leader elections (e.g., separate leaders for cron scheduler and workflow timers).

The is_leader flag uses SeqCst ordering because:

  • Multiple threads read this flag to decide whether to execute leader-only code
  • We need visibility guarantees across threads immediately after acquiring/releasing
  • The performance cost is negligible (leadership changes are rare)

Implementations§

Source§

impl LeaderElection

Source

pub fn new( pool: PgPool, node_id: NodeId, role: LeaderRole, config: LeaderConfig, ) -> Self

Source

pub fn with_notify_bus(self, bus: Arc<PgNotifyBus>) -> Self

Attach a PgNotifyBus so this election emits NOTIFY on forge_leader_released during voluntary release and subscribes to the same channel to wake standbys without waiting for the next check_interval tick.

Source

pub fn is_leader(&self) -> bool

Source

pub fn lock_validate_interval(&self) -> Duration

How often the leader validates the advisory lock is still held.

Source

pub fn check_interval(&self) -> Duration

How often the leader refreshes its lease row in forge_leaders.

Daemon runners use this cadence to call refresh_lease() so that standbys see a live lease and can distinguish a running leader from a zombie whose lease has simply expired.

Source

pub fn stop(&self)

Source

pub async fn try_become_leader(&self) -> Result<bool>

Try to acquire leadership.

The advisory lock and the forge_leaders INSERT run on the same connection. If that connection dies between the lock acquire and the INSERT, PostgreSQL releases the lock and the INSERT fails together — no torn leader rows pointing at a node that holds nothing.

Zombie leader preemption: if pg_try_advisory_lock fails but the current leader’s lease is stale (expired), the application process that owned the lock is presumed dead. A connection pooler may be keeping the PG backend alive, preventing automatic lock release. In that case we locate the lock-holding backend via pg_locks and call pg_terminate_backend() to evict it, then retry the lock acquisition once. pg_terminate_backend requires superuser or pg_signal_backend role; if the call is refused or the backend is already gone, we log and return false rather than erroring out — election will be retried on the next check interval tick.

Source

pub async fn validate_lock_held(&self) -> Result<()>

Confirm the advisory lock is still held on the lock-owning connection.

Runs on its own cadence (lock_validate_interval, default 1s) so a long lease (60s) still detects an out-of-band lock loss promptly. If PostgreSQL released the lock (backend terminated, sqlx reconnected, etc.) we drop leadership locally and surface an error: keeping the lease alive without the underlying lock would risk split brain.

Source

pub async fn keepalive(&self) -> Result<()>

Send a lightweight keepalive ping on the lock-owning connection.

Firewalls and load-balancers silently drop idle TCP connections after their idle-timeout (commonly 5–10 minutes). PostgreSQL may do the same via tcp_keepalives_idle. Either way the advisory lock is released without the process knowing, leading to silent leadership loss between validate_lock_held intervals.

Issuing SELECT 1 every 30 s keeps the connection active at the TCP level and ensures PostgreSQL doesn’t reclaim the backend. This is a no-op for standbys (no lock connection) and is distinct from validate_lock_held: that method verifies the lock is still held; this method prevents the connection from going idle in the first place.

Source

pub async fn refresh_lease(&self) -> Result<()>

Refresh the leadership lease.

Validates the advisory lock and extends forge_leaders.lease_until as a single critical section: the lock_connection Mutex is held across both the pg_locks probe and the UPDATE. That guarantees a concurrent try_become_leader cannot repopulate the slot with a different backend’s connection between validate and refresh, which would otherwise leave us extending the lease against a connection that no longer holds the lock we just checked.

Source

pub async fn release_leadership(&self) -> Result<()>

Source

pub async fn check_leader_health(&self) -> Result<bool>

Source

pub async fn get_leader(&self) -> Result<Option<LeaderInfo>>

Source

pub async fn run(&self)

Run the leader election loop.

Three independent cadences:

  • lock_validate_interval (leader only): re-check pg_locks to confirm the advisory lock is still held. Faster than check_interval so a long lease detects an out-of-band lock loss within seconds.
  • check_interval (leader): refresh the lease row. Validates first inside refresh_lease, so the validate is idempotent with the faster timer above.
  • check_interval (standby): check whether the current leader’s lease is healthy and try to take over if not.

Trait Implementations§

Source§

impl Debug for LeaderElection

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoEither for T

Source§

fn into_either(self, into_left: bool) -> Either<Self, Self>

Converts self into a Left variant of Either<Self, Self> if into_left is true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
where F: FnOnce(&Self) -> bool,

Converts self into a Left variant of Either<Self, Self> if into_left(&self) returns true. Converts self into a Right variant of Either<Self, Self> otherwise. Read more
Source§

impl<T> PolicyExt for T
where T: ?Sized,

Source§

fn and<P, B, E>(self, other: P) -> And<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow only if self and other return Action::Follow. Read more
Source§

fn or<P, B, E>(self, other: P) -> Or<T, P>
where T: Policy<B, E>, P: Policy<B, E>,

Create a new Policy that returns Action::Follow if either self or other returns Action::Follow. Read more
Source§

impl<T> Same for T

Source§

type Output = T

Should always be Self
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more