pgroles_operator/advisory.rs
1//! PostgreSQL advisory locking for cross-replica reconciliation safety.
2//!
3//! Uses `pg_try_advisory_lock` / `pg_advisory_unlock` to prevent concurrent
4//! inspect/diff/apply cycles against the same database, even when multiple
5//! operator replicas are running.
6//!
7//! Session-level advisory locks are bound to the connection that acquired them,
8//! so this module checks out a dedicated [`PoolConnection`] and holds it for
9//! the lifetime of the lock. Both acquire and release execute on the same
10//! underlying database session.
11
12use sqlx::pool::PoolConnection;
13use sqlx::{PgPool, Postgres};
14
15/// A held advisory lock that must be explicitly released.
16///
17/// Holds a dedicated [`PoolConnection`] so that the lock acquire and release
18/// always run on the same PostgreSQL session (advisory locks are
19/// session-scoped).
20pub struct AdvisoryLock {
21 key: i64,
22 conn: PoolConnection<Postgres>,
23}
24
25impl AdvisoryLock {
26 /// Release the advisory lock. Logs a warning on failure.
27 ///
28 /// The unlock runs on the same connection that acquired the lock, ensuring
29 /// `pg_advisory_unlock` targets the correct session.
30 pub async fn release(mut self) {
31 match sqlx::query_scalar::<_, bool>("SELECT pg_advisory_unlock($1)")
32 .bind(self.key)
33 .fetch_one(&mut *self.conn)
34 .await
35 {
36 Ok(true) => {
37 tracing::debug!(key = self.key, "released advisory lock");
38 }
39 Ok(false) => {
40 tracing::warn!(
41 key = self.key,
42 "advisory unlock returned false (lock was not held)"
43 );
44 }
45 Err(err) => {
46 tracing::warn!(key = self.key, %err, "failed to release advisory lock");
47 }
48 }
49 // `self.conn` is returned to the pool on drop.
50 }
51}
52
53/// Attempt to acquire a session-level advisory lock for the given database identity.
54///
55/// Checks out a dedicated connection from the pool and executes
56/// `pg_try_advisory_lock` on it. If the lock is acquired, the connection is
57/// kept inside the returned [`AdvisoryLock`] so that both acquire and release
58/// run on the same session.
59///
60/// Returns `Ok(Some(AdvisoryLock))` if the lock was acquired, `Ok(None)` if it
61/// is already held by another session, or `Err` on query failure.
62pub async fn try_acquire(
63 pool: &PgPool,
64 database_identity: &str,
65) -> Result<Option<AdvisoryLock>, sqlx::Error> {
66 let key = advisory_lock_key(database_identity);
67
68 let mut conn = pool.acquire().await?;
69 let acquired: bool = sqlx::query_scalar("SELECT pg_try_advisory_lock($1)")
70 .bind(key)
71 .fetch_one(&mut *conn)
72 .await?;
73
74 if acquired {
75 tracing::info!(key, database_identity, "acquired advisory lock");
76 Ok(Some(AdvisoryLock { key, conn }))
77 } else {
78 tracing::info!(
79 key,
80 database_identity,
81 "advisory lock contention — another session holds the lock"
82 );
83 // `conn` is returned to the pool on drop — no lock was acquired.
84 Ok(None)
85 }
86}
87
88/// Derive a stable `i64` advisory lock key from a database identity string.
89///
90/// Uses a simple hash (FNV-1a inspired) folded to i64 range. The exact
91/// algorithm is not important as long as it is deterministic and distributes
92/// well across different identity strings.
93fn advisory_lock_key(identity: &str) -> i64 {
94 // We use a namespace prefix so pgroles advisory locks are unlikely to
95 // collide with application-level advisory locks.
96 const OFFSET_BASIS: u64 = 0xcbf2_9ce4_8422_2325;
97 const PRIME: u64 = 0x0100_0000_01b3;
98
99 let mut hash = OFFSET_BASIS;
100 for byte in b"pgroles:".iter().chain(identity.as_bytes()) {
101 hash ^= u64::from(*byte);
102 hash = hash.wrapping_mul(PRIME);
103 }
104
105 // Shift right by 1 to clear the sign bit, guaranteeing a non-negative i64.
106 (hash >> 1) as i64
107}
108
109// ---------------------------------------------------------------------------
110// Tests
111// ---------------------------------------------------------------------------
112
113#[cfg(test)]
114mod tests {
115 use super::*;
116
117 #[test]
118 fn advisory_lock_key_deterministic() {
119 let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
120 let b = advisory_lock_key("prod/db-creds/DATABASE_URL");
121 assert_eq!(a, b, "same identity must produce same key");
122 }
123
124 #[test]
125 fn advisory_lock_key_different_for_different_identities() {
126 let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
127 let b = advisory_lock_key("staging/db-creds/DATABASE_URL");
128 assert_ne!(a, b, "different identities must produce different keys");
129 }
130
131 #[test]
132 fn advisory_lock_key_different_secret_keys() {
133 let a = advisory_lock_key("prod/db-creds/DATABASE_URL");
134 let b = advisory_lock_key("prod/db-creds/CUSTOM_URL");
135 assert_ne!(a, b, "different secret keys must produce different keys");
136 }
137
138 #[test]
139 fn advisory_lock_key_is_positive() {
140 // The `hash >> 1` conversion guarantees a non-negative i64.
141 let key = advisory_lock_key("prod/db-creds/DATABASE_URL");
142 assert!(key >= 0, "advisory lock key should be non-negative");
143 }
144
145 #[test]
146 fn advisory_lock_key_namespace_prefix_avoids_collision() {
147 // Even with same suffix, the "pgroles:" prefix should differentiate.
148 let a = advisory_lock_key("x");
149 let b = advisory_lock_key("y");
150 assert_ne!(a, b);
151 }
152
153 #[test]
154 fn advisory_lock_key_empty_identity() {
155 // Should not panic on empty identity.
156 let key = advisory_lock_key("");
157 assert!(key >= 0);
158 }
159}