Skip to main content

dynamo_runtime/transports/etcd/
lock.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! Distributed read-write lock implementation using etcd atomic transactions
5
6use std::time::Duration;
7
8use etcd_client::{Compare, CompareOp, PutOptions, Txn, TxnOp};
9
10use anyhow::Result;
11
12use super::Client;
13
14/// Timeout for acquiring read lock when downloading snapshots
15const DEFAULT_READ_LOCK_TIMEOUT_SECS: u64 = 30;
16
17/// Distributed read-write lock for coordinating operations across multiple processes
18///
19/// This implementation uses etcd atomic transactions to prevent race conditions:
20/// - Write locks are exclusive (no readers or writers can coexist)
21/// - Read locks are shared (multiple readers allowed, but no writers)
22/// - All lock operations use atomic compare-and-set to ensure correctness
23/// - Locks are bound to leases for automatic cleanup on client failure
24#[derive(Clone)]
25pub struct DistributedRWLock {
26    lock_prefix: String,
27}
28
29pub struct WriteLockGuard<'a> {
30    rwlock: &'a DistributedRWLock,
31    etcd_client: &'a Client,
32}
33
34impl Drop for WriteLockGuard<'_> {
35    fn drop(&mut self) {
36        match tokio::runtime::Handle::try_current() {
37            Ok(handle) => {
38                let rwlock = self.rwlock.clone();
39                let etcd_client = self.etcd_client.clone();
40                handle.spawn(async move {
41                    let write_key = format!("v1/{}/writer", rwlock.lock_prefix);
42                    if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
43                        tracing::warn!("Failed to release write lock in drop: {e:?}");
44                    }
45                });
46            }
47            Err(_) => {
48                tracing::error!(
49                    "WriteLockGuard dropped outside tokio runtime - lock not released! \
50                     Lock will be cleaned up when etcd lease expires."
51                );
52            }
53        }
54    }
55}
56
57pub struct ReadLockGuard<'a> {
58    rwlock: &'a DistributedRWLock,
59    etcd_client: &'a Client,
60    reader_id: String,
61}
62
63impl Drop for ReadLockGuard<'_> {
64    fn drop(&mut self) {
65        match tokio::runtime::Handle::try_current() {
66            Ok(handle) => {
67                let rwlock = self.rwlock.clone();
68                let etcd_client = self.etcd_client.clone();
69                let reader_id = self.reader_id.clone();
70                handle.spawn(async move {
71                    let reader_key = format!("v1/{}/readers/{reader_id}", rwlock.lock_prefix);
72                    if let Err(e) = etcd_client.kv_delete(reader_key.as_str(), None).await {
73                        tracing::warn!("Failed to release read lock in drop: {e:?}");
74                    }
75                });
76            }
77            Err(_) => {
78                tracing::error!(
79                    "ReadLockGuard dropped outside tokio runtime - lock not released! \
80                     Lock will be cleaned up when etcd lease expires."
81                );
82            }
83        }
84    }
85}
86
87impl DistributedRWLock {
88    /// Create a new distributed RWLock with the given prefix
89    ///
90    /// The lock will create keys under:
91    /// - `v1/{prefix}/writer` for the write lock
92    /// - `v1/{prefix}/readers/{reader_id}` for read locks
93    pub fn new(lock_prefix: String) -> Self {
94        Self { lock_prefix }
95    }
96
97    /// Try to acquire exclusive write lock (non-blocking)
98    ///
99    /// Returns `Some(WriteLockGuard)` if acquired, `None` if readers exist or lock unavailable.
100    /// The guard automatically releases the lock when dropped.
101    ///
102    /// Implementation strategy:
103    /// 1. Atomically create writer key if it doesn't exist
104    /// 2. Immediately check if any readers exist
105    /// 3. If readers found, rollback (delete writer key) and return None
106    ///
107    /// Note: There is still a small race window (sub-millisecond) where a reader could acquire
108    /// a lock between steps 2-3.
109    pub async fn try_write_lock<'a>(
110        &'a self,
111        etcd_client: &'a Client,
112    ) -> Option<WriteLockGuard<'a>> {
113        let write_key = format!("v1/{}/writer", self.lock_prefix);
114        let lease_id = etcd_client.lease_id();
115        let put_options = PutOptions::new().with_lease(lease_id as i64);
116
117        // Step 1: Atomically create write lock only if it doesn't exist
118        let txn = Txn::new()
119            .when(vec![Compare::version(
120                write_key.as_str(),
121                CompareOp::Equal,
122                0,
123            )])
124            .and_then(vec![TxnOp::put(
125                write_key.as_str(),
126                b"writing",
127                Some(put_options),
128            )]);
129
130        // Execute the atomic transaction
131        match etcd_client.etcd_client().kv_client().txn(txn).await {
132            Ok(response) if response.succeeded() => {
133                // Step 2: Immediately check if any readers exist
134                let reader_prefix = format!("v1/{}/readers/", self.lock_prefix);
135                match etcd_client.kv_get_prefix(&reader_prefix).await {
136                    Ok(readers) if !readers.is_empty() => {
137                        // Readers exist! Rollback - delete our writer key
138                        tracing::debug!(
139                            "Found {} reader(s) after acquiring write lock, rolling back",
140                            readers.len()
141                        );
142                        if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
143                            tracing::warn!("Failed to rollback write lock: {e:?}");
144                        }
145                        None
146                    }
147                    Ok(_) => {
148                        // No readers, we successfully hold the write lock
149                        tracing::debug!("Successfully acquired write lock with no readers");
150                        Some(WriteLockGuard {
151                            rwlock: self,
152                            etcd_client,
153                        })
154                    }
155                    Err(e) => {
156                        // Error checking for readers - rollback to be safe
157                        tracing::warn!(
158                            "Failed to check for readers, rolling back write lock: {e:?}"
159                        );
160                        let _ = etcd_client.kv_delete(write_key.as_str(), None).await;
161                        None
162                    }
163                }
164            }
165            Ok(_) => {
166                tracing::debug!("Write lock already exists, transaction failed");
167                None
168            }
169            Err(e) => {
170                tracing::warn!("Failed to execute write lock transaction: {e:?}");
171                None
172            }
173        }
174    }
175
176    /// Acquire shared read lock with polling retry
177    ///
178    /// Polls every 100ms until write lock is released, then atomically acquires read lock.
179    /// The guard automatically releases the lock when dropped.
180    /// Uses atomic transaction to prevent race with writer - the check for no write lock
181    /// and creation of read lock happen in a single atomic operation.
182    ///
183    /// # Arguments
184    /// * `etcd_client` - The etcd client
185    /// * `reader_id` - Unique identifier for this reader
186    /// * `timeout` - Optional timeout, defaults to 5 seconds
187    pub async fn read_lock_with_wait<'a>(
188        &'a self,
189        etcd_client: &'a Client,
190        reader_id: &str,
191        timeout: Option<Duration>,
192    ) -> Result<ReadLockGuard<'a>> {
193        let timeout = timeout.unwrap_or(Duration::from_secs(DEFAULT_READ_LOCK_TIMEOUT_SECS));
194        let write_key = format!("v1/{}/writer", self.lock_prefix);
195        let reader_key = format!("v1/{}/readers/{reader_id}", self.lock_prefix);
196        let deadline = tokio::time::Instant::now() + timeout;
197        let lease_id = etcd_client.lease_id();
198
199        loop {
200            // Check if timeout exceeded
201            if tokio::time::Instant::now() > deadline {
202                anyhow::bail!("Timeout waiting for read lock after {:?}", timeout);
203            }
204
205            // Try to atomically acquire read lock
206            // The transaction checks that no writer exists and creates reader key atomically
207            let put_options = PutOptions::new().with_lease(lease_id as i64);
208
209            // Build atomic transaction: create reader key only if write_key doesn't exist
210            let txn = Txn::new()
211                .when(vec![Compare::version(
212                    write_key.as_str(),
213                    CompareOp::Equal,
214                    0,
215                )])
216                .and_then(vec![TxnOp::put(
217                    reader_key.as_str(),
218                    b"reading",
219                    Some(put_options),
220                )]);
221
222            // Execute the atomic transaction
223            match etcd_client.etcd_client().kv_client().txn(txn).await {
224                Ok(response) if response.succeeded() => {
225                    tracing::debug!("Acquired read lock for reader {}", reader_id);
226                    return Ok(ReadLockGuard {
227                        rwlock: self,
228                        etcd_client,
229                        reader_id: reader_id.to_string(),
230                    });
231                }
232                Ok(_) => {
233                    tracing::trace!("Write lock exists or was created, retrying after delay");
234                }
235                Err(e) => {
236                    tracing::warn!("Failed to execute read lock transaction: {e:?}");
237                }
238            }
239
240            // Wait before next retry
241            tokio::time::sleep(Duration::from_millis(100)).await;
242        }
243    }
244}
245
246#[cfg(feature = "testing-etcd")]
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use crate::Runtime;
251    use std::sync::Arc;
252    use tokio::sync::Barrier;
253
254    /// Test the DistributedRWLock behavior
255    ///
256    /// This test verifies:
257    /// 1. Multiple readers can acquire read locks simultaneously
258    /// 2. Write lock fails when readers are active
259    /// 3. Write lock succeeds when no locks are held
260    /// 4. Read lock waits for write lock to be released
261    #[tokio::test]
262    async fn test_distributed_rwlock() {
263        // Setup: Create etcd client
264        let runtime = Runtime::from_settings().unwrap();
265        let etcd_client = Client::builder()
266            .etcd_url(vec!["http://localhost:2379".to_string()])
267            .build()
268            .unwrap();
269        let etcd_client = Client::new(etcd_client, runtime).await.unwrap();
270
271        // Prevent runtime from being dropped in async context at end of test
272        let etcd_client = std::mem::ManuallyDrop::new(etcd_client);
273
274        // Create RWLock with unique prefix for this test
275        let test_id = uuid::Uuid::new_v4();
276        let lock_prefix = format!("/test/rwlock/{}", test_id);
277        let rwlock = DistributedRWLock::new(lock_prefix.clone());
278
279        // Step 1: Acquire first read lock
280        let _reader1_guard = rwlock
281            .read_lock_with_wait(&etcd_client, "reader1", Some(Duration::from_secs(5)))
282            .await
283            .expect("First read lock should succeed");
284        println!("✓ Acquired first read lock");
285
286        // Step 2: Acquire second read lock (should succeed - multiple readers allowed)
287        let _reader2_guard = rwlock
288            .read_lock_with_wait(&etcd_client, "reader2", Some(Duration::from_secs(5)))
289            .await
290            .expect("Second read lock should succeed");
291        println!("✓ Acquired second read lock");
292
293        // Step 3: Try to acquire write lock (should fail - readers are active)
294        let write_result = rwlock.try_write_lock(&etcd_client).await;
295        assert!(
296            write_result.is_none(),
297            "Write lock should fail when readers are active"
298        );
299        println!("✓ Write lock correctly failed with active readers");
300
301        // Step 4: Drop first read lock
302        drop(_reader1_guard);
303        tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
304        println!("✓ Released first read lock");
305
306        // Verify write lock still fails with one reader active
307        let write_result_with_one_reader = rwlock.try_write_lock(&etcd_client).await;
308        assert!(
309            write_result_with_one_reader.is_none(),
310            "Write lock should still fail when one reader is active"
311        );
312        println!("✓ Write lock correctly failed with one reader still active");
313
314        drop(_reader2_guard);
315        tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
316        println!("✓ Released second read lock");
317
318        // Give etcd a moment to process the deletions
319        tokio::time::sleep(Duration::from_millis(100)).await;
320
321        // Step 5: Acquire write lock (should succeed now - no locks held)
322        let _write_guard = rwlock
323            .try_write_lock(&etcd_client)
324            .await
325            .expect("Write lock should succeed with no readers");
326        println!("✓ Acquired write lock");
327
328        // Step 5a: Try to acquire write lock again (should fail immediately - already held)
329        let write_result_already_held = rwlock.try_write_lock(&etcd_client).await;
330        assert!(
331            write_result_already_held.is_none(),
332            "Write lock should fail when another write lock is already held"
333        );
334        println!("✓ Write lock correctly failed when already held");
335
336        // Step 6: Spawn background task to acquire read lock
337        // It should wait because write lock is held
338        let barrier = Arc::new(Barrier::new(2));
339        let barrier_clone = barrier.clone();
340        let rwlock_clone = rwlock.clone();
341        let etcd_client_clone = etcd_client.clone();
342
343        let read_task = tokio::spawn(async move {
344            println!("→ Background: Attempting to acquire read lock (should wait)...");
345            barrier_clone.wait().await; // Signal that we've started
346
347            let start = std::time::Instant::now();
348            let _guard = rwlock_clone
349                .read_lock_with_wait(&etcd_client_clone, "reader3", Some(Duration::from_secs(10)))
350                .await
351                .expect("Read lock should eventually succeed");
352
353            let elapsed = start.elapsed();
354            println!("✓ Background: Acquired read lock after {:?}", elapsed);
355
356            // Verify it actually waited (should be > 100ms since we sleep before releasing write lock)
357            assert!(
358                elapsed > Duration::from_millis(50),
359                "Read lock should have waited for write lock to be released"
360            );
361
362            // Guard will be dropped here, releasing the lock
363        });
364
365        // Wait for background task to start
366        barrier.wait().await;
367
368        // Give the background task a moment to start polling
369        tokio::time::sleep(Duration::from_millis(200)).await;
370
371        // Step 7: Release write lock by dropping guard
372        println!("→ Releasing write lock...");
373        drop(_write_guard);
374        tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
375        println!("✓ Released write lock");
376
377        // Step 8: Background task should now succeed
378        read_task
379            .await
380            .expect("Background task should complete successfully");
381
382        // Final cleanup: verify all locks are released
383        tokio::time::sleep(Duration::from_millis(100)).await;
384        let remaining_locks = etcd_client
385            .kv_get_prefix(&format!("v1/{lock_prefix}"))
386            .await
387            .expect("Should be able to check remaining locks");
388        assert!(
389            remaining_locks.is_empty(),
390            "All locks should be released at end of test"
391        );
392        println!("✓ All locks cleaned up successfully");
393
394        println!("\n🎉 All DistributedRWLock tests passed!");
395    }
396}