1use std::{
5 collections::HashMap,
6 time::{Duration, Instant},
7};
8
9use aws_sdk_dynamodb::{
10 Client,
11 error::SdkError,
12 operation::{
13 delete_item::DeleteItemError,
14 put_item::{PutItemError, PutItemOutput},
15 update_item::{UpdateItemError, UpdateItemOutput},
16 },
17 types::{AttributeValue, ReturnValue, ReturnValuesOnConditionCheckFailure},
18};
19use bon::Builder;
20use serde::{Deserialize, Serialize};
21use serde_dynamo::{aws_sdk_dynamodb_1::from_item, to_item};
22use tracing::debug;
23use uuid::Uuid;
24
25use crate::{
26 error::DLockError,
27 providers::{Lease, Provider},
28};
29
30#[derive(Builder, Debug, Clone)]
32pub struct DynamodbProvider {
33 client: Client,
34 table_name: String,
35}
36
37impl DynamodbProvider {
38 pub const NAME_ATTRIBUTE: &str = "lock_name";
39 pub const LEASE_ATTRIBUTE: &str = "lease";
40 pub const TOKEN_ATTRIBUTE: &str = "token";
41
42 async fn acquire_non_existing(
43 &self,
44 name: &str,
45 owner: &str,
46 duration: &Duration,
47 ) -> Result<DynamodbLease, DLockError<DynamodbRetry>> {
48 let lock = DynamodbLockItem {
49 lease: Uuid::new_v4(),
50 owner: owner.to_string(),
51 duration: *duration,
52 name: name.to_string(),
53 token: 0,
54 };
55
56 let item = to_item(lock.clone()).unwrap();
57 self.client
58 .put_item()
59 .table_name(self.table_name.clone())
60 .set_item(Some(item))
61 .set_return_values_on_condition_check_failure(Some(
62 ReturnValuesOnConditionCheckFailure::AllOld,
63 ))
64 .condition_expression(format!("attribute_not_exists({})", Self::NAME_ATTRIBUTE))
65 .send()
66 .await
67 .map_err(|sdk_error| match &sdk_error {
68 SdkError::ServiceError(e) => match e.err() {
69 PutItemError::ConditionalCheckFailedException(e) => {
70 debug!("Acquiring non-existing lock failed {:?}", lock);
71 let item: DynamodbLockItem = e.item().unwrap().into();
72
73 DLockError::AlreadyAcquired(DynamodbRetry {
74 lease: item.lease,
75 duration: item.duration,
76 start: Instant::now(),
77 })
78 }
79 _ => DLockError::ProviderError(sdk_error.into()),
80 },
81 _ => DLockError::ProviderError(sdk_error.into()),
82 })?;
83 Ok(DynamodbLease {
84 item: lock,
85 client: self.client.clone(),
86 table: self.table_name.clone(),
87 })
88 }
89
90 async fn acquire_dead_lease(
91 &self,
92 name: &str,
93 owner: &str,
94 duration: &Duration,
95 retry: DynamodbRetry,
96 ) -> Result<DynamodbLease, DLockError<DynamodbRetry>> {
97 let lock = DynamodbLockItem {
98 lease: Uuid::new_v4(),
99 duration: *duration,
100 name: name.to_string(),
101 owner: owner.to_string(),
102 token: 0,
103 };
104 let item = self
105 .client
106 .update_item()
107 .table_name(self.table_name.clone())
108 .key(
109 DynamodbProvider::NAME_ATTRIBUTE,
110 AttributeValue::S(lock.name.to_string()),
111 )
112 .condition_expression("#lease = :prev_lease")
113 .update_expression("SET #lease = :new_lease, #token = #token + :one")
114 .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
115 .expression_attribute_names("#token", DynamodbProvider::TOKEN_ATTRIBUTE)
116 .expression_attribute_values(":prev_lease", AttributeValue::S(retry.lease.to_string()))
117 .expression_attribute_values(":new_lease", AttributeValue::S(lock.lease.to_string()))
118 .expression_attribute_values(":one", AttributeValue::N(1.to_string()))
119 .return_values(ReturnValue::AllNew)
120 .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
121 .send()
122 .await
123 .map_err(|sdk_error| match &sdk_error {
124 SdkError::ServiceError(e) => match e.err() {
125 UpdateItemError::ConditionalCheckFailedException(_) => {
126 DLockError::AlreadyReleased
127 }
128 _ => DLockError::ProviderError(sdk_error.into()),
129 },
130 _ => DLockError::ProviderError(sdk_error.into()),
131 })?
132 .into();
133
134 Ok(DynamodbLease {
135 item,
136 client: self.client.clone(),
137 table: self.table_name.clone(),
138 })
139 }
140}
141
142impl Provider for DynamodbProvider {
143 type T = u64;
144 type L = DynamodbLease;
145 type R = DynamodbRetry;
146
147 async fn acquire(
148 &self,
149 name: &str,
150 owner: &str,
151 duration: &Duration,
152 retry: Option<DynamodbRetry>,
153 ) -> Result<Self::L, DLockError<Self::R>> {
154 if let Some(retry) = retry {
155 if retry.start.elapsed() < retry.duration {
156 self.acquire_non_existing(name, owner, duration)
158 .await
159 .map_err(|e| match e {
160 DLockError::AlreadyAcquired(_) => DLockError::AlreadyAcquired(retry),
161 _ => e,
162 })
163 } else {
164 self.acquire_dead_lease(name, owner, duration, retry).await
168 }
169 } else {
170 self.acquire_non_existing(name, owner, duration).await
172 }
173 }
174}
175
176#[derive(Serialize, Deserialize, Clone, Debug)]
177struct DynamodbLockItem {
178 #[serde(rename(serialize = "lock_name", deserialize = "lock_name"))]
179 name: String,
180 owner: String,
181 lease: Uuid,
182 duration: Duration,
183 token: u64,
184}
185
186#[derive(Debug, Clone)]
187pub struct DynamodbLease {
188 item: DynamodbLockItem,
189 client: Client,
190 table: String,
191}
192
193impl Lease<Self, u64> for DynamodbLease {
194 async fn release(&self) -> Result<(), DLockError> {
195 self.client
196 .delete_item()
197 .table_name(self.table.to_string())
198 .key(
199 DynamodbProvider::NAME_ATTRIBUTE,
200 AttributeValue::S(self.item.name.to_string()),
201 )
202 .condition_expression("#lease = :lease")
203 .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
204 .expression_attribute_values(":lease", AttributeValue::S(self.item.lease.to_string()))
205 .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
206 .send()
207 .await
208 .map_err(|sdk_error| match &sdk_error {
209 SdkError::ServiceError(e) => match e.err() {
210 DeleteItemError::ConditionalCheckFailedException(_) => {
211 DLockError::AlreadyReleased
212 }
213 _ => DLockError::ProviderError(sdk_error.into()),
214 },
215 _ => DLockError::ProviderError(sdk_error.into()),
216 })?;
217 Ok(())
218 }
219
220 async fn renew(&self) -> Result<Self, DLockError> {
221 let new_lease = Uuid::new_v4();
222 let item = self
223 .client
224 .update_item()
225 .table_name(self.table.to_string())
226 .key(
227 DynamodbProvider::NAME_ATTRIBUTE,
228 AttributeValue::S(self.item.name.to_string()),
229 )
230 .condition_expression("#lease = :prev_lease")
231 .update_expression("SET #lease = :new_lease")
232 .expression_attribute_names("#lease", DynamodbProvider::LEASE_ATTRIBUTE)
233 .expression_attribute_values(
234 ":prev_lease",
235 AttributeValue::S(self.item.lease.to_string()),
236 )
237 .expression_attribute_values(":new_lease", AttributeValue::S(new_lease.to_string()))
238 .return_values(ReturnValue::AllNew)
239 .return_values_on_condition_check_failure(ReturnValuesOnConditionCheckFailure::AllOld)
240 .send()
241 .await
242 .map_err(|sdk_error| match &sdk_error {
243 SdkError::ServiceError(e) => match e.err() {
244 UpdateItemError::ConditionalCheckFailedException(_) => {
245 DLockError::AlreadyReleased
246 }
247 _ => DLockError::ProviderError(sdk_error.into()),
248 },
249 _ => DLockError::ProviderError(sdk_error.into()),
250 })?
251 .into();
252
253 Ok(DynamodbLease {
254 item,
255 client: self.client.clone(),
256 table: self.table.clone(),
257 })
258 }
259
260 fn token(&self) -> u64 {
261 self.item.token
262 }
263}
264
265#[derive(Debug, Clone)]
266pub struct DynamodbRetry {
267 lease: Uuid,
268 duration: Duration,
269 start: Instant,
270}
271
272impl From<PutItemOutput> for DynamodbLockItem {
273 fn from(value: PutItemOutput) -> Self {
274 from_item(value.attributes().unwrap().clone()).unwrap()
275 }
276}
277
278impl From<UpdateItemOutput> for DynamodbLockItem {
279 fn from(value: UpdateItemOutput) -> Self {
280 from_item(value.attributes().unwrap().clone()).unwrap()
281 }
282}
283
284impl From<&HashMap<String, AttributeValue>> for DynamodbLockItem {
285 fn from(value: &HashMap<String, AttributeValue>) -> Self {
286 from_item(value.to_owned()).unwrap()
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use aws_config::Region;
293 use aws_sdk_dynamodb::{
294 Client,
295 config::Credentials,
296 types::{AttributeDefinition, BillingMode, KeySchemaElement, KeyType, ScalarAttributeType},
297 };
298 use testcontainers_modules::{
299 dynamodb_local::DynamoDb,
300 testcontainers::{ContainerAsync, ImageExt, runners::AsyncRunner},
301 };
302
303 use super::*;
304
305 const TABLE_NAME: &str = "table_name";
306 async fn setup() -> (ContainerAsync<DynamoDb>, Client) {
307 let db = DynamoDb::default().with_tag("3.1.0").start().await.unwrap();
308
309 let credentials = Credentials::for_tests();
310 let config = aws_sdk_dynamodb::config::Builder::new()
311 .behavior_version_latest()
312 .endpoint_url(format!(
313 "http://{}:{}",
314 db.get_host().await.unwrap(),
315 db.get_host_port_ipv4(8000).await.unwrap()
316 ))
317 .region(Region::new("test"))
318 .credentials_provider(credentials)
319 .build();
320 let client = Client::from_conf(config);
321
322 client
323 .create_table()
324 .billing_mode(BillingMode::PayPerRequest)
325 .table_name(TABLE_NAME)
326 .attribute_definitions(
327 AttributeDefinition::builder()
328 .attribute_name(DynamodbProvider::NAME_ATTRIBUTE)
329 .attribute_type(ScalarAttributeType::S)
330 .build()
331 .expect("should be able to build partition key attribute"),
332 )
333 .key_schema(
334 KeySchemaElement::builder()
335 .attribute_name(DynamodbProvider::NAME_ATTRIBUTE)
336 .key_type(KeyType::Hash)
337 .build()
338 .expect("should be able to build hash key"),
339 )
340 .send()
341 .await
342 .expect("should be able to create table");
343 (db, client)
344 }
345
346 #[tokio::test]
347 async fn acquire_fresh_lock() {
348 let (_db, client) = setup().await;
349
350 let provider = DynamodbProvider::builder()
351 .client(client)
352 .table_name(TABLE_NAME.to_string())
353 .build();
354
355 let lease = provider
356 .acquire("test_lock", "owner", &Duration::from_secs(5), None)
357 .await
358 .expect("should be able to acquire lock");
359
360 assert_eq!(lease.item.duration, Duration::from_secs(5));
361 assert_eq!(lease.item.name, "test_lock");
362 }
363
364 #[tokio::test]
365 async fn renew_lock() {
366 let (_db, client) = setup().await;
367
368 let provider = DynamodbProvider::builder()
369 .client(client)
370 .table_name(TABLE_NAME.to_string())
371 .build();
372
373 let lease = provider
374 .acquire("test_lock", "owner", &Duration::from_secs(5), None)
375 .await
376 .expect("should be able to acquire lock");
377
378 assert_eq!(lease.item.duration, Duration::from_secs(5));
379 assert_eq!(lease.item.name, "test_lock");
380
381 let new_lease = lease.renew().await.expect("renew should work");
382
383 assert_eq!(lease.item.duration, new_lease.item.duration);
384 assert_eq!(lease.item.name, new_lease.item.name);
385 assert_eq!(lease.item.token, new_lease.item.token);
386 assert!(lease.item.lease != new_lease.item.lease);
387 }
388
389 #[tokio::test]
390 async fn reacquire_already_locked() {
391 let (_db, client) = setup().await;
392
393 let provider = DynamodbProvider::builder()
394 .client(client)
395 .table_name(TABLE_NAME.to_string())
396 .build();
397
398 let lock_name = "test_lock";
399 let owner = "owner";
400 let duration = Duration::from_hours(1);
401
402 let lease = provider
403 .acquire(lock_name, owner, &duration, None)
404 .await
405 .expect("should be able to acquire lock");
406
407 assert_eq!(lease.item.duration, duration);
408 assert_eq!(lease.item.name, lock_name);
409
410 let result = provider
411 .acquire(lock_name, owner, &duration, None)
412 .await
413 .err()
414 .unwrap();
415
416 match result {
417 DLockError::AlreadyAcquired(retry) => {
418 assert_eq!(retry.lease, lease.item.lease);
419 assert!(retry.start <= Instant::now());
420 }
421 _ => panic!("unexpected error: {}", result),
422 };
423 }
424
425 #[tokio::test]
426 async fn reacquire_dropped_lock() {
427 let (_db, client) = setup().await;
428
429 let provider = DynamodbProvider::builder()
430 .client(client)
431 .table_name(TABLE_NAME.to_string())
432 .build();
433
434 let lock_name = "test_lock";
435 let owner = "owner";
436 let duration = Duration::from_hours(1);
437
438 {
439 let lease = provider
440 .acquire(lock_name, owner, &duration, None)
441 .await
442 .expect("should be able to acquire lock");
443
444 assert_eq!(lease.item.duration, duration);
445 assert_eq!(lease.item.name, lock_name);
446
447 lease.release().await.unwrap();
448 }
449
450 {
451 let lease = provider
452 .acquire(lock_name, owner, &duration, None)
453 .await
454 .expect("should be able to acquire lock");
455
456 assert_eq!(lease.item.duration, duration);
457 assert_eq!(lease.item.name, lock_name);
458 }
459 }
460}