distributed_lock_postgres/
handle.rs1use tokio::sync::watch;
4use tracing::instrument;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8
9use crate::key::PostgresAdvisoryLockKey;
10use sqlx::pool::PoolConnection;
11use sqlx::{Postgres, Transaction};
12
13pub struct PostgresLockHandle {
15 _connection: Option<PostgresConnectionInner>,
18 key: PostgresAdvisoryLockKey,
20 lost_receiver: watch::Receiver<bool>,
22 _monitor_task: tokio::task::JoinHandle<()>,
24}
25
26pub(crate) enum PostgresConnectionInner {
27 Transaction(*mut Transaction<'static, Postgres>),
30 Connection(Box<PoolConnection<Postgres>>),
32}
33
34unsafe impl Send for PostgresConnectionInner {}
35unsafe impl Sync for PostgresConnectionInner {}
36
37impl PostgresLockHandle {
38 pub(crate) fn new(
39 connection: PostgresConnectionInner,
40 key: PostgresAdvisoryLockKey,
41 _lost_sender: watch::Sender<bool>,
42 lost_receiver: watch::Receiver<bool>,
43 _keepalive_cadence: Option<std::time::Duration>,
44 ) -> Self {
45 let monitor_task = tokio::spawn(async move {
51 });
53
54 Self {
55 _connection: Some(connection),
56 key,
57 lost_receiver,
58 _monitor_task: monitor_task,
59 }
60 }
61}
62
63impl LockHandle for PostgresLockHandle {
64 fn lost_token(&self) -> &watch::Receiver<bool> {
65 &self.lost_receiver
66 }
67
68 #[instrument(skip(self), fields(backend = "postgres"))]
69 async fn release(mut self) -> LockResult<()> {
70 let key = self.key;
71
72 if let Some(connection) = self._connection.take() {
76 match connection {
77 PostgresConnectionInner::Connection(mut conn) => {
78 let sql = format!("SELECT pg_advisory_unlock({})", key.to_sql_args());
80 let _ = sqlx::query(&sql).execute(&mut **conn).await;
81 }
82 PostgresConnectionInner::Transaction(transaction_ptr) => {
83 let transaction = unsafe { Box::from_raw(transaction_ptr) };
85 match transaction.rollback().await {
86 Ok(_) => {
87 tracing::debug!("Transaction rolled back successfully");
88 }
89 Err(e) => {
90 tracing::warn!("Failed to rollback transaction: {}", e);
91 }
92 }
93 }
95 }
96 }
97
98 Ok(())
100 }
101}
102
103impl Drop for PostgresLockHandle {
104 fn drop(&mut self) {
105 self._monitor_task.abort();
107 }
108}