dynamodb_lease/
lease.rs

1use crate::Client;
2use std::sync::Arc;
3use tokio::sync::{Mutex, OwnedMutexGuard};
4use uuid::Uuid;
5
6/// Represents a held distributed lease & background task to
7/// continuously try to extend it until dropped.
8///
9/// On drop asynchronously releases the underlying lock.
10#[derive(Debug)]
11pub struct Lease {
12    client: Client,
13    /// Note: The extension tasks holds an **exclusive** weak references to
14    /// this (that exclusivity is used to indicate that the task is still alive).
15    key_lease_v: Arc<(String, Mutex<Uuid>)>,
16    /// A local guard to avoid db contention for leases within the same client.
17    local_guard: Option<OwnedMutexGuard<()>>,
18    release_on_drop: bool,
19}
20
21impl Lease {
22    pub(crate) fn new(client: Client, key: String, lease_v: Uuid) -> Self {
23        let lease = Self {
24            client,
25            key_lease_v: Arc::new((key, Mutex::new(lease_v))),
26            local_guard: None,
27            release_on_drop: true,
28        };
29
30        start_periodically_extending(&lease);
31
32        lease
33    }
34
35    pub(crate) fn with_local_guard(mut self, guard: OwnedMutexGuard<()>) -> Self {
36        self.local_guard = Some(guard);
37        self
38    }
39
40    /// Releases the lease returning `Ok(())` after successful deletion.
41    ///
42    /// Note: The local guard is unlocked **first** before deleting the lease.
43    /// This avoids other concurrent acquires in the same process being unfairly
44    /// advantaged in acquiring subsequent leases and potentially causing other
45    /// processes to be starved.
46    ///
47    /// If you await this method then immediately acquire a lease,
48    /// e.g. inside a loop, you are acquiring with an unfair advantage vs other process
49    /// attempts. This may lead to other process being starved of leases.
50    pub async fn release(mut self) -> anyhow::Result<()> {
51        // disable release on drop since we're doing that now
52        self.release_on_drop = false;
53
54        let (key, lease_v) = &*self.key_lease_v;
55
56        drop(self.local_guard.take());
57        self.client.try_clean_local_lock(key.clone());
58
59        let lease_v = lease_v.lock().await;
60        self.client.delete_lease(key.clone(), *lease_v).await?;
61        drop(lease_v); // hold v-lock during deletion to ensure no race with `extend_lease`
62        Ok(())
63    }
64
65    /// Returns `true` if the lease periodic extension task is still running.
66    ///
67    /// If lease extension fails, e.g. due to lost contact with the db, this
68    /// will return `false`.
69    pub fn is_healthy(&self) -> bool {
70        // The `start_periodically_extending` task holds an exclusive weak ref
71        // to this field, so if the weak count is zero we know this task has died.
72        Arc::weak_count(&self.key_lease_v) != 0
73    }
74}
75
76fn start_periodically_extending(lease: &Lease) {
77    let key_lease_v = Arc::downgrade(&lease.key_lease_v);
78    let client = lease.client.clone();
79    tokio::spawn(async move {
80        let mut extend_interval = tokio::time::interval(client.extend_period);
81        extend_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
82        extend_interval.reset();
83        loop {
84            extend_interval.tick().await;
85
86            match key_lease_v.upgrade() {
87                Some(key_lease_v) => {
88                    let mut lease_v = key_lease_v.1.lock().await;
89                    let key = key_lease_v.0.clone();
90                    match client.extend_lease(key, *lease_v).await {
91                        Ok(new_lease_v) => {
92                            *lease_v = new_lease_v;
93                        }
94                        // stop on error, TODO retries, logs?
95                        Err(_) => break,
96                    }
97                }
98                // lease dropped
99                None => break,
100            }
101        }
102    });
103}
104
105impl Drop for Lease {
106    /// Asynchronously releases the underlying lock.
107    fn drop(&mut self) {
108        if self.release_on_drop {
109            // Clone necessary data before moving self into the spawned task
110            let lease = Lease {
111                client: self.client.clone(),
112                key_lease_v: Arc::clone(&self.key_lease_v),
113                local_guard: self.local_guard.take(), // Take ownership of the guard
114                release_on_drop: false,
115            };
116            tokio::spawn(async move {
117                // TODO retries, logs?
118                _ = lease.release().await;
119            });
120        }
121    }
122}