dynamo_runtime/transports/etcd/
lock.rs1use std::time::Duration;
7
8use etcd_client::{Compare, CompareOp, PutOptions, Txn, TxnOp};
9
10use anyhow::Result;
11
12use super::Client;
13
14const DEFAULT_READ_LOCK_TIMEOUT_SECS: u64 = 30;
16
17#[derive(Clone)]
25pub struct DistributedRWLock {
26 lock_prefix: String,
27}
28
29pub struct WriteLockGuard<'a> {
30 rwlock: &'a DistributedRWLock,
31 etcd_client: &'a Client,
32}
33
34impl Drop for WriteLockGuard<'_> {
35 fn drop(&mut self) {
36 match tokio::runtime::Handle::try_current() {
37 Ok(handle) => {
38 let rwlock = self.rwlock.clone();
39 let etcd_client = self.etcd_client.clone();
40 handle.spawn(async move {
41 let write_key = format!("v1/{}/writer", rwlock.lock_prefix);
42 if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
43 tracing::warn!("Failed to release write lock in drop: {e:?}");
44 }
45 });
46 }
47 Err(_) => {
48 tracing::error!(
49 "WriteLockGuard dropped outside tokio runtime - lock not released! \
50 Lock will be cleaned up when etcd lease expires."
51 );
52 }
53 }
54 }
55}
56
57pub struct ReadLockGuard<'a> {
58 rwlock: &'a DistributedRWLock,
59 etcd_client: &'a Client,
60 reader_id: String,
61}
62
63impl Drop for ReadLockGuard<'_> {
64 fn drop(&mut self) {
65 match tokio::runtime::Handle::try_current() {
66 Ok(handle) => {
67 let rwlock = self.rwlock.clone();
68 let etcd_client = self.etcd_client.clone();
69 let reader_id = self.reader_id.clone();
70 handle.spawn(async move {
71 let reader_key = format!("v1/{}/readers/{reader_id}", rwlock.lock_prefix);
72 if let Err(e) = etcd_client.kv_delete(reader_key.as_str(), None).await {
73 tracing::warn!("Failed to release read lock in drop: {e:?}");
74 }
75 });
76 }
77 Err(_) => {
78 tracing::error!(
79 "ReadLockGuard dropped outside tokio runtime - lock not released! \
80 Lock will be cleaned up when etcd lease expires."
81 );
82 }
83 }
84 }
85}
86
87impl DistributedRWLock {
88 pub fn new(lock_prefix: String) -> Self {
94 Self { lock_prefix }
95 }
96
97 pub async fn try_write_lock<'a>(
110 &'a self,
111 etcd_client: &'a Client,
112 ) -> Option<WriteLockGuard<'a>> {
113 let write_key = format!("v1/{}/writer", self.lock_prefix);
114 let lease_id = etcd_client.lease_id();
115 let put_options = PutOptions::new().with_lease(lease_id as i64);
116
117 let txn = Txn::new()
119 .when(vec![Compare::version(
120 write_key.as_str(),
121 CompareOp::Equal,
122 0,
123 )])
124 .and_then(vec![TxnOp::put(
125 write_key.as_str(),
126 b"writing",
127 Some(put_options),
128 )]);
129
130 match etcd_client.etcd_client().kv_client().txn(txn).await {
132 Ok(response) if response.succeeded() => {
133 let reader_prefix = format!("v1/{}/readers/", self.lock_prefix);
135 match etcd_client.kv_get_prefix(&reader_prefix).await {
136 Ok(readers) if !readers.is_empty() => {
137 tracing::debug!(
139 "Found {} reader(s) after acquiring write lock, rolling back",
140 readers.len()
141 );
142 if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
143 tracing::warn!("Failed to rollback write lock: {e:?}");
144 }
145 None
146 }
147 Ok(_) => {
148 tracing::debug!("Successfully acquired write lock with no readers");
150 Some(WriteLockGuard {
151 rwlock: self,
152 etcd_client,
153 })
154 }
155 Err(e) => {
156 tracing::warn!(
158 "Failed to check for readers, rolling back write lock: {e:?}"
159 );
160 let _ = etcd_client.kv_delete(write_key.as_str(), None).await;
161 None
162 }
163 }
164 }
165 Ok(_) => {
166 tracing::debug!("Write lock already exists, transaction failed");
167 None
168 }
169 Err(e) => {
170 tracing::warn!("Failed to execute write lock transaction: {e:?}");
171 None
172 }
173 }
174 }
175
176 pub async fn read_lock_with_wait<'a>(
188 &'a self,
189 etcd_client: &'a Client,
190 reader_id: &str,
191 timeout: Option<Duration>,
192 ) -> Result<ReadLockGuard<'a>> {
193 let timeout = timeout.unwrap_or(Duration::from_secs(DEFAULT_READ_LOCK_TIMEOUT_SECS));
194 let write_key = format!("v1/{}/writer", self.lock_prefix);
195 let reader_key = format!("v1/{}/readers/{reader_id}", self.lock_prefix);
196 let deadline = tokio::time::Instant::now() + timeout;
197 let lease_id = etcd_client.lease_id();
198
199 loop {
200 if tokio::time::Instant::now() > deadline {
202 anyhow::bail!("Timeout waiting for read lock after {:?}", timeout);
203 }
204
205 let put_options = PutOptions::new().with_lease(lease_id as i64);
208
209 let txn = Txn::new()
211 .when(vec![Compare::version(
212 write_key.as_str(),
213 CompareOp::Equal,
214 0,
215 )])
216 .and_then(vec![TxnOp::put(
217 reader_key.as_str(),
218 b"reading",
219 Some(put_options),
220 )]);
221
222 match etcd_client.etcd_client().kv_client().txn(txn).await {
224 Ok(response) if response.succeeded() => {
225 tracing::debug!("Acquired read lock for reader {}", reader_id);
226 return Ok(ReadLockGuard {
227 rwlock: self,
228 etcd_client,
229 reader_id: reader_id.to_string(),
230 });
231 }
232 Ok(_) => {
233 tracing::trace!("Write lock exists or was created, retrying after delay");
234 }
235 Err(e) => {
236 tracing::warn!("Failed to execute read lock transaction: {e:?}");
237 }
238 }
239
240 tokio::time::sleep(Duration::from_millis(100)).await;
242 }
243 }
244}
245
246#[cfg(feature = "testing-etcd")]
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use crate::Runtime;
251 use std::sync::Arc;
252 use tokio::sync::Barrier;
253
254 #[tokio::test]
262 async fn test_distributed_rwlock() {
263 let runtime = Runtime::from_settings().unwrap();
265 let etcd_client = Client::builder()
266 .etcd_url(vec!["http://localhost:2379".to_string()])
267 .build()
268 .unwrap();
269 let etcd_client = Client::new(etcd_client, runtime).await.unwrap();
270
271 let etcd_client = std::mem::ManuallyDrop::new(etcd_client);
273
274 let test_id = uuid::Uuid::new_v4();
276 let lock_prefix = format!("/test/rwlock/{}", test_id);
277 let rwlock = DistributedRWLock::new(lock_prefix.clone());
278
279 let _reader1_guard = rwlock
281 .read_lock_with_wait(&etcd_client, "reader1", Some(Duration::from_secs(5)))
282 .await
283 .expect("First read lock should succeed");
284 println!("✓ Acquired first read lock");
285
286 let _reader2_guard = rwlock
288 .read_lock_with_wait(&etcd_client, "reader2", Some(Duration::from_secs(5)))
289 .await
290 .expect("Second read lock should succeed");
291 println!("✓ Acquired second read lock");
292
293 let write_result = rwlock.try_write_lock(&etcd_client).await;
295 assert!(
296 write_result.is_none(),
297 "Write lock should fail when readers are active"
298 );
299 println!("✓ Write lock correctly failed with active readers");
300
301 drop(_reader1_guard);
303 tokio::time::sleep(Duration::from_millis(50)).await; println!("✓ Released first read lock");
305
306 let write_result_with_one_reader = rwlock.try_write_lock(&etcd_client).await;
308 assert!(
309 write_result_with_one_reader.is_none(),
310 "Write lock should still fail when one reader is active"
311 );
312 println!("✓ Write lock correctly failed with one reader still active");
313
314 drop(_reader2_guard);
315 tokio::time::sleep(Duration::from_millis(50)).await; println!("✓ Released second read lock");
317
318 tokio::time::sleep(Duration::from_millis(100)).await;
320
321 let _write_guard = rwlock
323 .try_write_lock(&etcd_client)
324 .await
325 .expect("Write lock should succeed with no readers");
326 println!("✓ Acquired write lock");
327
328 let write_result_already_held = rwlock.try_write_lock(&etcd_client).await;
330 assert!(
331 write_result_already_held.is_none(),
332 "Write lock should fail when another write lock is already held"
333 );
334 println!("✓ Write lock correctly failed when already held");
335
336 let barrier = Arc::new(Barrier::new(2));
339 let barrier_clone = barrier.clone();
340 let rwlock_clone = rwlock.clone();
341 let etcd_client_clone = etcd_client.clone();
342
343 let read_task = tokio::spawn(async move {
344 println!("→ Background: Attempting to acquire read lock (should wait)...");
345 barrier_clone.wait().await; let start = std::time::Instant::now();
348 let _guard = rwlock_clone
349 .read_lock_with_wait(&etcd_client_clone, "reader3", Some(Duration::from_secs(10)))
350 .await
351 .expect("Read lock should eventually succeed");
352
353 let elapsed = start.elapsed();
354 println!("✓ Background: Acquired read lock after {:?}", elapsed);
355
356 assert!(
358 elapsed > Duration::from_millis(50),
359 "Read lock should have waited for write lock to be released"
360 );
361
362 });
364
365 barrier.wait().await;
367
368 tokio::time::sleep(Duration::from_millis(200)).await;
370
371 println!("→ Releasing write lock...");
373 drop(_write_guard);
374 tokio::time::sleep(Duration::from_millis(50)).await; println!("✓ Released write lock");
376
377 read_task
379 .await
380 .expect("Background task should complete successfully");
381
382 tokio::time::sleep(Duration::from_millis(100)).await;
384 let remaining_locks = etcd_client
385 .kv_get_prefix(&format!("v1/{lock_prefix}"))
386 .await
387 .expect("Should be able to check remaining locks");
388 assert!(
389 remaining_locks.is_empty(),
390 "All locks should be released at end of test"
391 );
392 println!("✓ All locks cleaned up successfully");
393
394 println!("\n🎉 All DistributedRWLock tests passed!");
395 }
396}