distributed_lock_redis/
handle.rs1use std::sync::Arc;
4use std::time::Duration;
5
6use distributed_lock_core::error::LockResult;
7use distributed_lock_core::traits::LockHandle;
8use fred::prelude::*;
9use tokio::sync::watch;
10use tracing::instrument;
11
12use crate::lock::RedisLockState;
13use crate::redlock::{extend::extend_redlock, release::release_redlock};
14
15pub struct RedisLockHandle {
20 state: Arc<RedisLockState>,
22 acquire_results: Arc<Vec<bool>>,
24 clients: Arc<Vec<RedisClient>>,
26 #[allow(dead_code)]
28 extension_cadence: Duration,
29 #[allow(dead_code)]
31 expiry: Duration,
32 lost_receiver: watch::Receiver<bool>,
34 extension_task: tokio::task::JoinHandle<()>,
36}
37
38impl RedisLockHandle {
39 pub(crate) fn new(
41 state: RedisLockState,
42 acquire_results: Vec<bool>,
43 clients: Vec<RedisClient>,
44 extension_cadence: Duration,
45 expiry: Duration,
46 ) -> Self {
47 let state = Arc::new(state);
48 let acquire_results = Arc::new(acquire_results);
49 let clients = Arc::new(clients);
50 let (lost_sender, lost_receiver) = watch::channel(false);
51
52 let state_clone = state.clone();
54 let acquire_results_clone = acquire_results.clone();
55 let clients_clone = clients.clone();
56 let extension_cadence_clone = extension_cadence;
57
58 let extension_task = tokio::spawn(async move {
60 let mut interval = tokio::time::interval(extension_cadence_clone);
61 interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
62
63 loop {
64 interval.tick().await;
65
66 if lost_sender.is_closed() {
68 break;
69 }
70
71 let (_cancel_sender, cancel_receiver) = watch::channel(false);
73
74 let state_for_extend = state_clone.clone();
76 match extend_redlock(
77 move |client| {
78 let state = state_for_extend.clone();
79 let client = client.clone();
80 async move { state.try_extend(&client).await }
81 },
82 &clients_clone,
83 &acquire_results_clone,
84 &state_clone.timeouts,
85 &cancel_receiver,
86 )
87 .await
88 {
89 Ok(Some(true)) => {
90 continue;
92 }
93 Ok(Some(false)) => {
94 let _ = lost_sender.send(true);
96 break;
97 }
98 Ok(None) => {
99 continue;
101 }
102 Err(_) => {
103 let _ = lost_sender.send(true);
105 break;
106 }
107 }
108 }
109 });
110
111 Self {
112 state,
113 acquire_results,
114 clients,
115 extension_cadence,
116 expiry,
117 lost_receiver,
118 extension_task,
119 }
120 }
121}
122
123impl LockHandle for RedisLockHandle {
124 fn lost_token(&self) -> &watch::Receiver<bool> {
125 &self.lost_receiver
126 }
127
128 #[instrument(skip(self), fields(lock.key = %self.state.key, backend = "redis"))]
129 async fn release(self) -> LockResult<()> {
130 self.extension_task.abort();
132 let state = self.state.clone();
136 let clients = self.clients.clone();
137 let acquire_results = self.acquire_results.clone();
138 release_redlock(
139 move |client| {
140 let state = state.clone();
141 let client = client.clone();
142 async move { state.try_release(&client).await }
143 },
144 &clients,
145 &acquire_results,
146 )
147 .await
148 }
149}
150
151impl Drop for RedisLockHandle {
152 fn drop(&mut self) {
153 self.extension_task.abort();
155 }
158}