forge_alloc/layout/slab_owner.rs
1//! `SlabOwner<T, B>` + `SlabRemote<T, B>` — cross-thread typed allocation
2//! via the ownership-return model. Replaces the previously-named
3//! `MessagePassingSlab`.
4//!
5//! `Slab<T, B>` is `!Sync`. Another thread cannot call `deallocate` directly
6//! without racing on the freelist head. The snmalloc / mimalloc pattern,
7//! adopted here: cross-thread frees route a slot index to the owner via a
8//! queue; the owner drains the queue back into its local freelist on its
9//! next allocate.
10//!
11//! v0.1 ships a `Mutex<VecDeque<u32>>`-backed queue for correctness;
12//! v1.0 will swap in a lock-free MPSC ring. The visible API
13//! is identical either way.
14//!
15//! Requires `std` (uses `Arc`, `Mutex`, `VecDeque`).
16//!
17//! See `docs/ARCHITECTURE.md` for the cross-thread ownership design.
18
19#![cfg(feature = "std")]
20
21use core::ptr::NonNull;
22use std::collections::VecDeque;
23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::{Arc, Mutex};
25
26use forge_alloc_core::{
27 AllocError, Allocator, CachePadded, Deallocator, FixedRange, NonZeroLayout, CACHE_LINE,
28};
29
30use crate::layout::Slab;
31
32/// How aggressively the owner drains the remote-free queue.
33#[derive(Copy, Clone, Debug)]
34pub enum BatchPolicy {
35 /// Drain every `N` remote frees. v0.1 default = 64.
36 Fixed(usize),
37 /// Stepped-threshold adaptive policy. The drain threshold steps
38 /// through 5 levels — `[8, 16, 32, 64, 128]` — based on observed
39 /// queue depth relative to `queue_capacity`:
40 ///
41 /// - Queue length > 75% of capacity → step **down** (smaller batch,
42 /// drain sooner) to relieve back-pressure on remote senders.
43 /// - Queue length < 25% of capacity → step **up** (larger batch,
44 /// drain less often) to amortize the drain cost across more frees.
45 ///
46 /// A cooldown of `ADAPTIVE_COOLDOWN_TICKS` `maybe_drain` calls
47 /// between adjustments prevents oscillation. Initial step is 3
48 /// (threshold = 64) — matches the `Fixed(64)` v0.1 default so an
49 /// `Adaptive`-policy owner behaves like a `Fixed(64)` owner until
50 /// the workload pushes the threshold off the middle level.
51 ///
52 /// All arithmetic is integer-only; the 0.25 / 0.75 hysteresis band
53 /// is encoded as `q*4 < cap` / `q*4 > cap*3`. No floating point.
54 ///
55 /// This is the v1.0 control law; the v2.0 EMA-based upgrade is
56 /// gated on benchmark validation against this baseline.
57 Adaptive,
58}
59
60impl Default for BatchPolicy {
61 fn default() -> Self {
62 Self::Fixed(64)
63 }
64}
65
66/// Drain-threshold levels for [`BatchPolicy::Adaptive`].
67pub const ADAPTIVE_LEVELS: [usize; 5] = [8, 16, 32, 64, 128];
68
69/// Number of `maybe_drain` ticks between adaptive step adjustments.
70/// Tuned for low overhead and quick settling on bursty workloads.
71pub const ADAPTIVE_COOLDOWN_TICKS: u32 = 16;
72
73/// Internal state for [`BatchPolicy::Adaptive`]. Lives in a [`Cell`]
74/// on the (`!Sync`) [`SlabOwner`]; the cell guarantees only the owner
75/// thread reads or writes it.
76#[derive(Copy, Clone, Debug)]
77struct AdaptiveState {
78 /// Current step index into [`ADAPTIVE_LEVELS`] (0..=4).
79 step: u8,
80 /// Remaining `maybe_drain` ticks before the next adjustment is
81 /// allowed. Counts down to 0; while >0 the step is locked.
82 cooldown: u32,
83}
84
85impl AdaptiveState {
86 /// Starting state — step 3 (threshold = 64), matching the
87 /// `Fixed(64)` default so workloads that never trip the hysteresis
88 /// bands behave identically under either policy.
89 const fn initial() -> Self {
90 Self {
91 step: 3,
92 cooldown: 0,
93 }
94 }
95
96 #[inline]
97 fn threshold(&self) -> usize {
98 ADAPTIVE_LEVELS[self.step as usize]
99 }
100}
101
102/// Shared state between [`SlabOwner`] and any number of [`SlabRemote`] handles.
103///
104/// Layout note: `remote_queue` is wrapped in `CachePadded` so the
105/// cross-thread mutex sits on its own cache line. Without this,
106/// `SlabRemote::deallocate` (which slams the mutex word) would evict the
107/// owner-only `slab` field from L1 on every remote push, costing the
108/// owner a cache miss on every subsequent `allocate`.
109struct SlabInner<T, B: Allocator + forge_alloc_core::FixedRange> {
110 /// The actual slab. Mutated only by `SlabOwner` (we enforce single-owner
111 /// via `!Sync` on the owner type).
112 slab: core::cell::UnsafeCell<Slab<T, B>>,
113 /// Max queue depth before `try_deallocate` returns `Err`. Read-only after
114 /// construction so it's safe to share a line with `slab`.
115 queue_capacity: usize,
116 /// Remote-free queue: `(ptr, layout)` pairs queued for return to the local
117 /// freelist. Mutex-protected for v0.1; will become lock-free MPSC in v1.0.
118 /// Cache-line-isolated from `slab` to prevent false sharing — see above.
119 remote_queue: CachePadded<Mutex<VecDeque<RemoteEntry>>>,
120 /// Mirror of `remote_queue.lock().len()` updated under the same lock.
121 ///
122 /// Lets the owner-thread fast path in [`SlabOwner::maybe_drain`] sample
123 /// the queue depth with a single Relaxed load instead of acquiring the
124 /// mutex and reading `VecDeque::len()` — the prior design paid an
125 /// uncontended `try_lock + queue.len()` round-trip (~8–12 ns/op) on
126 /// every owner-side `allocate` / `deallocate`, even when the queue was
127 /// empty (the common case for owner-heavy workloads). The fast path
128 /// now collapses to a load + compare (~2–3 ns).
129 ///
130 /// Correctness: the counter is written under the queue mutex
131 /// (`store(q.len())` after every `push_back` / `drain`), so the
132 /// counter and the queue length are always in sync at lock-release
133 /// time. The owner reads with `Relaxed` outside the lock; staleness
134 /// is bounded by one remote push between the read and the next
135 /// `maybe_drain` tick. Since `maybe_drain` is called on every
136 /// `allocate` / `deallocate`, the queue cannot grow unboundedly past
137 /// the threshold — the next tick samples a fresh value and drains.
138 ///
139 /// Not wrapped in `CachePadded`. With the preceding
140 /// `CachePadded<Mutex<...>>` field rounding up to a fresh cache-line
141 /// boundary, `remote_queue_len` already starts on a different cache
142 /// line from the mutex word — it's not sharing a line whether we pad
143 /// it or not. Adding a second `CachePadded` here would just bloat the
144 /// struct by another cache line for no perf benefit; the field is
145 /// already isolated from the mutex's cache traffic. (An earlier
146 /// version of this comment claimed the field shared a line with the
147 /// mutex on purpose — that was never actually the case post-padding.)
148 remote_queue_len: AtomicUsize,
149 /// Set by [`SlabOwner::drop`] under the `remote_queue` mutex.
150 ///
151 /// Once `true`, no thread will ever drain the queue again, so
152 /// [`SlabRemote::try_deallocate`] returns `Err(ptr)` and the
153 /// infallible [`SlabRemote::deallocate`] bails out instead of
154 /// spinning forever. The flag lives next to the queue rather than
155 /// on `SlabOwner` because `SlabRemote` clones outlive the owner;
156 /// they read it through their shared `Arc<SlabInner>`.
157 closed: AtomicBool,
158}
159
160impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabInner<T, B> {
161 /// Layout-pin: `slab` (owner-only, hot) and `remote_queue` (cross-thread
162 /// mutex word, hammered by remotes) must live on different cache lines.
163 /// Without this, every remote `push_back` would invalidate the owner's
164 /// L1-cached `slab` field and cost the owner a cache miss on its next
165 /// allocate. The `CachePadded` wrapper on `remote_queue` enforces it;
166 /// this const fires a compile error if a future refactor unwraps or
167 /// reorders the fields. Forced to evaluate by reference in
168 /// [`SlabOwner::with_batch_policy`].
169 const LAYOUT_PIN: () = {
170 use core::mem::offset_of;
171 let slab_off = offset_of!(SlabInner<T, B>, slab);
172 let queue_off = offset_of!(SlabInner<T, B>, remote_queue);
173 assert!(
174 slab_off / CACHE_LINE != queue_off / CACHE_LINE,
175 "SlabInner layout regression: `slab` and `remote_queue` share a cache line",
176 );
177 };
178}
179
180/// One entry in the remote-free queue: the (ptr, layout) pair to deallocate.
181#[derive(Copy, Clone)]
182struct RemoteEntry {
183 ptr: NonNull<u8>,
184 layout: NonZeroLayout,
185}
186
187// SAFETY: NonNull<u8> is !Send by default, but the mapping it points to is
188// owned by the slab inside the same Arc; sending the entry between threads
189// is equivalent to sending a u8 address — sound when the slab is Send.
190unsafe impl Send for RemoteEntry {}
191
192/// Owns the slab. Has exclusive `allocate` access. `Send` (can be moved
193/// across threads) but `!Sync` (one-at-a-time access — enforced by
194/// `UnsafeCell` on the inner slab plus our manual `Send` impl with no
195/// corresponding `Sync` impl).
196///
197/// # API-misuse compile-failures (pinned)
198///
199/// `SlabOwner` is `!Sync` by construction (`_not_sync:
200/// PhantomData<Cell<()>>`); a future refactor that accidentally
201/// rederived `Sync` would let two threads share an `&SlabOwner` and race
202/// on the `UnsafeCell<Slab>` inside `inner.slab`. The compile_fail
203/// below pins that rejection:
204///
205/// ```compile_fail
206/// // FAILS TO COMPILE: SlabOwner is deliberately !Sync. The
207/// // `_not_sync: PhantomData<Cell<()>>` marker blocks the auto-derive,
208/// // and `assert_sync` cannot accept a `!Sync` type.
209/// use forge_alloc::InlineBacked;
210/// use forge_alloc::SlabOwner;
211/// fn assert_sync<T: Sync>() {}
212/// assert_sync::<SlabOwner<u64, InlineBacked<512>>>();
213/// ```
214pub struct SlabOwner<T, B: Allocator + forge_alloc_core::FixedRange> {
215 inner: Arc<SlabInner<T, B>>,
216 batch_policy: BatchPolicy,
217 /// Adaptive-policy state. Cell because `maybe_drain` takes `&self`
218 /// and needs to mutate; `!Sync` guarantees only the owning thread
219 /// reads or writes it. Unused when `batch_policy` is `Fixed`.
220 adaptive: core::cell::Cell<AdaptiveState>,
221 /// Hold a `!Sync` marker so we definitely don't accidentally derive
222 /// `Sync` if all other fields become `Sync` later. `Arc<SlabInner<...>>`
223 /// is `Sync` (it has to be to be shared between `SlabOwner` and
224 /// `SlabRemote`), so without this we'd lose the `!Sync` guarantee.
225 /// (The `Cell<AdaptiveState>` above is already `!Sync`, but keep
226 /// the explicit marker so a future field swap can't accidentally
227 /// re-enable `Sync`.)
228 _not_sync: core::marker::PhantomData<core::cell::Cell<()>>,
229}
230
231/// Remote deallocation handle. `Send + Sync` — freely cloneable across
232/// threads. Implements [`Deallocator`] only; cannot allocate.
233///
234/// # API-misuse compile-failures (pinned)
235///
236/// `SlabRemote<T, B>` is `Send + Sync` only when `T: Send` (and
237/// `B: Send`). Instantiating with a non-`Send` `T` (e.g. `Rc<u64>`)
238/// and then trying to ship the remote across threads is rejected at
239/// compile time:
240///
241/// ```compile_fail
242/// // FAILS TO COMPILE: SlabRemote's `Send` bound requires `T: Send`,
243/// // so `SlabRemote<Rc<u64>, _>` is not `Send` and cannot satisfy
244/// // `assert_send`.
245/// use std::rc::Rc;
246/// use forge_alloc::InlineBacked;
247/// use forge_alloc::SlabRemote;
248/// fn assert_send<T: Send>() {}
249/// assert_send::<SlabRemote<Rc<u64>, InlineBacked<512>>>();
250/// ```
251#[derive(Clone)]
252pub struct SlabRemote<T, B: Allocator + forge_alloc_core::FixedRange> {
253 inner: Arc<SlabInner<T, B>>,
254}
255
256impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabOwner<T, B> {
257 /// Construct, taking ownership of a freshly-built slab.
258 pub fn new(capacity: usize, backing: B) -> Result<Self, AllocError> {
259 Self::with_batch_policy(capacity, backing, BatchPolicy::default(), 1024)
260 }
261
262 /// Construct with explicit batch policy and queue capacity.
263 pub fn with_batch_policy(
264 capacity: usize,
265 backing: B,
266 batch_policy: BatchPolicy,
267 queue_capacity: usize,
268 ) -> Result<Self, AllocError> {
269 // Force evaluation of the layout-pin const for this instantiation
270 // of (T, B). Compiles away to nothing; fails the build if a future
271 // refactor reshuffles SlabInner so that `slab` and `remote_queue`
272 // share a cache line, restoring the false-sharing problem.
273 let _: () = SlabInner::<T, B>::LAYOUT_PIN;
274 let slab = Slab::new(capacity, backing)?;
275 let inner = Arc::new(SlabInner {
276 slab: core::cell::UnsafeCell::new(slab),
277 queue_capacity,
278 remote_queue: CachePadded::new(Mutex::new(VecDeque::with_capacity(queue_capacity))),
279 remote_queue_len: AtomicUsize::new(0),
280 closed: AtomicBool::new(false),
281 });
282 Ok(Self {
283 inner,
284 batch_policy,
285 adaptive: core::cell::Cell::new(AdaptiveState::initial()),
286 _not_sync: core::marker::PhantomData,
287 })
288 }
289
290 /// Create a remote handle. Cheap — just an `Arc` clone.
291 pub fn remote(&self) -> SlabRemote<T, B> {
292 SlabRemote {
293 inner: Arc::clone(&self.inner),
294 }
295 }
296
297 /// Drain the remote-free queue into the local freelist now.
298 ///
299 /// Holds the queue mutex *only* long enough to swap out the pending
300 /// entries; releases the lock before calling `slab.deallocate` for
301 /// each entry. Without this two-phase pattern, remote senders would
302 /// be blocked through the entire drain loop — death by lock-hold time
303 /// proportional to queue depth.
304 pub fn drain(&self) {
305 // Phase 1: under the lock, snapshot the queue into a local Vec.
306 // `drain(..).collect()` empties the deque without resetting its
307 // capacity, so subsequent `push_back`s from `SlabRemote` continue
308 // to use the pre-allocated buffer rather than re-allocating.
309 let pending: Vec<RemoteEntry> = {
310 let mut q = self
311 .inner
312 .remote_queue
313 .lock()
314 .expect("SlabOwner remote queue poisoned");
315 // Fast-path the empty case: avoid the Vec allocation when the
316 // queue has nothing. `drain(..).collect()` on an empty deque
317 // still allocates a zero-length Vec on most allocators (the
318 // standard Vec sets `cap = 0` so this is actually cheap, but
319 // skipping the empty `for` below also saves a function call
320 // boundary that the optimizer can't always elide because
321 // `slab` is behind UnsafeCell).
322 if q.is_empty() {
323 // Invariant: every push and drain maintains
324 // `remote_queue_len == q.len()` under the lock, so an
325 // empty queue means the mirror is already 0. Assert it
326 // here so a future bookkeeping regression fires loudly
327 // in debug instead of silently being papered over by an
328 // unconditional store.
329 debug_assert_eq!(
330 self.inner.remote_queue_len.load(Ordering::Relaxed),
331 0,
332 "drain: queue empty but remote_queue_len mirror non-zero — push site dropped a mirror update",
333 );
334 return;
335 }
336 let entries: Vec<_> = q.drain(..).collect();
337 // Mirror reset MUST stay inside the critical section so the next
338 // owner-fast-path Relaxed load sees a consistent (queue, mirror)
339 // pair after lock release.
340 self.inner.remote_queue_len.store(0, Ordering::Relaxed);
341 entries
342 };
343 // Lock dropped here. Remote senders can resume pushing.
344
345 // Phase 2: process locally. SAFETY: !Sync on SlabOwner ensures
346 // we're the only thread touching the slab via this owner reference.
347 //
348 // We use `&*` (Shared retag) rather than `&mut *` (Unique retag) here:
349 // every `Slab` method we invoke (allocate / deallocate / base / size /
350 // capacity_bytes) takes `&self` and handles its own interior mutation
351 // through `UnsafeCell`. A `&mut Slab` would create a Unique retag
352 // covering the full slab AND its embedded backing buffer, which
353 // invalidates the SharedReadWrite tag the backing returned via
354 // `InlineBacked::buffer_base` (and which the live slot pointers were
355 // derived from). Miri caught this as a Stacked Borrows violation
356 // when `Slab::deallocate` later wrote the freelist link
357 // through one of those slot pointers.
358 let slab = unsafe { &*self.inner.slab.get() };
359 for entry in pending {
360 // SAFETY: the entry came from our SlabRemote::deallocate caller,
361 // who promised (ptr, layout) was issued by this slab.
362 unsafe { slab.deallocate(entry.ptr, entry.layout) };
363 }
364 }
365
366 /// Internal: check the batch-policy condition and drain if met.
367 ///
368 /// Hot path: called from every owner-side `allocate` and `deallocate`.
369 /// Two-tier design:
370 ///
371 /// 1. **Fast path** — sample the cached `remote_queue_len` mirror with
372 /// a single `Relaxed` load. The mirror is maintained under the
373 /// queue mutex by `SlabRemote::try_deallocate` (on push) and by
374 /// `drain` / `maybe_drain` (on consume), so it tracks the queue
375 /// depth at lock-release granularity. If the observed depth is
376 /// below threshold we return immediately — no mutex acquisition,
377 /// no `VecDeque::len()` indirection. Cost: one Relaxed load +
378 /// compare (~2–3 ns on x86_64 / AArch64).
379 /// 2. **Slow path** — depth ≥ threshold. We `try_lock` the queue and,
380 /// if uncontended, drain in one shot. `try_lock` (not `lock`) so
381 /// we don't contend with an in-flight remote push; the next tick
382 /// will catch up.
383 ///
384 /// Bounded staleness: the Relaxed load may observe a value behind a
385 /// concurrent remote push that hasn't yet released the mutex. The
386 /// drift is at most one push per tick; since `maybe_drain` is called
387 /// on every owner alloc/dealloc, the queue cannot grow unboundedly.
388 /// (If the owner is genuinely idle for long stretches, the queue is
389 /// also bounded by `queue_capacity` — remote pushes start rejecting
390 /// at that point.)
391 ///
392 /// Why we don't update the mirror outside the lock: the mirror and
393 /// the queue must stay in sync at lock-release time, otherwise the
394 /// owner's Relaxed load could observe an inconsistent pair (mirror=0
395 /// while queue has pending entries → owner skips draining and the
396 /// remote-pushed slot leaks until the next tick that happens to
397 /// observe a non-zero mirror). Keeping both writes inside the same
398 /// critical section makes the consistency check trivial.
399 fn maybe_drain(&self) {
400 // Fast path: relaxed load of the queue-length mirror. The
401 // updates to this counter live inside the queue mutex
402 // critical sections (push in `SlabRemote::try_deallocate`,
403 // reset in `drain` / `maybe_drain` / `Drop`), so a load that
404 // observes value `n` means the queue had ≥ n entries at the
405 // most recent mutex unlock visible to this CPU — possibly more
406 // since then, but never artificially inflated.
407 let pending = self.inner.remote_queue_len.load(Ordering::Relaxed);
408 let threshold = match self.batch_policy {
409 BatchPolicy::Fixed(n) => n,
410 BatchPolicy::Adaptive => self.adaptive_threshold(pending),
411 };
412 if pending < threshold {
413 return;
414 }
415
416 // Slow path: threshold tripped. Take the queue under `try_lock`
417 // and drain in one shot. `try_lock` (not `lock`) so we don't
418 // serialize against an in-flight remote push; the next tick
419 // will catch up.
420 let entries: Vec<RemoteEntry> = {
421 let mut q = match self.inner.remote_queue.try_lock() {
422 Ok(q) => q,
423 Err(_) => return, // contended; skip — we'll catch up next time
424 };
425 // Re-check inside the lock — the queue may have been drained
426 // by another owner-thread path (e.g. an `allocate` that
427 // bottomed out and called `drain()` between our load and
428 // try_lock). Skip the Vec allocation in that case.
429 if q.is_empty() {
430 // Mirror invariant: empty queue ⇒ mirror == 0.
431 debug_assert_eq!(
432 self.inner.remote_queue_len.load(Ordering::Relaxed),
433 0,
434 "maybe_drain: queue empty but remote_queue_len mirror non-zero",
435 );
436 return;
437 }
438 let entries: Vec<_> = q.drain(..).collect();
439 self.inner.remote_queue_len.store(0, Ordering::Relaxed);
440 entries
441 };
442 // Lock dropped. Process the snapshot under exclusive !Sync access.
443 // SAFETY: SlabOwner is !Sync, so no other thread holds an alias
444 // to the slab. `&*` (Shared retag) is sufficient — see `drain()`
445 // for the rationale; `Slab::deallocate` takes `&self`.
446 let slab = unsafe { &*self.inner.slab.get() };
447 for entry in entries {
448 // SAFETY: the entry came from our SlabRemote::deallocate caller,
449 // who promised (ptr, layout) was issued by this slab.
450 unsafe { slab.deallocate(entry.ptr, entry.layout) };
451 }
452 }
453
454 /// Update the adaptive state based on observed `pending` queue
455 /// depth and return the current threshold.
456 ///
457 /// Step DOWN (smaller batch, drain sooner) when `pending` exceeds
458 /// 75% of `queue_capacity` — queue is filling up, relieve back-
459 /// pressure on remote senders.
460 ///
461 /// Step UP (larger batch, drain later) when `pending` is below
462 /// 25% of `queue_capacity` — queue is mostly empty, amortize the
463 /// drain cost.
464 ///
465 /// A cooldown of [`ADAPTIVE_COOLDOWN_TICKS`] calls between
466 /// adjustments prevents oscillation around a band edge. All
467 /// arithmetic integer; the 0.25 / 0.75 bands are encoded as
468 /// `pending * 4 < cap` and `pending * 4 > cap * 3`.
469 fn adaptive_threshold(&self, pending: usize) -> usize {
470 let mut state = self.adaptive.get();
471 if state.cooldown > 0 {
472 state.cooldown -= 1;
473 } else {
474 let cap = self.inner.queue_capacity;
475 // q > 75% — step down toward smaller batch.
476 if pending.saturating_mul(4) > cap.saturating_mul(3) && state.step > 0 {
477 state.step -= 1;
478 state.cooldown = ADAPTIVE_COOLDOWN_TICKS;
479 // q < 25% — step up toward larger batch.
480 } else if pending.saturating_mul(4) < cap && state.step < 4 {
481 state.step += 1;
482 state.cooldown = ADAPTIVE_COOLDOWN_TICKS;
483 }
484 }
485 self.adaptive.set(state);
486 state.threshold()
487 }
488
489 /// Current adaptive-step threshold (in remote-queue entries), or
490 /// `None` if the owner is configured with `BatchPolicy::Fixed`.
491 /// Useful in tests and adaptive-tuning telemetry.
492 #[inline]
493 pub fn adaptive_threshold_snapshot(&self) -> Option<usize> {
494 match self.batch_policy {
495 BatchPolicy::Adaptive => Some(self.adaptive.get().threshold()),
496 BatchPolicy::Fixed(_) => None,
497 }
498 }
499}
500
501unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Deallocator for SlabOwner<T, B> {
502 #[inline]
503 unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: NonZeroLayout) {
504 // Drain pending remote deallocations on the owner-side dealloc
505 // path so a long-lived owner that allocates rarely (or never)
506 // and only deallocs locally still services the remote queue.
507 // Without this, the remote queue accumulates indefinitely on
508 // dealloc-heavy workloads.
509 self.maybe_drain();
510 // Owner-side dealloc: direct push to local freelist (no queue).
511 // SAFETY: !Sync ensures exclusive access to the slab. `&*` (Shared
512 // retag) is sufficient — Slab::deallocate takes `&self` and uses
513 // interior mutability for the freelist head.
514 let slab = unsafe { &*self.inner.slab.get() };
515 unsafe { slab.deallocate(ptr, layout) };
516 }
517}
518
519unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Allocator for SlabOwner<T, B> {
520 #[inline]
521 fn allocate(&self, layout: NonZeroLayout) -> Result<NonNull<[u8]>, AllocError> {
522 self.maybe_drain();
523 // SAFETY: !Sync on SlabOwner — exclusive access to inner slab.
524 // `&*` (Shared retag) rather than `&mut *`: `Slab::allocate` takes
525 // `&self`, and a `&mut Slab` retag would invalidate the inner
526 // backing's SharedReadWrite tag covering its storage region —
527 // every previously-issued slot pointer that the slab's freelist
528 // (or its consumers) still holds would become a stale tag, which
529 // is UB under Stacked Borrows.
530 let slab = unsafe { &*self.inner.slab.get() };
531 // If first attempt fails (local list empty + uncarved exhausted),
532 // drain in case the queue has frees we can recover.
533 match slab.allocate(layout) {
534 Ok(block) => Ok(block),
535 Err(_) => {
536 // NLL ends the borrow of `slab` after its last use above (the
537 // failing allocate call). `drain()` re-borrows the UnsafeCell
538 // through its own &self path; the prior &Slab is no longer
539 // live, so the second & creation does not violate aliasing.
540 self.drain();
541 // SAFETY: !Sync, re-borrow shared access. The drain()
542 // borrow has already ended by the time control reaches here.
543 let slab = unsafe { &*self.inner.slab.get() };
544 slab.allocate(layout)
545 }
546 }
547 }
548
549 #[inline]
550 unsafe fn usable_size(&self, ptr: NonNull<u8>, layout: NonZeroLayout) -> Option<usize> {
551 // Forward to the inner Slab, which reports the full slot stride — so an
552 // outer scrub wrapper (`PoisonOnFree`/`ZeroizeOnFree`) over a
553 // `SlabOwner` wipes the whole slot on free, not just the requested
554 // prefix. Mirrors the `capacity_bytes`/`corruption_events` forwards.
555 // SAFETY: !Sync — exclusive access; `ptr` came from the inner slab's
556 // allocate via this owner, and the caller upholds usable_size's
557 // contract.
558 let slab = unsafe { &*self.inner.slab.get() };
559 unsafe { slab.usable_size(ptr, layout) }
560 }
561
562 fn capacity_bytes(&self) -> Option<usize> {
563 // SAFETY: !Sync — exclusive access.
564 let slab = unsafe { &*self.inner.slab.get() };
565 slab.capacity_bytes()
566 }
567
568 #[inline]
569 fn corruption_events(&self) -> u64 {
570 // SAFETY: !Sync — exclusive access. The owner-thread is the only
571 // reader of this counter via the SlabOwner allocate path; remote
572 // pushes don't touch the inner Slab's counter directly (they
573 // enqueue and the owner drains).
574 let slab = unsafe { &*self.inner.slab.get() };
575 slab.corruption_events()
576 }
577}
578
579/// Final drain on owner drop. Without this:
580///
581/// - `SlabRemote` clones outliving the owner would push entries into
582/// `remote_queue` that nothing ever drains.
583/// - Slots queued for return (already routed by the remote, not yet
584/// drained by the owner) would never be reclaimed back into the
585/// slab's local freelist. The slab keeps those slots marked-live
586/// for as long as the last remote keeps the `Arc<SlabInner>` alive
587/// — operationally a slot-table leak. Note that `T`'s destructor is
588/// the remote caller's responsibility BEFORE calling
589/// `SlabRemote::deallocate` (per the `Slab::deallocate` safety
590/// contract); the drain here only reclaims the freelist entry, it
591/// does not run `T::drop`.
592/// - Subsequent `SlabRemote::deallocate` (the spinning, infallible
593/// variant) would spin forever on a full queue, hanging the calling
594/// thread.
595///
596/// We close the queue while holding its mutex (race-free against any
597/// in-flight remote push) and drain the pending entries into the local
598/// freelist. After this, the slab is consistent and any further remote
599/// push observes `closed == true` and returns `Err(ptr)` /
600/// no-ops without spinning.
601impl<T, B: Allocator + forge_alloc_core::FixedRange> Drop for SlabOwner<T, B> {
602 fn drop(&mut self) {
603 // Phase 1: close + snapshot under the queue mutex.
604 //
605 // The close MUST be set under the mutex so a concurrent remote
606 // push that has acquired the lock either:
607 // (a) sees `closed == false` and pushes — we drain it in this
608 // same critical section below.
609 // (b) sees `closed == true` (after we set it) and returns
610 // `Err(ptr)` without pushing — the caller keeps ownership.
611 //
612 // No remote push can slip in between "we drain" and "we set
613 // closed" because both happen under the same lock acquisition.
614 let pending: Vec<RemoteEntry> = {
615 let mut q = match self.inner.remote_queue.lock() {
616 Ok(q) => q,
617 // Mutex poisoned — a prior holder panicked while
618 // mutating the queue. Recover the entries; the panic
619 // path is already broken, so don't compound it by
620 // bailing out on poison.
621 Err(p) => p.into_inner(),
622 };
623 // Release ordering pairs with Acquire in
624 // `SlabRemote::try_deallocate`'s closed-check. The mutex
625 // already provides happens-before for the queue contents,
626 // but we publish the flag separately so a remote that
627 // already holds the lock will observe the store via the
628 // unlock/relock chain.
629 self.inner.closed.store(true, Ordering::Release);
630 let entries: Vec<_> = q.drain(..).collect();
631 // Mirror reset under the same critical section as the drain
632 // so an `Arc<SlabInner>` still alive via `SlabRemote` clones
633 // post-owner-drop reflects "queue empty + closed" rather
634 // than a stale length. No fast path consumes the mirror
635 // after the owner is gone, but keep it consistent for
636 // debugging / future invariants.
637 self.inner.remote_queue_len.store(0, Ordering::Relaxed);
638 entries
639 };
640 // Phase 2: process the drained entries under our exclusive
641 // !Sync access. SAFETY: SlabOwner is !Sync, so no other thread
642 // holds an alias to the slab. The remote queue is now closed;
643 // no new alias will appear.
644 //
645 // **Drop-during-unwind escalation**: if `slab.deallocate`
646 // panics inside the drain loop (e.g. a wrapping
647 // `Statistics::deallocate`'s `debug_assert!` fires on an
648 // underflow), the loop aborts and the **remaining `pending`
649 // entries are dropped without being routed to the slab** — the
650 // slots are leaked. The `Arc<SlabInner>` itself still drops
651 // normally; the leak is just the freelist re-link work.
652 // If this Drop is running *during* an existing panic-unwind,
653 // the second panic here triggers **immediate process abort**
654 // per Rust language rules. The remote queue is already closed
655 // (Phase 1) so no further pushes can succeed, regardless of
656 // outcome — the closed-flag promise is preserved even on the
657 // abort path.
658 // `&*` (Shared retag) — Slab::deallocate takes `&self`; see the
659 // Stacked Borrows rationale on `drain()`.
660 let slab = unsafe { &*self.inner.slab.get() };
661 for entry in pending {
662 // SAFETY: the entry was pushed by a `SlabRemote::try_deallocate`
663 // caller who promised the pointer originated from this slab.
664 unsafe { slab.deallocate(entry.ptr, entry.layout) };
665 }
666 // Defense-in-depth: after Phase 1 closed the queue and Phase 2
667 // drained it under the same critical section / single-thread
668 // ownership, no entries can be added. The queue must now be
669 // empty for the `closed == true` promise to hold. Re-acquire
670 // (poisoned-recover) to inspect — under unwind this may itself
671 // panic (poisoned-then-abort), which is the desired safety
672 // posture for a corrupted Drop path. Compiled out in release.
673 debug_assert!(
674 self.inner
675 .remote_queue
676 .lock()
677 .map(|q| q.is_empty())
678 .unwrap_or_else(|p| p.into_inner().is_empty()),
679 "SlabOwner::drop: remote_queue non-empty after Phase 1+2 drain — \
680 closed-flag invariant violated, a remote push raced past the close",
681 );
682 debug_assert_eq!(
683 self.inner.remote_queue_len.load(Ordering::Relaxed),
684 0,
685 "SlabOwner::drop: remote_queue_len mirror non-zero after drain — \
686 push-side store missed the lock or drain-side reset was elided",
687 );
688 // The slab itself (and any T: Drop in still-live slots not
689 // routed through the queue) drops when the last
690 // `Arc<SlabInner>` is released — i.e., when the last
691 // `SlabRemote` clone is dropped. That's outside our control,
692 // but the closed flag keeps the queue from growing in the
693 // meantime.
694 }
695}
696
697impl<T, B: Allocator + forge_alloc_core::FixedRange> FixedRange for SlabOwner<T, B> {
698 fn base(&self) -> NonNull<u8> {
699 // SAFETY: !Sync — exclusive access.
700 let slab = unsafe { &*self.inner.slab.get() };
701 slab.base()
702 }
703
704 fn size(&self) -> usize {
705 // SAFETY: !Sync — exclusive access.
706 let slab = unsafe { &*self.inner.slab.get() };
707 slab.size()
708 }
709}
710
711impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabRemote<T, B> {
712 /// Non-spinning remote deallocate. Returns `Err(ptr)` if the queue is
713 /// full **or** the owner has been dropped (queue is closed); caller
714 /// retains ownership and must handle the pointer (typically by
715 /// dropping it once the whole slab tears down).
716 ///
717 /// # Safety
718 ///
719 /// `ptr` must have been allocated from the corresponding `SlabOwner`.
720 /// On `Err`, the pointer is still owned by the caller.
721 pub unsafe fn try_deallocate(
722 &self,
723 ptr: NonNull<u8>,
724 layout: NonZeroLayout,
725 ) -> Result<(), NonNull<u8>> {
726 let mut q = self
727 .inner
728 .remote_queue
729 .lock()
730 .expect("SlabRemote queue poisoned");
731 // Closed check under the lock pairs with the close-under-lock
732 // in `SlabOwner::drop`: any push that observes `closed == false`
733 // here is guaranteed to be drained by the owner's final
734 // drain-and-close critical section (which can't interleave with
735 // ours).
736 if self.inner.closed.load(Ordering::Acquire) {
737 return Err(ptr);
738 }
739 if q.len() >= self.inner.queue_capacity {
740 return Err(ptr);
741 }
742 q.push_back(RemoteEntry { ptr, layout });
743 // Update the owner-fast-path mirror under the lock. `store(q.len())`
744 // rather than `fetch_add(1)` so the post-store value matches the
745 // queue length at lock-release time exactly.
746 //
747 // Ordering: `Relaxed` is sufficient because the mirror is
748 // **advisory only**. The owner's fast path samples the counter
749 // outside the lock (no formal happens-before edge with this
750 // store), but the slow path then re-acquires the queue mutex
751 // and re-verifies the queue itself before draining — so any
752 // visibility lag of the Relaxed load is bounded by one mutex
753 // round-trip. On real hardware the Relaxed store is also
754 // committed by the unlock fence, so steady-state owner reads
755 // observe the update within a few hundred nanoseconds of the
756 // push. See `maybe_drain` for the bounded-staleness argument.
757 self.inner
758 .remote_queue_len
759 .store(q.len(), Ordering::Relaxed);
760 Ok(())
761 }
762}
763
764unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Deallocator for SlabRemote<T, B> {
765 /// Spins until the queue accepts the deallocation, **except** when
766 /// the owner has been dropped — in that case the queue will never
767 /// drain again, so we return immediately. The slot remains
768 /// marked-live in the slab until the last `SlabRemote` clone
769 /// releases the shared `Arc<SlabInner>` (at which point the slab
770 /// itself drops and the backing region is fully reclaimed). Any
771 /// `T: Drop` whose destructor was already run by the caller before
772 /// `deallocate` is not affected — the destructor ran on schedule;
773 /// only the freelist entry for the slot is forfeited.
774 ///
775 /// Latency-sensitive callers should use [`try_deallocate`](Self::try_deallocate)
776 /// and handle the `Err(ptr)` overflow / closed-queue explicitly.
777 unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: NonZeroLayout) {
778 let mut p = ptr;
779 loop {
780 // SAFETY: forwarded from caller.
781 match unsafe { self.try_deallocate(p, layout) } {
782 Ok(()) => return,
783 Err(returned) => {
784 // Distinguish "queue closed, owner gone" (permanent —
785 // bail to avoid an infinite spin) from "queue full,
786 // transient" (continue spinning).
787 if self.inner.closed.load(Ordering::Acquire) {
788 return;
789 }
790 p = returned;
791 core::hint::spin_loop();
792 }
793 }
794 }
795 }
796}
797
798// SlabOwner is Send but !Sync. The Arc<SlabInner> is Send when its contents
799// are; we have UnsafeCell<Slab<T, B>> inside, which is !Sync by default —
800// good. But SlabInner needs to be Sync (because Arc<SlabInner> being shared
801// across Owner + Remote(s) requires Sync) — the Mutex provides Sync for the
802// queue, and the UnsafeCell-wrapped slab is accessed only by the !Sync
803// Owner. Manual impl needed.
804unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabOwner<T, B> {}
805// SlabOwner is deliberately !Sync — `_not_sync: PhantomData<Cell<()>>`
806// blocks the auto-derive. We rely on this for soundness: if two threads
807// could share `&SlabOwner`, both could call `allocate` and race on the
808// UnsafeCell<Slab<T, B>>.
809
810unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabRemote<T, B> {}
811unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Sync for SlabRemote<T, B> {}
812
813// The Arc<SlabInner> requires SlabInner: Send + Sync.
814unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabInner<T, B> {}
815unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Sync for SlabInner<T, B> {}
816
817#[cfg(test)]
818mod tests {
819 use super::*;
820 use crate::backing::InlineBacked;
821
822 /// Positive companion to the `SlabOwner !Sync` compile_fail pin: the
823 /// owner IS `Send` (single-threaded ownership transfer is the v0.1
824 /// use case). If a future refactor accidentally removed `Send`, the
825 /// `multi_threaded_remote_dealloc` test below would still compile but
826 /// this pin would fail loudly.
827 #[test]
828 fn slab_owner_is_send() {
829 fn assert_send<T: Send>() {}
830 assert_send::<SlabOwner<u64, InlineBacked<512>>>();
831 }
832
833 /// Positive companion to the `SlabRemote<!Send T>` compile_fail pin:
834 /// `SlabRemote<u64, InlineBacked<N>>` IS `Send + Sync` because both
835 /// `T = u64` and `B: Send`. Pinning this stops a refactor that
836 /// accidentally weakened the bound from breaking the cross-thread
837 /// deallocate API silently.
838 #[test]
839 fn slab_remote_is_send_and_sync_when_t_is_send() {
840 fn assert_send_sync<T: Send + Sync>() {}
841 assert_send_sync::<SlabRemote<u64, InlineBacked<512>>>();
842 }
843
844 #[test]
845 fn owner_can_alloc_and_dealloc_locally() {
846 let owner: SlabOwner<u64, InlineBacked<512>> =
847 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
848 let layout = NonZeroLayout::for_type::<u64>().unwrap();
849 let p = owner.allocate(layout).unwrap();
850 unsafe { owner.deallocate(p.cast(), layout) };
851 }
852
853 /// `usable_size` forwards to the inner Slab's full slot stride, so an outer
854 /// scrub wrapper wipes the whole slot. `SlabOwner<u8>` → 8-byte slots.
855 #[test]
856 fn usable_size_forwards_slot_stride() {
857 let owner: SlabOwner<u8, InlineBacked<512>> =
858 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
859 let layout = NonZeroLayout::from_size_align(1, 1).unwrap();
860 let p = owner.allocate(layout).unwrap();
861 let ptr = p.cast::<u8>();
862 let us = unsafe { owner.usable_size(ptr, layout) };
863 assert_eq!(
864 us,
865 Some(8),
866 "usable_size must forward the inner slot stride"
867 );
868 unsafe { owner.deallocate(ptr, layout) };
869 }
870
871 #[test]
872 fn remote_can_deallocate_owner_allocations() {
873 let owner: SlabOwner<u64, InlineBacked<512>> =
874 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
875 let remote = owner.remote();
876 let layout = NonZeroLayout::for_type::<u64>().unwrap();
877 let p = owner.allocate(layout).unwrap();
878 // Cross-thread deallocate via remote.
879 unsafe { remote.deallocate(p.cast(), layout) };
880 // Force a drain so the slot returns to the local freelist.
881 owner.drain();
882 // Subsequent allocate must reuse the freed slot.
883 let p2 = owner.allocate(layout).unwrap();
884 assert_eq!(p.cast::<u8>().as_ptr(), p2.cast::<u8>().as_ptr());
885 }
886
887 /// Boundary: `queue_capacity = 0` means every remote deallocation is
888 /// rejected — `try_deallocate` returns `Err(ptr)` on the very first
889 /// call. This is the documented contract; pinning it here protects
890 /// against an accidental "off-by-one allows one push" regression.
891 #[test]
892 fn queue_capacity_zero_rejects_every_remote_dealloc() {
893 let owner: SlabOwner<u64, InlineBacked<512>> =
894 SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 0)
895 .unwrap();
896 let remote = owner.remote();
897 let layout = NonZeroLayout::for_type::<u64>().unwrap();
898 let p = owner.allocate(layout).unwrap();
899 // Cap 0: first try_deallocate must already fail.
900 let r = unsafe { remote.try_deallocate(p.cast(), layout) };
901 assert!(
902 r.is_err(),
903 "queue_capacity=0 must reject every remote_deallocate",
904 );
905 // Clean up via owner to keep the slab balanced under the test.
906 unsafe { owner.deallocate(p.cast(), layout) };
907 }
908
909 /// Boundary: `queue_capacity = 1` accepts exactly one push; the
910 /// second must fail.
911 #[test]
912 fn queue_capacity_one_accepts_one_push_only() {
913 let owner: SlabOwner<u64, InlineBacked<512>> =
914 SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 1)
915 .unwrap();
916 let remote = owner.remote();
917 let layout = NonZeroLayout::for_type::<u64>().unwrap();
918 let a = owner.allocate(layout).unwrap();
919 let b = owner.allocate(layout).unwrap();
920 unsafe {
921 assert!(remote.try_deallocate(a.cast(), layout).is_ok());
922 assert!(remote.try_deallocate(b.cast(), layout).is_err());
923 // Drop the rejected pointer through the owner.
924 owner.deallocate(b.cast(), layout);
925 }
926 }
927
928 /// Boundary: adaptive policy at the floor (step 0 → threshold 8)
929 /// must not underflow the step counter on repeated step-down
930 /// attempts. We feed sustained > 75% queue depth and observe
931 /// that the threshold stops at the floor rather than wrapping.
932 #[test]
933 fn adaptive_policy_step_floor_does_not_underflow() {
934 let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
935 256,
936 InlineBacked::<8192>::new(),
937 BatchPolicy::Adaptive,
938 16,
939 )
940 .unwrap();
941 let remote = owner.remote();
942 let layout = NonZeroLayout::for_type::<u64>().unwrap();
943 // Pre-allocate a pool.
944 let pool: Vec<_> = (0..200).map(|_| owner.allocate(layout).unwrap()).collect();
945 let mut pushed = 0;
946 // Drive the policy to floor across many cycles.
947 for _ in 0..100 {
948 while pushed < pool.len() && {
949 let q = owner.inner.remote_queue.lock().unwrap();
950 q.len() < 14
951 } {
952 let _ = unsafe { remote.try_deallocate(pool[pushed].cast(), layout) };
953 pushed += 1;
954 }
955 let _ = owner.allocate(layout);
956 }
957 // Threshold must be at the floor (8) or one step up (16) — never
958 // a wrapped or invalid value.
959 let thr = owner.adaptive_threshold_snapshot().unwrap();
960 assert!(
961 ADAPTIVE_LEVELS.contains(&thr),
962 "threshold {thr} must remain one of the configured levels"
963 );
964 }
965
966 #[test]
967 fn try_deallocate_returns_err_on_full_queue() {
968 let owner: SlabOwner<u64, InlineBacked<512>> =
969 SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 2)
970 .unwrap();
971 let remote = owner.remote();
972 let layout = NonZeroLayout::for_type::<u64>().unwrap();
973 let a = owner.allocate(layout).unwrap();
974 let b = owner.allocate(layout).unwrap();
975 let c = owner.allocate(layout).unwrap();
976 unsafe {
977 remote.try_deallocate(a.cast(), layout).unwrap();
978 remote.try_deallocate(b.cast(), layout).unwrap();
979 // Queue capacity 2 — third must fail.
980 let r = remote.try_deallocate(c.cast(), layout);
981 assert!(r.is_err());
982 // Drop c via the owner so we don't leak under the test.
983 owner.deallocate(c.cast(), layout);
984 }
985 }
986
987 #[test]
988 #[cfg_attr(miri, ignore = "miri-incompatible: mmap / threads")]
989 fn multi_threaded_remote_dealloc() {
990 use std::sync::Arc;
991 use std::thread;
992
993 let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
994 256,
995 InlineBacked::<8192>::new(),
996 BatchPolicy::Fixed(64),
997 1024,
998 )
999 .unwrap();
1000 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1001
1002 // Pre-allocate a bunch on the owner thread, then send pointers off
1003 // to worker threads for cross-thread deallocate.
1004 let mut ptrs = Vec::new();
1005 for _ in 0..128 {
1006 ptrs.push(owner.allocate(layout).unwrap());
1007 }
1008 let ptrs_addrs: Vec<usize> = ptrs
1009 .iter()
1010 .map(|p| p.cast::<u8>().as_ptr() as usize)
1011 .collect();
1012 let remote = Arc::new(owner.remote());
1013
1014 let mut handles = Vec::new();
1015 for chunk in ptrs_addrs.chunks(32) {
1016 let chunk = chunk.to_vec();
1017 let r = Arc::clone(&remote);
1018 handles.push(thread::spawn(move || {
1019 for addr in chunk {
1020 let p = unsafe { NonNull::new_unchecked(addr as *mut u8) };
1021 unsafe { r.deallocate(p, layout) };
1022 }
1023 }));
1024 }
1025 for h in handles {
1026 h.join().unwrap();
1027 }
1028 owner.drain();
1029 // After drain, the freelist must hold exactly the 128 freed slots, so
1030 // re-allocating 128 must return those SAME slots (popped from the
1031 // freelist before any fresh carve from next_uncarved). Asserting the
1032 // re-allocated addresses equal the freed set is what actually proves
1033 // drain reclaimed them: a mere `is_ok()` check would pass even if drain
1034 // dropped every entry, because the slab would just carve fresh slots
1035 // from the remaining capacity (256 > 128).
1036 use std::collections::HashSet;
1037 let freed: HashSet<usize> = ptrs_addrs.iter().copied().collect();
1038 let mut reclaimed: HashSet<usize> = HashSet::new();
1039 for _ in 0..128 {
1040 let p = owner
1041 .allocate(layout)
1042 .expect("a reclaimed slot must be allocatable");
1043 let addr = p.cast::<u8>().as_ptr() as usize;
1044 assert!(
1045 freed.contains(&addr),
1046 "re-allocated slot {addr:#x} is not one of the drained slots — drain failed to reclaim",
1047 );
1048 reclaimed.insert(addr);
1049 }
1050 assert_eq!(
1051 reclaimed, freed,
1052 "the 128 re-allocated slots must be exactly the 128 drained slots",
1053 );
1054 }
1055
1056 // ====================================================================
1057 // BatchPolicy::Adaptive — stepped-threshold control law tests
1058 // ====================================================================
1059
1060 /// Helper: build a slab with `Adaptive` policy and a small queue
1061 /// so the 25%/75% hysteresis bands trigger on a few entries.
1062 fn build_adaptive_owner(queue_capacity: usize) -> SlabOwner<u64, InlineBacked<8192>> {
1063 SlabOwner::with_batch_policy(
1064 256,
1065 InlineBacked::<8192>::new(),
1066 BatchPolicy::Adaptive,
1067 queue_capacity,
1068 )
1069 .unwrap()
1070 }
1071
1072 #[test]
1073 fn adaptive_initial_threshold_is_64() {
1074 let owner = build_adaptive_owner(64);
1075 assert_eq!(owner.adaptive_threshold_snapshot(), Some(64));
1076 }
1077
1078 #[test]
1079 fn fixed_policy_has_no_adaptive_snapshot() {
1080 let owner: SlabOwner<u64, InlineBacked<512>> =
1081 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1082 // Default is Fixed(64).
1083 assert_eq!(owner.adaptive_threshold_snapshot(), None);
1084 }
1085
1086 #[test]
1087 fn adaptive_steps_down_when_queue_exceeds_75_percent() {
1088 // queue_capacity = 16; > 75% means q > 12 (since q*4 > 16*3 = 48 ⇔ q > 12).
1089 //
1090 // Path-dependence: every allocate calls maybe_drain, which observes
1091 // the queue and may step the policy. The 13 pre-allocates below
1092 // step the policy UP to the ceiling (128) on call #1 (queue empty
1093 // ⇒ q*4 < cap ⇒ step up) and engage a cooldown of
1094 // ADAPTIVE_COOLDOWN_TICKS = 16. Calls #2..#13 just decrement the
1095 // cooldown. After pushing 13 to the queue we need (16 − 12) = 4
1096 // more allocates to drain cooldown, then a 5th to observe the
1097 // step-down trigger. Total: 13 + 5 = 18 allocates.
1098 let owner = build_adaptive_owner(16);
1099 let remote = owner.remote();
1100 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1101 // 13 pre-allocates: step climbs to 128 on call 1, cooldown to 4 by 13.
1102 let ptrs: Vec<_> = (0..13).map(|_| owner.allocate(layout).unwrap()).collect();
1103 assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1104 // Push all 13 to the remote queue (q is now 13, > 75% of 16).
1105 for p in &ptrs {
1106 unsafe { remote.try_deallocate(p.cast(), layout).unwrap() };
1107 }
1108 // 4 more allocates to drain the remaining cooldown ticks. Each
1109 // observes q=13 but cooldown>0, so the band check is skipped.
1110 // Threshold stays at 128 (so q<threshold, no drain), queue stays at 13.
1111 for _ in 0..4 {
1112 let _ = owner.allocate(layout).unwrap();
1113 assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1114 }
1115 // 18th allocate: cooldown=0, band check fires, q*4=52 > cap*3=48,
1116 // step down from 4 → 3 ⇒ threshold 64.
1117 let _ = owner.allocate(layout).unwrap();
1118 assert_eq!(
1119 owner.adaptive_threshold_snapshot(),
1120 Some(64),
1121 "step should have dropped one level after q > 75% sample",
1122 );
1123 }
1124
1125 #[test]
1126 fn adaptive_steps_up_when_queue_below_25_percent() {
1127 // queue_capacity = 16; < 25% means q*4 < 16 ⇔ q < 4. q=0 qualifies.
1128 let owner = build_adaptive_owner(16);
1129 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1130 assert_eq!(owner.adaptive_threshold_snapshot(), Some(64));
1131 // One allocate observes empty queue → step up to 128.
1132 let _ = owner.allocate(layout).unwrap();
1133 assert_eq!(
1134 owner.adaptive_threshold_snapshot(),
1135 Some(128),
1136 "step should have climbed one level after q < 25% sample",
1137 );
1138 }
1139
1140 #[test]
1141 fn adaptive_cooldown_prevents_oscillation() {
1142 // After one step adjustment the cooldown counter prevents the
1143 // next ADAPTIVE_COOLDOWN_TICKS maybe_drain calls from changing
1144 // the step. Verify by checking the threshold stays put.
1145 let owner = build_adaptive_owner(16);
1146 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1147 // First call: step 3 → 4 (queue empty, < 25%).
1148 let _ = owner.allocate(layout).unwrap();
1149 let after_first = owner.adaptive_threshold_snapshot().unwrap();
1150 assert_eq!(after_first, 128);
1151 // Subsequent allocates DURING cooldown should not advance further.
1152 // The step is already at the ceiling (4) anyway, but the cooldown
1153 // also blocks any move; even if we wanted to drop down, we can't.
1154 for _ in 0..(ADAPTIVE_COOLDOWN_TICKS - 1) {
1155 let _ = owner.allocate(layout).unwrap();
1156 assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1157 }
1158 }
1159
1160 #[test]
1161 fn adaptive_ceiling_is_respected() {
1162 // Step 4 (= 128) is the ceiling. Climb to it via many empty-queue
1163 // ticks (with cooldown waits in between).
1164 let owner = build_adaptive_owner(16);
1165 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1166 // Each "step + cooldown_ticks" cycle moves the threshold one
1167 // level. 1 step from 3→4 puts us at the ceiling immediately.
1168 for _ in 0..(2 * ADAPTIVE_COOLDOWN_TICKS) {
1169 let _ = owner.allocate(layout).unwrap();
1170 }
1171 assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1172 }
1173
1174 #[test]
1175 fn adaptive_floor_is_respected() {
1176 // Step 0 (= 8) is the floor. Push enough cross-thread frees to
1177 // keep the queue > 75% across repeated ticks; the policy should
1178 // step down 3→2→1→0 and then stop (no step below the floor).
1179 //
1180 // We need cap=16 (so q>12 triggers step down) and enough
1181 // pre-allocated pointers to keep the queue full as the owner
1182 // drains it on threshold crossings. After each step-down,
1183 // cooldown=16 protects 16 calls. To hit four step-downs
1184 // (3→2→1→0 plus a "floor reached" check) we need ~4 × 17 = 68
1185 // allocates plus enough queue refill in between.
1186 const SLAB_CAP: usize = 256;
1187 let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
1188 SLAB_CAP,
1189 InlineBacked::<8192>::new(),
1190 BatchPolicy::Adaptive,
1191 16,
1192 )
1193 .unwrap();
1194 let remote = owner.remote();
1195 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1196 // Phase 1: pre-allocate a pool of pointers to push from.
1197 let pool: Vec<_> = (0..200).map(|_| owner.allocate(layout).unwrap()).collect();
1198 // Phase 2: feed the queue full before each drain so the
1199 // observed q stays > 75%. Strategy: tight loop — push 14 to the
1200 // queue, allocate once (decrements cooldown OR triggers
1201 // step-down on the cooldown=0 tick), continue.
1202 let mut pushed = 0usize;
1203 for cycle in 0..6 {
1204 // Top the queue back up to ~14 entries each cycle.
1205 while pushed < pool.len() && {
1206 let q = owner.inner.remote_queue.lock().unwrap();
1207 q.len() < 14
1208 } {
1209 // SAFETY: pool came from this owner.
1210 let _ = unsafe { remote.try_deallocate(pool[pushed].cast(), layout) };
1211 pushed += 1;
1212 }
1213 // Allocate once to trigger maybe_drain.
1214 let _ = owner.allocate(layout);
1215 // After enough cycles the step must hit the floor.
1216 if cycle == 5 {
1217 let thr = owner.adaptive_threshold_snapshot().unwrap();
1218 assert!(
1219 thr <= 64,
1220 "policy should have stepped down toward floor, got {thr}",
1221 );
1222 }
1223 }
1224 }
1225
1226 #[test]
1227 fn adaptive_levels_are_sorted_and_match_spec() {
1228 assert_eq!(ADAPTIVE_LEVELS, [8, 16, 32, 64, 128]);
1229 }
1230
1231 /// Regression: the `remote_queue_len` mirror tracks the queue
1232 /// length at lock-release granularity. Verifies the bookkeeping
1233 /// across push, drain, and `maybe_drain`:
1234 /// - After N remote pushes, mirror == N.
1235 /// - After `drain()`, mirror == 0 even though no allocate happened.
1236 /// - After a fresh push following drain, mirror == 1 (no stale-add).
1237 #[test]
1238 fn remote_queue_len_mirror_tracks_lock_release_state() {
1239 let owner: SlabOwner<u64, InlineBacked<512>> =
1240 SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 16)
1241 .unwrap();
1242 let remote = owner.remote();
1243 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1244
1245 // Start: mirror is 0.
1246 assert_eq!(owner.inner.remote_queue_len.load(Ordering::Relaxed), 0);
1247
1248 // Push three: mirror must be 3.
1249 let p1 = owner.allocate(layout).unwrap();
1250 let p2 = owner.allocate(layout).unwrap();
1251 let p3 = owner.allocate(layout).unwrap();
1252 unsafe {
1253 remote.try_deallocate(p1.cast(), layout).unwrap();
1254 remote.try_deallocate(p2.cast(), layout).unwrap();
1255 remote.try_deallocate(p3.cast(), layout).unwrap();
1256 }
1257 assert_eq!(
1258 owner.inner.remote_queue_len.load(Ordering::Relaxed),
1259 3,
1260 "mirror should equal the number of pushed entries"
1261 );
1262
1263 // Drain: mirror resets to 0 without an allocate tick.
1264 owner.drain();
1265 assert_eq!(
1266 owner.inner.remote_queue_len.load(Ordering::Relaxed),
1267 0,
1268 "drain() must reset the mirror under the same critical section"
1269 );
1270
1271 // Push one more — must be exactly 1, not 4.
1272 let p4 = owner.allocate(layout).unwrap();
1273 unsafe { remote.try_deallocate(p4.cast(), layout).unwrap() };
1274 assert_eq!(
1275 owner.inner.remote_queue_len.load(Ordering::Relaxed),
1276 1,
1277 "post-drain push must start from 0, not stale-add"
1278 );
1279
1280 // Cleanup so the slab balances under Drop's debug_assert.
1281 owner.drain();
1282 }
1283
1284 /// Regression: when an owner-side `allocate` tips the queue over
1285 /// threshold via `maybe_drain`, the mirror is reset under the same
1286 /// critical section as the actual VecDeque drain — otherwise a
1287 /// subsequent fast-path load would observe a stale non-zero value
1288 /// and keep firing the slow path with an empty queue.
1289 #[test]
1290 fn maybe_drain_resets_mirror_under_lock() {
1291 // Threshold low so a single tick drains.
1292 let owner: SlabOwner<u64, InlineBacked<512>> =
1293 SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(2), 16)
1294 .unwrap();
1295 let remote = owner.remote();
1296 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1297 let p1 = owner.allocate(layout).unwrap();
1298 let p2 = owner.allocate(layout).unwrap();
1299 unsafe {
1300 remote.try_deallocate(p1.cast(), layout).unwrap();
1301 remote.try_deallocate(p2.cast(), layout).unwrap();
1302 }
1303 assert_eq!(owner.inner.remote_queue_len.load(Ordering::Relaxed), 2);
1304 // This allocate's maybe_drain hits the slow path (pending >= 2),
1305 // drains the two entries, and must reset the mirror.
1306 let _p3 = owner.allocate(layout).unwrap();
1307 assert_eq!(
1308 owner.inner.remote_queue_len.load(Ordering::Relaxed),
1309 0,
1310 "maybe_drain must reset the mirror inside the same critical section as the drain"
1311 );
1312 }
1313
1314 /// Regression: dropping the owner must drain pending remote frees
1315 /// (no T: Drop leaks beyond the natural Slab teardown) and close
1316 /// the queue so further remote pushes return `Err(ptr)` instead of
1317 /// piling into an undrainable queue.
1318 #[test]
1319 fn owner_drop_drains_pending_remote_frees_and_closes_queue() {
1320 let owner: SlabOwner<u64, InlineBacked<512>> =
1321 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1322 let remote = owner.remote();
1323 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1324
1325 // Pre-allocate two slots through the owner, then queue two
1326 // remote frees without draining.
1327 let a = owner.allocate(layout).unwrap();
1328 let b = owner.allocate(layout).unwrap();
1329 unsafe {
1330 remote.try_deallocate(a.cast(), layout).unwrap();
1331 remote.try_deallocate(b.cast(), layout).unwrap();
1332 }
1333 // Drop the owner. Drop impl must drain a and b into the local
1334 // freelist (otherwise the slab keeps them marked-live until the
1335 // last Arc drops).
1336 drop(owner);
1337
1338 // After owner drop, remote pushes must NOT block and must NOT
1339 // silently succeed — they return Err(ptr).
1340 let c_layout = layout;
1341 // Use a fake but well-formed pointer; we never call into the
1342 // slab from this path. (The Err path doesn't touch the slab.)
1343 let fake = unsafe { NonNull::new_unchecked(0x1000_usize as *mut u8) };
1344 let result = unsafe { remote.try_deallocate(fake, c_layout) };
1345 assert!(
1346 result.is_err(),
1347 "remote.try_deallocate must return Err after owner drop"
1348 );
1349 }
1350
1351 /// Regression: the spinning `Deallocator::deallocate` impl must
1352 /// bail out (not spin forever) when the queue has been closed by
1353 /// owner drop. Without the closed check, a long-running task
1354 /// holding a `SlabRemote` would hang when its `deallocate` runs
1355 /// after the owner is torn down.
1356 #[test]
1357 fn remote_deallocate_does_not_spin_after_owner_drop() {
1358 let owner: SlabOwner<u64, InlineBacked<512>> =
1359 SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1360 let remote = owner.remote();
1361 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1362 let _p = owner.allocate(layout).unwrap();
1363 drop(owner);
1364 // After owner drop, this would spin forever before the fix.
1365 // The closed-check inside deallocate returns immediately, so
1366 // this completes synchronously.
1367 let fake = unsafe { NonNull::new_unchecked(0x1000_usize as *mut u8) };
1368 unsafe { remote.deallocate(fake, layout) };
1369 }
1370
1371 /// Multi-threaded stress test: owner + N remote workers running
1372 /// concurrent allocates/frees under the Adaptive policy. The test
1373 /// passes if (a) all allocated pointers come back distinct, (b)
1374 /// after-stop drain leaves the slab in a reusable state, and (c)
1375 /// the adaptive threshold has moved off its initial value at least
1376 /// once (i.e. the policy actually adapted, not just sat).
1377 #[test]
1378 #[cfg_attr(miri, ignore = "miri-incompatible: mmap / threads")]
1379 fn adaptive_multi_threaded_stress() {
1380 use std::collections::VecDeque;
1381 use std::sync::atomic::{AtomicBool, Ordering};
1382 use std::sync::{Arc, Mutex};
1383 use std::thread;
1384
1385 const CAP: usize = 1024;
1386 const QUEUE_CAP: usize = 64;
1387 let owner: SlabOwner<u64, InlineBacked<{ 16 * 1024 }>> = SlabOwner::with_batch_policy(
1388 CAP,
1389 InlineBacked::<{ 16 * 1024 }>::new(),
1390 BatchPolicy::Adaptive,
1391 QUEUE_CAP,
1392 )
1393 .unwrap();
1394 let layout = NonZeroLayout::for_type::<u64>().unwrap();
1395
1396 // Producer-consumer cycle so every address is remote-freed exactly
1397 // ONCE per allocation (the previous version re-freed a fixed pool
1398 // thousands of times — a wholesale double-free that exercised UB
1399 // rather than the adaptive path). The owner allocates LIVE blocks and
1400 // publishes their addresses to a shared work queue; remote workers pop
1401 // and remote-free them; the owner drains to reclaim the slots and
1402 // re-allocates. A slot only becomes allocatable again after it has been
1403 // remote-freed and drained, so the owner never re-issues a live slot —
1404 // no address is freed twice without an intervening allocate.
1405 let work: Arc<Mutex<VecDeque<usize>>> = Arc::new(Mutex::new(VecDeque::new()));
1406 let stop = Arc::new(AtomicBool::new(false));
1407
1408 let mut handles = Vec::new();
1409 for _ in 0..4 {
1410 let remote = owner.remote();
1411 let stop = Arc::clone(&stop);
1412 let work = Arc::clone(&work);
1413 handles.push(thread::spawn(move || {
1414 while !stop.load(Ordering::Relaxed) {
1415 let addr = work.lock().unwrap().pop_front();
1416 let Some(addr) = addr else {
1417 thread::yield_now();
1418 continue;
1419 };
1420 let p = unsafe { NonNull::new_unchecked(addr as *mut u8) };
1421 // Retry until the bounded remote queue accepts it (the
1422 // owner drains it concurrently). Each `addr` was a live
1423 // owner-issued slot, so this frees it exactly once.
1424 loop {
1425 if unsafe { remote.try_deallocate(p, layout) }.is_ok() {
1426 break;
1427 }
1428 if stop.load(Ordering::Relaxed) {
1429 break;
1430 }
1431 thread::yield_now();
1432 }
1433 }
1434 }));
1435 }
1436
1437 // Owner loop: reclaim remote frees, allocate a fresh batch of LIVE
1438 // slots, hand their addresses to the workers. Track whether the
1439 // adaptive threshold moves off its initial value of 64.
1440 let mut saw_step = false;
1441 let mut live = Vec::new();
1442 for _ in 0..3_000 {
1443 owner.drain();
1444 for _ in 0..16 {
1445 match owner.allocate(layout) {
1446 Ok(block) => live.push(block.cast::<u8>().as_ptr() as usize),
1447 Err(_) => break, // slab full — workers will free some
1448 }
1449 }
1450 if !live.is_empty() {
1451 let mut q = work.lock().unwrap();
1452 for addr in live.drain(..) {
1453 q.push_back(addr);
1454 }
1455 }
1456 if let Some(t) = owner.adaptive_threshold_snapshot() {
1457 if t != 64 {
1458 saw_step = true;
1459 }
1460 }
1461 }
1462 stop.store(true, Ordering::Relaxed);
1463 for h in handles {
1464 let _ = h.join();
1465 }
1466 owner.drain();
1467 // The invariant that actually matters: after a full produce/remote-
1468 // free/drain cycle across threads, the freelist must be uncorrupted.
1469 // A real double-free or freelist-link regression would trip the slab's
1470 // MAC/tripwire and bump this counter — assert it explicitly (the
1471 // previous version of this test checked only `saw_step`, so a genuine
1472 // corruption could have slipped through).
1473 assert_eq!(
1474 owner.corruption_events(),
1475 0,
1476 "remote-free/drain cycle must not corrupt the freelist",
1477 );
1478 assert!(
1479 saw_step,
1480 "Adaptive policy should have moved the threshold off 64 under contention",
1481 );
1482 }
1483}