dynamodb_lease/lease.rs
1use crate::Client;
2use std::sync::Arc;
3use tokio::sync::{Mutex, OwnedMutexGuard};
4use uuid::Uuid;
5
6/// Represents a held distributed lease & background task to
7/// continuously try to extend it until dropped.
8///
9/// On drop asynchronously releases the underlying lock.
10#[derive(Debug)]
11pub struct Lease {
12 client: Client,
13 /// Note: The extension tasks holds an **exclusive** weak references to
14 /// this (that exclusivity is used to indicate that the task is still alive).
15 key_lease_v: Arc<(String, Mutex<Uuid>)>,
16 /// A local guard to avoid db contention for leases within the same client.
17 local_guard: Option<OwnedMutexGuard<()>>,
18 release_on_drop: bool,
19}
20
21impl Lease {
22 pub(crate) fn new(client: Client, key: String, lease_v: Uuid) -> Self {
23 let lease = Self {
24 client,
25 key_lease_v: Arc::new((key, Mutex::new(lease_v))),
26 local_guard: None,
27 release_on_drop: true,
28 };
29
30 start_periodically_extending(&lease);
31
32 lease
33 }
34
35 pub(crate) fn with_local_guard(mut self, guard: OwnedMutexGuard<()>) -> Self {
36 self.local_guard = Some(guard);
37 self
38 }
39
40 /// Releases the lease returning `Ok(())` after successful deletion.
41 ///
42 /// Note: The local guard is unlocked **first** before deleting the lease.
43 /// This avoids other concurrent acquires in the same process being unfairly
44 /// advantaged in acquiring subsequent leases and potentially causing other
45 /// processes to be starved.
46 ///
47 /// If you await this method then immediately acquire a lease,
48 /// e.g. inside a loop, you are acquiring with an unfair advantage vs other process
49 /// attempts. This may lead to other process being starved of leases.
50 pub async fn release(mut self) -> anyhow::Result<()> {
51 // disable release on drop since we're doing that now
52 self.release_on_drop = false;
53
54 let (key, lease_v) = &*self.key_lease_v;
55
56 drop(self.local_guard.take());
57 self.client.try_clean_local_lock(key.clone());
58
59 let lease_v = lease_v.lock().await;
60 self.client.delete_lease(key.clone(), *lease_v).await?;
61 drop(lease_v); // hold v-lock during deletion to ensure no race with `extend_lease`
62 Ok(())
63 }
64
65 /// Returns `true` if the lease periodic extension task is still running.
66 ///
67 /// If lease extension fails, e.g. due to lost contact with the db, this
68 /// will return `false`.
69 pub fn is_healthy(&self) -> bool {
70 // The `start_periodically_extending` task holds an exclusive weak ref
71 // to this field, so if the weak count is zero we know this task has died.
72 Arc::weak_count(&self.key_lease_v) != 0
73 }
74}
75
76fn start_periodically_extending(lease: &Lease) {
77 let key_lease_v = Arc::downgrade(&lease.key_lease_v);
78 let client = lease.client.clone();
79 tokio::spawn(async move {
80 let mut extend_interval = tokio::time::interval(client.extend_period);
81 extend_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
82 extend_interval.reset();
83 loop {
84 extend_interval.tick().await;
85
86 match key_lease_v.upgrade() {
87 Some(key_lease_v) => {
88 let mut lease_v = key_lease_v.1.lock().await;
89 let key = key_lease_v.0.clone();
90 match client.extend_lease(key, *lease_v).await {
91 Ok(new_lease_v) => {
92 *lease_v = new_lease_v;
93 }
94 // stop on error, TODO retries, logs?
95 Err(_) => break,
96 }
97 }
98 // lease dropped
99 None => break,
100 }
101 }
102 });
103}
104
105impl Drop for Lease {
106 /// Asynchronously releases the underlying lock.
107 fn drop(&mut self) {
108 if self.release_on_drop {
109 // Clone necessary data before moving self into the spawned task
110 let lease = Lease {
111 client: self.client.clone(),
112 key_lease_v: Arc::clone(&self.key_lease_v),
113 local_guard: self.local_guard.take(), // Take ownership of the guard
114 release_on_drop: false,
115 };
116 tokio::spawn(async move {
117 // TODO retries, logs?
118 _ = lease.release().await;
119 });
120 }
121 }
122}