distributed_lock_postgres/
handle.rs

1//! PostgreSQL lock handle implementation.
2
3use tokio::sync::watch;
4use tracing::instrument;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8
9use crate::key::PostgresAdvisoryLockKey;
10use sqlx::pool::PoolConnection;
11use sqlx::{Executor, Postgres};
12
13/// Handle for a held PostgreSQL lock.
14pub struct PostgresLockHandle {
15    /// The database connection.
16    /// When dropped, the lock is released (via connection drop or explicit release).
17    conn: Option<PoolConnection<Postgres>>,
18    /// Whether this lock is held within a transaction.
19    is_transaction: bool,
20    /// The lock key.
21    key: PostgresAdvisoryLockKey,
22    /// Watch channel for lock lost detection.
23    lost_receiver: watch::Receiver<bool>,
24    /// Background task handle for connection monitoring.
25    _monitor_task: tokio::task::JoinHandle<()>,
26}
27
28impl PostgresLockHandle {
29    pub(crate) fn new(
30        conn: PoolConnection<Postgres>,
31        is_transaction: bool,
32        key: PostgresAdvisoryLockKey,
33        _lost_sender: watch::Sender<bool>,
34        lost_receiver: watch::Receiver<bool>,
35        _keepalive_cadence: Option<std::time::Duration>,
36    ) -> Self {
37        // Monitoring is skipped as connection drop handles release
38        let monitor_task = tokio::spawn(async move {});
39
40        Self {
41            conn: Some(conn),
42            is_transaction,
43            key,
44            lost_receiver,
45            _monitor_task: monitor_task,
46        }
47    }
48}
49
50impl LockHandle for PostgresLockHandle {
51    fn lost_token(&self) -> &watch::Receiver<bool> {
52        &self.lost_receiver
53    }
54
55    #[instrument(skip(self), fields(backend = "postgres"))]
56    async fn release(mut self) -> LockResult<()> {
57        if let Some(mut conn) = self.conn.take() {
58            if self.is_transaction {
59                // Rollback transaction to release lock
60                match conn.execute("ROLLBACK").await {
61                    Ok(_) => tracing::debug!("Transaction rolled back successfully"),
62                    Err(e) => tracing::warn!("Failed to rollback transaction: {}", e),
63                }
64            } else {
65                // Explicitly release session-scoped lock
66                let sql = format!("SELECT pg_advisory_unlock({})", self.key.to_sql_args());
67                if let Err(e) = conn.execute(sql.as_str()).await {
68                    tracing::warn!("Failed to release lock explicitly: {}", e);
69                }
70            }
71        }
72        Ok(())
73    }
74}
75
76impl Drop for PostgresLockHandle {
77    fn drop(&mut self) {
78        self._monitor_task.abort();
79    }
80}