Skip to main content

sqlx_postgres/
advisory_lock.rs

1use crate::error::Result;
2use crate::Either;
3use crate::PgConnection;
4use hkdf::Hkdf;
5use sha2::Sha256;
6use sqlx_core::executor::Executor;
7use sqlx_core::sql_str::SqlSafeStr;
8use std::ops::{Deref, DerefMut};
9use std::sync::Arc;
10use std::sync::OnceLock;
11
12/// A mutex-like type utilizing [Postgres advisory locks].
13///
14/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
15/// locks tracked in the database with application-defined semantics, as opposed to the standard
16/// row-level or table-level locks which may not fit all use-cases.
17///
18/// This API provides a convenient wrapper for generating and storing the integer keys that
19/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
20/// of scope.
21///
22/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
23/// automatically released when a connection is closed).
24///
25/// It is also possible to use transaction-scoped locks but those can be used by beginning a
26/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
27/// manually, and cannot be explicitly released, but are automatically released when a transaction
28/// ends (is committed or rolled back).
29///
30/// Session-level locks can be acquired either inside or outside a transaction and are not
31/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
32/// transaction is committed or rolled back, until explicitly released or the connection is closed.
33///
34/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
35/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
36/// exclusive lock prevents any other lock being taken for a given key until it is released.
37///
38/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
39#[derive(Debug, Clone)]
40pub struct PgAdvisoryLock {
41    key: PgAdvisoryLockKey,
42    /// The query to execute to release this lock.
43    release_query: Arc<OnceLock<String>>,
44}
45
46/// A key type natively used by Postgres advisory locks.
47///
48/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
49/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
50/// specify that these key spaces "do not overlap":
51///
52/// <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS>
53///
54/// The documentation for the `pg_locks` system view explains further how advisory locks
55/// are treated in Postgres:
56///
57/// <https://www.postgresql.org/docs/current/view-pg-locks.html>
58#[derive(Debug, Clone, PartialEq, Eq)]
59#[non_exhaustive]
60pub enum PgAdvisoryLockKey {
61    /// The keyspace designated by a single 64-bit integer.
62    ///
63    /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
64    /// this is the keyspace used.
65    BigInt(i64),
66    /// The keyspace designated by two 32-bit integers.
67    IntPair(i32, i32),
68}
69
70/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
71///
72/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
73/// Released on-drop or via [`Self::release_now()`].
74///
75/// ### Note: Release-on-drop is not immediate!
76/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
77/// flushed to the server the next time it is used, or when it is returned to
78/// a [`PgPool`][crate::PgPool] in the case of
79/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
80///
81/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
82/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
83pub struct PgAdvisoryLockGuard<C: AsMut<PgConnection>> {
84    lock: PgAdvisoryLock,
85    conn: Option<C>,
86}
87
88impl PgAdvisoryLock {
89    /// Construct a `PgAdvisoryLock` using the given string as a key.
90    ///
91    /// This is intended to make it easier to use an advisory lock by using a human-readable string
92    /// for a key as opposed to manually generating a unique integer key. The generated integer key
93    /// is guaranteed to be stable and in the single 64-bit integer keyspace
94    /// (see [`PgAdvisoryLockKey`] for details).
95    ///
96    /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
97    /// to the bytes of the input string, but in a way that the calculated integer is unlikely
98    /// to collide with any similar implementations (although we don't currently know of any).
99    /// See the source of this method for details.
100    ///
101    /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
102    /// ### Example
103    /// ```rust
104    /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
105    ///
106    /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
107    /// // Negative values are fine because of how Postgres treats advisory lock keys.
108    /// // See the documentation for the `pg_locks` system view for details.
109    /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
110    /// ```
111    pub fn new(key_string: impl AsRef<str>) -> Self {
112        let input_key_material = key_string.as_ref();
113
114        // HKDF was chosen because it is designed to concentrate the entropy in a variable-length
115        // input key and produce a higher quality but reduced-length output key with a
116        // well-specified and reproducible algorithm.
117        //
118        // Granted, the input key is usually meant to be pseudorandom and not human readable,
119        // but we're not trying to produce an unguessable value by any means; just one that's as
120        // unlikely to already be in use as possible, but still deterministic.
121        //
122        // SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
123        // which should save on codegen and optimization.
124
125        // We don't supply a salt as that is intended to be random, but we want a deterministic key.
126        let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
127
128        let mut output_key_material = [0u8; 8];
129
130        // The first string is the "info" string of the HKDF which is intended to tie the output
131        // exclusively to SQLx. This should avoid collisions with implementations using a similar
132        // strategy. If you _want_ this to match some other implementation then you should get
133        // the calculated integer key from it and use that directly.
134        //
135        // Do *not* change this string as it will affect the output!
136        hkdf.expand(
137            b"SQLx (Rust) Postgres advisory lock",
138            &mut output_key_material,
139        )
140        // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
141        // This is specified by RFC 5869 but not elaborated upon:
142        // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
143        // Since we're only asking for 8 bytes, this error shouldn't be returned.
144        .expect("BUG: `output_key_material` should be of acceptable length");
145
146        // For ease of use, this method assumes the user doesn't care which keyspace is used.
147        //
148        // It doesn't seem likely that someone would care about using the `(int, int)` keyspace
149        // specifically unless they already had keys to use, in which case they wouldn't
150        // care about this method. That's why we also provide `with_key()`.
151        //
152        // The choice of `from_le_bytes()` is mostly due to x86 being the most popular
153        // architecture for server software, so it should be a no-op there.
154        let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
155
156        tracing::trace!(
157            ?key,
158            key_string = ?input_key_material,
159            "generated key from key string",
160        );
161
162        Self::with_key(key)
163    }
164
165    /// Construct a `PgAdvisoryLock` with a manually supplied key.
166    pub fn with_key(key: PgAdvisoryLockKey) -> Self {
167        Self {
168            key,
169            release_query: Arc::new(OnceLock::new()),
170        }
171    }
172
173    /// Returns the current key.
174    pub fn key(&self) -> &PgAdvisoryLockKey {
175        &self.key
176    }
177
178    // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
179    // about "cannot project lifetimes to parent scope".
180    //
181    // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
182    // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
183    // or lazy normalization.
184
185    /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
186    ///
187    /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
188    ///
189    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
190    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
191    /// any of these.
192    ///
193    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
194    /// which will be executed the next time the connection is used, or when returned to a
195    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
196    ///
197    /// Postgres allows a single connection to acquire a given lock more than once without releasing
198    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
199    /// must match the number of lock operations for the lock to actually be released.
200    ///
201    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
202    ///
203    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
204    ///
205    /// # Cancel Safety
206    ///
207    /// This method is cancel safe. If the future is dropped before the query completes, a
208    /// `pg_advisory_unlock()` call is queued and run the next time the connection is used.
209    pub async fn acquire<C: AsMut<PgConnection>>(
210        &self,
211        mut conn: C,
212    ) -> Result<PgAdvisoryLockGuard<C>> {
213        let query = match &self.key {
214            PgAdvisoryLockKey::BigInt(_) => "SELECT pg_advisory_lock($1)",
215            PgAdvisoryLockKey::IntPair(_, _) => "SELECT pg_advisory_lock($1, $2)",
216        };
217
218        let stmt = conn.as_mut().prepare(query.into_sql_str()).await?;
219        let query = crate::query::query_statement(&stmt);
220
221        // We're wrapping the connection in a `PgAdvisoryLockGuard` early here on purpose. If this
222        // future is dropped, the lock will be released in the drop impl.
223        let mut guard = PgAdvisoryLockGuard::new(self.clone(), conn);
224        let conn = guard.conn.as_mut().unwrap();
225
226        match &self.key {
227            PgAdvisoryLockKey::BigInt(key) => query.bind(key),
228            PgAdvisoryLockKey::IntPair(key1, key2) => query.bind(key1).bind(key2),
229        }
230        .execute(conn.as_mut())
231        .await?;
232
233        Ok(guard)
234    }
235
236    /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
237    /// if the lock could not be acquired.
238    ///
239    /// For a version that waits until the lock is acquired, see [`Self::acquire()`].
240    ///
241    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
242    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
243    /// any of these. The connection is returned if the lock could not be acquired.
244    ///
245    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
246    /// which will be executed the next time the connection is used, or when returned to a
247    /// [`PgPool`][crate::PgPool] in the case of `PoolConnection<Postgres>`.
248    ///
249    /// Postgres allows a single connection to acquire a given lock more than once without releasing
250    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
251    /// must match the number of lock operations for the lock to actually be released.
252    ///
253    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
254    ///
255    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
256    ///
257    /// # Cancel Safety
258    ///
259    /// This method is **not** cancel safe. If the future is dropped while the query is in-flight,
260    /// it is not possible to know whether the lock was acquired, so it cannot be safely released.
261    /// The lock may remain held until the connection is closed.
262    pub async fn try_acquire<C: AsMut<PgConnection>>(
263        &self,
264        mut conn: C,
265    ) -> Result<Either<PgAdvisoryLockGuard<C>, C>> {
266        let locked: bool = match &self.key {
267            PgAdvisoryLockKey::BigInt(key) => {
268                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
269                    .bind(key)
270                    .fetch_one(conn.as_mut())
271                    .await?
272            }
273            PgAdvisoryLockKey::IntPair(key1, key2) => {
274                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
275                    .bind(key1)
276                    .bind(key2)
277                    .fetch_one(conn.as_mut())
278                    .await?
279            }
280        };
281
282        if locked {
283            Ok(Either::Left(PgAdvisoryLockGuard::new(self.clone(), conn)))
284        } else {
285            Ok(Either::Right(conn))
286        }
287    }
288
289    /// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
290    ///
291    /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
292    /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
293    ///
294    /// An error should only be returned if there is something wrong with the connection,
295    /// in which case the lock will be automatically released by the connection closing anyway.
296    ///
297    /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
298    /// indicates that the lock was not actually held by the given connection and that a warning
299    /// has been logged by the Postgres server.
300    pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
301        let released: bool = match &self.key {
302            PgAdvisoryLockKey::BigInt(key) => {
303                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
304                    .bind(key)
305                    .fetch_one(conn.as_mut())
306                    .await?
307            }
308            PgAdvisoryLockKey::IntPair(key1, key2) => {
309                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
310                    .bind(key1)
311                    .bind(key2)
312                    .fetch_one(conn.as_mut())
313                    .await?
314            }
315        };
316
317        Ok((conn, released))
318    }
319
320    fn get_release_query(&self) -> &str {
321        self.release_query.get_or_init(|| match &self.key {
322            PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({key})"),
323            PgAdvisoryLockKey::IntPair(key1, key2) => {
324                format!("SELECT pg_advisory_unlock({key1}, {key2})")
325            }
326        })
327    }
328}
329
330impl PgAdvisoryLockKey {
331    /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
332    pub fn as_bigint(&self) -> Option<i64> {
333        if let Self::BigInt(bigint) = self {
334            Some(*bigint)
335        } else {
336            None
337        }
338    }
339}
340
341const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
342
343impl<C: AsMut<PgConnection>> PgAdvisoryLockGuard<C> {
344    fn new(lock: PgAdvisoryLock, conn: C) -> Self {
345        PgAdvisoryLockGuard {
346            lock,
347            conn: Some(conn),
348        }
349    }
350
351    /// Immediately release the held advisory lock instead of when the connection is next used.
352    ///
353    /// An error should only be returned if there is something wrong with the connection,
354    /// in which case the lock will be automatically released by the connection closing anyway.
355    ///
356    /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
357    /// well as the Postgres server. This would only happen if the lock was released without
358    /// using this guard, or the connection was swapped using [`std::mem::replace()`].
359    pub async fn release_now(mut self) -> Result<C> {
360        let (conn, released) = self
361            .lock
362            .force_release(self.conn.take().expect(NONE_ERR))
363            .await?;
364
365        if !released {
366            tracing::warn!(
367                lock = ?self.lock.key,
368                "PgAdvisoryLockGuard: advisory lock was not held by the contained connection",
369            );
370        }
371
372        Ok(conn)
373    }
374
375    /// Cancel the release of the advisory lock, keeping it held until the connection is closed.
376    ///
377    /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
378    pub fn leak(mut self) -> C {
379        self.conn.take().expect(NONE_ERR)
380    }
381}
382
383impl<C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<C> {
384    type Target = PgConnection;
385
386    fn deref(&self) -> &Self::Target {
387        self.conn.as_ref().expect(NONE_ERR).as_ref()
388    }
389}
390
391/// Mutable access to the underlying connection is provided so it can still be used like normal,
392/// even allowing locks to be taken recursively.
393///
394/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
395/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
396/// guard attempts to release the lock.
397impl<C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut for PgAdvisoryLockGuard<C> {
398    fn deref_mut(&mut self) -> &mut Self::Target {
399        self.conn.as_mut().expect(NONE_ERR).as_mut()
400    }
401}
402
403impl<C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection> for PgAdvisoryLockGuard<C> {
404    fn as_ref(&self) -> &PgConnection {
405        self.conn.as_ref().expect(NONE_ERR).as_ref()
406    }
407}
408
409/// Mutable access to the underlying connection is provided so it can still be used like normal,
410/// even allowing locks to be taken recursively.
411///
412/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
413/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
414/// guard attempts to release the lock.
415impl<C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<C> {
416    fn as_mut(&mut self) -> &mut PgConnection {
417        self.conn.as_mut().expect(NONE_ERR).as_mut()
418    }
419}
420
421/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
422/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::PgPool]
423/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
424impl<C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<C> {
425    fn drop(&mut self) {
426        if let Some(mut conn) = self.conn.take() {
427            // Queue a simple query message to execute next time the connection is used.
428            // The `async fn` versions can safely use the prepared statement protocol,
429            // but this is the safest way to queue a query to execute on the next opportunity.
430            conn.as_mut()
431                .queue_simple_query(self.lock.get_release_query())
432                .expect("BUG: PgAdvisoryLock::get_release_query() somehow too long for protocol");
433        }
434    }
435}