range_lock/
reprangelock.rs

1// -*- coding: utf-8 -*-
2//
3// Copyright 2021-2023 Michael Büsch <m@bues.ch>
4//
5// Licensed under the Apache License version 2.0
6// or the MIT license, at your option.
7// SPDX-License-Identifier: Apache-2.0 OR MIT
8//
9
10use std::{
11    cell::UnsafeCell,
12    hint::unreachable_unchecked,
13    marker::PhantomData,
14    ops::{Index, IndexMut},
15    rc::Rc,
16    sync::{
17        atomic::{AtomicU32, Ordering},
18        TryLockError, TryLockResult,
19    },
20};
21
22/// Interleaved multi-thread range lock for [std::vec::Vec].
23///
24/// Each thread can lock a set of repeating slices of the data.
25/// The slices are interleaved with each other
26/// and the slice pattern cyclically repeats at `cycle_len` rate.
27///
28/// Offsets are not bound to one specific thread.
29///
30/// Please see the example below.
31///
32/// # Example
33///
34/// ```
35/// use range_lock::RepVecRangeLock;
36/// use std::{sync::Arc, thread};
37///
38/// let data = vec![1, 2,  3, 4,   5,  6,   // <- cycle 0
39///                 7, 8,  9, 10,  11, 12]; // <- cycle 1
40/// //              ^--^   ^---^   ^----^
41/// //                |      |      |
42/// //          offset-0  offset-1  offset-2
43///
44/// let lock = Arc::new(RepVecRangeLock::new(data,
45///                                          2,    // slice_len: Each slice has 2 elements.
46///                                          3));  // cycle_len: Each cycle has 3 slices (offsets).
47/// let lock0 = Arc::clone(&lock);
48/// let lock1 = Arc::clone(&lock);
49/// let lock2 = Arc::clone(&lock);
50///
51/// thread::scope(|s| {
52///     s.spawn(move || {
53///         // Lock slice offset 0:
54///         let mut guard = lock0.try_lock(0).expect("Failed to lock offset.");
55///
56///         // Read:
57///         assert_eq!(guard[0][0], 1);     // Cycle 0, Slice element 0
58///         assert_eq!(guard[0][1], 2);     // Cycle 0, Slice element 1
59///         // let _ = guard[0][2];         // Would panic. Slice len is only 2.
60///         assert_eq!(guard[1][0], 7);     // Cycle 1, Slice element 0
61///         assert_eq!(guard[1][1], 8);     // Cycle 1, Slice element 1
62///         // let _ = guard[2][0];         // Would panic: The data vec is only 2 repeat cycles long.
63///
64///         // Write:
65///         guard[0][0] = 10;               // Cycle 0, Slice element 0
66///         guard[0][1] = 20;               // Cycle 0, Slice element 1
67///         // guard[0][2] = 42;            // Would panic: Slice len is only 2.
68///         guard[1][0] = 30;               // Cycle 1, Slice element 0
69///         guard[1][1] = 40;               // Cycle 1, Slice element 1
70///         // guard[2][0] = 42;            // Would panic: The data vec is only 2 repeat cycles long.
71///     });
72///
73///     s.spawn(move || {
74///         // Lock slice offset 1:
75///         let mut guard = lock1.try_lock(1).expect("Failed to lock offset.");
76///
77///         guard[0][0] = 100;              // Cycle 0, Slice element 0
78///         guard[0][1] = 200;              // Cycle 0, Slice element 1
79///         guard[1][0] = 300;              // Cycle 1, Slice element 0
80///         guard[1][1] = 400;              // Cycle 1, Slice element 1
81///     });
82///
83///     s.spawn(move || {
84///         // Lock slice offset 2:
85///         let mut guard = lock2.try_lock(2).expect("Failed to lock offset.");
86///
87///         guard[0][0] = 1000;             // Cycle 0, Slice element 0
88///         guard[0][1] = 2000;             // Cycle 0, Slice element 1
89///         guard[1][0] = 3000;             // Cycle 1, Slice element 0
90///         guard[1][1] = 4000;             // Cycle 1, Slice element 1
91///     });
92/// });
93///
94/// // Get the data that has been modified by the threads.
95/// let data = Arc::try_unwrap(lock).expect("Thread is still using data.").into_inner();
96///
97/// assert_eq!(data,
98///            vec![10, 20, 100, 200, 1000, 2000,
99///                 30, 40, 300, 400, 3000, 4000]);
100/// ```
101#[derive(Debug)]
102pub struct RepVecRangeLock<T> {
103    /// Range length, in number of data elements.
104    slice_len: usize,
105    /// Cycle length, in number of slices.
106    cycle_len: usize,
107    /// Cycle length, in number of data elements.
108    cycle_num_elems: usize,
109    /// Bitmask of locked cycle offsets.
110    locked_offsets: Vec<AtomicU32>,
111    /// The protected data.
112    data: UnsafeCell<Vec<T>>,
113}
114
115// SAFETY:
116// It is safe to access RepVecRangeLock and the contained data (via RepVecRangeLockGuard)
117// from multiple threads simultaneously.
118// The lock ensures that access to the data is strictly serialized.
119// T must be Send-able to other threads.
120unsafe impl<T> Sync for RepVecRangeLock<T> where T: Send {}
121
122impl<'a, T> RepVecRangeLock<T> {
123    /// Construct a new [RepVecRangeLock].
124    ///
125    /// * `data`: The data [Vec] to protect.
126    /// * `slice_len`: The length of the slices, in number of elements. Must be >0.
127    /// * `cycle_len`: The length of the repeat cycle, in number of slices. Must be >0 and <=usize::MAX-31.
128    pub fn new(data: Vec<T>, slice_len: usize, cycle_len: usize) -> RepVecRangeLock<T> {
129        if slice_len == 0 {
130            panic!("slice_len must not be 0.");
131        }
132        if cycle_len == 0 || cycle_len > usize::MAX - 31 {
133            panic!("cycle_len out of range.");
134        }
135        let Some(cycle_num_elems) = cycle_len.checked_mul(slice_len) else {
136            panic!("Repeat cycle overflow.");
137        };
138
139        let num = (cycle_len + 31) / 32;
140        let mut locked_offsets = Vec::with_capacity(num);
141        locked_offsets.resize_with(num, || AtomicU32::new(0));
142
143        let data = UnsafeCell::new(data);
144
145        RepVecRangeLock {
146            slice_len,
147            cycle_len,
148            cycle_num_elems,
149            locked_offsets,
150            data,
151        }
152    }
153
154    /// Get the length (in number of elements) of the embedded [Vec].
155    #[inline]
156    pub fn data_len(&self) -> usize {
157        // SAFETY: Multithreaded access is safe. len cannot change.
158        unsafe { (*self.data.get()).len() }
159    }
160
161    /// Unwrap this [RepVecRangeLock] into the contained data.
162    /// This method consumes self.
163    #[inline]
164    pub fn into_inner(self) -> Vec<T> {
165        debug_assert!(self
166            .locked_offsets
167            .iter()
168            .all(|x| x.load(Ordering::Acquire) == 0));
169        self.data.into_inner()
170    }
171
172    /// Try to lock the given data slice at 'cycle_offset'.
173    ///
174    /// * On success: Returns a [RepVecRangeLockGuard] that can be used to access the locked region.
175    ///               Indexing [RepVecRangeLockGuard] yields a slice of the `data`.
176    /// * On failure: Returns [TryLockError::WouldBlock], if the slice is contended.
177    ///               The locking attempt may be retried by the caller upon contention.
178    ///               Returns [TryLockError::Poisoned], if the lock is poisoned.
179    #[inline]
180    pub fn try_lock(&'a self, cycle_offset: usize) -> TryLockResult<RepVecRangeLockGuard<'a, T>> {
181        if cycle_offset >= self.cycle_len {
182            panic!("Invalid cycle_offset. It must be 0 <= cycle_offset < cycle_len.");
183        }
184        let idx = cycle_offset / 32;
185        let mask = 1 << (cycle_offset % 32);
186        // SAFETY: cycle_offset has been checked against cycle_len.
187        let prev =
188            unsafe { self.locked_offsets.get_unchecked(idx) }.fetch_or(mask, Ordering::AcqRel);
189        if prev & mask == 0 {
190            // Multiply cannot overflow due to slice_len, cycle_len and cycle_offset checks.
191            let cycle_offset_slices = self.slice_len * cycle_offset;
192            // Successfully acquired the lock.
193            TryLockResult::Ok(RepVecRangeLockGuard::new(
194                self,
195                cycle_offset,
196                cycle_offset_slices,
197            ))
198        } else {
199            // Already locked by another thread.
200            TryLockResult::Err(TryLockError::WouldBlock)
201        }
202    }
203
204    /// Unlock a slice at 'cycle_offset'.
205    #[inline]
206    fn unlock(&self, cycle_offset: usize) {
207        let idx = cycle_offset / 32;
208        let mask = 1 << (cycle_offset % 32);
209        // SAFETY: cycle_offset has been checked against cycle_len in try_lock().
210        let prev =
211            unsafe { self.locked_offsets.get_unchecked(idx) }.fetch_xor(mask, Ordering::Release);
212        debug_assert!(prev & mask != 0);
213    }
214
215    /// Get an immutable slice at 'cycle' / 'cycle_offset'.
216    ///
217    /// # SAFETY
218    ///
219    /// See get_mut_slice().
220    #[inline]
221    unsafe fn get_slice(&self, cycle_offset_slices: usize, cycle: usize) -> &[T] {
222        if let Some(cycle_elemidx) = self.cycle_num_elems.checked_mul(cycle) {
223            if let Some(begin) = cycle_elemidx.checked_add(cycle_offset_slices) {
224                if let Some(end) = begin.checked_add(self.slice_len) {
225                    let dataptr = self.data.get();
226                    if end <= (*dataptr).len() {
227                        // SAFETY: We trust the slicing machinery of Vec to work correctly.
228                        //         It must return the slice range that we requested.
229                        //         Otherwise our non-overlap guarantees are gone.
230                        return &(*dataptr)[begin..end];
231                    }
232                }
233            }
234        }
235        panic!("RepVecRangeLock cycle index out of range.");
236    }
237
238    /// Get a mutable slice at 'cycle' / 'cycle_offset'.
239    ///
240    /// # SAFETY
241    ///
242    /// The caller must ensure that:
243    /// * No overlapping slices must coexist on multiple threads.
244    /// * Immutable slices to overlapping ranges may only coexist on a single thread.
245    /// * Immutable and mutable slices must not coexist.
246    #[inline]
247    #[allow(clippy::mut_from_ref)] // Slices won't overlap. See SAFETY.
248    unsafe fn get_mut_slice(&self, cycle_offset_slices: usize, cycle: usize) -> &mut [T] {
249        let cptr = self.get_slice(cycle_offset_slices, cycle) as *const [T];
250        let mut_slice = (cptr as *mut [T]).as_mut();
251        // SAFETY: The pointer is never null, because it has been casted from a slice.
252        mut_slice.unwrap_or_else(|| unreachable_unchecked())
253    }
254}
255
256/// Lock guard variable type for [RepVecRangeLock].
257///
258/// The [Index] and [IndexMut] traits are implemented for this struct.
259/// See the documentation of [RepVecRangeLock] for usage examples of [RepVecRangeLockGuard].
260#[derive(Debug)]
261pub struct RepVecRangeLockGuard<'a, T> {
262    /// Reference to the underlying lock.
263    lock: &'a RepVecRangeLock<T>,
264    /// The locked cycle offset.
265    cycle_offset: usize,
266    /// The locked slice start.
267    cycle_offset_slices: usize,
268    /// Suppresses Send and Sync autotraits for RepVecRangeLockGuard.
269    /// The &mut suppresses Sync and the Rc suppresses Send.
270    #[allow(clippy::redundant_allocation)]
271    _p: PhantomData<Rc<&'a mut T>>,
272}
273
274impl<'a, T> RepVecRangeLockGuard<'a, T> {
275    #[inline]
276    fn new(
277        lock: &'a RepVecRangeLock<T>,
278        cycle_offset: usize,
279        cycle_offset_slices: usize,
280    ) -> RepVecRangeLockGuard<'a, T> {
281        RepVecRangeLockGuard {
282            lock,
283            cycle_offset,
284            cycle_offset_slices,
285            _p: PhantomData,
286        }
287    }
288}
289
290impl<'a, T> Drop for RepVecRangeLockGuard<'a, T> {
291    #[inline]
292    fn drop(&mut self) {
293        self.lock.unlock(self.cycle_offset);
294    }
295}
296
297impl<'a, T> Index<usize> for RepVecRangeLockGuard<'a, T> {
298    type Output = [T];
299
300    #[inline]
301    fn index(&self, cycle: usize) -> &Self::Output {
302        // SAFETY: See index_mut().
303        unsafe { self.lock.get_slice(self.cycle_offset_slices, cycle) }
304    }
305}
306
307impl<'a, T> IndexMut<usize> for RepVecRangeLockGuard<'a, T> {
308    #[inline]
309    fn index_mut(&mut self, cycle: usize) -> &mut Self::Output {
310        // SAFETY:
311        // The lifetime of the slice is bounded by the lifetime of the guard.
312        // The lifetime of the guard is bounded by the lifetime of the range lock.
313        // The underlying data is owned by the range lock.
314        // Therefore the slice cannot outlive the data.
315        // The range lock ensures that no overlapping/conflicting guards
316        // can be constructed.
317        // The compiler ensures that the DerefMut result cannot be used,
318        // if there's also an immutable Deref result.
319        unsafe { self.lock.get_mut_slice(self.cycle_offset_slices, cycle) }
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326    use std::cell::RefCell;
327    use std::sync::{Arc, Barrier};
328    use std::thread;
329
330    #[test]
331    #[should_panic(expected = "cycle_len out of range")]
332    fn test_oob_slice_len() {
333        let _ = RepVecRangeLock::new(vec![0; 100], 1, 0);
334    }
335
336    #[test]
337    #[should_panic(expected = "cycle_len out of range")]
338    fn test_oob_cycle_len1() {
339        let _ = RepVecRangeLock::new(vec![0; 100], 1, usize::MAX - 30);
340    }
341
342    #[test]
343    #[should_panic(expected = "slice_len must not be 0")]
344    fn test_oob_cycle_len0() {
345        let _ = RepVecRangeLock::new(vec![0; 100], 0, 1);
346    }
347
348    #[test]
349    #[should_panic(expected = "cycle overflow")]
350    fn test_oob_cycle_len2() {
351        let _ = RepVecRangeLock::new(vec![0; 100], usize::MAX, 2);
352    }
353
354    #[test]
355    #[should_panic(expected = "must be 0 <= cycle_offset < cycle_len")]
356    fn test_oob_lock_offset() {
357        let a = RepVecRangeLock::new(vec![0; 100], 2, 10);
358        let _ = a.try_lock(10);
359    }
360
361    #[test]
362    #[should_panic(expected = "index out of bounds")]
363    fn test_base_oob_read() {
364        let a = RepVecRangeLock::new(vec![0; 100], 1, 2);
365        let g = a.try_lock(0).unwrap();
366        let _ = g[0][1];
367    }
368
369    #[test]
370    #[should_panic(expected = "guard 1 panicked")]
371    fn test_overlap0() {
372        let a = RepVecRangeLock::new(vec![1_i32, 2, 3, 4, 5, 6], 1, 3);
373        let _g0 = a.try_lock(0).expect("guard 0 panicked");
374        let _g1 = a.try_lock(0).expect("guard 1 panicked");
375    }
376
377    #[test]
378    #[should_panic(expected = "guard 1 panicked")]
379    fn test_overlap1() {
380        let a = RepVecRangeLock::new(vec![1_i32, 2, 3, 4, 5, 6], 1, 3);
381        let _g0 = a.try_lock(1).expect("guard 0 panicked");
382        let _g1 = a.try_lock(1).expect("guard 1 panicked");
383    }
384
385    #[test]
386    fn test_big_cycle() {
387        let a = Arc::new(RepVecRangeLock::new(
388            vec![1_i32; 256],
389            2,   // slice_len
390            128, // cycle_len
391        ));
392        assert!(a.locked_offsets.len() == 4);
393        {
394            let _g = a.try_lock(0);
395            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 1);
396            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
397            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
398            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
399        }
400        {
401            let _g = a.try_lock(1);
402            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 2);
403            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
404            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
405            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
406        }
407        {
408            let _g = a.try_lock(32);
409            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
410            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 1);
411            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
412            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
413        }
414        {
415            let _g = a.try_lock(33);
416            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
417            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 2);
418            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
419            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
420        }
421        {
422            let _g = a.try_lock(69);
423            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
424            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
425            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 32);
426            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
427        }
428        {
429            let _g = a.try_lock(127);
430            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
431            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
432            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
433            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0x80000000);
434        }
435    }
436
437    #[test]
438    #[should_panic(expected = "Invalid cycle_offset")]
439    fn test_cycle_offset_out_of_range() {
440        let a = Arc::new(RepVecRangeLock::new(
441            vec![1_i32; 256],
442            2,   // slice_len
443            128, // cycle_len
444        ));
445        let _g = a.try_lock(128);
446    }
447
448    #[test]
449    fn test_thread_no_overlap() {
450        let a = Arc::new(RepVecRangeLock::new(
451            vec![1_i32, 2, 3, 4],
452            1, // slice_len
453            2, // cycle_len
454        ));
455        let b = Arc::clone(&a);
456        let c = Arc::clone(&a);
457        let ba0 = Arc::new(Barrier::new(2));
458        let ba1 = Arc::clone(&ba0);
459        let j0 = thread::spawn(move || {
460            {
461                let mut g = b.try_lock(0).unwrap();
462                assert!(b.locked_offsets[0].load(Ordering::Acquire) & 1 != 0);
463                assert_eq!(g[0][0], 1);
464                assert_eq!(g[1][0], 3);
465                g[0][0] = 10;
466                g[1][0] = 30;
467            }
468            ba0.wait();
469        });
470        let j1 = thread::spawn(move || {
471            {
472                let g = c.try_lock(1).unwrap();
473                assert!(c.locked_offsets[0].load(Ordering::Acquire) & 2 != 0);
474                assert_eq!(g[0][0], 2);
475                assert_eq!(g[1][0], 4);
476            }
477            ba1.wait();
478            let g = c.try_lock(0).unwrap();
479            assert_eq!(g[0][0], 10);
480            assert_eq!(g[1][0], 30);
481        });
482        j1.join().expect("Thread 1 panicked.");
483        j0.join().expect("Thread 0 panicked.");
484        assert!(a
485            .locked_offsets
486            .iter()
487            .all(|x| x.load(Ordering::Acquire) == 0));
488    }
489
490    struct NoSyncStruct(RefCell<u32>); // No Sync auto-trait.
491
492    #[test]
493    fn test_nosync() {
494        let a = Arc::new(RepVecRangeLock::new(
495            vec![
496                NoSyncStruct(RefCell::new(1)),
497                NoSyncStruct(RefCell::new(2)),
498                NoSyncStruct(RefCell::new(3)),
499                NoSyncStruct(RefCell::new(4)),
500            ],
501            1, // slice_len
502            2, // cycle_len
503        ));
504        let b = Arc::clone(&a);
505        let c = Arc::clone(&a);
506        let ba0 = Arc::new(Barrier::new(2));
507        let ba1 = Arc::clone(&ba0);
508        let j0 = thread::spawn(move || {
509            let _g = b.try_lock(0).unwrap();
510            assert!(b.locked_offsets[0].load(Ordering::Acquire) & 1 != 0);
511            ba0.wait();
512        });
513        let j1 = thread::spawn(move || {
514            let _g = c.try_lock(1).unwrap();
515            assert!(c.locked_offsets[0].load(Ordering::Acquire) & 2 != 0);
516            ba1.wait();
517        });
518        j1.join().expect("Thread 1 panicked.");
519        j0.join().expect("Thread 0 panicked.");
520        assert!(a
521            .locked_offsets
522            .iter()
523            .all(|x| x.load(Ordering::Acquire) == 0));
524    }
525}
526
527// vim: ts=4 sw=4 expandtab