azoth_core/lock_manager.rs
1//! Stripe-based lock manager with deadlock prevention
2//!
3//! Provides per-key locking with automatic sorting to prevent deadlocks.
4//! Keys are hashed to stripes, and locks are acquired in stripe index order.
5
6use crate::error::{AzothError, Result};
7use parking_lot::{Mutex, MutexGuard};
8use std::collections::BTreeSet;
9use std::time::Duration;
10use xxhash_rust::xxh3::xxh3_64;
11
12/// Default lock acquisition timeout (5 seconds)
13pub const DEFAULT_LOCK_TIMEOUT_MS: u64 = 5000;
14
15/// Lock manager for stripe locking during preflight phase
16///
17/// Provides per-key locking with configurable number of stripes.
18/// Non-conflicting keys map to different stripes and can be locked concurrently.
19///
20/// # Deadlock Prevention
21///
22/// This implementation prevents deadlocks through two mechanisms:
23///
24/// 1. **Automatic Sorting**: When acquiring multiple locks via `acquire_keys()`,
25/// stripes are always acquired in ascending index order. This prevents the
26/// classic A-B/B-A deadlock scenario.
27///
28/// 2. **Timeout-Based Acquisition**: Uses `try_lock_for()` with a configurable
29/// timeout instead of blocking indefinitely. If a lock cannot be acquired
30/// within the timeout, returns `LockTimeout` error.
31///
32/// # Example
33///
34/// ```ignore
35/// let lm = LockManager::new(256, Duration::from_secs(5));
36///
37/// // Acquire locks on multiple keys - automatically sorted to prevent deadlock
38/// let _guard = lm.acquire_keys(&[b"key_b", b"key_a"])?;
39/// // Locks acquired in stripe order, not key order
40/// ```
41pub struct LockManager {
42 stripes: Vec<Mutex<()>>,
43 num_stripes: usize,
44 default_timeout: Duration,
45}
46
47/// Guard that holds multiple stripe locks
48///
49/// Locks are held in sorted stripe order and released in reverse order
50/// when dropped (via RAII).
51pub struct MultiLockGuard<'a> {
52 // Stored in sorted stripe order; dropped in reverse order automatically
53 _guards: Vec<MutexGuard<'a, ()>>,
54}
55
56impl LockManager {
57 /// Create a new lock manager with the specified number of stripes
58 ///
59 /// # Arguments
60 ///
61 /// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024).
62 /// More stripes = less contention but more memory.
63 /// * `default_timeout` - Default timeout for lock acquisition.
64 ///
65 /// # Panics
66 ///
67 /// Panics if `num_stripes` is 0.
68 pub fn new(num_stripes: usize, default_timeout: Duration) -> Self {
69 assert!(num_stripes > 0, "num_stripes must be positive");
70 let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect();
71
72 Self {
73 stripes,
74 num_stripes,
75 default_timeout,
76 }
77 }
78
79 /// Create a lock manager with default timeout
80 pub fn with_stripes(num_stripes: usize) -> Self {
81 Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS))
82 }
83
84 /// Hash a key to determine its stripe index
85 fn stripe_index(&self, key: &[u8]) -> usize {
86 let hash = xxh3_64(key);
87 (hash as usize) % self.num_stripes
88 }
89
90 /// Acquire exclusive locks on all keys in a deadlock-free manner
91 ///
92 /// This method:
93 /// 1. Computes stripe indices for all keys
94 /// 2. Deduplicates stripes (multiple keys may map to same stripe)
95 /// 3. Sorts stripe indices for consistent global ordering
96 /// 4. Acquires locks in sorted order with timeout
97 ///
98 /// # Arguments
99 ///
100 /// * `keys` - Keys to acquire locks for
101 ///
102 /// # Returns
103 ///
104 /// A `MultiLockGuard` that holds all acquired locks. Locks are released
105 /// when the guard is dropped.
106 ///
107 /// # Errors
108 ///
109 /// Returns `LockTimeout` if any lock cannot be acquired within the timeout.
110 ///
111 /// # Deadlock Safety
112 ///
113 /// Because locks are always acquired in sorted stripe order, two threads
114 /// acquiring locks on keys [A, B] and [B, A] will both attempt to acquire
115 /// locks in the same order, preventing deadlock.
116 pub fn acquire_keys<K: AsRef<[u8]>>(&self, keys: &[K]) -> Result<MultiLockGuard<'_>> {
117 self.acquire_keys_with_timeout(keys, self.default_timeout)
118 }
119
120 /// Acquire exclusive locks with a custom timeout
121 pub fn acquire_keys_with_timeout<K: AsRef<[u8]>>(
122 &self,
123 keys: &[K],
124 timeout: Duration,
125 ) -> Result<MultiLockGuard<'_>> {
126 // Early return for empty keys
127 if keys.is_empty() {
128 return Ok(MultiLockGuard {
129 _guards: Vec::new(),
130 });
131 }
132
133 // Compute stripe indices and deduplicate using BTreeSet (automatically sorted)
134 let stripe_indices: BTreeSet<usize> =
135 keys.iter().map(|k| self.stripe_index(k.as_ref())).collect();
136
137 // Acquire locks in sorted order
138 let mut guards = Vec::with_capacity(stripe_indices.len());
139
140 for stripe_idx in stripe_indices {
141 match self.stripes[stripe_idx].try_lock_for(timeout) {
142 Some(guard) => guards.push(guard),
143 None => {
144 // Failed to acquire lock - drop all acquired locks and return error
145 // Guards are dropped automatically when `guards` goes out of scope
146 return Err(AzothError::LockTimeout {
147 timeout_ms: timeout.as_millis() as u64,
148 });
149 }
150 }
151 }
152
153 Ok(MultiLockGuard { _guards: guards })
154 }
155
156 /// Acquire a single lock (convenience method)
157 ///
158 /// Prefer `acquire_keys()` for multiple keys to ensure deadlock safety.
159 pub fn lock(&self, key: &[u8]) -> Result<MutexGuard<'_, ()>> {
160 let idx = self.stripe_index(key);
161 self.stripes[idx]
162 .try_lock_for(self.default_timeout)
163 .ok_or(AzothError::LockTimeout {
164 timeout_ms: self.default_timeout.as_millis() as u64,
165 })
166 }
167
168 /// Get the number of stripes
169 pub fn num_stripes(&self) -> usize {
170 self.num_stripes
171 }
172
173 /// Get the default timeout
174 pub fn default_timeout(&self) -> Duration {
175 self.default_timeout
176 }
177}
178
179#[cfg(test)]
180mod tests {
181 use super::*;
182 use std::sync::Arc;
183 use std::thread;
184
185 #[test]
186 fn test_lock_manager_basic() {
187 let lm = LockManager::with_stripes(256);
188 assert_eq!(lm.num_stripes(), 256);
189
190 // Should be able to acquire locks
191 let _guard = lm.acquire_keys(&[b"key1", b"key2"]).unwrap();
192 }
193
194 #[test]
195 fn test_stripe_distribution() {
196 let lm = LockManager::with_stripes(256);
197
198 // Different keys should (usually) map to different stripes
199 let idx1 = lm.stripe_index(b"key1");
200 let idx2 = lm.stripe_index(b"key2");
201 let idx3 = lm.stripe_index(b"key3");
202
203 assert!(idx1 < 256);
204 assert!(idx2 < 256);
205 assert!(idx3 < 256);
206
207 // Same key should always map to same stripe
208 assert_eq!(idx1, lm.stripe_index(b"key1"));
209 }
210
211 #[test]
212 fn test_empty_keys() {
213 let lm = LockManager::with_stripes(256);
214 let _guard = lm.acquire_keys::<&[u8]>(&[]).unwrap();
215 }
216
217 #[test]
218 fn test_duplicate_keys_deduplicated() {
219 let lm = LockManager::with_stripes(256);
220
221 // Same key multiple times should only lock once
222 let _guard = lm.acquire_keys(&[b"key1", b"key1", b"key1"]).unwrap();
223 }
224
225 #[test]
226 fn test_sorted_acquisition_prevents_deadlock() {
227 // This test verifies that unsorted key order doesn't cause deadlock
228 let lm = Arc::new(LockManager::with_stripes(256));
229
230 let lm1 = lm.clone();
231 let lm2 = lm.clone();
232
233 // Thread 1: keys in order [a, b]
234 let h1 = thread::spawn(move || {
235 for _ in 0..100 {
236 let _guard = lm1.acquire_keys(&[b"key_a", b"key_b"]).unwrap();
237 // Hold briefly
238 thread::sleep(Duration::from_micros(10));
239 }
240 });
241
242 // Thread 2: keys in REVERSE order [b, a]
243 // Without sorted acquisition, this would deadlock
244 let h2 = thread::spawn(move || {
245 for _ in 0..100 {
246 let _guard = lm2.acquire_keys(&[b"key_b", b"key_a"]).unwrap();
247 thread::sleep(Duration::from_micros(10));
248 }
249 });
250
251 // Both should complete without deadlock
252 h1.join().unwrap();
253 h2.join().unwrap();
254 }
255
256 #[test]
257 fn test_timeout_works() {
258 let lm = Arc::new(LockManager::new(1, Duration::from_millis(50)));
259
260 // Acquire the only stripe
261 let _guard = lm.acquire_keys(&[b"any_key"]).unwrap();
262
263 // Try to acquire from another thread - should timeout
264 let lm2 = lm.clone();
265 let handle = thread::spawn(move || {
266 // Check if result is a timeout error (don't return guard across threads)
267 matches!(
268 lm2.acquire_keys(&[b"another_key"]),
269 Err(AzothError::LockTimeout { .. })
270 )
271 });
272
273 let timed_out = handle.join().unwrap();
274 assert!(timed_out, "Should have timed out");
275 }
276
277 #[test]
278 fn test_concurrent_different_stripes() {
279 // Keys on different stripes should be acquired concurrently
280 let lm = Arc::new(LockManager::with_stripes(256));
281
282 let handles: Vec<_> = (0..10)
283 .map(|i| {
284 let lm = lm.clone();
285 thread::spawn(move || {
286 let key = format!("unique_key_{}", i);
287 let _guard = lm.acquire_keys(&[key.as_bytes()]).unwrap();
288 thread::sleep(Duration::from_millis(10));
289 })
290 })
291 .collect();
292
293 for handle in handles {
294 handle.join().unwrap();
295 }
296 }
297}