1use anyhow::Result;
2use dashmap::DashMap;
3use futures::future;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{watch, Notify};
8use tokio::time;
9use tracing::debug;
10
11const LEASE_DROP_TIMEOUT_SECS: u64 = 5;
13
14pub struct Lease {
15 id: u32,
16 guid: String,
17 renter: Renter,
18 pub replace_rx: watch::Receiver<bool>,
19 inactive_notify: Arc<Notify>,
21}
22
23impl Drop for Lease {
24 fn drop(&mut self) {
25 self.inactive_notify.notify_waiters();
27
28 debug!(
30 "Unregistering lease for guid: {} with id: {}",
31 self.guid, self.id
32 );
33 if let Some((_, entry)) = self
34 .renter
35 .leases
36 .remove_if(&self.guid, |_, v| v.id == self.id)
37 {
38 debug!(
39 "Removed lease entry for guid: {} with id: {}",
40 self.guid, self.id
41 );
42 entry.tx.send(true).ok();
43 drop(entry.tx);
44 } else {
45 debug!(
46 "No matching lease found for guid: {} with id: {}",
47 self.guid, self.id
48 );
49 }
50 }
51}
52
53#[derive(Clone)]
54pub struct LeaseEntry {
55 id: u32,
56 tx: watch::Sender<bool>,
57 inactive_notify: Arc<Notify>,
59}
60
61#[derive(Clone)]
62pub struct Renter {
63 leases: Arc<DashMap<String, LeaseEntry>>,
64 counter: Arc<AtomicU32>,
65}
66
67impl Default for Renter {
68 fn default() -> Self {
69 Self::new()
70 }
71}
72
73impl Renter {
74 pub fn new() -> Self {
75 Self {
76 leases: Arc::new(DashMap::new()),
77 counter: Arc::new(AtomicU32::new(1)),
78 }
79 }
80
81 pub async fn acquire_lease(&self, guid: String) -> Result<Lease> {
82 if self.leases.contains_key(&guid) {
84 debug!("Found previous lease for guid: {}", guid);
85 self.drop_lease(&guid).await?;
86 }
87
88 let id = self.counter.fetch_add(1, Ordering::Relaxed);
90 debug!(
91 "Attempting to acquire lease for guid: {} with id: {}",
92 guid, id
93 );
94
95 let (lease_tx, lease_rx) = watch::channel(false);
96 let inactive_notify = Arc::new(Notify::new());
97 let entry = LeaseEntry {
98 id,
99 tx: lease_tx,
100 inactive_notify: inactive_notify.clone(),
101 };
102
103 self.leases.insert(guid.clone(), entry);
104
105 debug!("Lease acquired for guid: {} with id: {}", guid, id);
106
107 Ok(Lease {
108 guid: guid.clone(),
109 id,
110 renter: self.clone(),
111 replace_rx: lease_rx,
112 inactive_notify,
113 })
114 }
115
116 pub async fn drop_lease(&self, guid: &str) -> Result<()> {
117 debug!("Dropping lease for guid: {}", guid);
118 if let Some(entry_ref) = self.leases.get(guid) {
119 let id = entry_ref.id;
120 let tx = entry_ref.tx.clone();
121 let inactive_notify = entry_ref.inactive_notify.clone();
122 drop(entry_ref);
123
124 debug!(
125 "Sending preempt signal to lease (guid: {}, id: {})",
126 guid, id
127 );
128
129 if tx.send(true).is_err() {
131 debug!(
132 "Lease already inactive when sending preempt (guid: {}, id: {})",
133 guid, id
134 );
135 } else {
136 debug!(
137 "Waiting for lease to become inactive (guid: {}, id: {})",
138 guid, id
139 );
140
141 let wait_result = time::timeout(
143 Duration::from_secs(LEASE_DROP_TIMEOUT_SECS),
144 inactive_notify.notified(),
145 )
146 .await;
147
148 match wait_result {
149 Ok(_) => {
150 debug!("Lease became inactive (guid: {}, id: {})", guid, id);
151 }
152 Err(_) => {
153 debug!(
154 "Timeout waiting for lease to become inactive (guid: {}, id: {})",
155 guid, id
156 );
157 return Err(anyhow::anyhow!("Failed to close lease"));
158 }
159 }
160 }
161
162 let _ = self.leases.remove_if(guid, |_, v| v.id == id);
164 }
165 Ok(())
166 }
167
168 pub async fn drop_all(&self) {
169 debug!("Dropping all leases");
170 let guids: Vec<String> = self.iter().collect();
171
172 let handles: Vec<_> = guids
173 .into_iter()
174 .map(|guid| {
175 let renter = self.clone();
176 tokio::spawn(async move {
177 let _ = renter.drop_lease(&guid).await;
178 })
179 })
180 .collect();
181
182 let _ = future::join_all(handles).await;
184 }
185
186 pub fn iter(&self) -> impl Iterator<Item = String> {
187 self.leases
188 .iter()
189 .map(|entry| entry.key().clone())
190 .collect::<Vec<_>>()
191 .into_iter()
192 }
193
194 pub fn has_lease(&self, guid: &str) -> bool {
195 self.leases.contains_key(guid)
196 }
197
198 pub fn len(&self) -> usize {
199 self.leases.len()
200 }
201
202 pub fn is_empty(&self) -> bool {
203 self.leases.is_empty()
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use super::*;
210
211 #[tokio::test]
212 async fn acquire_times_out_when_prev_not_dropped() {
213 let renter = Renter::new();
214
215 let l1 = renter.acquire_lease("g1".to_string()).await.unwrap();
216
217 let renter2 = renter.clone();
219 let handle = tokio::spawn(async move { renter2.acquire_lease("g1".to_string()).await });
220
221 let mut rx = l1.replace_rx.clone();
224 rx.changed().await.unwrap();
225 assert!(
226 *rx.borrow(),
227 "Previous holder should have received preempt signal"
228 );
229
230 let res = handle.await.unwrap();
232 assert!(
233 res.is_err(),
234 "Second acquire should timeout when previous holder does not drop"
235 );
236
237 assert!(renter.has_lease("g1"));
239
240 drop(l1);
242 assert!(
243 !renter.has_lease("g1"),
244 "Lease should be unregistered after dropping"
245 );
246 }
247
248 #[tokio::test]
249 async fn acquire_succeeds_after_prev_drops() {
250 let renter = Renter::new();
251
252 let l1 = renter.acquire_lease("g2".to_string()).await.unwrap();
253
254 let renter2 = renter.clone();
256 let handle = tokio::spawn(async move { renter2.acquire_lease("g2".to_string()).await });
257
258 tokio::time::sleep(Duration::from_millis(100)).await;
260
261 let mut rx = l1.replace_rx.clone();
263 let changed = rx.changed().await;
264 assert!(changed.is_ok(), "Should receive preempt signal");
265 assert!(*rx.borrow(), "Preempt signal should be true");
266
267 drop(l1);
269
270 tokio::time::sleep(Duration::from_millis(100)).await;
272
273 let l2 = handle
274 .await
275 .unwrap()
276 .expect("Second acquire should succeed after previous drops");
277
278 assert!(renter.has_lease("g2"), "New lease should be registered");
279
280 drop(l2);
281 assert!(
282 !renter.has_lease("g2"),
283 "Lease should be unregistered after dropping new lease"
284 );
285 }
286
287 #[tokio::test]
288 async fn drop_all_removes_all_leases() {
289 let renter = Renter::new();
290
291 let l1 = renter.acquire_lease("g1".to_string()).await.unwrap();
293 let l2 = renter.acquire_lease("g2".to_string()).await.unwrap();
294 let l3 = renter.acquire_lease("g3".to_string()).await.unwrap();
295
296 assert_eq!(renter.len(), 3, "Should have 3 leases");
297 assert!(renter.has_lease("g1"));
298 assert!(renter.has_lease("g2"));
299 assert!(renter.has_lease("g3"));
300
301 let mut rx1 = l1.replace_rx.clone();
303 let mut rx2 = l2.replace_rx.clone();
304 let mut rx3 = l3.replace_rx.clone();
305
306 let renter2 = renter.clone();
308 let handle = tokio::spawn(async move {
309 renter2.drop_all().await;
310 });
311
312 tokio::select! {
314 _ = rx1.changed() => assert!(*rx1.borrow(), "l1 should receive preempt signal"),
315 _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l1 preempt"),
316 }
317 tokio::select! {
318 _ = rx2.changed() => assert!(*rx2.borrow(), "l2 should receive preempt signal"),
319 _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l2 preempt"),
320 }
321 tokio::select! {
322 _ = rx3.changed() => assert!(*rx3.borrow(), "l3 should receive preempt signal"),
323 _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l3 preempt"),
324 }
325
326 drop(l1);
328 drop(l2);
329 drop(l3);
330
331 handle.await.unwrap();
333
334 assert_eq!(renter.len(), 0, "Should have 0 leases after drop_all");
336 assert!(!renter.has_lease("g1"));
337 assert!(!renter.has_lease("g2"));
338 assert!(!renter.has_lease("g3"));
339 }
340}