distributed_lock_postgres/
handle.rs1use tokio::sync::watch;
4use tracing::instrument;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8
9use crate::key::PostgresAdvisoryLockKey;
10use sqlx::pool::PoolConnection;
11use sqlx::{Executor, Postgres};
12
13pub struct PostgresLockHandle {
15 conn: Option<PoolConnection<Postgres>>,
18 is_transaction: bool,
20 key: PostgresAdvisoryLockKey,
22 lost_receiver: watch::Receiver<bool>,
24 _monitor_task: tokio::task::JoinHandle<()>,
26}
27
28impl PostgresLockHandle {
29 pub(crate) fn new(
30 conn: PoolConnection<Postgres>,
31 is_transaction: bool,
32 key: PostgresAdvisoryLockKey,
33 _lost_sender: watch::Sender<bool>,
34 lost_receiver: watch::Receiver<bool>,
35 _keepalive_cadence: Option<std::time::Duration>,
36 ) -> Self {
37 let monitor_task = tokio::spawn(async move {});
39
40 Self {
41 conn: Some(conn),
42 is_transaction,
43 key,
44 lost_receiver,
45 _monitor_task: monitor_task,
46 }
47 }
48}
49
50impl LockHandle for PostgresLockHandle {
51 fn lost_token(&self) -> &watch::Receiver<bool> {
52 &self.lost_receiver
53 }
54
55 #[instrument(skip(self), fields(backend = "postgres"))]
56 async fn release(mut self) -> LockResult<()> {
57 if let Some(mut conn) = self.conn.take() {
58 if self.is_transaction {
59 match conn.execute("ROLLBACK").await {
61 Ok(_) => tracing::debug!("Transaction rolled back successfully"),
62 Err(e) => tracing::warn!("Failed to rollback transaction: {}", e),
63 }
64 } else {
65 let sql = format!("SELECT pg_advisory_unlock({})", self.key.to_sql_args());
67 if let Err(e) = conn.execute(sql.as_str()).await {
68 tracing::warn!("Failed to release lock explicitly: {}", e);
69 }
70 }
71 }
72 Ok(())
73 }
74}
75
76impl Drop for PostgresLockHandle {
77 fn drop(&mut self) {
78 self._monitor_task.abort();
79 }
80}