bunch/
lib.rs

1#![deny(missing_docs)]
2//! An append-only, concurrent arena
3
4use std::cell::UnsafeCell;
5use std::mem;
6
7use arrayvec::ArrayVec;
8use parking_lot::Mutex;
9
10const N_LANES: usize = 64;
11const USIZE_BITS: usize = mem::size_of::<usize>() * 8;
12
13#[inline(always)]
14fn lane_size(n: usize) -> usize {
15    2_usize.pow(n as u32) * 2
16}
17
18#[inline(always)]
19fn lane_offset(offset: usize) -> (usize, usize) {
20    let i = offset / 2 + 1;
21    let lane = USIZE_BITS - i.leading_zeros() as usize - 1;
22    let offset = offset - (2usize.pow(lane as u32) - 1) * 2;
23    (lane, offset)
24}
25
26impl<T> Default for Bunch<T> {
27    fn default() -> Self {
28        Bunch {
29            lanes: Default::default(),
30            len: Default::default(),
31        }
32    }
33}
34
35unsafe impl<T> Send for Bunch<T> {}
36unsafe impl<T> Sync for Bunch<T> {}
37
38/// The main Arena type
39pub struct Bunch<T> {
40    lanes: UnsafeCell<ArrayVec<[Vec<T>; N_LANES]>>,
41    len: Mutex<usize>,
42}
43
44impl<T> Bunch<T> {
45    /// Creates a new arena
46    pub fn new() -> Self {
47        Self::default()
48    }
49
50    /// Pushes to the arena, returning a reference to the pushed element
51    pub fn push(&self, t: T) -> &T {
52        let len = &mut *self.len.lock();
53        let (lane, offset) = lane_offset(*len);
54
55        if offset == 0 {
56            unsafe {
57                let lanes = &mut *self.lanes.get();
58                let size = lane_size(lane);
59                lanes.push(Vec::with_capacity(size));
60            }
61        }
62
63        unsafe {
64            let lanes = &mut *self.lanes.get();
65            lanes[lane].push(t);
66            *len += 1;
67            lanes[lane].get(offset).expect("just pushed")
68        }
69    }
70
71    /// Gets a reference into the arena
72    pub fn get(&self, idx: usize) -> &T {
73        let (lane, offset) = lane_offset(idx);
74        unsafe { &(*self.lanes.get())[lane][offset] }
75    }
76
77    /// Returns the number of elements in the Bunch
78    pub fn len(&self) -> usize {
79        *self.len.lock()
80    }
81}
82
83#[cfg(test)]
84mod tests {
85
86    use super::*;
87    use std::sync::Arc;
88
89    #[test]
90    fn it_works() {
91        let p = Bunch::new();
92
93        assert_eq!(p.push(3), &3);
94
95        assert_eq!(p.get(0), &3);
96    }
97
98    #[test]
99    fn multiple() {
100        let p = Bunch::new();
101
102        for i in 0..10_000 {
103            let r = p.push(i);
104            assert_eq!(r, &i);
105        }
106
107        for i in 0..10_000 {
108            assert_eq!(p.get(i), &i);
109        }
110    }
111
112    #[test]
113    fn multithreading() {
114        let vec = Arc::new(Bunch::new());
115        let n = 100_000;
116
117        let n_threads = 16;
118
119        let mut handles = vec![];
120
121        for t in 0..n_threads {
122            let vec = vec.clone();
123            handles.push(std::thread::spawn(move || {
124                for i in 0..n {
125                    if i % n_threads == t {
126                        vec.push(i);
127                    }
128                }
129            }))
130        }
131
132        for h in handles {
133            h.join().unwrap();
134        }
135
136        let mut result = vec![];
137
138        for i in 0..n {
139            result.push(vec.get(i));
140        }
141
142        result.sort();
143
144        for i in 0..n {
145            assert_eq!(result[i], &i)
146        }
147    }
148
149    #[test]
150    fn dropping() {
151        let n = 100_000;
152        let n_threads = 16;
153
154        let mut arcs = vec![];
155
156        for i in 0..n {
157            arcs.push(Arc::new(i));
158        }
159
160        for i in 0..n {
161            assert_eq!(Arc::strong_count(&arcs[i]), 1);
162        }
163
164        let wrapped_arcs = Arc::new(arcs);
165
166        {
167            let vec = Arc::new(Bunch::new());
168
169            let mut handles = vec![];
170
171            for t in 0..n_threads {
172                let vec = vec.clone();
173                let wrapped_clone = wrapped_arcs.clone();
174                handles.push(std::thread::spawn(move || {
175                    for i in 0..n {
176                        if i % n_threads == t {
177                            vec.push(wrapped_clone[i].clone());
178                        }
179                    }
180                }))
181            }
182
183            for h in handles {
184                h.join().unwrap();
185            }
186
187            for i in 0..n {
188                assert_eq!(Arc::strong_count(&wrapped_arcs[i]), 2);
189            }
190        }
191        // Bunch dropped here
192        for i in 0..n {
193            assert_eq!(Arc::strong_count(&wrapped_arcs[i]), 1);
194        }
195    }
196
197    #[test]
198    #[should_panic]
199    fn out_of_bounds_access() {
200        let bunch = Bunch::new();
201        bunch.push("hello");
202
203        bunch.get(1);
204    }
205}