Skip to main content

azoth_core/
lock_manager.rs

1//! Stripe-based lock manager with deadlock prevention
2//!
3//! Provides per-key locking with automatic sorting to prevent deadlocks.
4//! Keys are hashed to stripes, and locks are acquired in stripe index order.
5
6use crate::error::{AzothError, Result};
7use parking_lot::{Mutex, MutexGuard};
8use std::collections::BTreeSet;
9use std::time::Duration;
10use xxhash_rust::xxh3::xxh3_64;
11
12/// Default lock acquisition timeout (5 seconds)
13pub const DEFAULT_LOCK_TIMEOUT_MS: u64 = 5000;
14
15/// Lock manager for stripe locking during preflight phase
16///
17/// Provides per-key locking with configurable number of stripes.
18/// Non-conflicting keys map to different stripes and can be locked concurrently.
19///
20/// # Deadlock Prevention
21///
22/// This implementation prevents deadlocks through two mechanisms:
23///
24/// 1. **Automatic Sorting**: When acquiring multiple locks via `acquire_keys()`,
25///    stripes are always acquired in ascending index order. This prevents the
26///    classic A-B/B-A deadlock scenario.
27///
28/// 2. **Timeout-Based Acquisition**: Uses `try_lock_for()` with a configurable
29///    timeout instead of blocking indefinitely. If a lock cannot be acquired
30///    within the timeout, returns `LockTimeout` error.
31///
32/// # Example
33///
34/// ```ignore
35/// let lm = LockManager::new(256, Duration::from_secs(5));
36///
37/// // Acquire locks on multiple keys - automatically sorted to prevent deadlock
38/// let _guard = lm.acquire_keys(&[b"key_b", b"key_a"])?;
39/// // Locks acquired in stripe order, not key order
40/// ```
41pub struct LockManager {
42    stripes: Vec<Mutex<()>>,
43    num_stripes: usize,
44    default_timeout: Duration,
45}
46
47/// Guard that holds multiple stripe locks
48///
49/// Locks are held in sorted stripe order and released in reverse order
50/// when dropped (via RAII).
51pub struct MultiLockGuard<'a> {
52    // Stored in sorted stripe order; dropped in reverse order automatically
53    _guards: Vec<MutexGuard<'a, ()>>,
54}
55
56impl LockManager {
57    /// Create a new lock manager with the specified number of stripes.
58    ///
59    /// # Arguments
60    ///
61    /// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024).
62    ///   More stripes = less contention but more memory. Must be > 0.
63    /// * `default_timeout` - Default timeout for lock acquisition.
64    ///
65    /// # Returns
66    ///
67    /// `Err(AzothError::Config)` if `num_stripes` is 0.
68    pub fn new(num_stripes: usize, default_timeout: Duration) -> Self {
69        // Clamp to 1 instead of panicking — a single stripe still works (just no parallelism).
70        let num_stripes = if num_stripes == 0 {
71            tracing::warn!(
72                "LockManager created with num_stripes=0, defaulting to 1. \
73                 This is a configuration error — set ARCANA_KEY_LOCK_STRIPES > 0."
74            );
75            1
76        } else {
77            num_stripes
78        };
79        let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect();
80
81        Self {
82            stripes,
83            num_stripes,
84            default_timeout,
85        }
86    }
87
88    /// Create a lock manager with default timeout.
89    pub fn with_stripes(num_stripes: usize) -> Self {
90        Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS))
91    }
92
93    /// Hash a key to determine its stripe index
94    fn stripe_index(&self, key: &[u8]) -> usize {
95        let hash = xxh3_64(key);
96        (hash as usize) % self.num_stripes
97    }
98
99    /// Acquire exclusive locks on all keys in a deadlock-free manner
100    ///
101    /// This method:
102    /// 1. Computes stripe indices for all keys
103    /// 2. Deduplicates stripes (multiple keys may map to same stripe)
104    /// 3. Sorts stripe indices for consistent global ordering
105    /// 4. Acquires locks in sorted order with timeout
106    ///
107    /// # Arguments
108    ///
109    /// * `keys` - Keys to acquire locks for
110    ///
111    /// # Returns
112    ///
113    /// A `MultiLockGuard` that holds all acquired locks. Locks are released
114    /// when the guard is dropped.
115    ///
116    /// # Errors
117    ///
118    /// Returns `LockTimeout` if any lock cannot be acquired within the timeout.
119    ///
120    /// # Deadlock Safety
121    ///
122    /// Because locks are always acquired in sorted stripe order, two threads
123    /// acquiring locks on keys [A, B] and [B, A] will both attempt to acquire
124    /// locks in the same order, preventing deadlock.
125    pub fn acquire_keys<K: AsRef<[u8]>>(&self, keys: &[K]) -> Result<MultiLockGuard<'_>> {
126        self.acquire_keys_with_timeout(keys, self.default_timeout)
127    }
128
129    /// Acquire exclusive locks with a custom timeout
130    pub fn acquire_keys_with_timeout<K: AsRef<[u8]>>(
131        &self,
132        keys: &[K],
133        timeout: Duration,
134    ) -> Result<MultiLockGuard<'_>> {
135        // Early return for empty keys
136        if keys.is_empty() {
137            return Ok(MultiLockGuard {
138                _guards: Vec::new(),
139            });
140        }
141
142        // Compute stripe indices and deduplicate using BTreeSet (automatically sorted)
143        let stripe_indices: BTreeSet<usize> =
144            keys.iter().map(|k| self.stripe_index(k.as_ref())).collect();
145
146        // Acquire locks in sorted order
147        let mut guards = Vec::with_capacity(stripe_indices.len());
148
149        for stripe_idx in stripe_indices {
150            match self.stripes[stripe_idx].try_lock_for(timeout) {
151                Some(guard) => guards.push(guard),
152                None => {
153                    // Failed to acquire lock - drop all acquired locks and return error
154                    // Guards are dropped automatically when `guards` goes out of scope
155                    return Err(AzothError::LockTimeout {
156                        timeout_ms: timeout.as_millis() as u64,
157                    });
158                }
159            }
160        }
161
162        Ok(MultiLockGuard { _guards: guards })
163    }
164
165    /// Acquire a single lock (convenience method)
166    ///
167    /// Prefer `acquire_keys()` for multiple keys to ensure deadlock safety.
168    pub fn lock(&self, key: &[u8]) -> Result<MutexGuard<'_, ()>> {
169        let idx = self.stripe_index(key);
170        self.stripes[idx]
171            .try_lock_for(self.default_timeout)
172            .ok_or(AzothError::LockTimeout {
173                timeout_ms: self.default_timeout.as_millis() as u64,
174            })
175    }
176
177    /// Get the number of stripes
178    pub fn num_stripes(&self) -> usize {
179        self.num_stripes
180    }
181
182    /// Get the default timeout
183    pub fn default_timeout(&self) -> Duration {
184        self.default_timeout
185    }
186}
187
188#[cfg(test)]
189mod tests {
190    use super::*;
191    use std::sync::Arc;
192    use std::thread;
193
194    #[test]
195    fn test_lock_manager_basic() {
196        let lm = LockManager::with_stripes(256);
197        assert_eq!(lm.num_stripes(), 256);
198
199        // Should be able to acquire locks
200        let _guard = lm.acquire_keys(&[b"key1", b"key2"]).unwrap();
201    }
202
203    #[test]
204    fn test_stripe_distribution() {
205        let lm = LockManager::with_stripes(256);
206
207        // Different keys should (usually) map to different stripes
208        let idx1 = lm.stripe_index(b"key1");
209        let idx2 = lm.stripe_index(b"key2");
210        let idx3 = lm.stripe_index(b"key3");
211
212        assert!(idx1 < 256);
213        assert!(idx2 < 256);
214        assert!(idx3 < 256);
215
216        // Same key should always map to same stripe
217        assert_eq!(idx1, lm.stripe_index(b"key1"));
218    }
219
220    #[test]
221    fn test_empty_keys() {
222        let lm = LockManager::with_stripes(256);
223        let _guard = lm.acquire_keys::<&[u8]>(&[]).unwrap();
224    }
225
226    #[test]
227    fn test_duplicate_keys_deduplicated() {
228        let lm = LockManager::with_stripes(256);
229
230        // Same key multiple times should only lock once
231        let _guard = lm.acquire_keys(&[b"key1", b"key1", b"key1"]).unwrap();
232    }
233
234    #[test]
235    fn test_sorted_acquisition_prevents_deadlock() {
236        // This test verifies that unsorted key order doesn't cause deadlock
237        let lm = Arc::new(LockManager::with_stripes(256));
238
239        let lm1 = lm.clone();
240        let lm2 = lm.clone();
241
242        // Thread 1: keys in order [a, b]
243        let h1 = thread::spawn(move || {
244            for _ in 0..100 {
245                let _guard = lm1.acquire_keys(&[b"key_a", b"key_b"]).unwrap();
246                // Hold briefly
247                thread::sleep(Duration::from_micros(10));
248            }
249        });
250
251        // Thread 2: keys in REVERSE order [b, a]
252        // Without sorted acquisition, this would deadlock
253        let h2 = thread::spawn(move || {
254            for _ in 0..100 {
255                let _guard = lm2.acquire_keys(&[b"key_b", b"key_a"]).unwrap();
256                thread::sleep(Duration::from_micros(10));
257            }
258        });
259
260        // Both should complete without deadlock
261        h1.join().unwrap();
262        h2.join().unwrap();
263    }
264
265    #[test]
266    fn test_timeout_works() {
267        let lm = Arc::new(LockManager::new(1, Duration::from_millis(50)));
268
269        // Acquire the only stripe
270        let _guard = lm.acquire_keys(&[b"any_key"]).unwrap();
271
272        // Try to acquire from another thread - should timeout
273        let lm2 = lm.clone();
274        let handle = thread::spawn(move || {
275            // Check if result is a timeout error (don't return guard across threads)
276            matches!(
277                lm2.acquire_keys(&[b"another_key"]),
278                Err(AzothError::LockTimeout { .. })
279            )
280        });
281
282        let timed_out = handle.join().unwrap();
283        assert!(timed_out, "Should have timed out");
284    }
285
286    #[test]
287    fn test_concurrent_different_stripes() {
288        // Keys on different stripes should be acquired concurrently
289        let lm = Arc::new(LockManager::with_stripes(256));
290
291        let handles: Vec<_> = (0..10)
292            .map(|i| {
293                let lm = lm.clone();
294                thread::spawn(move || {
295                    let key = format!("unique_key_{}", i);
296                    let _guard = lm.acquire_keys(&[key.as_bytes()]).unwrap();
297                    thread::sleep(Duration::from_millis(10));
298                })
299            })
300            .collect();
301
302        for handle in handles {
303            handle.join().unwrap();
304        }
305    }
306}