azoth_core/lock_manager.rs
1//! Stripe-based lock manager with deadlock prevention
2//!
3//! Provides per-key locking with automatic sorting to prevent deadlocks.
4//! Keys are hashed to stripes, and locks are acquired in stripe index order.
5
6use crate::error::{AzothError, Result};
7use parking_lot::{Mutex, MutexGuard};
8use std::collections::BTreeSet;
9use std::time::Duration;
10use xxhash_rust::xxh3::xxh3_64;
11
12/// Default lock acquisition timeout (5 seconds)
13pub const DEFAULT_LOCK_TIMEOUT_MS: u64 = 5000;
14
15/// Lock manager for stripe locking during preflight phase
16///
17/// Provides per-key locking with configurable number of stripes.
18/// Non-conflicting keys map to different stripes and can be locked concurrently.
19///
20/// # Deadlock Prevention
21///
22/// This implementation prevents deadlocks through two mechanisms:
23///
24/// 1. **Automatic Sorting**: When acquiring multiple locks via `acquire_keys()`,
25/// stripes are always acquired in ascending index order. This prevents the
26/// classic A-B/B-A deadlock scenario.
27///
28/// 2. **Timeout-Based Acquisition**: Uses `try_lock_for()` with a configurable
29/// timeout instead of blocking indefinitely. If a lock cannot be acquired
30/// within the timeout, returns `LockTimeout` error.
31///
32/// # Example
33///
34/// ```ignore
35/// let lm = LockManager::new(256, Duration::from_secs(5));
36///
37/// // Acquire locks on multiple keys - automatically sorted to prevent deadlock
38/// let _guard = lm.acquire_keys(&[b"key_b", b"key_a"])?;
39/// // Locks acquired in stripe order, not key order
40/// ```
41pub struct LockManager {
42 stripes: Vec<Mutex<()>>,
43 num_stripes: usize,
44 default_timeout: Duration,
45}
46
47/// Guard that holds multiple stripe locks
48///
49/// Locks are held in sorted stripe order and released in reverse order
50/// when dropped (via RAII).
51pub struct MultiLockGuard<'a> {
52 // Stored in sorted stripe order; dropped in reverse order automatically
53 _guards: Vec<MutexGuard<'a, ()>>,
54}
55
56impl LockManager {
57 /// Create a new lock manager with the specified number of stripes.
58 ///
59 /// # Arguments
60 ///
61 /// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024).
62 /// More stripes = less contention but more memory. Must be > 0.
63 /// * `default_timeout` - Default timeout for lock acquisition.
64 ///
65 /// # Returns
66 ///
67 /// `Err(AzothError::Config)` if `num_stripes` is 0.
68 pub fn new(num_stripes: usize, default_timeout: Duration) -> Self {
69 // Clamp to 1 instead of panicking — a single stripe still works (just no parallelism).
70 let num_stripes = if num_stripes == 0 {
71 tracing::warn!(
72 "LockManager created with num_stripes=0, defaulting to 1. \
73 This is a configuration error — set ARCANA_KEY_LOCK_STRIPES > 0."
74 );
75 1
76 } else {
77 num_stripes
78 };
79 let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect();
80
81 Self {
82 stripes,
83 num_stripes,
84 default_timeout,
85 }
86 }
87
88 /// Create a lock manager with default timeout.
89 pub fn with_stripes(num_stripes: usize) -> Self {
90 Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS))
91 }
92
93 /// Hash a key to determine its stripe index
94 fn stripe_index(&self, key: &[u8]) -> usize {
95 let hash = xxh3_64(key);
96 (hash as usize) % self.num_stripes
97 }
98
99 /// Acquire exclusive locks on all keys in a deadlock-free manner
100 ///
101 /// This method:
102 /// 1. Computes stripe indices for all keys
103 /// 2. Deduplicates stripes (multiple keys may map to same stripe)
104 /// 3. Sorts stripe indices for consistent global ordering
105 /// 4. Acquires locks in sorted order with timeout
106 ///
107 /// # Arguments
108 ///
109 /// * `keys` - Keys to acquire locks for
110 ///
111 /// # Returns
112 ///
113 /// A `MultiLockGuard` that holds all acquired locks. Locks are released
114 /// when the guard is dropped.
115 ///
116 /// # Errors
117 ///
118 /// Returns `LockTimeout` if any lock cannot be acquired within the timeout.
119 ///
120 /// # Deadlock Safety
121 ///
122 /// Because locks are always acquired in sorted stripe order, two threads
123 /// acquiring locks on keys [A, B] and [B, A] will both attempt to acquire
124 /// locks in the same order, preventing deadlock.
125 pub fn acquire_keys<K: AsRef<[u8]>>(&self, keys: &[K]) -> Result<MultiLockGuard<'_>> {
126 self.acquire_keys_with_timeout(keys, self.default_timeout)
127 }
128
129 /// Acquire exclusive locks with a custom timeout
130 pub fn acquire_keys_with_timeout<K: AsRef<[u8]>>(
131 &self,
132 keys: &[K],
133 timeout: Duration,
134 ) -> Result<MultiLockGuard<'_>> {
135 // Early return for empty keys
136 if keys.is_empty() {
137 return Ok(MultiLockGuard {
138 _guards: Vec::new(),
139 });
140 }
141
142 // Compute stripe indices and deduplicate using BTreeSet (automatically sorted)
143 let stripe_indices: BTreeSet<usize> =
144 keys.iter().map(|k| self.stripe_index(k.as_ref())).collect();
145
146 // Acquire locks in sorted order
147 let mut guards = Vec::with_capacity(stripe_indices.len());
148
149 for stripe_idx in stripe_indices {
150 match self.stripes[stripe_idx].try_lock_for(timeout) {
151 Some(guard) => guards.push(guard),
152 None => {
153 // Failed to acquire lock - drop all acquired locks and return error
154 // Guards are dropped automatically when `guards` goes out of scope
155 return Err(AzothError::LockTimeout {
156 timeout_ms: timeout.as_millis() as u64,
157 });
158 }
159 }
160 }
161
162 Ok(MultiLockGuard { _guards: guards })
163 }
164
165 /// Acquire a single lock (convenience method)
166 ///
167 /// Prefer `acquire_keys()` for multiple keys to ensure deadlock safety.
168 pub fn lock(&self, key: &[u8]) -> Result<MutexGuard<'_, ()>> {
169 let idx = self.stripe_index(key);
170 self.stripes[idx]
171 .try_lock_for(self.default_timeout)
172 .ok_or(AzothError::LockTimeout {
173 timeout_ms: self.default_timeout.as_millis() as u64,
174 })
175 }
176
177 /// Get the number of stripes
178 pub fn num_stripes(&self) -> usize {
179 self.num_stripes
180 }
181
182 /// Get the default timeout
183 pub fn default_timeout(&self) -> Duration {
184 self.default_timeout
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use super::*;
191 use std::sync::Arc;
192 use std::thread;
193
194 #[test]
195 fn test_lock_manager_basic() {
196 let lm = LockManager::with_stripes(256);
197 assert_eq!(lm.num_stripes(), 256);
198
199 // Should be able to acquire locks
200 let _guard = lm.acquire_keys(&[b"key1", b"key2"]).unwrap();
201 }
202
203 #[test]
204 fn test_stripe_distribution() {
205 let lm = LockManager::with_stripes(256);
206
207 // Different keys should (usually) map to different stripes
208 let idx1 = lm.stripe_index(b"key1");
209 let idx2 = lm.stripe_index(b"key2");
210 let idx3 = lm.stripe_index(b"key3");
211
212 assert!(idx1 < 256);
213 assert!(idx2 < 256);
214 assert!(idx3 < 256);
215
216 // Same key should always map to same stripe
217 assert_eq!(idx1, lm.stripe_index(b"key1"));
218 }
219
220 #[test]
221 fn test_empty_keys() {
222 let lm = LockManager::with_stripes(256);
223 let _guard = lm.acquire_keys::<&[u8]>(&[]).unwrap();
224 }
225
226 #[test]
227 fn test_duplicate_keys_deduplicated() {
228 let lm = LockManager::with_stripes(256);
229
230 // Same key multiple times should only lock once
231 let _guard = lm.acquire_keys(&[b"key1", b"key1", b"key1"]).unwrap();
232 }
233
234 #[test]
235 fn test_sorted_acquisition_prevents_deadlock() {
236 // This test verifies that unsorted key order doesn't cause deadlock
237 let lm = Arc::new(LockManager::with_stripes(256));
238
239 let lm1 = lm.clone();
240 let lm2 = lm.clone();
241
242 // Thread 1: keys in order [a, b]
243 let h1 = thread::spawn(move || {
244 for _ in 0..100 {
245 let _guard = lm1.acquire_keys(&[b"key_a", b"key_b"]).unwrap();
246 // Hold briefly
247 thread::sleep(Duration::from_micros(10));
248 }
249 });
250
251 // Thread 2: keys in REVERSE order [b, a]
252 // Without sorted acquisition, this would deadlock
253 let h2 = thread::spawn(move || {
254 for _ in 0..100 {
255 let _guard = lm2.acquire_keys(&[b"key_b", b"key_a"]).unwrap();
256 thread::sleep(Duration::from_micros(10));
257 }
258 });
259
260 // Both should complete without deadlock
261 h1.join().unwrap();
262 h2.join().unwrap();
263 }
264
265 #[test]
266 fn test_timeout_works() {
267 let lm = Arc::new(LockManager::new(1, Duration::from_millis(50)));
268
269 // Acquire the only stripe
270 let _guard = lm.acquire_keys(&[b"any_key"]).unwrap();
271
272 // Try to acquire from another thread - should timeout
273 let lm2 = lm.clone();
274 let handle = thread::spawn(move || {
275 // Check if result is a timeout error (don't return guard across threads)
276 matches!(
277 lm2.acquire_keys(&[b"another_key"]),
278 Err(AzothError::LockTimeout { .. })
279 )
280 });
281
282 let timed_out = handle.join().unwrap();
283 assert!(timed_out, "Should have timed out");
284 }
285
286 #[test]
287 fn test_concurrent_different_stripes() {
288 // Keys on different stripes should be acquired concurrently
289 let lm = Arc::new(LockManager::with_stripes(256));
290
291 let handles: Vec<_> = (0..10)
292 .map(|i| {
293 let lm = lm.clone();
294 thread::spawn(move || {
295 let key = format!("unique_key_{}", i);
296 let _guard = lm.acquire_keys(&[key.as_bytes()]).unwrap();
297 thread::sleep(Duration::from_millis(10));
298 })
299 })
300 .collect();
301
302 for handle in handles {
303 handle.join().unwrap();
304 }
305 }
306}