dlock/providers/
dynamodb.rs

1// SPDX-FileCopyrightText: 2025 Abe Kohandel <abe@kodebooth.com>
2// SPDX-License-Identifier: MIT
3
4use std::{
5    collections::HashMap,
6    time::{Duration, Instant},
7};
8
9use aws_sdk_dynamodb::{
10    Client,
11    error::SdkError,
12    operation::{
13        delete_item::DeleteItemError,
14        put_item::{PutItemError, PutItemOutput},
15        update_item::{UpdateItemError, UpdateItemOutput},
16    },
17    types::{AttributeValue, ReturnValue, ReturnValuesOnConditionCheckFailure},
18};
19use bon::Builder;
20use serde::{Deserialize, Serialize};
21use serde_dynamo::{aws_sdk_dynamodb_1::from_item, to_item};
22use tracing::debug;
23use uuid::Uuid;
24
25use crate::{
26    error::DLockError,
27    providers::{Lease, Provider},
28};
29
30/// [DynamoDB](aws_sdk_dynamodb) provider for [DLock](crate::DLock) implementation
31#[derive(Builder, Debug, Clone)]
32pub struct DynamodbProvider {
33    client: Client,
34    table_name: String,
35}
36
37impl DynamodbProvider {
38    pub const NAME_ATTRIBUTE: &str = "lock_name";
39    pub const LEASE_ATTRIBUTE: &str = "lease";
40    pub const TOKEN_ATTRIBUTE: &str = "token";
41
42    async fn acquire_non_existing(
43        &self,
44        name: &str,
45        owner: &str,
46        duration: &Duration,
47    ) -> Result<DynamodbLease, DLockError<DynamodbRetry>> {
48        let lock = DynamodbLockItem {
49            lease: Uuid::new_v4(),
50            owner: owner.to_string(),
51            duration: *duration,
52            name: name.to_string(),
53            token: 0,
54        };
55
56        let item = to_item(lock.clone()).unwrap();
57        self.client
58            .put_item()
59            .table_name(self.table_name.clone())
60            .set_item(Some(item))
61            .set_return_values_on_condition_check_failure(Some(
62                ReturnValuesOnConditionCheckFailure::AllOld,
63            ))
64            .condition_expression(format!("attribute_not_exists({})", Self::NAME_ATTRIBUTE))
65            .send()
66            .await
67            .map_err(|sdk_error| match &sdk_error {
68                SdkError::ServiceError(e) => match e.err() {
69                    PutItemError::ConditionalCheckFailedException(e) => {
70                        debug!("Acquiring non-existing lock failed {:?}", lock);
71                        let item: DynamodbLockItem = e.item().unwrap().into();
72
73                        DLockError::AlreadyAcquired(DynamodbRetry {
74                            lease: item.lease,
75                            duration: item.duration,
76                            start: Instant::now(),
77                        })
78                    }
79                    _ => DLockError::ProviderError(sdk_error.into()),
80                },
81                _ => DLockError::ProviderError(sdk_error.into()),
82            })?;
83        Ok(DynamodbLease {
84            item: lock,
85            client: self.client.clone(),
86            table: self.table_name.clone(),
87        })
88    }
89
90    async fn acquire_dead_lease(
91        &self,
92        name: &str,
93        owner: &str,
94        duration: &Duration,
95        retry: DynamodbRetry,
96    ) -> Result<DynamodbLease, DLockError<DynamodbRetry>> {
97        let lock = DynamodbLockItem {
98            lease: Uuid::new_v4(),
99            duration: *duration,
100            name: name.to_string(),
101            owner: owner.to_string(),
102            token: 0,
103        };
104        let item = self
105            .client
106            .update_item()
107            .table_name(self.table_name.clone())
108            .key(
109                DynamodbProvider::NAME_ATTRIBUTE,
110                AttributeValue::S(lock.name.to_string()),
111            )
112            .condition_expression("#lease = :prev_lease")
113            .update_expression("SET #lease = :new_lease, #token = #token + :one")
114            .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
115            .expression_attribute_names("#token", DynamodbProvider::TOKEN_ATTRIBUTE)
116            .expression_attribute_values(":prev_lease", AttributeValue::S(retry.lease.to_string()))
117            .expression_attribute_values(":new_lease", AttributeValue::S(lock.lease.to_string()))
118            .expression_attribute_values(":one", AttributeValue::N(1.to_string()))
119            .return_values(ReturnValue::AllNew)
120            .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
121            .send()
122            .await
123            .map_err(|sdk_error| match &sdk_error {
124                SdkError::ServiceError(e) => match e.err() {
125                    UpdateItemError::ConditionalCheckFailedException(_) => {
126                        DLockError::AlreadyReleased
127                    }
128                    _ => DLockError::ProviderError(sdk_error.into()),
129                },
130                _ => DLockError::ProviderError(sdk_error.into()),
131            })?
132            .into();
133
134        Ok(DynamodbLease {
135            item,
136            client: self.client.clone(),
137            table: self.table_name.clone(),
138        })
139    }
140}
141
142impl Provider for DynamodbProvider {
143    type T = u64;
144    type L = DynamodbLease;
145    type R = DynamodbRetry;
146
147    async fn acquire(
148        &self,
149        name: &str,
150        owner: &str,
151        duration: &Duration,
152        retry: Option<DynamodbRetry>,
153    ) -> Result<Self::L, DLockError<Self::R>> {
154        if let Some(retry) = retry {
155            if retry.start.elapsed() < retry.duration {
156                // Too early to retry, last duration read for the given lease has not expired yet
157                self.acquire_non_existing(name, owner, duration)
158                    .await
159                    .map_err(|e| match e {
160                        DLockError::AlreadyAcquired(_) => DLockError::AlreadyAcquired(retry),
161                        _ => e,
162                    })
163            } else {
164                // Duration has passed:
165                // 1. if the previous lease has not been updated the lock owner is dead so take over the lock
166                // 2. if the previous lease has been updated, someone renewed their lease so start over
167                self.acquire_dead_lease(name, owner, duration, retry).await
168            }
169        } else {
170            // No retry context, the lock can only be acquired if it doesn't exist
171            self.acquire_non_existing(name, owner, duration).await
172        }
173    }
174}
175
176#[derive(Serialize, Deserialize, Clone, Debug)]
177struct DynamodbLockItem {
178    #[serde(rename(serialize = "lock_name", deserialize = "lock_name"))]
179    name: String,
180    owner: String,
181    lease: Uuid,
182    duration: Duration,
183    token: u64,
184}
185
186#[derive(Debug, Clone)]
187pub struct DynamodbLease {
188    item: DynamodbLockItem,
189    client: Client,
190    table: String,
191}
192
193impl Lease<Self, u64> for DynamodbLease {
194    async fn release(&self) -> Result<(), DLockError> {
195        self.client
196            .delete_item()
197            .table_name(self.table.to_string())
198            .key(
199                DynamodbProvider::NAME_ATTRIBUTE,
200                AttributeValue::S(self.item.name.to_string()),
201            )
202            .condition_expression("#lease = :lease")
203            .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
204            .expression_attribute_values(":lease", AttributeValue::S(self.item.lease.to_string()))
205            .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
206            .send()
207            .await
208            .map_err(|sdk_error| match &sdk_error {
209                SdkError::ServiceError(e) => match e.err() {
210                    DeleteItemError::ConditionalCheckFailedException(_) => {
211                        DLockError::AlreadyReleased
212                    }
213                    _ => DLockError::ProviderError(sdk_error.into()),
214                },
215                _ => DLockError::ProviderError(sdk_error.into()),
216            })?;
217        Ok(())
218    }
219
220    async fn renew(&self) -> Result<Self, DLockError> {
221        let new_lease = Uuid::new_v4();
222        let item = self
223            .client
224            .update_item()
225            .table_name(self.table.to_string())
226            .key(
227                DynamodbProvider::NAME_ATTRIBUTE,
228                AttributeValue::S(self.item.name.to_string()),
229            )
230            .condition_expression("#lease = :prev_lease")
231            .update_expression("SET #lease = :new_lease")
232            .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
233            .expression_attribute_values(
234                ":prev_lease",
235                AttributeValue::S(self.item.lease.to_string()),
236            )
237            .expression_attribute_values(":new_lease", AttributeValue::S(new_lease.to_string()))
238            .return_values(ReturnValue::AllNew)
239            .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
240            .send()
241            .await
242            .map_err(|sdk_error| match &sdk_error {
243                SdkError::ServiceError(e) => match e.err() {
244                    UpdateItemError::ConditionalCheckFailedException(_) => {
245                        DLockError::AlreadyReleased
246                    }
247                    _ => DLockError::ProviderError(sdk_error.into()),
248                },
249                _ => DLockError::ProviderError(sdk_error.into()),
250            })?
251            .into();
252
253        Ok(DynamodbLease {
254            item,
255            client: self.client.clone(),
256            table: self.table.clone(),
257        })
258    }
259
260    fn token(&self) -> u64 {
261        self.item.token
262    }
263}
264
265#[derive(Debug, Clone)]
266pub struct DynamodbRetry {
267    lease: Uuid,
268    duration: Duration,
269    start: Instant,
270}
271
272impl From<PutItemOutput> for DynamodbLockItem {
273    fn from(value: PutItemOutput) -> Self {
274        from_item(value.attributes().unwrap().clone()).unwrap()
275    }
276}
277
278impl From<UpdateItemOutput> for DynamodbLockItem {
279    fn from(value: UpdateItemOutput) -> Self {
280        from_item(value.attributes().unwrap().clone()).unwrap()
281    }
282}
283
284impl From<&HashMap<String, AttributeValue>> for DynamodbLockItem {
285    fn from(value: &HashMap<String, AttributeValue>) -> Self {
286        from_item(value.to_owned()).unwrap()
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use aws_config::Region;
293    use aws_sdk_dynamodb::{
294        Client,
295        config::Credentials,
296        types::{AttributeDefinition, BillingMode, KeySchemaElement, KeyType, ScalarAttributeType},
297    };
298    use testcontainers_modules::{
299        dynamodb_local::DynamoDb,
300        testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner},
301    };
302
303    use super::*;
304
305    const TABLE_NAME: &str = "table_name";
306    async fn setup() -> (ContainerAsync<DynamoDb>, Client) {
307        let db = DynamoDb::default().with_tag("3.1.0").start().await.unwrap();
308
309        let credentials = Credentials::for_tests();
310        let config = aws_sdk_dynamodb::config::Builder::new()
311            .behavior_version_latest()
312            .endpoint_url(format!(
313                "http://{}:{}",
314                db.get_host().await.unwrap(),
315                db.get_host_port_ipv4(8000).await.unwrap()
316            ))
317            .region(Region::new("test"))
318            .credentials_provider(credentials)
319            .build();
320        let client = Client::from_conf(config);
321
322        client
323            .create_table()
324            .billing_mode(BillingMode::PayPerRequest)
325            .table_name(TABLE_NAME)
326            .attribute_definitions(
327                AttributeDefinition::builder()
328                    .attribute_name(DynamodbProvider::NAME_ATTRIBUTE)
329                    .attribute_type(ScalarAttributeType::S)
330                    .build()
331                    .expect("should be able to build partition key attribute"),
332            )
333            .key_schema(
334                KeySchemaElement::builder()
335                    .attribute_name(DynamodbProvider::NAME_ATTRIBUTE)
336                    .key_type(KeyType::Hash)
337                    .build()
338                    .expect("should be able to build hash key"),
339            )
340            .send()
341            .await
342            .expect("should be able to create table");
343        (db, client)
344    }
345
346    #[tokio::test]
347    async fn acquire_fresh_lock() {
348        let (_db, client) = setup().await;
349
350        let provider = DynamodbProvider::builder()
351            .client(client)
352            .table_name(TABLE_NAME.to_string())
353            .build();
354
355        let lease = provider
356            .acquire("test_lock", "owner", &Duration::from_secs(5), None)
357            .await
358            .expect("should be able to acquire lock");
359
360        assert_eq!(lease.item.duration, Duration::from_secs(5));
361        assert_eq!(lease.item.name, "test_lock");
362    }
363
364    #[tokio::test]
365    async fn renew_lock() {
366        let (_db, client) = setup().await;
367
368        let provider = DynamodbProvider::builder()
369            .client(client)
370            .table_name(TABLE_NAME.to_string())
371            .build();
372
373        let lease = provider
374            .acquire("test_lock", "owner", &Duration::from_secs(5), None)
375            .await
376            .expect("should be able to acquire lock");
377
378        assert_eq!(lease.item.duration, Duration::from_secs(5));
379        assert_eq!(lease.item.name, "test_lock");
380
381        let new_lease = lease.renew().await.expect("renew should work");
382
383        assert_eq!(lease.item.duration, new_lease.item.duration);
384        assert_eq!(lease.item.name, new_lease.item.name);
385        assert_eq!(lease.item.token, new_lease.item.token);
386        assert!(lease.item.lease != new_lease.item.lease);
387    }
388
389    #[tokio::test]
390    async fn reacquire_already_locked() {
391        let (_db, client) = setup().await;
392
393        let provider = DynamodbProvider::builder()
394            .client(client)
395            .table_name(TABLE_NAME.to_string())
396            .build();
397
398        let lock_name = "test_lock";
399        let owner = "owner";
400        let duration = Duration::from_hours(1);
401
402        let lease = provider
403            .acquire(lock_name, owner, &duration, None)
404            .await
405            .expect("should be able to acquire lock");
406
407        assert_eq!(lease.item.duration, duration);
408        assert_eq!(lease.item.name, lock_name);
409
410        let result = provider
411            .acquire(lock_name, owner, &duration, None)
412            .await
413            .err()
414            .unwrap();
415
416        match result {
417            DLockError::AlreadyAcquired(retry) => {
418                assert_eq!(retry.lease, lease.item.lease);
419                assert!(retry.start <= Instant::now());
420            }
421            _ => panic!("unexpected error: {}", result),
422        };
423    }
424
425    #[tokio::test]
426    async fn reacquire_dropped_lock() {
427        let (_db, client) = setup().await;
428
429        let provider = DynamodbProvider::builder()
430            .client(client)
431            .table_name(TABLE_NAME.to_string())
432            .build();
433
434        let lock_name = "test_lock";
435        let owner = "owner";
436        let duration = Duration::from_hours(1);
437
438        {
439            let lease = provider
440                .acquire(lock_name, owner, &duration, None)
441                .await
442                .expect("should be able to acquire lock");
443
444            assert_eq!(lease.item.duration, duration);
445            assert_eq!(lease.item.name, lock_name);
446
447            lease.release().await.unwrap();
448        }
449
450        {
451            let lease = provider
452                .acquire(lock_name, owner, &duration, None)
453                .await
454                .expect("should be able to acquire lock");
455
456            assert_eq!(lease.item.duration, duration);
457            assert_eq!(lease.item.name, lock_name);
458        }
459    }
460}