sqlx_core_oldapi/postgres/
advisory_lock.rs

1use crate::error::Result;
2use crate::postgres::PgConnection;
3use crate::Either;
4use hkdf::Hkdf;
5use once_cell::sync::OnceCell;
6use sha2::Sha256;
7use std::ops::{Deref, DerefMut};
8
9/// A mutex-like type utilizing [Postgres advisory locks].
10///
11/// Advisory locks are a mechanism provided by Postgres to have mutually exclusive or shared
12/// locks tracked in the database with application-defined semantics, as opposed to the standard
13/// row-level or table-level locks which may not fit all use-cases.
14///
15/// This API provides a convenient wrapper for generating and storing the integer keys that
16/// advisory locks use, as well as RAII guards for releasing advisory locks when they fall out
17/// of scope.
18///
19/// This API only handles session-scoped advisory locks (explicitly locked and unlocked, or
20/// automatically released when a connection is closed).
21///
22/// It is also possible to use transaction-scoped locks but those can be used by beginning a
23/// transaction and calling the appropriate lock functions (e.g. `SELECT pg_advisory_xact_lock()`)
24/// manually, and cannot be explicitly released, but are automatically released when a transaction
25/// ends (is committed or rolled back).
26///
27/// Session-level locks can be acquired either inside or outside a transaction and are not
28/// tied to transaction semantics; a lock acquired inside a transaction is still held when that
29/// transaction is committed or rolled back, until explicitly released or the connection is closed.
30///
31/// Locks can be acquired in either shared or exclusive modes, which can be thought of as read locks
32/// and write locks, respectively. Multiple shared locks are allowed for the same key, but a single
33/// exclusive lock prevents any other lock being taken for a given key until it is released.
34///
35/// [Postgres advisory locks]: https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
36#[derive(Debug, Clone)]
37pub struct PgAdvisoryLock {
38    key: PgAdvisoryLockKey,
39    /// The query to execute to release this lock.
40    release_query: OnceCell<String>,
41}
42
43/// A key type natively used by Postgres advisory locks.
44///
45/// Currently, Postgres advisory locks have two different key spaces: one keyed by a single
46/// 64-bit integer, and one keyed by a pair of two 32-bit integers. The Postgres docs
47/// specify that these key spaces "do not overlap":
48///
49/// https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
50///
51/// The documentation for the `pg_locks` system view explains further how advisory locks
52/// are treated in Postgres:
53///
54/// https://www.postgresql.org/docs/current/view-pg-locks.html
55#[derive(Debug, Clone, PartialEq, Eq)]
56#[non_exhaustive]
57pub enum PgAdvisoryLockKey {
58    /// The keyspace designated by a single 64-bit integer.
59    ///
60    /// When [PgAdvisoryLock] is constructed with [::new()][PgAdvisoryLock::new()],
61    /// this is the keyspace used.
62    BigInt(i64),
63    /// The keyspace designated by two 32-bit integers.
64    IntPair(i32, i32),
65}
66
67/// A wrapper for `PgConnection` (or a similar type) that represents a held Postgres advisory lock.
68///
69/// Can be acquired by [`PgAdvisoryLock::acquire()`] or [`PgAdvisoryLock::try_acquire()`].
70/// Released on-drop or via [`Self::release_now()`].
71///
72/// ### Note: Release-on-drop is not immediate!
73/// On drop, this guard queues a `pg_advisory_unlock()` call on the connection which will be
74/// flushed to the server the next time it is used, or when it is returned to
75/// a [`PgPool`][crate::postgres::PgPool] in the case of
76/// [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
77///
78/// This means the lock is not actually released as soon as the guard is dropped. To ensure the
79/// lock is eagerly released, you can call [`.release_now().await`][Self::release_now()].
80pub struct PgAdvisoryLockGuard<'lock, C: AsMut<PgConnection>> {
81    lock: &'lock PgAdvisoryLock,
82    conn: Option<C>,
83}
84
85impl PgAdvisoryLock {
86    /// Construct a `PgAdvisoryLock` using the given string as a key.
87    ///
88    /// This is intended to make it easier to use an advisory lock by using a human-readable string
89    /// for a key as opposed to manually generating a unique integer key. The generated integer key
90    /// is guaranteed to be stable and in the single 64-bit integer keyspace
91    /// (see [`PgAdvisoryLockKey`] for details).
92    ///
93    /// This is done by applying the [Hash-based Key Derivation Function (HKDF; IETF RFC 5869)][hkdf]
94    /// to the bytes of the input string, but in a way that the calculated integer is unlikely
95    /// to collide with any similar implementations (although we don't currently know of any).
96    /// See the source of this method for details.
97    ///
98    /// [hkdf]: https://datatracker.ietf.org/doc/html/rfc5869
99    /// ### Example
100    /// ```rust
101    /// # extern crate sqlx_core_oldapi as sqlx;
102    /// use sqlx::postgres::{PgAdvisoryLock, PgAdvisoryLockKey};
103    ///
104    /// let lock = PgAdvisoryLock::new("my first Postgres advisory lock!");
105    /// // Negative values are fine because of how Postgres treats advisory lock keys.
106    /// // See the documentation for the `pg_locks` system view for details.
107    /// assert_eq!(lock.key(), &PgAdvisoryLockKey::BigInt(-5560419505042474287));
108    /// ```
109    pub fn new(key_string: impl AsRef<str>) -> Self {
110        let input_key_material = key_string.as_ref();
111
112        // HKDF was chosen because it is designed to concentrate the entropy in a variable-length
113        // input key and produce a higher quality but reduced-length output key with a
114        // well-specified and reproducible algorithm.
115        //
116        // Granted, the input key is usually meant to be pseudorandom and not human readable,
117        // but we're not trying to produce an unguessable value by any means; just one that's as
118        // unlikely to already be in use as possible, but still deterministic.
119        //
120        // SHA-256 was chosen as the hash function because it's already used in the Postgres driver,
121        // which should save on codegen and optimization.
122
123        // We don't supply a salt as that is intended to be random, but we want a deterministic key.
124        let hkdf = Hkdf::<Sha256>::new(None, input_key_material.as_bytes());
125
126        let mut output_key_material = [0u8; 8];
127
128        // The first string is the "info" string of the HKDF which is intended to tie the output
129        // exclusively to SQLx. This should avoid collisions with implementations using a similar
130        // strategy. If you _want_ this to match some other implementation then you should get
131        // the calculated integer key from it and use that directly.
132        //
133        // Do *not* change this string as it will affect the output!
134        hkdf.expand(
135            b"SQLx (Rust) Postgres advisory lock",
136            &mut output_key_material,
137        )
138        // `Hkdf::expand()` only returns an error if you ask for more than 255 times the digest size.
139        // This is specified by RFC 5869 but not elaborated upon:
140        // https://datatracker.ietf.org/doc/html/rfc5869#section-2.3
141        // Since we're only asking for 8 bytes, this error shouldn't be returned.
142        .expect("BUG: `output_key_material` should be of acceptable length");
143
144        // For ease of use, this method assumes the user doesn't care which keyspace is used.
145        //
146        // It doesn't seem likely that someone would care about using the `(int, int)` keyspace
147        // specifically unless they already had keys to use, in which case they wouldn't
148        // care about this method. That's why we also provide `with_key()`.
149        //
150        // The choice of `from_le_bytes()` is mostly due to x86 being the most popular
151        // architecture for server software, so it should be a no-op there.
152        let key = PgAdvisoryLockKey::BigInt(i64::from_le_bytes(output_key_material));
153
154        log::trace!(
155            "generated {:?} from key string {:?}",
156            key,
157            input_key_material
158        );
159
160        Self::with_key(key)
161    }
162
163    /// Construct a `PgAdvisoryLock` with a manually supplied key.
164    pub fn with_key(key: PgAdvisoryLockKey) -> Self {
165        Self {
166            key,
167            release_query: OnceCell::new(),
168        }
169    }
170
171    /// Returns the current key.
172    pub fn key(&self) -> &PgAdvisoryLockKey {
173        &self.key
174    }
175
176    // Why doesn't this use `Acquire`? Well, I tried it and got really useless errors
177    // about "cannot project lifetimes to parent scope".
178    //
179    // It has something to do with how lifetimes work on the `Acquire` trait, I couldn't
180    // be bothered to figure it out. Probably another issue with a lack of `async fn` in traits
181    // or lazy normalization.
182
183    /// Acquires an exclusive lock using `pg_advisory_lock()`, waiting until the lock is acquired.
184    ///
185    /// For a version that returns immediately instead of waiting, see [`Self::try_acquire()`].
186    ///
187    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
188    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
189    /// any of these.
190    ///
191    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
192    /// which will be executed the next time the connection is used, or when returned to a
193    /// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
194    ///
195    /// Postgres allows a single connection to acquire a given lock more than once without releasing
196    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
197    /// must match the number of lock operations for the lock to actually be released.
198    ///
199    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
200    ///
201    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
202    pub async fn acquire<C: AsMut<PgConnection>>(
203        &self,
204        mut conn: C,
205    ) -> Result<PgAdvisoryLockGuard<'_, C>> {
206        match &self.key {
207            PgAdvisoryLockKey::BigInt(key) => {
208                crate::query::query("SELECT pg_advisory_lock($1)")
209                    .bind(key)
210                    .execute(conn.as_mut())
211                    .await?;
212            }
213            PgAdvisoryLockKey::IntPair(key1, key2) => {
214                crate::query::query("SELECT pg_advisory_lock($1, $2)")
215                    .bind(key1)
216                    .bind(key2)
217                    .execute(conn.as_mut())
218                    .await?;
219            }
220        }
221
222        Ok(PgAdvisoryLockGuard::new(self, conn))
223    }
224
225    /// Acquires an exclusive lock using `pg_try_advisory_lock()`, returning immediately
226    /// if the lock could not be acquired.
227    ///
228    /// For a version that waits until the lock is acquired, see [`Self::acquire()`].
229    ///
230    /// A connection-like type is required to execute the call. Allowed types include `PgConnection`,
231    /// `PoolConnection<Postgres>` and `Transaction<Postgres>`, as well as mutable references to
232    /// any of these. The connection is returned if the lock could not be acquired.
233    ///
234    /// The returned guard queues a `pg_advisory_unlock()` call on the connection when dropped,
235    /// which will be executed the next time the connection is used, or when returned to a
236    /// [`PgPool`][crate::postgres::PgPool] in the case of `PoolConnection<Postgres>`.
237    ///
238    /// Postgres allows a single connection to acquire a given lock more than once without releasing
239    /// it first, so in that sense the lock is re-entrant. However, the number of unlock operations
240    /// must match the number of lock operations for the lock to actually be released.
241    ///
242    /// See [Postgres' documentation for the Advisory Lock Functions][advisory-funcs] for details.
243    ///
244    /// [advisory-funcs]: https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS
245    pub async fn try_acquire<C: AsMut<PgConnection>>(
246        &self,
247        mut conn: C,
248    ) -> Result<Either<PgAdvisoryLockGuard<'_, C>, C>> {
249        let locked: bool = match &self.key {
250            PgAdvisoryLockKey::BigInt(key) => {
251                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1)")
252                    .bind(key)
253                    .fetch_one(conn.as_mut())
254                    .await?
255            }
256            PgAdvisoryLockKey::IntPair(key1, key2) => {
257                crate::query_scalar::query_scalar("SELECT pg_try_advisory_lock($1, $2)")
258                    .bind(key1)
259                    .bind(key2)
260                    .fetch_one(conn.as_mut())
261                    .await?
262            }
263        };
264
265        if locked {
266            Ok(Either::Left(PgAdvisoryLockGuard::new(self, conn)))
267        } else {
268            Ok(Either::Right(conn))
269        }
270    }
271
272    /// Execute `pg_advisory_unlock()` for this lock's key on the given connection.
273    ///
274    /// This is used by [`PgAdvisoryLockGuard::release_now()`] and is also provided for manually
275    /// releasing the lock from connections returned by [`PgAdvisoryLockGuard::leak()`].
276    ///
277    /// An error should only be returned if there is something wrong with the connection,
278    /// in which case the lock will be automatically released by the connection closing anyway.
279    ///
280    /// The `boolean` value is that returned by `pg_advisory_lock()`. If it is `false`, it
281    /// indicates that the lock was not actually held by the given connection and that a warning
282    /// has been logged by the Postgres server.
283    pub async fn force_release<C: AsMut<PgConnection>>(&self, mut conn: C) -> Result<(C, bool)> {
284        let released: bool = match &self.key {
285            PgAdvisoryLockKey::BigInt(key) => {
286                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1)")
287                    .bind(key)
288                    .fetch_one(conn.as_mut())
289                    .await?
290            }
291            PgAdvisoryLockKey::IntPair(key1, key2) => {
292                crate::query_scalar::query_scalar("SELECT pg_advisory_unlock($1, $2)")
293                    .bind(key1)
294                    .bind(key2)
295                    .fetch_one(conn.as_mut())
296                    .await?
297            }
298        };
299
300        Ok((conn, released))
301    }
302
303    fn get_release_query(&self) -> &str {
304        self.release_query.get_or_init(|| match &self.key {
305            PgAdvisoryLockKey::BigInt(key) => format!("SELECT pg_advisory_unlock({})", key),
306            PgAdvisoryLockKey::IntPair(key1, key2) => {
307                format!("SELECT pg_advisory_unlock({}, {})", key1, key2)
308            }
309        })
310    }
311}
312
313impl PgAdvisoryLockKey {
314    /// Converts `Self::Bigint(bigint)` to `Some(bigint)` and all else to `None`.
315    pub fn as_bigint(&self) -> Option<i64> {
316        if let Self::BigInt(bigint) = self {
317            Some(*bigint)
318        } else {
319            None
320        }
321    }
322}
323
324const NONE_ERR: &str = "BUG: PgAdvisoryLockGuard.conn taken";
325
326impl<'lock, C: AsMut<PgConnection>> PgAdvisoryLockGuard<'lock, C> {
327    fn new(lock: &'lock PgAdvisoryLock, conn: C) -> Self {
328        PgAdvisoryLockGuard {
329            lock,
330            conn: Some(conn),
331        }
332    }
333
334    /// Immediately release the held advisory lock instead of when the connection is next used.
335    ///
336    /// An error should only be returned if there is something wrong with the connection,
337    /// in which case the lock will be automatically released by the connection closing anyway.
338    ///
339    /// If `pg_advisory_unlock()` returns `false`, a warning will be logged, both by SQLx as
340    /// well as the Postgres server. This would only happen if the lock was released without
341    /// using this guard, or the connection was swapped using [`std::mem::replace()`].
342    pub async fn release_now(mut self) -> Result<C> {
343        let (conn, released) = self
344            .lock
345            .force_release(self.conn.take().expect(NONE_ERR))
346            .await?;
347
348        if !released {
349            log::warn!(
350                "PgAdvisoryLockGuard: advisory lock {:?} was not held by the contained connection",
351                self.lock.key
352            );
353        }
354
355        Ok(conn)
356    }
357
358    /// Cancel the release of the advisory lock, keeping it held until the connection is closed.
359    ///
360    /// To manually release the lock later, see [`PgAdvisoryLock::force_release()`].
361    pub fn leak(mut self) -> C {
362        self.conn.take().expect(NONE_ERR)
363    }
364}
365
366impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> Deref for PgAdvisoryLockGuard<'lock, C> {
367    type Target = PgConnection;
368
369    fn deref(&self) -> &Self::Target {
370        self.conn.as_ref().expect(NONE_ERR).as_ref()
371    }
372}
373
374/// Mutable access to the underlying connection is provided so it can still be used like normal,
375/// even allowing locks to be taken recursively.
376///
377/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
378/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
379/// guard attempts to release the lock.
380impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> DerefMut
381    for PgAdvisoryLockGuard<'lock, C>
382{
383    fn deref_mut(&mut self) -> &mut Self::Target {
384        self.conn.as_mut().expect(NONE_ERR).as_mut()
385    }
386}
387
388impl<'lock, C: AsMut<PgConnection> + AsRef<PgConnection>> AsRef<PgConnection>
389    for PgAdvisoryLockGuard<'lock, C>
390{
391    fn as_ref(&self) -> &PgConnection {
392        self.conn.as_ref().expect(NONE_ERR).as_ref()
393    }
394}
395
396/// Mutable access to the underlying connection is provided so it can still be used like normal,
397/// even allowing locks to be taken recursively.
398///
399/// However, replacing the connection with a different one using, e.g. [`std::mem::replace()`]
400/// is a logic error and will cause a warning to be logged by the PostgreSQL server when this
401/// guard attempts to release the lock.
402impl<'lock, C: AsMut<PgConnection>> AsMut<PgConnection> for PgAdvisoryLockGuard<'lock, C> {
403    fn as_mut(&mut self) -> &mut PgConnection {
404        self.conn.as_mut().expect(NONE_ERR).as_mut()
405    }
406}
407
408/// Queues a `pg_advisory_unlock()` call on the wrapped connection which will be flushed
409/// to the server the next time it is used, or when it is returned to [`PgPool`][crate::postgres::PgPool]
410/// in the case of [`PoolConnection<Postgres>`][crate::pool::PoolConnection].
411impl<'lock, C: AsMut<PgConnection>> Drop for PgAdvisoryLockGuard<'lock, C> {
412    fn drop(&mut self) {
413        if let Some(mut conn) = self.conn.take() {
414            // Queue a simple query message to execute next time the connection is used.
415            // The `async fn` versions can safely use the prepared statement protocol,
416            // but this is the safest way to queue a query to execute on the next opportunity.
417            conn.as_mut()
418                .queue_simple_query(self.lock.get_release_query());
419        }
420    }
421}