range_lock/
reprangelock.rs

1// -*- coding: utf-8 -*-
2//
3// Copyright 2021-2025 Michael Büsch <m@bues.ch>
4//
5// Licensed under the Apache License version 2.0
6// or the MIT license, at your option.
7// SPDX-License-Identifier: Apache-2.0 OR MIT
8//
9
10use crate::vecparts::VecParts;
11use std::{
12    cell::UnsafeCell,
13    marker::PhantomData,
14    mem::size_of,
15    ops::{Index, IndexMut, Range},
16    rc::Rc,
17    slice,
18    sync::{
19        atomic::{AtomicU32, Ordering},
20        TryLockError, TryLockResult,
21    },
22};
23
24/// Interleaved multi-thread range lock for [std::vec::Vec].
25///
26/// Each thread can lock a set of repeating slices of the data.
27/// The slices are interleaved with each other
28/// and the slice pattern cyclically repeats at `cycle_len` rate.
29///
30/// Offsets are not bound to one specific thread.
31///
32/// Please see the example below.
33///
34/// # Example
35///
36/// ```
37/// use range_lock::RepVecRangeLock;
38/// use std::{sync::Arc, thread};
39///
40/// let data = vec![1, 2,  3, 4,   5,  6,   // <- cycle 0
41///                 7, 8,  9, 10,  11, 12]; // <- cycle 1
42/// //              ^--^   ^---^   ^----^
43/// //                |      |      |
44/// //          offset-0  offset-1  offset-2
45///
46/// let lock = Arc::new(RepVecRangeLock::new(data,
47///                                          2,    // slice_len: Each slice has 2 elements.
48///                                          3));  // cycle_len: Each cycle has 3 slices (offsets).
49/// let lock0 = Arc::clone(&lock);
50/// let lock1 = Arc::clone(&lock);
51/// let lock2 = Arc::clone(&lock);
52///
53/// thread::scope(|s| {
54///     s.spawn(move || {
55///         // Lock slice offset 0:
56///         let mut guard = lock0.try_lock(0).expect("Failed to lock offset.");
57///
58///         // Read:
59///         assert_eq!(guard[0][0], 1);     // Cycle 0, Slice element 0
60///         assert_eq!(guard[0][1], 2);     // Cycle 0, Slice element 1
61///         // let _ = guard[0][2];         // Would panic. Slice len is only 2.
62///         assert_eq!(guard[1][0], 7);     // Cycle 1, Slice element 0
63///         assert_eq!(guard[1][1], 8);     // Cycle 1, Slice element 1
64///         // let _ = guard[2][0];         // Would panic: The data vec is only 2 repeat cycles long.
65///
66///         // Write:
67///         guard[0][0] = 10;               // Cycle 0, Slice element 0
68///         guard[0][1] = 20;               // Cycle 0, Slice element 1
69///         // guard[0][2] = 42;            // Would panic: Slice len is only 2.
70///         guard[1][0] = 30;               // Cycle 1, Slice element 0
71///         guard[1][1] = 40;               // Cycle 1, Slice element 1
72///         // guard[2][0] = 42;            // Would panic: The data vec is only 2 repeat cycles long.
73///     });
74///
75///     s.spawn(move || {
76///         // Lock slice offset 1:
77///         let mut guard = lock1.try_lock(1).expect("Failed to lock offset.");
78///
79///         guard[0][0] = 100;              // Cycle 0, Slice element 0
80///         guard[0][1] = 200;              // Cycle 0, Slice element 1
81///         guard[1][0] = 300;              // Cycle 1, Slice element 0
82///         guard[1][1] = 400;              // Cycle 1, Slice element 1
83///     });
84///
85///     s.spawn(move || {
86///         // Lock slice offset 2:
87///         let mut guard = lock2.try_lock(2).expect("Failed to lock offset.");
88///
89///         guard[0][0] = 1000;             // Cycle 0, Slice element 0
90///         guard[0][1] = 2000;             // Cycle 0, Slice element 1
91///         guard[1][0] = 3000;             // Cycle 1, Slice element 0
92///         guard[1][1] = 4000;             // Cycle 1, Slice element 1
93///     });
94/// });
95///
96/// // Get the data that has been modified by the threads.
97/// let data = Arc::try_unwrap(lock).expect("Thread is still using data.").into_inner();
98///
99/// assert_eq!(data,
100///            vec![10, 20, 100, 200, 1000, 2000,
101///                 30, 40, 300, 400, 3000, 4000]);
102/// ```
103#[derive(Debug)]
104pub struct RepVecRangeLock<T> {
105    /// Range length, in number of data elements.
106    slice_len: usize,
107    /// Cycle length, in number of slices.
108    cycle_len: usize,
109    /// Cycle length, in number of data elements.
110    cycle_num_elems: usize,
111    /// Bitmask of locked cycle offsets.
112    locked_offsets: Vec<AtomicU32>,
113    /// The protected data.
114    data: UnsafeCell<VecParts<T>>,
115}
116
117// SAFETY:
118// It is safe to access RepVecRangeLock and the contained data (via RepVecRangeLockGuard)
119// from multiple threads simultaneously.
120// The lock ensures that access to the data is strictly serialized.
121// T must be Send-able to other threads.
122unsafe impl<T> Sync for RepVecRangeLock<T> where T: Send {}
123
124impl<'a, T> RepVecRangeLock<T> {
125    /// Construct a new [RepVecRangeLock].
126    ///
127    /// * `data`: The data [Vec] to protect.
128    /// * `slice_len`: The length of the slices, in number of elements. Must be >0.
129    /// * `cycle_len`: The length of the repeat cycle, in number of slices. Must be >0 and <=usize::MAX-31.
130    pub fn new(data: Vec<T>, slice_len: usize, cycle_len: usize) -> RepVecRangeLock<T> {
131        if slice_len == 0 {
132            panic!("slice_len must not be 0.");
133        }
134        if cycle_len == 0 || cycle_len > usize::MAX - 31 {
135            panic!("cycle_len out of range.");
136        }
137        let Some(cycle_num_elems) = cycle_len.checked_mul(slice_len) else {
138            panic!("Repeat cycle overflow.");
139        };
140
141        let num = cycle_len.div_ceil(32);
142        let mut locked_offsets = Vec::with_capacity(num);
143        locked_offsets.resize_with(num, || AtomicU32::new(0));
144
145        let data = UnsafeCell::new(data.into());
146
147        RepVecRangeLock {
148            slice_len,
149            cycle_len,
150            cycle_num_elems,
151            locked_offsets,
152            data,
153        }
154    }
155
156    /// Get the length (in number of elements) of the embedded [Vec].
157    #[inline]
158    pub fn data_len(&self) -> usize {
159        // SAFETY: The UnsafeCell content it always valid.
160        unsafe { (*self.data.get()).len() }
161    }
162
163    /// Unwrap this [RepVecRangeLock] into the contained data.
164    /// This method consumes self.
165    #[inline]
166    pub fn into_inner(self) -> Vec<T> {
167        debug_assert!(self
168            .locked_offsets
169            .iter()
170            .all(|x| x.load(Ordering::Acquire) == 0));
171        self.data.into_inner().into()
172    }
173
174    /// Try to lock the given data slice at 'cycle_offset'.
175    ///
176    /// * On success: Returns a [RepVecRangeLockGuard] that can be used to access the locked region.
177    ///   Indexing [RepVecRangeLockGuard] yields a slice of the `data`.
178    /// * On failure: Returns [TryLockError::WouldBlock], if the slice is contended.
179    ///   The locking attempt may be retried by the caller upon contention.
180    ///   Returns [TryLockError::Poisoned], if the lock is poisoned.
181    #[inline]
182    pub fn try_lock(&'a self, cycle_offset: usize) -> TryLockResult<RepVecRangeLockGuard<'a, T>> {
183        if cycle_offset >= self.cycle_len {
184            panic!("Invalid cycle_offset. It must be 0 <= cycle_offset < cycle_len.");
185        }
186        let idx = cycle_offset / 32;
187        let mask = 1 << (cycle_offset % 32);
188        // SAFETY: cycle_offset has been checked against cycle_len.
189        let locked_offsets = unsafe { self.locked_offsets.get_unchecked(idx) };
190        let prev = locked_offsets.fetch_or(mask, Ordering::AcqRel);
191        if prev & mask == 0 {
192            // Multiply cannot overflow due to slice_len, cycle_len and cycle_offset checks.
193            let cycle_offset_slices = self.slice_len * cycle_offset;
194            // Successfully acquired the lock.
195            TryLockResult::Ok(RepVecRangeLockGuard::new(
196                self,
197                cycle_offset,
198                cycle_offset_slices,
199            ))
200        } else {
201            // Already locked by another thread.
202            TryLockResult::Err(TryLockError::WouldBlock)
203        }
204    }
205
206    /// Unlock a slice at 'cycle_offset'.
207    #[inline]
208    fn unlock(&self, cycle_offset: usize) {
209        let idx = cycle_offset / 32;
210        let mask = 1 << (cycle_offset % 32);
211        // SAFETY: cycle_offset has been checked against cycle_len in try_lock().
212        let locked_offsets = unsafe { self.locked_offsets.get_unchecked(idx) };
213        let prev = locked_offsets.fetch_xor(mask, Ordering::Release);
214        debug_assert!(prev & mask != 0);
215    }
216
217    /// Convert a 'cycle' / 'cycle_offset' into a `Range`.
218    #[inline]
219    fn calc_range(&self, cycle_offset_slices: usize, cycle: usize) -> Range<usize> {
220        if let Some(cycle_elemidx) = self.cycle_num_elems.checked_mul(cycle) {
221            if let Some(start) = cycle_elemidx.checked_add(cycle_offset_slices) {
222                if let Some(end) = start.checked_add(self.slice_len) {
223                    return Range { start, end };
224                }
225            }
226        }
227        panic!("RepVecRangeLock cycle index out of range.");
228    }
229
230    /// Get an immutable slice at 'cycle' / 'cycle_offset'.
231    ///
232    /// # SAFETY
233    ///
234    /// See get_mut_slice().
235    #[inline]
236    unsafe fn get_slice(&self, cycle_offset_slices: usize, cycle: usize) -> &[T] {
237        let data = (*self.data.get()).ptr();
238        let range = self.calc_range(cycle_offset_slices, cycle);
239        assert!(range.start <= isize::MAX as usize / size_of::<T>());
240        // SAFETY: The caller is responsible for passing a range that results in a valid slice
241        // and isize overflow has been checked here.
242        unsafe { slice::from_raw_parts(data.add(range.start) as _, range.end - range.start) }
243    }
244
245    /// Get a mutable slice at 'cycle' / 'cycle_offset'.
246    ///
247    /// # SAFETY
248    ///
249    /// The caller must ensure that:
250    /// * No overlapping slices must coexist on multiple threads.
251    /// * Immutable slices to overlapping ranges may only coexist on a single thread.
252    /// * Immutable and mutable slices must not coexist.
253    #[inline]
254    #[allow(clippy::mut_from_ref)]
255    unsafe fn get_mut_slice(&self, cycle_offset_slices: usize, cycle: usize) -> &mut [T] {
256        let data = (*self.data.get()).ptr();
257        let range = self.calc_range(cycle_offset_slices, cycle);
258        assert!(range.start <= isize::MAX as usize / size_of::<T>());
259        // SAFETY: The caller is responsible for passing a range that results in a valid slice
260        // and isize overflow has been checked here.
261        unsafe { slice::from_raw_parts_mut(data.add(range.start) as _, range.end - range.start) }
262    }
263}
264
265/// Lock guard variable type for [RepVecRangeLock].
266///
267/// The [Index] and [IndexMut] traits are implemented for this struct.
268/// See the documentation of [RepVecRangeLock] for usage examples of [RepVecRangeLockGuard].
269#[derive(Debug)]
270pub struct RepVecRangeLockGuard<'a, T> {
271    /// Reference to the underlying lock.
272    lock: &'a RepVecRangeLock<T>,
273    /// The locked cycle offset.
274    cycle_offset: usize,
275    /// The locked slice start.
276    cycle_offset_slices: usize,
277    /// Suppresses Send and Sync autotraits for RepVecRangeLockGuard.
278    /// The &mut suppresses Sync and the Rc suppresses Send.
279    #[allow(clippy::redundant_allocation)]
280    _p: PhantomData<Rc<&'a mut T>>,
281}
282
283impl<'a, T> RepVecRangeLockGuard<'a, T> {
284    #[inline]
285    fn new(
286        lock: &'a RepVecRangeLock<T>,
287        cycle_offset: usize,
288        cycle_offset_slices: usize,
289    ) -> RepVecRangeLockGuard<'a, T> {
290        RepVecRangeLockGuard {
291            lock,
292            cycle_offset,
293            cycle_offset_slices,
294            _p: PhantomData,
295        }
296    }
297}
298
299impl<T> Drop for RepVecRangeLockGuard<'_, T> {
300    #[inline]
301    fn drop(&mut self) {
302        self.lock.unlock(self.cycle_offset);
303    }
304}
305
306impl<T> Index<usize> for RepVecRangeLockGuard<'_, T> {
307    type Output = [T];
308
309    #[inline]
310    fn index(&self, cycle: usize) -> &Self::Output {
311        // SAFETY: See index_mut().
312        unsafe { self.lock.get_slice(self.cycle_offset_slices, cycle) }
313    }
314}
315
316impl<T> IndexMut<usize> for RepVecRangeLockGuard<'_, T> {
317    #[inline]
318    fn index_mut(&mut self, cycle: usize) -> &mut Self::Output {
319        // SAFETY:
320        // The lifetime of the slice is bounded by the lifetime of the guard.
321        // The lifetime of the guard is bounded by the lifetime of the range lock.
322        // The underlying data is owned by the range lock.
323        // Therefore the slice cannot outlive the data.
324        // The range lock ensures that no overlapping/conflicting guards
325        // can be constructed.
326        // The compiler ensures that the DerefMut result cannot be used,
327        // if there's also an immutable Deref result.
328        unsafe { self.lock.get_mut_slice(self.cycle_offset_slices, cycle) }
329    }
330}
331
332#[cfg(test)]
333mod tests {
334    use super::*;
335    use std::cell::RefCell;
336    use std::sync::{Arc, Barrier};
337    use std::thread;
338
339    #[test]
340    #[should_panic(expected = "cycle_len out of range")]
341    fn test_oob_slice_len() {
342        let _ = RepVecRangeLock::new(vec![0; 100], 1, 0);
343    }
344
345    #[test]
346    #[should_panic(expected = "cycle_len out of range")]
347    fn test_oob_cycle_len1() {
348        let _ = RepVecRangeLock::new(vec![0; 100], 1, usize::MAX - 30);
349    }
350
351    #[test]
352    #[should_panic(expected = "slice_len must not be 0")]
353    fn test_oob_cycle_len0() {
354        let _ = RepVecRangeLock::new(vec![0; 100], 0, 1);
355    }
356
357    #[test]
358    #[should_panic(expected = "cycle overflow")]
359    fn test_oob_cycle_len2() {
360        let _ = RepVecRangeLock::new(vec![0; 100], usize::MAX, 2);
361    }
362
363    #[test]
364    #[should_panic(expected = "must be 0 <= cycle_offset < cycle_len")]
365    fn test_oob_lock_offset() {
366        let a = RepVecRangeLock::new(vec![0; 100], 2, 10);
367        let _ = a.try_lock(10);
368    }
369
370    #[test]
371    #[should_panic(expected = "index out of bounds")]
372    fn test_base_oob_read() {
373        let a = RepVecRangeLock::new(vec![0; 100], 1, 2);
374        let g = a.try_lock(0).unwrap();
375        let _ = g[0][1];
376    }
377
378    #[test]
379    #[should_panic(expected = "guard 1 panicked")]
380    fn test_overlap0() {
381        let a = RepVecRangeLock::new(vec![1_i32, 2, 3, 4, 5, 6], 1, 3);
382        let _g0 = a.try_lock(0).expect("guard 0 panicked");
383        let _g1 = a.try_lock(0).expect("guard 1 panicked");
384    }
385
386    #[test]
387    #[should_panic(expected = "guard 1 panicked")]
388    fn test_overlap1() {
389        let a = RepVecRangeLock::new(vec![1_i32, 2, 3, 4, 5, 6], 1, 3);
390        let _g0 = a.try_lock(1).expect("guard 0 panicked");
391        let _g1 = a.try_lock(1).expect("guard 1 panicked");
392    }
393
394    #[test]
395    fn test_big_cycle() {
396        let a = Arc::new(RepVecRangeLock::new(
397            vec![1_i32; 256],
398            2,   // slice_len
399            128, // cycle_len
400        ));
401        assert!(a.locked_offsets.len() == 4);
402        {
403            let _g = a.try_lock(0);
404            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 1);
405            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
406            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
407            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
408        }
409        {
410            let _g = a.try_lock(1);
411            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 2);
412            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
413            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
414            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
415        }
416        {
417            let _g = a.try_lock(32);
418            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
419            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 1);
420            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
421            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
422        }
423        {
424            let _g = a.try_lock(33);
425            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
426            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 2);
427            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
428            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
429        }
430        {
431            let _g = a.try_lock(69);
432            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
433            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
434            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 32);
435            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0);
436        }
437        {
438            let _g = a.try_lock(127);
439            assert!(a.locked_offsets[0].load(Ordering::Acquire) == 0);
440            assert!(a.locked_offsets[1].load(Ordering::Acquire) == 0);
441            assert!(a.locked_offsets[2].load(Ordering::Acquire) == 0);
442            assert!(a.locked_offsets[3].load(Ordering::Acquire) == 0x80000000);
443        }
444    }
445
446    #[test]
447    #[should_panic(expected = "Invalid cycle_offset")]
448    fn test_cycle_offset_out_of_range() {
449        let a = Arc::new(RepVecRangeLock::new(
450            vec![1_i32; 256],
451            2,   // slice_len
452            128, // cycle_len
453        ));
454        let _g = a.try_lock(128);
455    }
456
457    #[test]
458    fn test_thread_no_overlap() {
459        let a = Arc::new(RepVecRangeLock::new(
460            vec![1_i32, 2, 3, 4],
461            1, // slice_len
462            2, // cycle_len
463        ));
464        let b = Arc::clone(&a);
465        let c = Arc::clone(&a);
466        let ba0 = Arc::new(Barrier::new(2));
467        let ba1 = Arc::clone(&ba0);
468        let j0 = thread::spawn(move || {
469            {
470                let mut g = b.try_lock(0).unwrap();
471                assert!(b.locked_offsets[0].load(Ordering::Acquire) & 1 != 0);
472                assert_eq!(g[0][0], 1);
473                assert_eq!(g[1][0], 3);
474                g[0][0] = 10;
475                g[1][0] = 30;
476            }
477            ba0.wait();
478        });
479        let j1 = thread::spawn(move || {
480            {
481                let g = c.try_lock(1).unwrap();
482                assert!(c.locked_offsets[0].load(Ordering::Acquire) & 2 != 0);
483                assert_eq!(g[0][0], 2);
484                assert_eq!(g[1][0], 4);
485            }
486            ba1.wait();
487            let g = c.try_lock(0).unwrap();
488            assert_eq!(g[0][0], 10);
489            assert_eq!(g[1][0], 30);
490        });
491        j1.join().expect("Thread 1 panicked.");
492        j0.join().expect("Thread 0 panicked.");
493        assert!(a
494            .locked_offsets
495            .iter()
496            .all(|x| x.load(Ordering::Acquire) == 0));
497    }
498
499    #[allow(dead_code)]
500    struct NoSyncStruct(RefCell<u32>); // No Sync auto-trait.
501
502    #[test]
503    fn test_nosync() {
504        let a = Arc::new(RepVecRangeLock::new(
505            vec![
506                NoSyncStruct(RefCell::new(1)),
507                NoSyncStruct(RefCell::new(2)),
508                NoSyncStruct(RefCell::new(3)),
509                NoSyncStruct(RefCell::new(4)),
510            ],
511            1, // slice_len
512            2, // cycle_len
513        ));
514        let b = Arc::clone(&a);
515        let c = Arc::clone(&a);
516        let ba0 = Arc::new(Barrier::new(2));
517        let ba1 = Arc::clone(&ba0);
518        let j0 = thread::spawn(move || {
519            let _g = b.try_lock(0).unwrap();
520            assert!(b.locked_offsets[0].load(Ordering::Acquire) & 1 != 0);
521            ba0.wait();
522        });
523        let j1 = thread::spawn(move || {
524            let _g = c.try_lock(1).unwrap();
525            assert!(c.locked_offsets[0].load(Ordering::Acquire) & 2 != 0);
526            ba1.wait();
527        });
528        j1.join().expect("Thread 1 panicked.");
529        j0.join().expect("Thread 0 panicked.");
530        assert!(a
531            .locked_offsets
532            .iter()
533            .all(|x| x.load(Ordering::Acquire) == 0));
534    }
535}
536
537// vim: ts=4 sw=4 expandtab