distributed_lock_redis/
handle.rs

1//! Redis lock handle implementation.
2
3use std::sync::Arc;
4use std::time::Duration;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8use fred::prelude::*;
9use tokio::sync::watch;
10use tracing::instrument;
11
12use crate::lock::RedisLockState;
13use crate::redlock::{extend::extend_redlock, release::release_redlock};
14
15/// Handle for a held Redis lock.
16///
17/// The lock is automatically extended in the background while this handle exists.
18/// Dropping or releasing the handle stops extension and releases the lock.
19pub struct RedisLockHandle {
20    /// Lock state.
21    state: Arc<RedisLockState>,
22    /// Acquire results indexed by client position.
23    acquire_results: Arc<Vec<bool>>,
24    /// Redis clients.
25    clients: Arc<Vec<RedisClient>>,
26    /// Extension cadence.
27    #[allow(dead_code)]
28    extension_cadence: Duration,
29    /// Lock expiry duration.
30    #[allow(dead_code)]
31    expiry: Duration,
32    /// Watch channel for lock lost detection.
33    lost_receiver: watch::Receiver<bool>,
34    /// Background extension task handle.
35    extension_task: tokio::task::JoinHandle<()>,
36}
37
38impl RedisLockHandle {
39    /// Creates a new lock handle.
40    pub(crate) fn new(
41        state: RedisLockState,
42        acquire_results: Vec<bool>,
43        clients: Vec<RedisClient>,
44        extension_cadence: Duration,
45        expiry: Duration,
46    ) -> Self {
47        let state = Arc::new(state);
48        let acquire_results = Arc::new(acquire_results);
49        let clients = Arc::new(clients);
50        let (lost_sender, lost_receiver) = watch::channel(false);
51
52        // Clone for background task
53        let state_clone = state.clone();
54        let acquire_results_clone = acquire_results.clone();
55        let clients_clone = clients.clone();
56        let extension_cadence_clone = extension_cadence;
57
58        // Spawn background task for lock extension
59        let extension_task = tokio::spawn(async move {
60            let mut interval = tokio::time::interval(extension_cadence_clone);
61            interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63            loop {
64                interval.tick().await;
65
66                // Check if we should stop (receiver closed)
67                if lost_sender.is_closed() {
68                    break;
69                }
70
71                // Create cancellation token (not used for extend, but required by API)
72                let (_cancel_sender, cancel_receiver) = watch::channel(false);
73
74                // Extend the lock
75                let state_for_extend = state_clone.clone();
76                match extend_redlock(
77                    move |client| {
78                        let state = state_for_extend.clone();
79                        let client = client.clone();
80                        async move { state.try_extend(&client).await }
81                    },
82                    &clients_clone,
83                    &acquire_results_clone,
84                    &state_clone.timeouts,
85                    &cancel_receiver,
86                )
87                .await
88                {
89                    Ok(Some(true)) => {
90                        // Successfully extended
91                        continue;
92                    }
93                    Ok(Some(false)) => {
94                        // Failed to extend - lock lost
95                        let _ = lost_sender.send(true);
96                        break;
97                    }
98                    Ok(None) => {
99                        // Inconclusive - continue trying
100                        continue;
101                    }
102                    Err(_) => {
103                        // Error extending - lock lost
104                        let _ = lost_sender.send(true);
105                        break;
106                    }
107                }
108            }
109        });
110
111        Self {
112            state,
113            acquire_results,
114            clients,
115            extension_cadence,
116            expiry,
117            lost_receiver,
118            extension_task,
119        }
120    }
121}
122
123impl LockHandle for RedisLockHandle {
124    fn lost_token(&self) -> &watch::Receiver<bool> {
125        &self.lost_receiver
126    }
127
128    #[instrument(skip(self), fields(lock.key = %self.state.key, backend = "redis"))]
129    async fn release(self) -> LockResult<()> {
130        // Abort the extension task
131        self.extension_task.abort();
132        // Don't await - just abort and continue
133
134        // Release the lock on all clients
135        let state = self.state.clone();
136        let clients = self.clients.clone();
137        let acquire_results = self.acquire_results.clone();
138        release_redlock(
139            move |client| {
140                let state = state.clone();
141                let client = client.clone();
142                async move { state.try_release(&client).await }
143            },
144            &clients,
145            &acquire_results,
146        )
147        .await
148    }
149}
150
151impl Drop for RedisLockHandle {
152    fn drop(&mut self) {
153        // Abort extension task
154        self.extension_task.abort();
155        // Note: We can't async release in Drop, so the lock will expire naturally
156        // For proper cleanup, users should call release() explicitly
157    }
158}