Skip to main content

pgroles_operator/
advisory.rs

1//! PostgreSQL advisory locking for cross-replica reconciliation safety.
2//!
3//! Uses `pg_try_advisory_lock` / `pg_advisory_unlock` to prevent concurrent
4//! inspect/diff/apply cycles against the same database, even when multiple
5//! operator replicas are running.
6//!
7//! Session-level advisory locks are bound to the connection that acquired them,
8//! so this module checks out a dedicated [`PoolConnection`] and holds it for
9//! the lifetime of the lock. Both acquire and release execute on the same
10//! underlying database session.
11
12use sqlx::pool::PoolConnection;
13use sqlx::{PgPool, Postgres};
14
15/// A held advisory lock that must be explicitly released.
16///
17/// Holds a dedicated [`PoolConnection`] so that the lock acquire and release
18/// always run on the same PostgreSQL session (advisory locks are
19/// session-scoped).
20pub struct AdvisoryLock {
21    key: i64,
22    conn: PoolConnection<Postgres>,
23}
24
25impl AdvisoryLock {
26    /// Release the advisory lock. Logs a warning on failure.
27    ///
28    /// The unlock runs on the same connection that acquired the lock, ensuring
29    /// `pg_advisory_unlock` targets the correct session.
30    pub async fn release(mut self) {
31        match sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock($1)")
32            .bind(self.key)
33            .fetch_one(&mut *self.conn)
34            .await
35        {
36            Ok(true) => {
37                tracing::debug!(key = self.key, "released advisory lock");
38            }
39            Ok(false) => {
40                tracing::warn!(
41                    key = self.key,
42                    "advisory unlock returned false (lock was not held)"
43                );
44            }
45            Err(err) => {
46                tracing::warn!(key = self.key, %err, "failed to release advisory lock");
47            }
48        }
49        // `self.conn` is returned to the pool on drop.
50    }
51}
52
53/// Attempt to acquire a session-level advisory lock for the given database identity.
54///
55/// Checks out a dedicated connection from the pool and executes
56/// `pg_try_advisory_lock` on it. If the lock is acquired, the connection is
57/// kept inside the returned [`AdvisoryLock`] so that both acquire and release
58/// run on the same session.
59///
60/// Returns `Ok(Some(AdvisoryLock))` if the lock was acquired, `Ok(None)` if it
61/// is already held by another session, or `Err` on query failure.
62pub async fn try_acquire(
63    pool: &PgPool,
64    database_identity: &str,
65) -> Result<Option<AdvisoryLock>, sqlx::Error> {
66    let key = advisory_lock_key(database_identity);
67
68    let mut conn = pool.acquire().await?;
69    let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
70        .bind(key)
71        .fetch_one(&mut *conn)
72        .await?;
73
74    if acquired {
75        tracing::info!(key, database_identity, "acquired advisory lock");
76        Ok(Some(AdvisoryLock { key, conn }))
77    } else {
78        tracing::info!(
79            key,
80            database_identity,
81            "advisory lock contention — another session holds the lock"
82        );
83        // `conn` is returned to the pool on drop — no lock was acquired.
84        Ok(None)
85    }
86}
87
88/// Derive a stable `i64` advisory lock key from a database identity string.
89///
90/// Uses a simple hash (FNV-1a inspired) folded to i64 range. The exact
91/// algorithm is not important as long as it is deterministic and distributes
92/// well across different identity strings.
93fn advisory_lock_key(identity: &str) -> i64 {
94    // We use a namespace prefix so pgroles advisory locks are unlikely to
95    // collide with application-level advisory locks.
96    const OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
97    const PRIME: u64 = 0x0100_0000_01b3;
98
99    let mut hash = OFFSET_BASIS;
100    for byte in b"pgroles:".iter().chain(identity.as_bytes()) {
101        hash ^= u64::from(*byte);
102        hash = hash.wrapping_mul(PRIME);
103    }
104
105    // Shift right by 1 to clear the sign bit, guaranteeing a non-negative i64.
106    (hash >> 1) as i64
107}
108
109// ---------------------------------------------------------------------------
110// Tests
111// ---------------------------------------------------------------------------
112
113#[cfg(test)]
114mod tests {
115    use super::*;
116
117    #[test]
118    fn advisory_lock_key_deterministic() {
119        let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
120        let b = advisory_lock_key("prod/db-creds/DATABASE_URL");
121        assert_eq!(a, b, "same identity must produce same key");
122    }
123
124    #[test]
125    fn advisory_lock_key_different_for_different_identities() {
126        let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
127        let b = advisory_lock_key("staging/db-creds/DATABASE_URL");
128        assert_ne!(a, b, "different identities must produce different keys");
129    }
130
131    #[test]
132    fn advisory_lock_key_different_secret_keys() {
133        let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
134        let b = advisory_lock_key("prod/db-creds/CUSTOM_URL");
135        assert_ne!(a, b, "different secret keys must produce different keys");
136    }
137
138    #[test]
139    fn advisory_lock_key_is_positive() {
140        // The `hash >> 1` conversion guarantees a non-negative i64.
141        let key = advisory_lock_key("prod/db-creds/DATABASE_URL");
142        assert!(key >= 0, "advisory lock key should be non-negative");
143    }
144
145    #[test]
146    fn advisory_lock_key_namespace_prefix_avoids_collision() {
147        // Even with same suffix, the "pgroles:" prefix should differentiate.
148        let a = advisory_lock_key("x");
149        let b = advisory_lock_key("y");
150        assert_ne!(a, b);
151    }
152
153    #[test]
154    fn advisory_lock_key_empty_identity() {
155        // Should not panic on empty identity.
156        let key = advisory_lock_key("");
157        assert!(key >= 0);
158    }
159}