1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//! Distributed read-write lock implementation using etcd atomic transactions
use std::time::Duration;
use etcd_client::{Compare, CompareOp, PutOptions, Txn, TxnOp};
use anyhow::Result;
use super::Client;
/// Timeout for acquiring read lock when downloading snapshots
const DEFAULT_READ_LOCK_TIMEOUT_SECS: u64 = 30;
/// Distributed read-write lock for coordinating operations across multiple processes
///
/// This implementation uses etcd atomic transactions to prevent race conditions:
/// - Write locks are exclusive (no readers or writers can coexist)
/// - Read locks are shared (multiple readers allowed, but no writers)
/// - All lock operations use atomic compare-and-set to ensure correctness
/// - Locks are bound to leases for automatic cleanup on client failure
#[derive(Clone)]
pub struct DistributedRWLock {
lock_prefix: String,
}
pub struct WriteLockGuard<'a> {
rwlock: &'a DistributedRWLock,
etcd_client: &'a Client,
}
impl Drop for WriteLockGuard<'_> {
fn drop(&mut self) {
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
let rwlock = self.rwlock.clone();
let etcd_client = self.etcd_client.clone();
handle.spawn(async move {
let write_key = format!("v1/{}/writer", rwlock.lock_prefix);
if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
tracing::warn!("Failed to release write lock in drop: {e:?}");
}
});
}
Err(_) => {
tracing::error!(
"WriteLockGuard dropped outside tokio runtime - lock not released! \
Lock will be cleaned up when etcd lease expires."
);
}
}
}
}
pub struct ReadLockGuard<'a> {
rwlock: &'a DistributedRWLock,
etcd_client: &'a Client,
reader_id: String,
}
impl Drop for ReadLockGuard<'_> {
fn drop(&mut self) {
match tokio::runtime::Handle::try_current() {
Ok(handle) => {
let rwlock = self.rwlock.clone();
let etcd_client = self.etcd_client.clone();
let reader_id = self.reader_id.clone();
handle.spawn(async move {
let reader_key = format!("v1/{}/readers/{reader_id}", rwlock.lock_prefix);
if let Err(e) = etcd_client.kv_delete(reader_key.as_str(), None).await {
tracing::warn!("Failed to release read lock in drop: {e:?}");
}
});
}
Err(_) => {
tracing::error!(
"ReadLockGuard dropped outside tokio runtime - lock not released! \
Lock will be cleaned up when etcd lease expires."
);
}
}
}
}
impl DistributedRWLock {
/// Create a new distributed RWLock with the given prefix
///
/// The lock will create keys under:
/// - `v1/{prefix}/writer` for the write lock
/// - `v1/{prefix}/readers/{reader_id}` for read locks
pub fn new(lock_prefix: String) -> Self {
Self { lock_prefix }
}
/// Try to acquire exclusive write lock (non-blocking)
///
/// Returns `Some(WriteLockGuard)` if acquired, `None` if readers exist or lock unavailable.
/// The guard automatically releases the lock when dropped.
///
/// Implementation strategy:
/// 1. Atomically create writer key if it doesn't exist
/// 2. Immediately check if any readers exist
/// 3. If readers found, rollback (delete writer key) and return None
///
/// Note: There is still a small race window (sub-millisecond) where a reader could acquire
/// a lock between steps 2-3.
pub async fn try_write_lock<'a>(
&'a self,
etcd_client: &'a Client,
) -> Option<WriteLockGuard<'a>> {
let write_key = format!("v1/{}/writer", self.lock_prefix);
let lease_id = etcd_client.lease_id();
let put_options = PutOptions::new().with_lease(lease_id as i64);
// Step 1: Atomically create write lock only if it doesn't exist
let txn = Txn::new()
.when(vec![Compare::version(
write_key.as_str(),
CompareOp::Equal,
0,
)])
.and_then(vec![TxnOp::put(
write_key.as_str(),
b"writing",
Some(put_options),
)]);
// Execute the atomic transaction
match etcd_client.etcd_client().kv_client().txn(txn).await {
Ok(response) if response.succeeded() => {
// Step 2: Immediately check if any readers exist
let reader_prefix = format!("v1/{}/readers/", self.lock_prefix);
match etcd_client.kv_get_prefix(&reader_prefix).await {
Ok(readers) if !readers.is_empty() => {
// Readers exist! Rollback - delete our writer key
tracing::debug!(
"Found {} reader(s) after acquiring write lock, rolling back",
readers.len()
);
if let Err(e) = etcd_client.kv_delete(write_key.as_str(), None).await {
tracing::warn!("Failed to rollback write lock: {e:?}");
}
None
}
Ok(_) => {
// No readers, we successfully hold the write lock
tracing::debug!("Successfully acquired write lock with no readers");
Some(WriteLockGuard {
rwlock: self,
etcd_client,
})
}
Err(e) => {
// Error checking for readers - rollback to be safe
tracing::warn!(
"Failed to check for readers, rolling back write lock: {e:?}"
);
let _ = etcd_client.kv_delete(write_key.as_str(), None).await;
None
}
}
}
Ok(_) => {
tracing::debug!("Write lock already exists, transaction failed");
None
}
Err(e) => {
tracing::warn!("Failed to execute write lock transaction: {e:?}");
None
}
}
}
/// Acquire shared read lock with polling retry
///
/// Polls every 100ms until write lock is released, then atomically acquires read lock.
/// The guard automatically releases the lock when dropped.
/// Uses atomic transaction to prevent race with writer - the check for no write lock
/// and creation of read lock happen in a single atomic operation.
///
/// # Arguments
/// * `etcd_client` - The etcd client
/// * `reader_id` - Unique identifier for this reader
/// * `timeout` - Optional timeout, defaults to 5 seconds
pub async fn read_lock_with_wait<'a>(
&'a self,
etcd_client: &'a Client,
reader_id: &str,
timeout: Option<Duration>,
) -> Result<ReadLockGuard<'a>> {
let timeout = timeout.unwrap_or(Duration::from_secs(DEFAULT_READ_LOCK_TIMEOUT_SECS));
let write_key = format!("v1/{}/writer", self.lock_prefix);
let reader_key = format!("v1/{}/readers/{reader_id}", self.lock_prefix);
let deadline = tokio::time::Instant::now() + timeout;
let lease_id = etcd_client.lease_id();
loop {
// Check if timeout exceeded
if tokio::time::Instant::now() > deadline {
anyhow::bail!("Timeout waiting for read lock after {:?}", timeout);
}
// Try to atomically acquire read lock
// The transaction checks that no writer exists and creates reader key atomically
let put_options = PutOptions::new().with_lease(lease_id as i64);
// Build atomic transaction: create reader key only if write_key doesn't exist
let txn = Txn::new()
.when(vec![Compare::version(
write_key.as_str(),
CompareOp::Equal,
0,
)])
.and_then(vec![TxnOp::put(
reader_key.as_str(),
b"reading",
Some(put_options),
)]);
// Execute the atomic transaction
match etcd_client.etcd_client().kv_client().txn(txn).await {
Ok(response) if response.succeeded() => {
tracing::debug!("Acquired read lock for reader {}", reader_id);
return Ok(ReadLockGuard {
rwlock: self,
etcd_client,
reader_id: reader_id.to_string(),
});
}
Ok(_) => {
tracing::trace!("Write lock exists or was created, retrying after delay");
}
Err(e) => {
tracing::warn!("Failed to execute read lock transaction: {e:?}");
}
}
// Wait before next retry
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
#[cfg(feature = "testing-etcd")]
#[cfg(test)]
mod tests {
use super::*;
use crate::Runtime;
use std::sync::Arc;
use tokio::sync::Barrier;
/// Test the DistributedRWLock behavior
///
/// This test verifies:
/// 1. Multiple readers can acquire read locks simultaneously
/// 2. Write lock fails when readers are active
/// 3. Write lock succeeds when no locks are held
/// 4. Read lock waits for write lock to be released
#[tokio::test]
async fn test_distributed_rwlock() {
// Setup: Create etcd client
let runtime = Runtime::from_settings().unwrap();
let etcd_client = Client::builder()
.etcd_url(vec!["http://localhost:2379".to_string()])
.build()
.unwrap();
let etcd_client = Client::new(etcd_client, runtime).await.unwrap();
// Prevent runtime from being dropped in async context at end of test
let etcd_client = std::mem::ManuallyDrop::new(etcd_client);
// Create RWLock with unique prefix for this test
let test_id = uuid::Uuid::new_v4();
let lock_prefix = format!("/test/rwlock/{}", test_id);
let rwlock = DistributedRWLock::new(lock_prefix.clone());
// Step 1: Acquire first read lock
let _reader1_guard = rwlock
.read_lock_with_wait(&etcd_client, "reader1", Some(Duration::from_secs(5)))
.await
.expect("First read lock should succeed");
println!("✓ Acquired first read lock");
// Step 2: Acquire second read lock (should succeed - multiple readers allowed)
let _reader2_guard = rwlock
.read_lock_with_wait(&etcd_client, "reader2", Some(Duration::from_secs(5)))
.await
.expect("Second read lock should succeed");
println!("✓ Acquired second read lock");
// Step 3: Try to acquire write lock (should fail - readers are active)
let write_result = rwlock.try_write_lock(&etcd_client).await;
assert!(
write_result.is_none(),
"Write lock should fail when readers are active"
);
println!("✓ Write lock correctly failed with active readers");
// Step 4: Drop first read lock
drop(_reader1_guard);
tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
println!("✓ Released first read lock");
// Verify write lock still fails with one reader active
let write_result_with_one_reader = rwlock.try_write_lock(&etcd_client).await;
assert!(
write_result_with_one_reader.is_none(),
"Write lock should still fail when one reader is active"
);
println!("✓ Write lock correctly failed with one reader still active");
drop(_reader2_guard);
tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
println!("✓ Released second read lock");
// Give etcd a moment to process the deletions
tokio::time::sleep(Duration::from_millis(100)).await;
// Step 5: Acquire write lock (should succeed now - no locks held)
let _write_guard = rwlock
.try_write_lock(&etcd_client)
.await
.expect("Write lock should succeed with no readers");
println!("✓ Acquired write lock");
// Step 5a: Try to acquire write lock again (should fail immediately - already held)
let write_result_already_held = rwlock.try_write_lock(&etcd_client).await;
assert!(
write_result_already_held.is_none(),
"Write lock should fail when another write lock is already held"
);
println!("✓ Write lock correctly failed when already held");
// Step 6: Spawn background task to acquire read lock
// It should wait because write lock is held
let barrier = Arc::new(Barrier::new(2));
let barrier_clone = barrier.clone();
let rwlock_clone = rwlock.clone();
let etcd_client_clone = etcd_client.clone();
let read_task = tokio::spawn(async move {
println!("→ Background: Attempting to acquire read lock (should wait)...");
barrier_clone.wait().await; // Signal that we've started
let start = std::time::Instant::now();
let _guard = rwlock_clone
.read_lock_with_wait(&etcd_client_clone, "reader3", Some(Duration::from_secs(10)))
.await
.expect("Read lock should eventually succeed");
let elapsed = start.elapsed();
println!("✓ Background: Acquired read lock after {:?}", elapsed);
// Verify it actually waited (should be > 100ms since we sleep before releasing write lock)
assert!(
elapsed > Duration::from_millis(50),
"Read lock should have waited for write lock to be released"
);
// Guard will be dropped here, releasing the lock
});
// Wait for background task to start
barrier.wait().await;
// Give the background task a moment to start polling
tokio::time::sleep(Duration::from_millis(200)).await;
// Step 7: Release write lock by dropping guard
println!("→ Releasing write lock...");
drop(_write_guard);
tokio::time::sleep(Duration::from_millis(50)).await; // Give time for async drop
println!("✓ Released write lock");
// Step 8: Background task should now succeed
read_task
.await
.expect("Background task should complete successfully");
// Final cleanup: verify all locks are released
tokio::time::sleep(Duration::from_millis(100)).await;
let remaining_locks = etcd_client
.kv_get_prefix(&format!("v1/{lock_prefix}"))
.await
.expect("Should be able to check remaining locks");
assert!(
remaining_locks.is_empty(),
"All locks should be released at end of test"
);
println!("✓ All locks cleaned up successfully");
println!("\n🎉 All DistributedRWLock tests passed!");
}
}