distributed_lock_postgres/
handle.rs

1//! PostgreSQL lock handle implementation.
2
3use tokio::sync::watch;
4use tracing::instrument;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8
9use crate::key::PostgresAdvisoryLockKey;
10use sqlx::pool::PoolConnection;
11use sqlx::{Postgres, Transaction};
12
13/// Handle for a held PostgreSQL lock.
14pub struct PostgresLockHandle {
15    /// The database transaction (if transaction-scoped) or connection.
16    /// When dropped, the lock is released.
17    _connection: Option<PostgresConnectionInner>,
18    /// The lock key for explicit unlock.
19    key: PostgresAdvisoryLockKey,
20    /// Watch channel for lock lost detection.
21    lost_receiver: watch::Receiver<bool>,
22    /// Background task handle for connection monitoring.
23    _monitor_task: tokio::task::JoinHandle<()>,
24}
25
26pub(crate) enum PostgresConnectionInner {
27    /// Transaction-scoped lock: stores transaction which keeps connection alive
28    /// Using raw pointer to avoid lifetime issues - we manage the transaction manually
29    Transaction(*mut Transaction<'static, Postgres>),
30    /// Session-scoped lock: stores pool connection
31    Connection(Box<PoolConnection<Postgres>>),
32}
33
34unsafe impl Send for PostgresConnectionInner {}
35unsafe impl Sync for PostgresConnectionInner {}
36
37impl PostgresLockHandle {
38    pub(crate) fn new(
39        connection: PostgresConnectionInner,
40        key: PostgresAdvisoryLockKey,
41        _lost_sender: watch::Sender<bool>,
42        lost_receiver: watch::Receiver<bool>,
43        _keepalive_cadence: Option<std::time::Duration>,
44    ) -> Self {
45        // For session-scoped locks (Connection), the lock is tied to the connection
46        // and will be released when the connection is dropped. We don't need
47        // to monitor separately - the connection drop will handle cleanup.
48        // For transaction-scoped locks, connection loss will be detected when
49        // transaction operations fail, so we skip monitoring.
50        let monitor_task = tokio::spawn(async move {
51            // No monitoring needed - locks are released when connection/transaction is dropped
52        });
53
54        Self {
55            _connection: Some(connection),
56            key,
57            lost_receiver,
58            _monitor_task: monitor_task,
59        }
60    }
61}
62
63impl LockHandle for PostgresLockHandle {
64    fn lost_token(&self) -> &watch::Receiver<bool> {
65        &self.lost_receiver
66    }
67
68    #[instrument(skip(self), fields(backend = "postgres"))]
69    async fn release(mut self) -> LockResult<()> {
70        let key = self.key;
71
72        // Explicitly release the lock before dropping the connection
73        // This is important for connection pooling - we need to unlock before
74        // the connection is returned to the pool
75        if let Some(connection) = self._connection.take() {
76            match connection {
77                PostgresConnectionInner::Connection(mut conn) => {
78                    // Release session-scoped lock explicitly
79                    let sql = format!("SELECT pg_advisory_unlock({})", key.to_sql_args());
80                    let _ = sqlx::query(&sql).execute(&mut **conn).await;
81                }
82                PostgresConnectionInner::Transaction(transaction_ptr) => {
83                    // SAFETY: We created this pointer and it's still valid
84                    let transaction = unsafe { Box::from_raw(transaction_ptr) };
85                    match transaction.rollback().await {
86                        Ok(_) => {
87                            tracing::debug!("Transaction rolled back successfully");
88                        }
89                        Err(e) => {
90                            tracing::warn!("Failed to rollback transaction: {}", e);
91                        }
92                    }
93                    // Transaction is consumed by rollback(), so no need to drop it
94                }
95            }
96        }
97
98        // Lock is released, connection/transaction has been dropped
99        Ok(())
100    }
101}
102
103impl Drop for PostgresLockHandle {
104    fn drop(&mut self) {
105        // Abort monitoring task on drop
106        self._monitor_task.abort();
107    }
108}