safe_bump/shared_arena.rs
1use std::sync::atomic::{AtomicUsize, Ordering};
2
3use crate::chunked_storage::ChunkedStorage;
4use crate::{Checkpoint, Idx};
5
6/// Thread-safe typed arena allocator.
7///
8/// Concurrent allocation via `&self`. Wait-free reads returning `&T`
9/// directly (no guards or locks). Same [`Idx<T>`] handles and
10/// [`Checkpoint<T>`] semantics as [`Arena`](crate::Arena).
11///
12/// `SharedArena<T>` is `Send + Sync` when `T: Send + Sync`.
13///
14/// For single-thread usage with zero overhead, see [`Arena`](crate::Arena).
15pub struct SharedArena<T> {
16 storage: ChunkedStorage<T>,
17 /// Next slot to be reserved by `alloc`.
18 reserved: AtomicUsize,
19 /// Last index visible to readers (all slots `< published` are readable).
20 published: AtomicUsize,
21}
22
23impl<T> SharedArena<T> {
24 /// Creates an empty shared arena.
25 #[must_use]
26 pub const fn new() -> Self {
27 Self {
28 storage: ChunkedStorage::new(),
29 reserved: AtomicUsize::new(0),
30 published: AtomicUsize::new(0),
31 }
32 }
33
34 /// Allocates a value, returning its stable index.
35 ///
36 /// Can be called concurrently from multiple threads (`&self`).
37 /// O(1).
38 ///
39 /// # Panics
40 ///
41 /// Panics if internal slot reservation fails (should not happen in
42 /// normal usage).
43 pub fn alloc(&self, value: T) -> Idx<T> {
44 let slot = self.reserved.fetch_add(1, Ordering::Relaxed);
45 let ok = self.storage.set(slot, value);
46 assert!(ok, "slot {slot} already occupied");
47 self.advance_published(slot);
48 Idx::from_raw(slot)
49 }
50
51 /// Cooperatively advances `published` past `slot`.
52 ///
53 /// `published` always equals the length of the longest contiguous
54 /// prefix of written slots: if slots 0..N are all written,
55 /// `published = N`. This guarantees `get(i)` for any `i < published`
56 /// will find a value — no gaps.
57 ///
58 /// Each writer helps advance `published` through all preceding ready
59 /// slots, not just its own. If slots 5, 6, 7 complete out of order,
60 /// the thread finishing slot 5 advances published 5→6→7→8 in one pass.
61 ///
62 /// # Spin behavior
63 ///
64 /// A thread spins only while its predecessor slot is not yet written.
65 /// Between `reserved.fetch_add` and `storage.set` there is no user
66 /// code — just `split_index` (pure math), `get_or_init` (chunk
67 /// allocation), and `OnceLock::set` (memcpy). None of these can
68 /// panic in normal operation, so the spin resolves in nanoseconds.
69 /// A permanent stall would require the predecessor thread to abort
70 /// (e.g. OOM or `process::abort`), which terminates the entire
71 /// process anyway.
72 fn advance_published(&self, slot: usize) {
73 loop {
74 let p = self.published.load(Ordering::Acquire);
75 if p > slot {
76 break; // Already published past our slot
77 }
78 // Check if slot at position p is written (by us or another thread)
79 if !self.storage.is_set(p) {
80 // Slot p not yet written by its owner. Spin briefly.
81 std::hint::spin_loop();
82 continue;
83 }
84 // Slot p is written. Try to advance published from p to p+1.
85 // If CAS fails, another thread advanced — retry from new p.
86 let _ = self.published.compare_exchange_weak(
87 p,
88 p + 1,
89 Ordering::Release,
90 Ordering::Relaxed,
91 );
92 }
93 }
94
95 /// Returns a reference to the value at `idx`.
96 ///
97 /// Wait-free. Returns `&T` directly (no guard or lock).
98 ///
99 /// # Panics
100 ///
101 /// Panics if `idx` is out of bounds (stale after rollback/reset).
102 #[must_use]
103 pub fn get(&self, idx: Idx<T>) -> &T {
104 let i = idx.into_raw();
105 assert!(
106 i < self.published.load(Ordering::Acquire),
107 "index out of bounds: index is {i} but published length is {}",
108 self.published.load(Ordering::Acquire),
109 );
110 self.storage.get(i)
111 }
112
113 /// Returns the number of published (visible) items.
114 #[must_use]
115 pub fn len(&self) -> usize {
116 self.published.load(Ordering::Acquire)
117 }
118
119 /// Returns `true` if the arena contains no items.
120 #[must_use]
121 pub fn is_empty(&self) -> bool {
122 self.len() == 0
123 }
124
125 /// Saves the current allocation state.
126 ///
127 /// Use with [`rollback`](SharedArena::rollback) to discard allocations
128 /// made after this point.
129 #[must_use]
130 pub fn checkpoint(&self) -> Checkpoint<T> {
131 Checkpoint::from_len(self.published.load(Ordering::Acquire))
132 }
133
134 /// Rolls back to a previous checkpoint, dropping all values
135 /// allocated after it.
136 ///
137 /// O(k) where k = number of items dropped (destructors run).
138 ///
139 /// # Panics
140 ///
141 /// Panics if `cp` points beyond the current length.
142 pub fn rollback(&mut self, cp: Checkpoint<T>) {
143 let current = *self.published.get_mut();
144 assert!(
145 cp.len() <= current,
146 "checkpoint {} beyond current length {}",
147 cp.len(),
148 current,
149 );
150 // Drop values in reverse order (mirrors Vec::truncate behavior)
151 for slot in (cp.len()..current).rev() {
152 self.storage.take(slot);
153 }
154 *self.published.get_mut() = cp.len();
155 *self.reserved.get_mut() = cp.len();
156 }
157
158 /// Removes all items, running their destructors.
159 ///
160 /// Retains allocated storage for reuse.
161 pub fn reset(&mut self) {
162 let current = *self.published.get_mut();
163 for slot in (0..current).rev() {
164 self.storage.take(slot);
165 }
166 *self.published.get_mut() = 0;
167 *self.reserved.get_mut() = 0;
168 }
169
170 /// Returns `true` if `idx` points to a valid item in this arena.
171 #[must_use]
172 pub fn is_valid(&self, idx: Idx<T>) -> bool {
173 idx.into_raw() < self.published.load(Ordering::Acquire)
174 }
175
176 /// Returns a reference to the value at `idx`, or `None` if the
177 /// index is out of bounds.
178 #[must_use]
179 pub fn try_get(&self, idx: Idx<T>) -> Option<&T> {
180 let i = idx.into_raw();
181 if i < self.published.load(Ordering::Acquire) {
182 Some(self.storage.get(i))
183 } else {
184 None
185 }
186 }
187
188 /// Allocates multiple values from an iterator, returning the index
189 /// of the first allocated item.
190 ///
191 /// Returns `None` if the iterator is empty.
192 ///
193 /// O(n) where n = items yielded by the iterator.
194 pub fn alloc_extend(&self, iter: impl IntoIterator<Item = T>) -> Option<Idx<T>> {
195 let mut first = None;
196 for value in iter {
197 let idx = self.alloc(value);
198 if first.is_none() {
199 first = Some(idx);
200 }
201 }
202 first
203 }
204
205 /// Removes all items, returning an iterator that yields them
206 /// in allocation order.
207 ///
208 /// The arena is empty after the iterator is consumed or dropped.
209 pub fn drain(&mut self) -> std::vec::IntoIter<T> {
210 let current = *self.published.get_mut();
211 let mut items = Vec::with_capacity(current);
212 for slot in 0..current {
213 if let Some(value) = self.storage.take(slot) {
214 items.push(value);
215 }
216 }
217 *self.published.get_mut() = 0;
218 *self.reserved.get_mut() = 0;
219 items.into_iter()
220 }
221}
222
223impl<T> Default for SharedArena<T> {
224 fn default() -> Self {
225 Self::new()
226 }
227}
228
229impl<T> std::ops::Index<Idx<T>> for SharedArena<T> {
230 type Output = T;
231
232 fn index(&self, idx: Idx<T>) -> &T {
233 self.get(idx)
234 }
235}
236
237impl<T> IntoIterator for SharedArena<T> {
238 type Item = T;
239 type IntoIter = std::vec::IntoIter<T>;
240
241 fn into_iter(mut self) -> Self::IntoIter {
242 self.drain()
243 }
244}