concurrent_arena/
arena.rs

1use super::{arcs::Arcs, bucket::Bucket, thread_id::get_thread_id, Arc, ArenaArc};
2
3/// * `LEN` - Number of elements stored per bucket.
4///   Must be less than or equal to `u32::MAX`, divisible by
5///   `usize::BITS` and it must not be `0`.
6/// * `BITARRAY_LEN` - Number of [`usize`] in the bitmap per bucket.
7///   Must be equal to `LEN / usize::BITS`.
8///
9///   For best performance, try to set this to number of CPUs that are going
10///   to access `Arena` concurrently.
11///
12/// `Arena` stores the elements in buckets to ensure that the address
13/// for elements are stable while improving efficiency.
14///
15/// Every bucket is of size `LEN`.
16///
17/// The larger `LEN` is, the more compact the `Arena` will be, however it might
18/// also waste space if it is unused.
19///
20/// And, allocating a large chunk of memory takes more time.
21///
22/// `Arena` internally stores the array of buckets as a `triomphe::ThinArc`
23/// and use `ArcSwapAny` to grow the array atomically, without blocking any
24/// reader.
25///
26/// # Examples
27///
28/// If you provides `Arena` with invalid `LEM` or `BITARRAY_LEN`, then your
29/// code will panic at runtime:
30///
31/// ```rust,compile_fail
32/// use concurrent_arena::*;
33/// let arena = Arena::<u32, 1, 100>::new();
34/// ```
35///
36/// To make it a compile time failure, you need to call
37/// `max_buckets`:
38///
39/// ```rust,compile_fail
40/// use concurrent_arena::*;
41/// const MAX_BUCKETS: u32 = Arena::<u32, 1, 100>::max_buckets();
42/// ```
43#[derive(Debug)]
44pub struct Arena<T, const BITARRAY_LEN: usize, const LEN: usize> {
45    buckets: Arcs<Arc<Bucket<T, BITARRAY_LEN, LEN>>>,
46}
47
48impl<T: Sync + Send, const BITARRAY_LEN: usize, const LEN: usize> Default
49    for Arena<T, BITARRAY_LEN, LEN>
50{
51    fn default() -> Self {
52        Self::new()
53    }
54}
55
56const fn check_const_generics<const BITARRAY_LEN: usize, const LEN: usize>() {
57    let bits = usize::BITS as usize;
58
59    assert!(
60        LEN <= (u32::MAX as usize),
61        "LEN must be less than or equal to u32::MAX"
62    );
63    assert!(LEN % bits == 0, "LEN must be divisible by usize::BITS");
64    assert!(LEN != 0, "LEN must not be 0");
65    assert!(
66        LEN / bits == BITARRAY_LEN,
67        "BITARRAY_LEN must be equal to LEN / usize::BITS"
68    );
69}
70
71impl<T, const BITARRAY_LEN: usize, const LEN: usize> Arena<T, BITARRAY_LEN, LEN> {
72    /// Maximum buckets `Arena` can have.
73    pub const fn max_buckets() -> u32 {
74        const { check_const_generics::<BITARRAY_LEN, LEN>() };
75
76        u32::MAX / (LEN as u32)
77    }
78}
79
80impl<T: Send + Sync, const BITARRAY_LEN: usize, const LEN: usize> Arena<T, BITARRAY_LEN, LEN> {
81    /// Would preallocate 2 buckets.
82    pub fn new() -> Self {
83        Self::with_capacity(2)
84    }
85
86    pub fn with_capacity(cap: u32) -> Self {
87        const { check_const_generics::<BITARRAY_LEN, LEN>() };
88
89        let cap = cap.min(Self::max_buckets());
90        let buckets = Arcs::new();
91
92        buckets.grow(cap as usize, Arc::default);
93
94        Self { buckets }
95    }
96
97    /// Return Ok(arc) on success, or Err((value, len)) where value is
98    /// the input param `value` and `len` is the length of the `Arena` at the time
99    /// of insertion.
100    ///
101    /// This function is lock-free.
102    pub fn try_insert(&self, mut value: T) -> Result<ArenaArc<T, BITARRAY_LEN, LEN>, (T, u32)> {
103        let slice = self.buckets.as_slice();
104        let len = slice.len();
105
106        debug_assert!(len <= Self::max_buckets() as usize);
107
108        if len == 0 {
109            return Err((value, 0));
110        }
111
112        let mut pos = get_thread_id() % len;
113
114        let slice1_iter = slice[pos..].iter();
115        let slice2_iter = slice[..pos].iter();
116
117        for bucket in slice1_iter.chain(slice2_iter) {
118            match Bucket::try_insert(bucket, pos as u32, value) {
119                Ok(arc) => return Ok(arc),
120                Err(val) => value = val,
121            }
122
123            pos = (pos + 1) % len;
124        }
125
126        Err((value, len as u32))
127    }
128
129    /// Try to reserve `min(new_len, Self::max_buckets())` buckets.
130    ///
131    /// This function is technically lock-free.
132    pub fn try_reserve(&self, new_len: u32) -> bool {
133        if new_len == 0 {
134            return true;
135        }
136
137        let new_len = new_len.min(Self::max_buckets());
138        self.buckets
139            .try_grow(new_len as usize, Arc::default)
140            .is_ok()
141    }
142
143    /// Reserve `min(new_len, Self::max_buckets())` buckets.
144    pub fn reserve(&self, new_len: u32) {
145        if new_len != 0 {
146            let new_len = new_len.min(Self::max_buckets());
147            self.buckets.grow(new_len as usize, Arc::default)
148        }
149    }
150
151    /// Insert one value.
152    ///
153    /// If there isn't enough buckets, then try to reserve one bucket and
154    /// restart the operation.
155    pub fn insert(&self, mut value: T) -> ArenaArc<T, BITARRAY_LEN, LEN> {
156        // Fast path where `try_reserve` is used to avoid locking.
157        for _ in 0..5 {
158            match self.try_insert(value) {
159                Ok(arc) => return arc,
160                Err((val, len)) => {
161                    value = val;
162
163                    // If len == Self::max_buckets(), then we would have to
164                    // wait for slots to be removed from `Arena`.
165                    if len != Self::max_buckets() {
166                        // If try_reserve succeeds, then another new bucket is available.
167                        //
168                        // If try_reserve fail, then another thread is doing the
169                        // reservation.
170                        //
171                        // We can simply restart operation, waiting for it to be done.
172                        //
173                        // Grow by 1.5 exponential to have amoritized O(1), adding +4 more in case
174                        // there's only one element (1 * 3 / 2 evaluaes to 1 in rust).
175                        self.try_reserve(len * 3 / 2 + 4);
176                    }
177                }
178            }
179        }
180
181        // Slow path where `reserve` is used.
182        loop {
183            match self.try_insert(value) {
184                Ok(arc) => break arc,
185                Err((val, len)) => {
186                    value = val;
187
188                    // If len == Self::max_buckets(), then we would have to
189                    // wait for slots to be removed from `Arena`.
190                    if len != Self::max_buckets() {
191                        self.reserve(len + 8);
192                    }
193                }
194            }
195        }
196    }
197}
198
199type AccessOp<T, const BITARRAY_LEN: usize, const LEN: usize> =
200    unsafe fn(
201        Arc<Bucket<T, BITARRAY_LEN, LEN>>,
202        u32,
203        u32,
204    ) -> Option<ArenaArc<T, BITARRAY_LEN, LEN>>;
205
206impl<T: Send + Sync, const BITARRAY_LEN: usize, const LEN: usize> Arena<T, BITARRAY_LEN, LEN> {
207    fn access_impl(
208        &self,
209        slot: u32,
210        op: AccessOp<T, BITARRAY_LEN, LEN>,
211    ) -> Option<ArenaArc<T, BITARRAY_LEN, LEN>> {
212        let bucket_index = slot / (LEN as u32);
213        let index = slot % (LEN as u32);
214
215        self.buckets
216            .as_slice()
217            .get(bucket_index as usize)
218            .cloned()
219            // Safety: index is <= LEN
220            .and_then(|bucket| unsafe { op(bucket, bucket_index, index) })
221    }
222
223    /// May enter busy loop if the slot is not fully initialized.
224    ///
225    /// This function is lock free.
226    pub fn remove(&self, slot: u32) -> Option<ArenaArc<T, BITARRAY_LEN, LEN>> {
227        self.access_impl(slot, Bucket::remove)
228    }
229
230    /// May enter busy loop if the slot is not fully initialized.
231    ///
232    /// This function is lock free.
233    pub fn get(&self, slot: u32) -> Option<ArenaArc<T, BITARRAY_LEN, LEN>> {
234        self.access_impl(slot, Bucket::get)
235    }
236
237    /// Return number of buckets allocated.
238    ///
239    /// This function is lock free.
240    pub fn len(&self) -> u32 {
241        self.buckets.len() as u32
242    }
243
244    /// This function is lock free.
245    pub fn is_empty(&self) -> bool {
246        self.buckets.is_empty()
247    }
248}
249
250#[cfg(test)]
251mod tests {
252    use crate::*;
253    const LEN: usize = usize::BITS as usize;
254
255    #[test]
256    fn test_new() {
257        let arena: Arena<_, 1, { LEN }> = Arena::new();
258        let slot = ArenaArc::slot(&arena.insert(()));
259        assert_eq!(ArenaArc::slot(&arena.remove(slot).unwrap()), slot);
260    }
261
262    #[test]
263    fn test_with_capacity() {
264        let arena: Arena<_, 1, { LEN }> = Arena::with_capacity(0);
265        let slot = ArenaArc::slot(&arena.insert(()));
266        assert_eq!(ArenaArc::slot(&arena.remove(slot).unwrap()), slot);
267    }
268
269    /// Thread sanitizer produces false positive in this test.
270    ///
271    /// This has been discussed in
272    /// [this issue](https://github.com/vorner/arc-swap/issues/71)
273    /// and the failure can only be reproduced on x86-64-unknown-linux-gnu.
274    /// It cannot be reproduced on MacOS.
275    ///
276    /// Since crate arc-swap is a cross platform crate with no assembly used
277    /// or any x86 specific feature, this can be some bugs in the allocator
278    /// or the thread sanitizer.
279    #[cfg(not(feature = "thread-sanitizer"))]
280    #[test]
281    fn realworld_test() {
282        use std::thread::sleep;
283        use std::time::Duration;
284
285        use parking_lot::Mutex;
286        use rayon::prelude::*;
287        use rayon::spawn;
288        use std::sync::Arc;
289
290        let arena: Arc<Arena<Mutex<u32>, 1, { LEN }>> = Arc::new(Arena::with_capacity(0));
291
292        (0..u16::MAX).into_par_iter().for_each(|i| {
293            let i = i as u32;
294
295            let arc = arena.insert(Mutex::new(i));
296
297            assert_eq!(ArenaArc::strong_count(&arc), 2);
298            assert_eq!(*arc.lock(), i);
299
300            let slot = ArenaArc::slot(&arc);
301
302            let arena = arena.clone();
303
304            spawn(move || {
305                sleep(Duration::from_micros(1));
306
307                let arc = arena.remove(slot).unwrap();
308
309                let mut guard = arc.lock();
310                assert_eq!(*guard, i);
311                *guard = 2000;
312            });
313        });
314    }
315}