Skip to main content

safe_bump/
shared_arena.rs

1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use crate::chunked_storage::ChunkedStorage;
4use crate::{Checkpoint, Idx};
5
6/// Thread-safe typed arena allocator.
7///
8/// Concurrent allocation via `&self`. Wait-free reads returning `&T`
9/// directly (no guards or locks). Same [`Idx<T>`] handles and
10/// [`Checkpoint<T>`] semantics as [`Arena`](crate::Arena).
11///
12/// `SharedArena<T>` is `Send + Sync` when `T: Send + Sync`.
13///
14/// For single-thread usage with zero overhead, see [`Arena`](crate::Arena).
15pub struct SharedArena<T> {
16    storage: ChunkedStorage<T>,
17    /// Next slot to be reserved by `alloc`.
18    reserved: AtomicUsize,
19    /// Last index visible to readers (all slots `< published` are readable).
20    published: AtomicUsize,
21}
22
23impl<T> SharedArena<T> {
24    /// Creates an empty shared arena.
25    #[must_use]
26    pub const fn new() -> Self {
27        Self {
28            storage: ChunkedStorage::new(),
29            reserved: AtomicUsize::new(0),
30            published: AtomicUsize::new(0),
31        }
32    }
33
34    /// Allocates a value, returning its stable index.
35    ///
36    /// Can be called concurrently from multiple threads (`&self`).
37    /// O(1).
38    ///
39    /// # Panics
40    ///
41    /// Panics if internal slot reservation fails (should not happen in
42    /// normal usage).
43    pub fn alloc(&self, value: T) -> Idx<T> {
44        let slot = self.reserved.fetch_add(1, Ordering::Relaxed);
45        let ok = self.storage.set(slot, value);
46        assert!(ok, "slot {slot} already occupied");
47        self.advance_published(slot);
48        Idx::from_raw(slot)
49    }
50
51    /// Cooperatively advances `published` past `slot`.
52    ///
53    /// `published` always equals the length of the longest contiguous
54    /// prefix of written slots: if slots 0..N are all written,
55    /// `published = N`. This guarantees `get(i)` for any `i < published`
56    /// will find a value — no gaps.
57    ///
58    /// Each writer helps advance `published` through all preceding ready
59    /// slots, not just its own. If slots 5, 6, 7 complete out of order,
60    /// the thread finishing slot 5 advances published 5→6→7→8 in one pass.
61    ///
62    /// # Spin behavior
63    ///
64    /// A thread spins only while its predecessor slot is not yet written.
65    /// Between `reserved.fetch_add` and `storage.set` there is no user
66    /// code — just `split_index` (pure math), `get_or_init` (chunk
67    /// allocation), and `OnceLock::set` (memcpy). None of these can
68    /// panic in normal operation, so the spin resolves in nanoseconds.
69    /// A permanent stall would require the predecessor thread to abort
70    /// (e.g. OOM or `process::abort`), which terminates the entire
71    /// process anyway.
72    fn advance_published(&self, slot: usize) {
73        loop {
74            let p = self.published.load(Ordering::Acquire);
75            if p > slot {
76                break; // Already published past our slot
77            }
78            // Check if slot at position p is written (by us or another thread)
79            if !self.storage.is_set(p) {
80                // Slot p not yet written by its owner. Spin briefly.
81                std::hint::spin_loop();
82                continue;
83            }
84            // Slot p is written. Try to advance published from p to p+1.
85            // If CAS fails, another thread advanced — retry from new p.
86            let _ = self.published.compare_exchange_weak(
87                p,
88                p + 1,
89                Ordering::Release,
90                Ordering::Relaxed,
91            );
92        }
93    }
94
95    /// Returns a reference to the value at `idx`.
96    ///
97    /// Wait-free. Returns `&T` directly (no guard or lock).
98    ///
99    /// # Panics
100    ///
101    /// Panics if `idx` is out of bounds (stale after rollback/reset).
102    #[must_use]
103    pub fn get(&self, idx: Idx<T>) -> &T {
104        let i = idx.into_raw();
105        assert!(
106            i < self.published.load(Ordering::Acquire),
107            "index out of bounds: index is {i} but published length is {}",
108            self.published.load(Ordering::Acquire),
109        );
110        self.storage.get(i)
111    }
112
113    /// Returns the number of published (visible) items.
114    #[must_use]
115    pub fn len(&self) -> usize {
116        self.published.load(Ordering::Acquire)
117    }
118
119    /// Returns `true` if the arena contains no items.
120    #[must_use]
121    pub fn is_empty(&self) -> bool {
122        self.len() == 0
123    }
124
125    /// Saves the current allocation state.
126    ///
127    /// Use with [`rollback`](SharedArena::rollback) to discard allocations
128    /// made after this point.
129    #[must_use]
130    pub fn checkpoint(&self) -> Checkpoint<T> {
131        Checkpoint::from_len(self.published.load(Ordering::Acquire))
132    }
133
134    /// Rolls back to a previous checkpoint, dropping all values
135    /// allocated after it.
136    ///
137    /// O(k) where k = number of items dropped (destructors run).
138    ///
139    /// # Panics
140    ///
141    /// Panics if `cp` points beyond the current length.
142    pub fn rollback(&mut self, cp: Checkpoint<T>) {
143        let current = *self.published.get_mut();
144        assert!(
145            cp.len() <= current,
146            "checkpoint {} beyond current length {}",
147            cp.len(),
148            current,
149        );
150        // Drop values in reverse order (mirrors Vec::truncate behavior)
151        for slot in (cp.len()..current).rev() {
152            self.storage.take(slot);
153        }
154        *self.published.get_mut() = cp.len();
155        *self.reserved.get_mut() = cp.len();
156    }
157
158    /// Removes all items, running their destructors.
159    ///
160    /// Retains allocated storage for reuse.
161    pub fn reset(&mut self) {
162        let current = *self.published.get_mut();
163        for slot in (0..current).rev() {
164            self.storage.take(slot);
165        }
166        *self.published.get_mut() = 0;
167        *self.reserved.get_mut() = 0;
168    }
169
170    /// Returns `true` if `idx` points to a valid item in this arena.
171    #[must_use]
172    pub fn is_valid(&self, idx: Idx<T>) -> bool {
173        idx.into_raw() < self.published.load(Ordering::Acquire)
174    }
175
176    /// Returns a reference to the value at `idx`, or `None` if the
177    /// index is out of bounds.
178    #[must_use]
179    pub fn try_get(&self, idx: Idx<T>) -> Option<&T> {
180        let i = idx.into_raw();
181        if i < self.published.load(Ordering::Acquire) {
182            Some(self.storage.get(i))
183        } else {
184            None
185        }
186    }
187
188    /// Allocates multiple values from an iterator, returning the index
189    /// of the first allocated item.
190    ///
191    /// Returns `None` if the iterator is empty.
192    ///
193    /// O(n) where n = items yielded by the iterator.
194    pub fn alloc_extend(&self, iter: impl IntoIterator<Item = T>) -> Option<Idx<T>> {
195        let mut first = None;
196        for value in iter {
197            let idx = self.alloc(value);
198            if first.is_none() {
199                first = Some(idx);
200            }
201        }
202        first
203    }
204
205    /// Removes all items, returning an iterator that yields them
206    /// in allocation order.
207    ///
208    /// The arena is empty after the iterator is consumed or dropped.
209    pub fn drain(&mut self) -> std::vec::IntoIter<T> {
210        let current = *self.published.get_mut();
211        let mut items = Vec::with_capacity(current);
212        for slot in 0..current {
213            if let Some(value) = self.storage.take(slot) {
214                items.push(value);
215            }
216        }
217        *self.published.get_mut() = 0;
218        *self.reserved.get_mut() = 0;
219        items.into_iter()
220    }
221}
222
223impl<T> Default for SharedArena<T> {
224    fn default() -> Self {
225        Self::new()
226    }
227}
228
229impl<T> std::ops::Index<Idx<T>> for SharedArena<T> {
230    type Output = T;
231
232    fn index(&self, idx: Idx<T>) -> &T {
233        self.get(idx)
234    }
235}
236
237impl<T> IntoIterator for SharedArena<T> {
238    type Item = T;
239    type IntoIter = std::vec::IntoIter<T>;
240
241    fn into_iter(mut self) -> Self::IntoIter {
242        self.drain()
243    }
244}