Skip to main content

azoth_core/
lock_manager.rs

1//! Stripe-based lock manager with deadlock prevention
2//!
3//! Provides per-key locking with automatic sorting to prevent deadlocks.
4//! Keys are hashed to stripes, and locks are acquired in stripe index order.
5
6use crate::error::{AzothError, Result};
7use parking_lot::{Mutex, MutexGuard};
8use std::collections::BTreeSet;
9use std::time::Duration;
10use xxhash_rust::xxh3::xxh3_64;
11
12/// Default lock acquisition timeout (5 seconds)
13pub const DEFAULT_LOCK_TIMEOUT_MS: u64 = 5000;
14
15/// Lock manager for stripe locking during preflight phase
16///
17/// Provides per-key locking with configurable number of stripes.
18/// Non-conflicting keys map to different stripes and can be locked concurrently.
19///
20/// # Deadlock Prevention
21///
22/// This implementation prevents deadlocks through two mechanisms:
23///
24/// 1. **Automatic Sorting**: When acquiring multiple locks via `acquire_keys()`,
25///    stripes are always acquired in ascending index order. This prevents the
26///    classic A-B/B-A deadlock scenario.
27///
28/// 2. **Timeout-Based Acquisition**: Uses `try_lock_for()` with a configurable
29///    timeout instead of blocking indefinitely. If a lock cannot be acquired
30///    within the timeout, returns `LockTimeout` error.
31///
32/// # Example
33///
34/// ```ignore
35/// let lm = LockManager::new(256, Duration::from_secs(5));
36///
37/// // Acquire locks on multiple keys - automatically sorted to prevent deadlock
38/// let _guard = lm.acquire_keys(&[b"key_b", b"key_a"])?;
39/// // Locks acquired in stripe order, not key order
40/// ```
41pub struct LockManager {
42    stripes: Vec<Mutex<()>>,
43    num_stripes: usize,
44    default_timeout: Duration,
45}
46
47/// Guard that holds multiple stripe locks
48///
49/// Locks are held in sorted stripe order and released in reverse order
50/// when dropped (via RAII).
51pub struct MultiLockGuard<'a> {
52    // Stored in sorted stripe order; dropped in reverse order automatically
53    _guards: Vec<MutexGuard<'a, ()>>,
54}
55
56impl LockManager {
57    /// Create a new lock manager with the specified number of stripes
58    ///
59    /// # Arguments
60    ///
61    /// * `num_stripes` - Number of lock stripes (common values: 256, 512, 1024).
62    ///   More stripes = less contention but more memory.
63    /// * `default_timeout` - Default timeout for lock acquisition.
64    ///
65    /// # Panics
66    ///
67    /// Panics if `num_stripes` is 0.
68    pub fn new(num_stripes: usize, default_timeout: Duration) -> Self {
69        assert!(num_stripes > 0, "num_stripes must be positive");
70        let stripes = (0..num_stripes).map(|_| Mutex::new(())).collect();
71
72        Self {
73            stripes,
74            num_stripes,
75            default_timeout,
76        }
77    }
78
79    /// Create a lock manager with default timeout
80    pub fn with_stripes(num_stripes: usize) -> Self {
81        Self::new(num_stripes, Duration::from_millis(DEFAULT_LOCK_TIMEOUT_MS))
82    }
83
84    /// Hash a key to determine its stripe index
85    fn stripe_index(&self, key: &[u8]) -> usize {
86        let hash = xxh3_64(key);
87        (hash as usize) % self.num_stripes
88    }
89
90    /// Acquire exclusive locks on all keys in a deadlock-free manner
91    ///
92    /// This method:
93    /// 1. Computes stripe indices for all keys
94    /// 2. Deduplicates stripes (multiple keys may map to same stripe)
95    /// 3. Sorts stripe indices for consistent global ordering
96    /// 4. Acquires locks in sorted order with timeout
97    ///
98    /// # Arguments
99    ///
100    /// * `keys` - Keys to acquire locks for
101    ///
102    /// # Returns
103    ///
104    /// A `MultiLockGuard` that holds all acquired locks. Locks are released
105    /// when the guard is dropped.
106    ///
107    /// # Errors
108    ///
109    /// Returns `LockTimeout` if any lock cannot be acquired within the timeout.
110    ///
111    /// # Deadlock Safety
112    ///
113    /// Because locks are always acquired in sorted stripe order, two threads
114    /// acquiring locks on keys [A, B] and [B, A] will both attempt to acquire
115    /// locks in the same order, preventing deadlock.
116    pub fn acquire_keys<K: AsRef<[u8]>>(&self, keys: &[K]) -> Result<MultiLockGuard<'_>> {
117        self.acquire_keys_with_timeout(keys, self.default_timeout)
118    }
119
120    /// Acquire exclusive locks with a custom timeout
121    pub fn acquire_keys_with_timeout<K: AsRef<[u8]>>(
122        &self,
123        keys: &[K],
124        timeout: Duration,
125    ) -> Result<MultiLockGuard<'_>> {
126        // Early return for empty keys
127        if keys.is_empty() {
128            return Ok(MultiLockGuard {
129                _guards: Vec::new(),
130            });
131        }
132
133        // Compute stripe indices and deduplicate using BTreeSet (automatically sorted)
134        let stripe_indices: BTreeSet<usize> =
135            keys.iter().map(|k| self.stripe_index(k.as_ref())).collect();
136
137        // Acquire locks in sorted order
138        let mut guards = Vec::with_capacity(stripe_indices.len());
139
140        for stripe_idx in stripe_indices {
141            match self.stripes[stripe_idx].try_lock_for(timeout) {
142                Some(guard) => guards.push(guard),
143                None => {
144                    // Failed to acquire lock - drop all acquired locks and return error
145                    // Guards are dropped automatically when `guards` goes out of scope
146                    return Err(AzothError::LockTimeout {
147                        timeout_ms: timeout.as_millis() as u64,
148                    });
149                }
150            }
151        }
152
153        Ok(MultiLockGuard { _guards: guards })
154    }
155
156    /// Acquire a single lock (convenience method)
157    ///
158    /// Prefer `acquire_keys()` for multiple keys to ensure deadlock safety.
159    pub fn lock(&self, key: &[u8]) -> Result<MutexGuard<'_, ()>> {
160        let idx = self.stripe_index(key);
161        self.stripes[idx]
162            .try_lock_for(self.default_timeout)
163            .ok_or(AzothError::LockTimeout {
164                timeout_ms: self.default_timeout.as_millis() as u64,
165            })
166    }
167
168    /// Get the number of stripes
169    pub fn num_stripes(&self) -> usize {
170        self.num_stripes
171    }
172
173    /// Get the default timeout
174    pub fn default_timeout(&self) -> Duration {
175        self.default_timeout
176    }
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182    use std::sync::Arc;
183    use std::thread;
184
185    #[test]
186    fn test_lock_manager_basic() {
187        let lm = LockManager::with_stripes(256);
188        assert_eq!(lm.num_stripes(), 256);
189
190        // Should be able to acquire locks
191        let _guard = lm.acquire_keys(&[b"key1", b"key2"]).unwrap();
192    }
193
194    #[test]
195    fn test_stripe_distribution() {
196        let lm = LockManager::with_stripes(256);
197
198        // Different keys should (usually) map to different stripes
199        let idx1 = lm.stripe_index(b"key1");
200        let idx2 = lm.stripe_index(b"key2");
201        let idx3 = lm.stripe_index(b"key3");
202
203        assert!(idx1 < 256);
204        assert!(idx2 < 256);
205        assert!(idx3 < 256);
206
207        // Same key should always map to same stripe
208        assert_eq!(idx1, lm.stripe_index(b"key1"));
209    }
210
211    #[test]
212    fn test_empty_keys() {
213        let lm = LockManager::with_stripes(256);
214        let _guard = lm.acquire_keys::<&[u8]>(&[]).unwrap();
215    }
216
217    #[test]
218    fn test_duplicate_keys_deduplicated() {
219        let lm = LockManager::with_stripes(256);
220
221        // Same key multiple times should only lock once
222        let _guard = lm.acquire_keys(&[b"key1", b"key1", b"key1"]).unwrap();
223    }
224
225    #[test]
226    fn test_sorted_acquisition_prevents_deadlock() {
227        // This test verifies that unsorted key order doesn't cause deadlock
228        let lm = Arc::new(LockManager::with_stripes(256));
229
230        let lm1 = lm.clone();
231        let lm2 = lm.clone();
232
233        // Thread 1: keys in order [a, b]
234        let h1 = thread::spawn(move || {
235            for _ in 0..100 {
236                let _guard = lm1.acquire_keys(&[b"key_a", b"key_b"]).unwrap();
237                // Hold briefly
238                thread::sleep(Duration::from_micros(10));
239            }
240        });
241
242        // Thread 2: keys in REVERSE order [b, a]
243        // Without sorted acquisition, this would deadlock
244        let h2 = thread::spawn(move || {
245            for _ in 0..100 {
246                let _guard = lm2.acquire_keys(&[b"key_b", b"key_a"]).unwrap();
247                thread::sleep(Duration::from_micros(10));
248            }
249        });
250
251        // Both should complete without deadlock
252        h1.join().unwrap();
253        h2.join().unwrap();
254    }
255
256    #[test]
257    fn test_timeout_works() {
258        let lm = Arc::new(LockManager::new(1, Duration::from_millis(50)));
259
260        // Acquire the only stripe
261        let _guard = lm.acquire_keys(&[b"any_key"]).unwrap();
262
263        // Try to acquire from another thread - should timeout
264        let lm2 = lm.clone();
265        let handle = thread::spawn(move || {
266            // Check if result is a timeout error (don't return guard across threads)
267            matches!(
268                lm2.acquire_keys(&[b"another_key"]),
269                Err(AzothError::LockTimeout { .. })
270            )
271        });
272
273        let timed_out = handle.join().unwrap();
274        assert!(timed_out, "Should have timed out");
275    }
276
277    #[test]
278    fn test_concurrent_different_stripes() {
279        // Keys on different stripes should be acquired concurrently
280        let lm = Arc::new(LockManager::with_stripes(256));
281
282        let handles: Vec<_> = (0..10)
283            .map(|i| {
284                let lm = lm.clone();
285                thread::spawn(move || {
286                    let key = format!("unique_key_{}", i);
287                    let _guard = lm.acquire_keys(&[key.as_bytes()]).unwrap();
288                    thread::sleep(Duration::from_millis(10));
289                })
290            })
291            .collect();
292
293        for handle in handles {
294            handle.join().unwrap();
295        }
296    }
297}