cloudpub_common/
lease.rs

1use anyhow::Result;
2use dashmap::DashMap;
3use futures::future;
4use std::sync::atomic::{AtomicU32, Ordering};
5use std::sync::Arc;
6use std::time::Duration;
7use tokio::sync::{watch, Notify};
8use tokio::time;
9use tracing::debug;
10
11/// Timeout for waiting for lease to become inactive during drop
12const LEASE_DROP_TIMEOUT_SECS: u64 = 5;
13
14pub struct Lease {
15    id: u32,
16    guid: String,
17    renter: Renter,
18    pub replace_rx: watch::Receiver<bool>,
19    // Notifier to wake up tasks waiting for this lease to drop
20    inactive_notify: Arc<Notify>,
21}
22
23impl Drop for Lease {
24    fn drop(&mut self) {
25        // Notify all waiters that this lease is now inactive
26        self.inactive_notify.notify_waiters();
27
28        // Unregister the lease from the renter
29        debug!(
30            "Unregistering lease for guid: {} with id: {}",
31            self.guid, self.id
32        );
33        if let Some((_, entry)) = self
34            .renter
35            .leases
36            .remove_if(&self.guid, |_, v| v.id == self.id)
37        {
38            debug!(
39                "Removed lease entry for guid: {} with id: {}",
40                self.guid, self.id
41            );
42            entry.tx.send(true).ok();
43            drop(entry.tx);
44        } else {
45            debug!(
46                "No matching lease found for guid: {} with id: {}",
47                self.guid, self.id
48            );
49        }
50    }
51}
52
53#[derive(Clone)]
54pub struct LeaseEntry {
55    id: u32,
56    tx: watch::Sender<bool>,
57    // Notifier to wake up tasks waiting for this lease to become inactive
58    inactive_notify: Arc<Notify>,
59}
60
61#[derive(Clone)]
62pub struct Renter {
63    leases: Arc<DashMap<String, LeaseEntry>>,
64    counter: Arc<AtomicU32>,
65}
66
67impl Default for Renter {
68    fn default() -> Self {
69        Self::new()
70    }
71}
72
73impl Renter {
74    pub fn new() -> Self {
75        Self {
76            leases: Arc::new(DashMap::new()),
77            counter: Arc::new(AtomicU32::new(1)),
78        }
79    }
80
81    pub async fn acquire_lease(&self, guid: String) -> Result<Lease> {
82        // First, check if there's an existing lease and shut it down before creating new one
83        if self.leases.contains_key(&guid) {
84            debug!("Found previous lease for guid: {}", guid);
85            self.drop_lease(&guid).await?;
86        }
87
88        // Now that previous lease is gone, create and insert new lease
89        let id = self.counter.fetch_add(1, Ordering::Relaxed);
90        debug!(
91            "Attempting to acquire lease for guid: {} with id: {}",
92            guid, id
93        );
94
95        let (lease_tx, lease_rx) = watch::channel(false);
96        let inactive_notify = Arc::new(Notify::new());
97        let entry = LeaseEntry {
98            id,
99            tx: lease_tx,
100            inactive_notify: inactive_notify.clone(),
101        };
102
103        self.leases.insert(guid.clone(), entry);
104
105        debug!("Lease acquired for guid: {} with id: {}", guid, id);
106
107        Ok(Lease {
108            guid: guid.clone(),
109            id,
110            renter: self.clone(),
111            replace_rx: lease_rx,
112            inactive_notify,
113        })
114    }
115
116    pub async fn drop_lease(&self, guid: &str) -> Result<()> {
117        debug!("Dropping lease for guid: {}", guid);
118        if let Some(entry_ref) = self.leases.get(guid) {
119            let id = entry_ref.id;
120            let tx = entry_ref.tx.clone();
121            let inactive_notify = entry_ref.inactive_notify.clone();
122            drop(entry_ref);
123
124            debug!(
125                "Sending preempt signal to lease (guid: {}, id: {})",
126                guid, id
127            );
128
129            // Send preempt signal to owner
130            if tx.send(true).is_err() {
131                debug!(
132                    "Lease already inactive when sending preempt (guid: {}, id: {})",
133                    guid, id
134                );
135            } else {
136                debug!(
137                    "Waiting for lease to become inactive (guid: {}, id: {})",
138                    guid, id
139                );
140
141                // Wait for the lease holder to complete cleanup
142                let wait_result = time::timeout(
143                    Duration::from_secs(LEASE_DROP_TIMEOUT_SECS),
144                    inactive_notify.notified(),
145                )
146                .await;
147
148                match wait_result {
149                    Ok(_) => {
150                        debug!("Lease became inactive (guid: {}, id: {})", guid, id);
151                    }
152                    Err(_) => {
153                        debug!(
154                            "Timeout waiting for lease to become inactive (guid: {}, id: {})",
155                            guid, id
156                        );
157                        return Err(anyhow::anyhow!("Failed to close lease"));
158                    }
159                }
160            }
161
162            // Remove only if it's still the same generation
163            let _ = self.leases.remove_if(guid, |_, v| v.id == id);
164        }
165        Ok(())
166    }
167
168    pub async fn drop_all(&self) {
169        debug!("Dropping all leases");
170        let guids: Vec<String> = self.iter().collect();
171
172        let handles: Vec<_> = guids
173            .into_iter()
174            .map(|guid| {
175                let renter = self.clone();
176                tokio::spawn(async move {
177                    let _ = renter.drop_lease(&guid).await;
178                })
179            })
180            .collect();
181
182        // Wait for all drop operations to complete concurrently
183        let _ = future::join_all(handles).await;
184    }
185
186    pub fn iter(&self) -> impl Iterator<Item = String> {
187        self.leases
188            .iter()
189            .map(|entry| entry.key().clone())
190            .collect::<Vec<_>>()
191            .into_iter()
192    }
193
194    pub fn has_lease(&self, guid: &str) -> bool {
195        self.leases.contains_key(guid)
196    }
197
198    pub fn len(&self) -> usize {
199        self.leases.len()
200    }
201
202    pub fn is_empty(&self) -> bool {
203        self.leases.is_empty()
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use super::*;
210
211    #[tokio::test]
212    async fn acquire_times_out_when_prev_not_dropped() {
213        let renter = Renter::new();
214
215        let l1 = renter.acquire_lease("g1".to_string()).await.unwrap();
216
217        // Start a competing acquire that will time out because l1 keeps the receiver alive.
218        let renter2 = renter.clone();
219        let handle = tokio::spawn(async move { renter2.acquire_lease("g1".to_string()).await });
220
221        // Wait for the timeout to occur (15 seconds)
222        // We can check for the preempt signal first to confirm it was sent
223        let mut rx = l1.replace_rx.clone();
224        rx.changed().await.unwrap();
225        assert!(
226            *rx.borrow(),
227            "Previous holder should have received preempt signal"
228        );
229
230        // Wait for the spawned task to complete (it should timeout)
231        let res = handle.await.unwrap();
232        assert!(
233            res.is_err(),
234            "Second acquire should timeout when previous holder does not drop"
235        );
236
237        // The original lease should still be registered because it was never dropped.
238        assert!(renter.has_lease("g1"));
239
240        // After dropping l1, the lease should be unregistered
241        drop(l1);
242        assert!(
243            !renter.has_lease("g1"),
244            "Lease should be unregistered after dropping"
245        );
246    }
247
248    #[tokio::test]
249    async fn acquire_succeeds_after_prev_drops() {
250        let renter = Renter::new();
251
252        let l1 = renter.acquire_lease("g2".to_string()).await.unwrap();
253
254        // Start competing acquire; it will wait for previous to close.
255        let renter2 = renter.clone();
256        let handle = tokio::spawn(async move { renter2.acquire_lease("g2".to_string()).await });
257
258        // Give the spawned task a moment to start and attempt acquisition
259        tokio::time::sleep(Duration::from_millis(100)).await;
260
261        // Wait until l1 is told to preempt, then drop it to allow the new acquire to proceed.
262        let mut rx = l1.replace_rx.clone();
263        let changed = rx.changed().await;
264        assert!(changed.is_ok(), "Should receive preempt signal");
265        assert!(*rx.borrow(), "Preempt signal should be true");
266
267        // Now drop l1 which should unregister the lease and close the channel
268        drop(l1);
269
270        // Give a moment for the drop to complete
271        tokio::time::sleep(Duration::from_millis(100)).await;
272
273        let l2 = handle
274            .await
275            .unwrap()
276            .expect("Second acquire should succeed after previous drops");
277
278        assert!(renter.has_lease("g2"), "New lease should be registered");
279
280        drop(l2);
281        assert!(
282            !renter.has_lease("g2"),
283            "Lease should be unregistered after dropping new lease"
284        );
285    }
286
287    #[tokio::test]
288    async fn drop_all_removes_all_leases() {
289        let renter = Renter::new();
290
291        // Acquire multiple leases
292        let l1 = renter.acquire_lease("g1".to_string()).await.unwrap();
293        let l2 = renter.acquire_lease("g2".to_string()).await.unwrap();
294        let l3 = renter.acquire_lease("g3".to_string()).await.unwrap();
295
296        assert_eq!(renter.len(), 3, "Should have 3 leases");
297        assert!(renter.has_lease("g1"));
298        assert!(renter.has_lease("g2"));
299        assert!(renter.has_lease("g3"));
300
301        // Keep receivers alive so we can detect preempt signals
302        let mut rx1 = l1.replace_rx.clone();
303        let mut rx2 = l2.replace_rx.clone();
304        let mut rx3 = l3.replace_rx.clone();
305
306        // Call drop_all
307        let renter2 = renter.clone();
308        let handle = tokio::spawn(async move {
309            renter2.drop_all().await;
310        });
311
312        // Verify all leases receive preempt signals
313        tokio::select! {
314            _ = rx1.changed() => assert!(*rx1.borrow(), "l1 should receive preempt signal"),
315            _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l1 preempt"),
316        }
317        tokio::select! {
318            _ = rx2.changed() => assert!(*rx2.borrow(), "l2 should receive preempt signal"),
319            _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l2 preempt"),
320        }
321        tokio::select! {
322            _ = rx3.changed() => assert!(*rx3.borrow(), "l3 should receive preempt signal"),
323            _ = tokio::time::sleep(Duration::from_secs(1)) => panic!("Timeout waiting for l3 preempt"),
324        }
325
326        // Drop the leases so drop_all can complete
327        drop(l1);
328        drop(l2);
329        drop(l3);
330
331        // Wait for drop_all to complete
332        handle.await.unwrap();
333
334        // Verify all leases are removed
335        assert_eq!(renter.len(), 0, "Should have 0 leases after drop_all");
336        assert!(!renter.has_lease("g1"));
337        assert!(!renter.has_lease("g2"));
338        assert!(!renter.has_lease("g3"));
339    }
340}