Skip to main content

forge_alloc/layout/
slab_owner.rs

1//! `SlabOwner<T, B>` + `SlabRemote<T, B>` — cross-thread typed allocation
2//! via the ownership-return model. Replaces the previously-named
3//! `MessagePassingSlab`.
4//!
5//! `Slab<T, B>` is `!Sync`. Another thread cannot call `deallocate` directly
6//! without racing on the freelist head. The snmalloc / mimalloc pattern,
7//! adopted here: cross-thread frees route a slot index to the owner via a
8//! queue; the owner drains the queue back into its local freelist on its
9//! next allocate.
10//!
11//! v0.1 ships a `Mutex<VecDeque<u32>>`-backed queue for correctness;
12//! v1.0 will swap in a lock-free MPSC ring. The visible API
13//! is identical either way.
14//!
15//! Requires `std` (uses `Arc`, `Mutex`, `VecDeque`).
16//!
17//! See `docs/ARCHITECTURE.md` for the cross-thread ownership design.
18
19#![cfg(feature = "std")]
20
21use core::ptr::NonNull;
22use std::collections::VecDeque;
23use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
24use std::sync::{Arc, Mutex};
25
26use forge_alloc_core::{
27    AllocError, Allocator, CachePadded, Deallocator, FixedRange, NonZeroLayout, CACHE_LINE,
28};
29
30use crate::layout::Slab;
31
32/// How aggressively the owner drains the remote-free queue.
33#[derive(Copy, Clone, Debug)]
34pub enum BatchPolicy {
35    /// Drain every `N` remote frees. v0.1 default = 64.
36    Fixed(usize),
37    /// Stepped-threshold adaptive policy. The drain threshold steps
38    /// through 5 levels — `[8, 16, 32, 64, 128]` — based on observed
39    /// queue depth relative to `queue_capacity`:
40    ///
41    /// - Queue length > 75% of capacity → step **down** (smaller batch,
42    ///   drain sooner) to relieve back-pressure on remote senders.
43    /// - Queue length < 25% of capacity → step **up** (larger batch,
44    ///   drain less often) to amortize the drain cost across more frees.
45    ///
46    /// A cooldown of `ADAPTIVE_COOLDOWN_TICKS` `maybe_drain` calls
47    /// between adjustments prevents oscillation. Initial step is 3
48    /// (threshold = 64) — matches the `Fixed(64)` v0.1 default so an
49    /// `Adaptive`-policy owner behaves like a `Fixed(64)` owner until
50    /// the workload pushes the threshold off the middle level.
51    ///
52    /// All arithmetic is integer-only; the 0.25 / 0.75 hysteresis band
53    /// is encoded as `q*4 < cap` / `q*4 > cap*3`. No floating point.
54    ///
55    /// This is the v1.0 control law; the v2.0 EMA-based upgrade is
56    /// gated on benchmark validation against this baseline.
57    Adaptive,
58}
59
60impl Default for BatchPolicy {
61    fn default() -> Self {
62        Self::Fixed(64)
63    }
64}
65
66/// Drain-threshold levels for [`BatchPolicy::Adaptive`].
67pub const ADAPTIVE_LEVELS: [usize; 5] = [8, 16, 32, 64, 128];
68
69/// Number of `maybe_drain` ticks between adaptive step adjustments.
70/// Tuned for low overhead and quick settling on bursty workloads.
71pub const ADAPTIVE_COOLDOWN_TICKS: u32 = 16;
72
73/// Internal state for [`BatchPolicy::Adaptive`]. Lives in a [`Cell`]
74/// on the (`!Sync`) [`SlabOwner`]; the cell guarantees only the owner
75/// thread reads or writes it.
76#[derive(Copy, Clone, Debug)]
77struct AdaptiveState {
78    /// Current step index into [`ADAPTIVE_LEVELS`] (0..=4).
79    step: u8,
80    /// Remaining `maybe_drain` ticks before the next adjustment is
81    /// allowed. Counts down to 0; while >0 the step is locked.
82    cooldown: u32,
83}
84
85impl AdaptiveState {
86    /// Starting state — step 3 (threshold = 64), matching the
87    /// `Fixed(64)` default so workloads that never trip the hysteresis
88    /// bands behave identically under either policy.
89    const fn initial() -> Self {
90        Self {
91            step: 3,
92            cooldown: 0,
93        }
94    }
95
96    #[inline]
97    fn threshold(&self) -> usize {
98        ADAPTIVE_LEVELS[self.step as usize]
99    }
100}
101
102/// Shared state between [`SlabOwner`] and any number of [`SlabRemote`] handles.
103///
104/// Layout note: `remote_queue` is wrapped in `CachePadded` so the
105/// cross-thread mutex sits on its own cache line. Without this,
106/// `SlabRemote::deallocate` (which slams the mutex word) would evict the
107/// owner-only `slab` field from L1 on every remote push, costing the
108/// owner a cache miss on every subsequent `allocate`.
109struct SlabInner<T, B: Allocator + forge_alloc_core::FixedRange> {
110    /// The actual slab. Mutated only by `SlabOwner` (we enforce single-owner
111    /// via `!Sync` on the owner type).
112    slab: core::cell::UnsafeCell<Slab<T, B>>,
113    /// Max queue depth before `try_deallocate` returns `Err`. Read-only after
114    /// construction so it's safe to share a line with `slab`.
115    queue_capacity: usize,
116    /// Remote-free queue: `(ptr, layout)` pairs queued for return to the local
117    /// freelist. Mutex-protected for v0.1; will become lock-free MPSC in v1.0.
118    /// Cache-line-isolated from `slab` to prevent false sharing — see above.
119    remote_queue: CachePadded<Mutex<VecDeque<RemoteEntry>>>,
120    /// Mirror of `remote_queue.lock().len()` updated under the same lock.
121    ///
122    /// Lets the owner-thread fast path in [`SlabOwner::maybe_drain`] sample
123    /// the queue depth with a single Relaxed load instead of acquiring the
124    /// mutex and reading `VecDeque::len()` — the prior design paid an
125    /// uncontended `try_lock + queue.len()` round-trip (~8–12 ns/op) on
126    /// every owner-side `allocate` / `deallocate`, even when the queue was
127    /// empty (the common case for owner-heavy workloads). The fast path
128    /// now collapses to a load + compare (~2–3 ns).
129    ///
130    /// Correctness: the counter is written under the queue mutex
131    /// (`store(q.len())` after every `push_back` / `drain`), so the
132    /// counter and the queue length are always in sync at lock-release
133    /// time. The owner reads with `Relaxed` outside the lock; staleness
134    /// is bounded by one remote push between the read and the next
135    /// `maybe_drain` tick. Since `maybe_drain` is called on every
136    /// `allocate` / `deallocate`, the queue cannot grow unboundedly past
137    /// the threshold — the next tick samples a fresh value and drains.
138    ///
139    /// Not wrapped in `CachePadded`. With the preceding
140    /// `CachePadded<Mutex<...>>` field rounding up to a fresh cache-line
141    /// boundary, `remote_queue_len` already starts on a different cache
142    /// line from the mutex word — it's not sharing a line whether we pad
143    /// it or not. Adding a second `CachePadded` here would just bloat the
144    /// struct by another cache line for no perf benefit; the field is
145    /// already isolated from the mutex's cache traffic. (An earlier
146    /// version of this comment claimed the field shared a line with the
147    /// mutex on purpose — that was never actually the case post-padding.)
148    remote_queue_len: AtomicUsize,
149    /// Set by [`SlabOwner::drop`] under the `remote_queue` mutex.
150    ///
151    /// Once `true`, no thread will ever drain the queue again, so
152    /// [`SlabRemote::try_deallocate`] returns `Err(ptr)` and the
153    /// infallible [`SlabRemote::deallocate`] bails out instead of
154    /// spinning forever. The flag lives next to the queue rather than
155    /// on `SlabOwner` because `SlabRemote` clones outlive the owner;
156    /// they read it through their shared `Arc<SlabInner>`.
157    closed: AtomicBool,
158}
159
160impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabInner<T, B> {
161    /// Layout-pin: `slab` (owner-only, hot) and `remote_queue` (cross-thread
162    /// mutex word, hammered by remotes) must live on different cache lines.
163    /// Without this, every remote `push_back` would invalidate the owner's
164    /// L1-cached `slab` field and cost the owner a cache miss on its next
165    /// allocate. The `CachePadded` wrapper on `remote_queue` enforces it;
166    /// this const fires a compile error if a future refactor unwraps or
167    /// reorders the fields. Forced to evaluate by reference in
168    /// [`SlabOwner::with_batch_policy`].
169    const LAYOUT_PIN: () = {
170        use core::mem::offset_of;
171        let slab_off = offset_of!(SlabInner<T, B>, slab);
172        let queue_off = offset_of!(SlabInner<T, B>, remote_queue);
173        assert!(
174            slab_off / CACHE_LINE != queue_off / CACHE_LINE,
175            "SlabInner layout regression: `slab` and `remote_queue` share a cache line",
176        );
177    };
178}
179
180/// One entry in the remote-free queue: the (ptr, layout) pair to deallocate.
181#[derive(Copy, Clone)]
182struct RemoteEntry {
183    ptr: NonNull<u8>,
184    layout: NonZeroLayout,
185}
186
187// SAFETY: NonNull<u8> is !Send by default, but the mapping it points to is
188// owned by the slab inside the same Arc; sending the entry between threads
189// is equivalent to sending a u8 address — sound when the slab is Send.
190unsafe impl Send for RemoteEntry {}
191
192/// Owns the slab. Has exclusive `allocate` access. `Send` (can be moved
193/// across threads) but `!Sync` (one-at-a-time access — enforced by
194/// `UnsafeCell` on the inner slab plus our manual `Send` impl with no
195/// corresponding `Sync` impl).
196///
197/// # API-misuse compile-failures (pinned)
198///
199/// `SlabOwner` is `!Sync` by construction (`_not_sync:
200/// PhantomData<Cell<()>>`); a future refactor that accidentally
201/// rederived `Sync` would let two threads share an `&SlabOwner` and race
202/// on the `UnsafeCell<Slab>` inside `inner.slab`. The compile_fail
203/// below pins that rejection:
204///
205/// ```compile_fail
206/// // FAILS TO COMPILE: SlabOwner is deliberately !Sync. The
207/// // `_not_sync: PhantomData<Cell<()>>` marker blocks the auto-derive,
208/// // and `assert_sync` cannot accept a `!Sync` type.
209/// use forge_alloc::InlineBacked;
210/// use forge_alloc::SlabOwner;
211/// fn assert_sync<T: Sync>() {}
212/// assert_sync::<SlabOwner<u64, InlineBacked<512>>>();
213/// ```
214pub struct SlabOwner<T, B: Allocator + forge_alloc_core::FixedRange> {
215    inner: Arc<SlabInner<T, B>>,
216    batch_policy: BatchPolicy,
217    /// Adaptive-policy state. Cell because `maybe_drain` takes `&self`
218    /// and needs to mutate; `!Sync` guarantees only the owning thread
219    /// reads or writes it. Unused when `batch_policy` is `Fixed`.
220    adaptive: core::cell::Cell<AdaptiveState>,
221    /// Hold a `!Sync` marker so we definitely don't accidentally derive
222    /// `Sync` if all other fields become `Sync` later. `Arc<SlabInner<...>>`
223    /// is `Sync` (it has to be to be shared between `SlabOwner` and
224    /// `SlabRemote`), so without this we'd lose the `!Sync` guarantee.
225    /// (The `Cell<AdaptiveState>` above is already `!Sync`, but keep
226    /// the explicit marker so a future field swap can't accidentally
227    /// re-enable `Sync`.)
228    _not_sync: core::marker::PhantomData<core::cell::Cell<()>>,
229}
230
231/// Remote deallocation handle. `Send + Sync` — freely cloneable across
232/// threads. Implements [`Deallocator`] only; cannot allocate.
233///
234/// # API-misuse compile-failures (pinned)
235///
236/// `SlabRemote<T, B>` is `Send + Sync` only when `T: Send` (and
237/// `B: Send`). Instantiating with a non-`Send` `T` (e.g. `Rc<u64>`)
238/// and then trying to ship the remote across threads is rejected at
239/// compile time:
240///
241/// ```compile_fail
242/// // FAILS TO COMPILE: SlabRemote's `Send` bound requires `T: Send`,
243/// // so `SlabRemote<Rc<u64>, _>` is not `Send` and cannot satisfy
244/// // `assert_send`.
245/// use std::rc::Rc;
246/// use forge_alloc::InlineBacked;
247/// use forge_alloc::SlabRemote;
248/// fn assert_send<T: Send>() {}
249/// assert_send::<SlabRemote<Rc<u64>, InlineBacked<512>>>();
250/// ```
251#[derive(Clone)]
252pub struct SlabRemote<T, B: Allocator + forge_alloc_core::FixedRange> {
253    inner: Arc<SlabInner<T, B>>,
254}
255
256impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabOwner<T, B> {
257    /// Construct, taking ownership of a freshly-built slab.
258    pub fn new(capacity: usize, backing: B) -> Result<Self, AllocError> {
259        Self::with_batch_policy(capacity, backing, BatchPolicy::default(), 1024)
260    }
261
262    /// Construct with explicit batch policy and queue capacity.
263    pub fn with_batch_policy(
264        capacity: usize,
265        backing: B,
266        batch_policy: BatchPolicy,
267        queue_capacity: usize,
268    ) -> Result<Self, AllocError> {
269        // Force evaluation of the layout-pin const for this instantiation
270        // of (T, B). Compiles away to nothing; fails the build if a future
271        // refactor reshuffles SlabInner so that `slab` and `remote_queue`
272        // share a cache line, restoring the false-sharing problem.
273        let _: () = SlabInner::<T, B>::LAYOUT_PIN;
274        let slab = Slab::new(capacity, backing)?;
275        let inner = Arc::new(SlabInner {
276            slab: core::cell::UnsafeCell::new(slab),
277            queue_capacity,
278            remote_queue: CachePadded::new(Mutex::new(VecDeque::with_capacity(queue_capacity))),
279            remote_queue_len: AtomicUsize::new(0),
280            closed: AtomicBool::new(false),
281        });
282        Ok(Self {
283            inner,
284            batch_policy,
285            adaptive: core::cell::Cell::new(AdaptiveState::initial()),
286            _not_sync: core::marker::PhantomData,
287        })
288    }
289
290    /// Create a remote handle. Cheap — just an `Arc` clone.
291    pub fn remote(&self) -> SlabRemote<T, B> {
292        SlabRemote {
293            inner: Arc::clone(&self.inner),
294        }
295    }
296
297    /// Drain the remote-free queue into the local freelist now.
298    ///
299    /// Holds the queue mutex *only* long enough to swap out the pending
300    /// entries; releases the lock before calling `slab.deallocate` for
301    /// each entry. Without this two-phase pattern, remote senders would
302    /// be blocked through the entire drain loop — death by lock-hold time
303    /// proportional to queue depth.
304    pub fn drain(&self) {
305        // Phase 1: under the lock, snapshot the queue into a local Vec.
306        // `drain(..).collect()` empties the deque without resetting its
307        // capacity, so subsequent `push_back`s from `SlabRemote` continue
308        // to use the pre-allocated buffer rather than re-allocating.
309        let pending: Vec<RemoteEntry> = {
310            let mut q = self
311                .inner
312                .remote_queue
313                .lock()
314                .expect("SlabOwner remote queue poisoned");
315            // Fast-path the empty case: avoid the Vec allocation when the
316            // queue has nothing. `drain(..).collect()` on an empty deque
317            // still allocates a zero-length Vec on most allocators (the
318            // standard Vec sets `cap = 0` so this is actually cheap, but
319            // skipping the empty `for` below also saves a function call
320            // boundary that the optimizer can't always elide because
321            // `slab` is behind UnsafeCell).
322            if q.is_empty() {
323                // Invariant: every push and drain maintains
324                // `remote_queue_len == q.len()` under the lock, so an
325                // empty queue means the mirror is already 0. Assert it
326                // here so a future bookkeeping regression fires loudly
327                // in debug instead of silently being papered over by an
328                // unconditional store.
329                debug_assert_eq!(
330                    self.inner.remote_queue_len.load(Ordering::Relaxed),
331                    0,
332                    "drain: queue empty but remote_queue_len mirror non-zero — push site dropped a mirror update",
333                );
334                return;
335            }
336            let entries: Vec<_> = q.drain(..).collect();
337            // Mirror reset MUST stay inside the critical section so the next
338            // owner-fast-path Relaxed load sees a consistent (queue, mirror)
339            // pair after lock release.
340            self.inner.remote_queue_len.store(0, Ordering::Relaxed);
341            entries
342        };
343        // Lock dropped here. Remote senders can resume pushing.
344
345        // Phase 2: process locally. SAFETY: !Sync on SlabOwner ensures
346        // we're the only thread touching the slab via this owner reference.
347        //
348        // We use `&*` (Shared retag) rather than `&mut *` (Unique retag) here:
349        // every `Slab` method we invoke (allocate / deallocate / base / size /
350        // capacity_bytes) takes `&self` and handles its own interior mutation
351        // through `UnsafeCell`. A `&mut Slab` would create a Unique retag
352        // covering the full slab AND its embedded backing buffer, which
353        // invalidates the SharedReadWrite tag the backing returned via
354        // `InlineBacked::buffer_base` (and which the live slot pointers were
355        // derived from). Miri caught this as a Stacked Borrows violation
356        // when `Slab::deallocate` later wrote the freelist link
357        // through one of those slot pointers.
358        let slab = unsafe { &*self.inner.slab.get() };
359        for entry in pending {
360            // SAFETY: the entry came from our SlabRemote::deallocate caller,
361            // who promised (ptr, layout) was issued by this slab.
362            unsafe { slab.deallocate(entry.ptr, entry.layout) };
363        }
364    }
365
366    /// Internal: check the batch-policy condition and drain if met.
367    ///
368    /// Hot path: called from every owner-side `allocate` and `deallocate`.
369    /// Two-tier design:
370    ///
371    /// 1. **Fast path** — sample the cached `remote_queue_len` mirror with
372    ///    a single `Relaxed` load. The mirror is maintained under the
373    ///    queue mutex by `SlabRemote::try_deallocate` (on push) and by
374    ///    `drain` / `maybe_drain` (on consume), so it tracks the queue
375    ///    depth at lock-release granularity. If the observed depth is
376    ///    below threshold we return immediately — no mutex acquisition,
377    ///    no `VecDeque::len()` indirection. Cost: one Relaxed load +
378    ///    compare (~2–3 ns on x86_64 / AArch64).
379    /// 2. **Slow path** — depth ≥ threshold. We `try_lock` the queue and,
380    ///    if uncontended, drain in one shot. `try_lock` (not `lock`) so
381    ///    we don't contend with an in-flight remote push; the next tick
382    ///    will catch up.
383    ///
384    /// Bounded staleness: the Relaxed load may observe a value behind a
385    /// concurrent remote push that hasn't yet released the mutex. The
386    /// drift is at most one push per tick; since `maybe_drain` is called
387    /// on every owner alloc/dealloc, the queue cannot grow unboundedly.
388    /// (If the owner is genuinely idle for long stretches, the queue is
389    /// also bounded by `queue_capacity` — remote pushes start rejecting
390    /// at that point.)
391    ///
392    /// Why we don't update the mirror outside the lock: the mirror and
393    /// the queue must stay in sync at lock-release time, otherwise the
394    /// owner's Relaxed load could observe an inconsistent pair (mirror=0
395    /// while queue has pending entries → owner skips draining and the
396    /// remote-pushed slot leaks until the next tick that happens to
397    /// observe a non-zero mirror). Keeping both writes inside the same
398    /// critical section makes the consistency check trivial.
399    fn maybe_drain(&self) {
400        // Fast path: relaxed load of the queue-length mirror. The
401        // updates to this counter live inside the queue mutex
402        // critical sections (push in `SlabRemote::try_deallocate`,
403        // reset in `drain` / `maybe_drain` / `Drop`), so a load that
404        // observes value `n` means the queue had ≥ n entries at the
405        // most recent mutex unlock visible to this CPU — possibly more
406        // since then, but never artificially inflated.
407        let pending = self.inner.remote_queue_len.load(Ordering::Relaxed);
408        let threshold = match self.batch_policy {
409            BatchPolicy::Fixed(n) => n,
410            BatchPolicy::Adaptive => self.adaptive_threshold(pending),
411        };
412        if pending < threshold {
413            return;
414        }
415
416        // Slow path: threshold tripped. Take the queue under `try_lock`
417        // and drain in one shot. `try_lock` (not `lock`) so we don't
418        // serialize against an in-flight remote push; the next tick
419        // will catch up.
420        let entries: Vec<RemoteEntry> = {
421            let mut q = match self.inner.remote_queue.try_lock() {
422                Ok(q) => q,
423                Err(_) => return, // contended; skip — we'll catch up next time
424            };
425            // Re-check inside the lock — the queue may have been drained
426            // by another owner-thread path (e.g. an `allocate` that
427            // bottomed out and called `drain()` between our load and
428            // try_lock). Skip the Vec allocation in that case.
429            if q.is_empty() {
430                // Mirror invariant: empty queue ⇒ mirror == 0.
431                debug_assert_eq!(
432                    self.inner.remote_queue_len.load(Ordering::Relaxed),
433                    0,
434                    "maybe_drain: queue empty but remote_queue_len mirror non-zero",
435                );
436                return;
437            }
438            let entries: Vec<_> = q.drain(..).collect();
439            self.inner.remote_queue_len.store(0, Ordering::Relaxed);
440            entries
441        };
442        // Lock dropped. Process the snapshot under exclusive !Sync access.
443        // SAFETY: SlabOwner is !Sync, so no other thread holds an alias
444        // to the slab. `&*` (Shared retag) is sufficient — see `drain()`
445        // for the rationale; `Slab::deallocate` takes `&self`.
446        let slab = unsafe { &*self.inner.slab.get() };
447        for entry in entries {
448            // SAFETY: the entry came from our SlabRemote::deallocate caller,
449            // who promised (ptr, layout) was issued by this slab.
450            unsafe { slab.deallocate(entry.ptr, entry.layout) };
451        }
452    }
453
454    /// Update the adaptive state based on observed `pending` queue
455    /// depth and return the current threshold.
456    ///
457    /// Step DOWN (smaller batch, drain sooner) when `pending` exceeds
458    /// 75% of `queue_capacity` — queue is filling up, relieve back-
459    /// pressure on remote senders.
460    ///
461    /// Step UP (larger batch, drain later) when `pending` is below
462    /// 25% of `queue_capacity` — queue is mostly empty, amortize the
463    /// drain cost.
464    ///
465    /// A cooldown of [`ADAPTIVE_COOLDOWN_TICKS`] calls between
466    /// adjustments prevents oscillation around a band edge. All
467    /// arithmetic integer; the 0.25 / 0.75 bands are encoded as
468    /// `pending * 4 < cap` and `pending * 4 > cap * 3`.
469    fn adaptive_threshold(&self, pending: usize) -> usize {
470        let mut state = self.adaptive.get();
471        if state.cooldown > 0 {
472            state.cooldown -= 1;
473        } else {
474            let cap = self.inner.queue_capacity;
475            // q > 75% — step down toward smaller batch.
476            if pending.saturating_mul(4) > cap.saturating_mul(3) && state.step > 0 {
477                state.step -= 1;
478                state.cooldown = ADAPTIVE_COOLDOWN_TICKS;
479            // q < 25% — step up toward larger batch.
480            } else if pending.saturating_mul(4) < cap && state.step < 4 {
481                state.step += 1;
482                state.cooldown = ADAPTIVE_COOLDOWN_TICKS;
483            }
484        }
485        self.adaptive.set(state);
486        state.threshold()
487    }
488
489    /// Current adaptive-step threshold (in remote-queue entries), or
490    /// `None` if the owner is configured with `BatchPolicy::Fixed`.
491    /// Useful in tests and adaptive-tuning telemetry.
492    #[inline]
493    pub fn adaptive_threshold_snapshot(&self) -> Option<usize> {
494        match self.batch_policy {
495            BatchPolicy::Adaptive => Some(self.adaptive.get().threshold()),
496            BatchPolicy::Fixed(_) => None,
497        }
498    }
499}
500
501unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Deallocator for SlabOwner<T, B> {
502    #[inline]
503    unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: NonZeroLayout) {
504        // Drain pending remote deallocations on the owner-side dealloc
505        // path so a long-lived owner that allocates rarely (or never)
506        // and only deallocs locally still services the remote queue.
507        // Without this, the remote queue accumulates indefinitely on
508        // dealloc-heavy workloads.
509        self.maybe_drain();
510        // Owner-side dealloc: direct push to local freelist (no queue).
511        // SAFETY: !Sync ensures exclusive access to the slab. `&*` (Shared
512        // retag) is sufficient — Slab::deallocate takes `&self` and uses
513        // interior mutability for the freelist head.
514        let slab = unsafe { &*self.inner.slab.get() };
515        unsafe { slab.deallocate(ptr, layout) };
516    }
517}
518
519unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Allocator for SlabOwner<T, B> {
520    #[inline]
521    fn allocate(&self, layout: NonZeroLayout) -> Result<NonNull<[u8]>, AllocError> {
522        self.maybe_drain();
523        // SAFETY: !Sync on SlabOwner — exclusive access to inner slab.
524        // `&*` (Shared retag) rather than `&mut *`: `Slab::allocate` takes
525        // `&self`, and a `&mut Slab` retag would invalidate the inner
526        // backing's SharedReadWrite tag covering its storage region —
527        // every previously-issued slot pointer that the slab's freelist
528        // (or its consumers) still holds would become a stale tag, which
529        // is UB under Stacked Borrows.
530        let slab = unsafe { &*self.inner.slab.get() };
531        // If first attempt fails (local list empty + uncarved exhausted),
532        // drain in case the queue has frees we can recover.
533        match slab.allocate(layout) {
534            Ok(block) => Ok(block),
535            Err(_) => {
536                // NLL ends the borrow of `slab` after its last use above (the
537                // failing allocate call). `drain()` re-borrows the UnsafeCell
538                // through its own &self path; the prior &Slab is no longer
539                // live, so the second & creation does not violate aliasing.
540                self.drain();
541                // SAFETY: !Sync, re-borrow shared access. The drain()
542                // borrow has already ended by the time control reaches here.
543                let slab = unsafe { &*self.inner.slab.get() };
544                slab.allocate(layout)
545            }
546        }
547    }
548
549    #[inline]
550    unsafe fn usable_size(&self, ptr: NonNull<u8>, layout: NonZeroLayout) -> Option<usize> {
551        // Forward to the inner Slab, which reports the full slot stride — so an
552        // outer scrub wrapper (`PoisonOnFree`/`ZeroizeOnFree`) over a
553        // `SlabOwner` wipes the whole slot on free, not just the requested
554        // prefix. Mirrors the `capacity_bytes`/`corruption_events` forwards.
555        // SAFETY: !Sync — exclusive access; `ptr` came from the inner slab's
556        // allocate via this owner, and the caller upholds usable_size's
557        // contract.
558        let slab = unsafe { &*self.inner.slab.get() };
559        unsafe { slab.usable_size(ptr, layout) }
560    }
561
562    fn capacity_bytes(&self) -> Option<usize> {
563        // SAFETY: !Sync — exclusive access.
564        let slab = unsafe { &*self.inner.slab.get() };
565        slab.capacity_bytes()
566    }
567
568    #[inline]
569    fn corruption_events(&self) -> u64 {
570        // SAFETY: !Sync — exclusive access. The owner-thread is the only
571        // reader of this counter via the SlabOwner allocate path; remote
572        // pushes don't touch the inner Slab's counter directly (they
573        // enqueue and the owner drains).
574        let slab = unsafe { &*self.inner.slab.get() };
575        slab.corruption_events()
576    }
577}
578
579/// Final drain on owner drop. Without this:
580///
581/// - `SlabRemote` clones outliving the owner would push entries into
582///   `remote_queue` that nothing ever drains.
583/// - Slots queued for return (already routed by the remote, not yet
584///   drained by the owner) would never be reclaimed back into the
585///   slab's local freelist. The slab keeps those slots marked-live
586///   for as long as the last remote keeps the `Arc<SlabInner>` alive
587///   — operationally a slot-table leak. Note that `T`'s destructor is
588///   the remote caller's responsibility BEFORE calling
589///   `SlabRemote::deallocate` (per the `Slab::deallocate` safety
590///   contract); the drain here only reclaims the freelist entry, it
591///   does not run `T::drop`.
592/// - Subsequent `SlabRemote::deallocate` (the spinning, infallible
593///   variant) would spin forever on a full queue, hanging the calling
594///   thread.
595///
596/// We close the queue while holding its mutex (race-free against any
597/// in-flight remote push) and drain the pending entries into the local
598/// freelist. After this, the slab is consistent and any further remote
599/// push observes `closed == true` and returns `Err(ptr)` /
600/// no-ops without spinning.
601impl<T, B: Allocator + forge_alloc_core::FixedRange> Drop for SlabOwner<T, B> {
602    fn drop(&mut self) {
603        // Phase 1: close + snapshot under the queue mutex.
604        //
605        // The close MUST be set under the mutex so a concurrent remote
606        // push that has acquired the lock either:
607        //  (a) sees `closed == false` and pushes — we drain it in this
608        //      same critical section below.
609        //  (b) sees `closed == true` (after we set it) and returns
610        //      `Err(ptr)` without pushing — the caller keeps ownership.
611        //
612        // No remote push can slip in between "we drain" and "we set
613        // closed" because both happen under the same lock acquisition.
614        let pending: Vec<RemoteEntry> = {
615            let mut q = match self.inner.remote_queue.lock() {
616                Ok(q) => q,
617                // Mutex poisoned — a prior holder panicked while
618                // mutating the queue. Recover the entries; the panic
619                // path is already broken, so don't compound it by
620                // bailing out on poison.
621                Err(p) => p.into_inner(),
622            };
623            // Release ordering pairs with Acquire in
624            // `SlabRemote::try_deallocate`'s closed-check. The mutex
625            // already provides happens-before for the queue contents,
626            // but we publish the flag separately so a remote that
627            // already holds the lock will observe the store via the
628            // unlock/relock chain.
629            self.inner.closed.store(true, Ordering::Release);
630            let entries: Vec<_> = q.drain(..).collect();
631            // Mirror reset under the same critical section as the drain
632            // so an `Arc<SlabInner>` still alive via `SlabRemote` clones
633            // post-owner-drop reflects "queue empty + closed" rather
634            // than a stale length. No fast path consumes the mirror
635            // after the owner is gone, but keep it consistent for
636            // debugging / future invariants.
637            self.inner.remote_queue_len.store(0, Ordering::Relaxed);
638            entries
639        };
640        // Phase 2: process the drained entries under our exclusive
641        // !Sync access. SAFETY: SlabOwner is !Sync, so no other thread
642        // holds an alias to the slab. The remote queue is now closed;
643        // no new alias will appear.
644        //
645        // **Drop-during-unwind escalation**: if `slab.deallocate`
646        // panics inside the drain loop (e.g. a wrapping
647        // `Statistics::deallocate`'s `debug_assert!` fires on an
648        // underflow), the loop aborts and the **remaining `pending`
649        // entries are dropped without being routed to the slab** — the
650        // slots are leaked. The `Arc<SlabInner>` itself still drops
651        // normally; the leak is just the freelist re-link work.
652        // If this Drop is running *during* an existing panic-unwind,
653        // the second panic here triggers **immediate process abort**
654        // per Rust language rules. The remote queue is already closed
655        // (Phase 1) so no further pushes can succeed, regardless of
656        // outcome — the closed-flag promise is preserved even on the
657        // abort path.
658        // `&*` (Shared retag) — Slab::deallocate takes `&self`; see the
659        // Stacked Borrows rationale on `drain()`.
660        let slab = unsafe { &*self.inner.slab.get() };
661        for entry in pending {
662            // SAFETY: the entry was pushed by a `SlabRemote::try_deallocate`
663            // caller who promised the pointer originated from this slab.
664            unsafe { slab.deallocate(entry.ptr, entry.layout) };
665        }
666        // Defense-in-depth: after Phase 1 closed the queue and Phase 2
667        // drained it under the same critical section / single-thread
668        // ownership, no entries can be added. The queue must now be
669        // empty for the `closed == true` promise to hold. Re-acquire
670        // (poisoned-recover) to inspect — under unwind this may itself
671        // panic (poisoned-then-abort), which is the desired safety
672        // posture for a corrupted Drop path. Compiled out in release.
673        debug_assert!(
674            self.inner
675                .remote_queue
676                .lock()
677                .map(|q| q.is_empty())
678                .unwrap_or_else(|p| p.into_inner().is_empty()),
679            "SlabOwner::drop: remote_queue non-empty after Phase 1+2 drain — \
680             closed-flag invariant violated, a remote push raced past the close",
681        );
682        debug_assert_eq!(
683            self.inner.remote_queue_len.load(Ordering::Relaxed),
684            0,
685            "SlabOwner::drop: remote_queue_len mirror non-zero after drain — \
686             push-side store missed the lock or drain-side reset was elided",
687        );
688        // The slab itself (and any T: Drop in still-live slots not
689        // routed through the queue) drops when the last
690        // `Arc<SlabInner>` is released — i.e., when the last
691        // `SlabRemote` clone is dropped. That's outside our control,
692        // but the closed flag keeps the queue from growing in the
693        // meantime.
694    }
695}
696
697impl<T, B: Allocator + forge_alloc_core::FixedRange> FixedRange for SlabOwner<T, B> {
698    fn base(&self) -> NonNull<u8> {
699        // SAFETY: !Sync — exclusive access.
700        let slab = unsafe { &*self.inner.slab.get() };
701        slab.base()
702    }
703
704    fn size(&self) -> usize {
705        // SAFETY: !Sync — exclusive access.
706        let slab = unsafe { &*self.inner.slab.get() };
707        slab.size()
708    }
709}
710
711impl<T, B: Allocator + forge_alloc_core::FixedRange> SlabRemote<T, B> {
712    /// Non-spinning remote deallocate. Returns `Err(ptr)` if the queue is
713    /// full **or** the owner has been dropped (queue is closed); caller
714    /// retains ownership and must handle the pointer (typically by
715    /// dropping it once the whole slab tears down).
716    ///
717    /// # Safety
718    ///
719    /// `ptr` must have been allocated from the corresponding `SlabOwner`.
720    /// On `Err`, the pointer is still owned by the caller.
721    pub unsafe fn try_deallocate(
722        &self,
723        ptr: NonNull<u8>,
724        layout: NonZeroLayout,
725    ) -> Result<(), NonNull<u8>> {
726        let mut q = self
727            .inner
728            .remote_queue
729            .lock()
730            .expect("SlabRemote queue poisoned");
731        // Closed check under the lock pairs with the close-under-lock
732        // in `SlabOwner::drop`: any push that observes `closed == false`
733        // here is guaranteed to be drained by the owner's final
734        // drain-and-close critical section (which can't interleave with
735        // ours).
736        if self.inner.closed.load(Ordering::Acquire) {
737            return Err(ptr);
738        }
739        if q.len() >= self.inner.queue_capacity {
740            return Err(ptr);
741        }
742        q.push_back(RemoteEntry { ptr, layout });
743        // Update the owner-fast-path mirror under the lock. `store(q.len())`
744        // rather than `fetch_add(1)` so the post-store value matches the
745        // queue length at lock-release time exactly.
746        //
747        // Ordering: `Relaxed` is sufficient because the mirror is
748        // **advisory only**. The owner's fast path samples the counter
749        // outside the lock (no formal happens-before edge with this
750        // store), but the slow path then re-acquires the queue mutex
751        // and re-verifies the queue itself before draining — so any
752        // visibility lag of the Relaxed load is bounded by one mutex
753        // round-trip. On real hardware the Relaxed store is also
754        // committed by the unlock fence, so steady-state owner reads
755        // observe the update within a few hundred nanoseconds of the
756        // push. See `maybe_drain` for the bounded-staleness argument.
757        self.inner
758            .remote_queue_len
759            .store(q.len(), Ordering::Relaxed);
760        Ok(())
761    }
762}
763
764unsafe impl<T, B: Allocator + forge_alloc_core::FixedRange> Deallocator for SlabRemote<T, B> {
765    /// Spins until the queue accepts the deallocation, **except** when
766    /// the owner has been dropped — in that case the queue will never
767    /// drain again, so we return immediately. The slot remains
768    /// marked-live in the slab until the last `SlabRemote` clone
769    /// releases the shared `Arc<SlabInner>` (at which point the slab
770    /// itself drops and the backing region is fully reclaimed). Any
771    /// `T: Drop` whose destructor was already run by the caller before
772    /// `deallocate` is not affected — the destructor ran on schedule;
773    /// only the freelist entry for the slot is forfeited.
774    ///
775    /// Latency-sensitive callers should use [`try_deallocate`](Self::try_deallocate)
776    /// and handle the `Err(ptr)` overflow / closed-queue explicitly.
777    unsafe fn deallocate(&self, ptr: NonNull<u8>, layout: NonZeroLayout) {
778        let mut p = ptr;
779        loop {
780            // SAFETY: forwarded from caller.
781            match unsafe { self.try_deallocate(p, layout) } {
782                Ok(()) => return,
783                Err(returned) => {
784                    // Distinguish "queue closed, owner gone" (permanent —
785                    // bail to avoid an infinite spin) from "queue full,
786                    // transient" (continue spinning).
787                    if self.inner.closed.load(Ordering::Acquire) {
788                        return;
789                    }
790                    p = returned;
791                    core::hint::spin_loop();
792                }
793            }
794        }
795    }
796}
797
798// SlabOwner is Send but !Sync. The Arc<SlabInner> is Send when its contents
799// are; we have UnsafeCell<Slab<T, B>> inside, which is !Sync by default —
800// good. But SlabInner needs to be Sync (because Arc<SlabInner> being shared
801// across Owner + Remote(s) requires Sync) — the Mutex provides Sync for the
802// queue, and the UnsafeCell-wrapped slab is accessed only by the !Sync
803// Owner. Manual impl needed.
804unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabOwner<T, B> {}
805// SlabOwner is deliberately !Sync — `_not_sync: PhantomData<Cell<()>>`
806// blocks the auto-derive. We rely on this for soundness: if two threads
807// could share `&SlabOwner`, both could call `allocate` and race on the
808// UnsafeCell<Slab<T, B>>.
809
810unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabRemote<T, B> {}
811unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Sync for SlabRemote<T, B> {}
812
813// The Arc<SlabInner> requires SlabInner: Send + Sync.
814unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Send for SlabInner<T, B> {}
815unsafe impl<T: Send, B: Allocator + forge_alloc_core::FixedRange + Send> Sync for SlabInner<T, B> {}
816
817#[cfg(test)]
818mod tests {
819    use super::*;
820    use crate::backing::InlineBacked;
821
822    /// Positive companion to the `SlabOwner !Sync` compile_fail pin: the
823    /// owner IS `Send` (single-threaded ownership transfer is the v0.1
824    /// use case). If a future refactor accidentally removed `Send`, the
825    /// `multi_threaded_remote_dealloc` test below would still compile but
826    /// this pin would fail loudly.
827    #[test]
828    fn slab_owner_is_send() {
829        fn assert_send<T: Send>() {}
830        assert_send::<SlabOwner<u64, InlineBacked<512>>>();
831    }
832
833    /// Positive companion to the `SlabRemote<!Send T>` compile_fail pin:
834    /// `SlabRemote<u64, InlineBacked<N>>` IS `Send + Sync` because both
835    /// `T = u64` and `B: Send`. Pinning this stops a refactor that
836    /// accidentally weakened the bound from breaking the cross-thread
837    /// deallocate API silently.
838    #[test]
839    fn slab_remote_is_send_and_sync_when_t_is_send() {
840        fn assert_send_sync<T: Send + Sync>() {}
841        assert_send_sync::<SlabRemote<u64, InlineBacked<512>>>();
842    }
843
844    #[test]
845    fn owner_can_alloc_and_dealloc_locally() {
846        let owner: SlabOwner<u64, InlineBacked<512>> =
847            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
848        let layout = NonZeroLayout::for_type::<u64>().unwrap();
849        let p = owner.allocate(layout).unwrap();
850        unsafe { owner.deallocate(p.cast(), layout) };
851    }
852
853    /// `usable_size` forwards to the inner Slab's full slot stride, so an outer
854    /// scrub wrapper wipes the whole slot. `SlabOwner<u8>` → 8-byte slots.
855    #[test]
856    fn usable_size_forwards_slot_stride() {
857        let owner: SlabOwner<u8, InlineBacked<512>> =
858            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
859        let layout = NonZeroLayout::from_size_align(1, 1).unwrap();
860        let p = owner.allocate(layout).unwrap();
861        let ptr = p.cast::<u8>();
862        let us = unsafe { owner.usable_size(ptr, layout) };
863        assert_eq!(
864            us,
865            Some(8),
866            "usable_size must forward the inner slot stride"
867        );
868        unsafe { owner.deallocate(ptr, layout) };
869    }
870
871    #[test]
872    fn remote_can_deallocate_owner_allocations() {
873        let owner: SlabOwner<u64, InlineBacked<512>> =
874            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
875        let remote = owner.remote();
876        let layout = NonZeroLayout::for_type::<u64>().unwrap();
877        let p = owner.allocate(layout).unwrap();
878        // Cross-thread deallocate via remote.
879        unsafe { remote.deallocate(p.cast(), layout) };
880        // Force a drain so the slot returns to the local freelist.
881        owner.drain();
882        // Subsequent allocate must reuse the freed slot.
883        let p2 = owner.allocate(layout).unwrap();
884        assert_eq!(p.cast::<u8>().as_ptr(), p2.cast::<u8>().as_ptr());
885    }
886
887    /// Boundary: `queue_capacity = 0` means every remote deallocation is
888    /// rejected — `try_deallocate` returns `Err(ptr)` on the very first
889    /// call. This is the documented contract; pinning it here protects
890    /// against an accidental "off-by-one allows one push" regression.
891    #[test]
892    fn queue_capacity_zero_rejects_every_remote_dealloc() {
893        let owner: SlabOwner<u64, InlineBacked<512>> =
894            SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 0)
895                .unwrap();
896        let remote = owner.remote();
897        let layout = NonZeroLayout::for_type::<u64>().unwrap();
898        let p = owner.allocate(layout).unwrap();
899        // Cap 0: first try_deallocate must already fail.
900        let r = unsafe { remote.try_deallocate(p.cast(), layout) };
901        assert!(
902            r.is_err(),
903            "queue_capacity=0 must reject every remote_deallocate",
904        );
905        // Clean up via owner to keep the slab balanced under the test.
906        unsafe { owner.deallocate(p.cast(), layout) };
907    }
908
909    /// Boundary: `queue_capacity = 1` accepts exactly one push; the
910    /// second must fail.
911    #[test]
912    fn queue_capacity_one_accepts_one_push_only() {
913        let owner: SlabOwner<u64, InlineBacked<512>> =
914            SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 1)
915                .unwrap();
916        let remote = owner.remote();
917        let layout = NonZeroLayout::for_type::<u64>().unwrap();
918        let a = owner.allocate(layout).unwrap();
919        let b = owner.allocate(layout).unwrap();
920        unsafe {
921            assert!(remote.try_deallocate(a.cast(), layout).is_ok());
922            assert!(remote.try_deallocate(b.cast(), layout).is_err());
923            // Drop the rejected pointer through the owner.
924            owner.deallocate(b.cast(), layout);
925        }
926    }
927
928    /// Boundary: adaptive policy at the floor (step 0 → threshold 8)
929    /// must not underflow the step counter on repeated step-down
930    /// attempts. We feed sustained > 75% queue depth and observe
931    /// that the threshold stops at the floor rather than wrapping.
932    #[test]
933    fn adaptive_policy_step_floor_does_not_underflow() {
934        let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
935            256,
936            InlineBacked::<8192>::new(),
937            BatchPolicy::Adaptive,
938            16,
939        )
940        .unwrap();
941        let remote = owner.remote();
942        let layout = NonZeroLayout::for_type::<u64>().unwrap();
943        // Pre-allocate a pool.
944        let pool: Vec<_> = (0..200).map(|_| owner.allocate(layout).unwrap()).collect();
945        let mut pushed = 0;
946        // Drive the policy to floor across many cycles.
947        for _ in 0..100 {
948            while pushed < pool.len() && {
949                let q = owner.inner.remote_queue.lock().unwrap();
950                q.len() < 14
951            } {
952                let _ = unsafe { remote.try_deallocate(pool[pushed].cast(), layout) };
953                pushed += 1;
954            }
955            let _ = owner.allocate(layout);
956        }
957        // Threshold must be at the floor (8) or one step up (16) — never
958        // a wrapped or invalid value.
959        let thr = owner.adaptive_threshold_snapshot().unwrap();
960        assert!(
961            ADAPTIVE_LEVELS.contains(&thr),
962            "threshold {thr} must remain one of the configured levels"
963        );
964    }
965
966    #[test]
967    fn try_deallocate_returns_err_on_full_queue() {
968        let owner: SlabOwner<u64, InlineBacked<512>> =
969            SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 2)
970                .unwrap();
971        let remote = owner.remote();
972        let layout = NonZeroLayout::for_type::<u64>().unwrap();
973        let a = owner.allocate(layout).unwrap();
974        let b = owner.allocate(layout).unwrap();
975        let c = owner.allocate(layout).unwrap();
976        unsafe {
977            remote.try_deallocate(a.cast(), layout).unwrap();
978            remote.try_deallocate(b.cast(), layout).unwrap();
979            // Queue capacity 2 — third must fail.
980            let r = remote.try_deallocate(c.cast(), layout);
981            assert!(r.is_err());
982            // Drop c via the owner so we don't leak under the test.
983            owner.deallocate(c.cast(), layout);
984        }
985    }
986
987    #[test]
988    #[cfg_attr(miri, ignore = "miri-incompatible: mmap / threads")]
989    fn multi_threaded_remote_dealloc() {
990        use std::sync::Arc;
991        use std::thread;
992
993        let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
994            256,
995            InlineBacked::<8192>::new(),
996            BatchPolicy::Fixed(64),
997            1024,
998        )
999        .unwrap();
1000        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1001
1002        // Pre-allocate a bunch on the owner thread, then send pointers off
1003        // to worker threads for cross-thread deallocate.
1004        let mut ptrs = Vec::new();
1005        for _ in 0..128 {
1006            ptrs.push(owner.allocate(layout).unwrap());
1007        }
1008        let ptrs_addrs: Vec<usize> = ptrs
1009            .iter()
1010            .map(|p| p.cast::<u8>().as_ptr() as usize)
1011            .collect();
1012        let remote = Arc::new(owner.remote());
1013
1014        let mut handles = Vec::new();
1015        for chunk in ptrs_addrs.chunks(32) {
1016            let chunk = chunk.to_vec();
1017            let r = Arc::clone(&remote);
1018            handles.push(thread::spawn(move || {
1019                for addr in chunk {
1020                    let p = unsafe { NonNull::new_unchecked(addr as *mut u8) };
1021                    unsafe { r.deallocate(p, layout) };
1022                }
1023            }));
1024        }
1025        for h in handles {
1026            h.join().unwrap();
1027        }
1028        owner.drain();
1029        // After drain, the freelist must hold exactly the 128 freed slots, so
1030        // re-allocating 128 must return those SAME slots (popped from the
1031        // freelist before any fresh carve from next_uncarved). Asserting the
1032        // re-allocated addresses equal the freed set is what actually proves
1033        // drain reclaimed them: a mere `is_ok()` check would pass even if drain
1034        // dropped every entry, because the slab would just carve fresh slots
1035        // from the remaining capacity (256 > 128).
1036        use std::collections::HashSet;
1037        let freed: HashSet<usize> = ptrs_addrs.iter().copied().collect();
1038        let mut reclaimed: HashSet<usize> = HashSet::new();
1039        for _ in 0..128 {
1040            let p = owner
1041                .allocate(layout)
1042                .expect("a reclaimed slot must be allocatable");
1043            let addr = p.cast::<u8>().as_ptr() as usize;
1044            assert!(
1045                freed.contains(&addr),
1046                "re-allocated slot {addr:#x} is not one of the drained slots — drain failed to reclaim",
1047            );
1048            reclaimed.insert(addr);
1049        }
1050        assert_eq!(
1051            reclaimed, freed,
1052            "the 128 re-allocated slots must be exactly the 128 drained slots",
1053        );
1054    }
1055
1056    // ====================================================================
1057    // BatchPolicy::Adaptive — stepped-threshold control law tests
1058    // ====================================================================
1059
1060    /// Helper: build a slab with `Adaptive` policy and a small queue
1061    /// so the 25%/75% hysteresis bands trigger on a few entries.
1062    fn build_adaptive_owner(queue_capacity: usize) -> SlabOwner<u64, InlineBacked<8192>> {
1063        SlabOwner::with_batch_policy(
1064            256,
1065            InlineBacked::<8192>::new(),
1066            BatchPolicy::Adaptive,
1067            queue_capacity,
1068        )
1069        .unwrap()
1070    }
1071
1072    #[test]
1073    fn adaptive_initial_threshold_is_64() {
1074        let owner = build_adaptive_owner(64);
1075        assert_eq!(owner.adaptive_threshold_snapshot(), Some(64));
1076    }
1077
1078    #[test]
1079    fn fixed_policy_has_no_adaptive_snapshot() {
1080        let owner: SlabOwner<u64, InlineBacked<512>> =
1081            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1082        // Default is Fixed(64).
1083        assert_eq!(owner.adaptive_threshold_snapshot(), None);
1084    }
1085
1086    #[test]
1087    fn adaptive_steps_down_when_queue_exceeds_75_percent() {
1088        // queue_capacity = 16; > 75% means q > 12 (since q*4 > 16*3 = 48 ⇔ q > 12).
1089        //
1090        // Path-dependence: every allocate calls maybe_drain, which observes
1091        // the queue and may step the policy. The 13 pre-allocates below
1092        // step the policy UP to the ceiling (128) on call #1 (queue empty
1093        // ⇒ q*4 < cap ⇒ step up) and engage a cooldown of
1094        // ADAPTIVE_COOLDOWN_TICKS = 16. Calls #2..#13 just decrement the
1095        // cooldown. After pushing 13 to the queue we need (16 − 12) = 4
1096        // more allocates to drain cooldown, then a 5th to observe the
1097        // step-down trigger. Total: 13 + 5 = 18 allocates.
1098        let owner = build_adaptive_owner(16);
1099        let remote = owner.remote();
1100        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1101        // 13 pre-allocates: step climbs to 128 on call 1, cooldown to 4 by 13.
1102        let ptrs: Vec<_> = (0..13).map(|_| owner.allocate(layout).unwrap()).collect();
1103        assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1104        // Push all 13 to the remote queue (q is now 13, > 75% of 16).
1105        for p in &ptrs {
1106            unsafe { remote.try_deallocate(p.cast(), layout).unwrap() };
1107        }
1108        // 4 more allocates to drain the remaining cooldown ticks. Each
1109        // observes q=13 but cooldown>0, so the band check is skipped.
1110        // Threshold stays at 128 (so q<threshold, no drain), queue stays at 13.
1111        for _ in 0..4 {
1112            let _ = owner.allocate(layout).unwrap();
1113            assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1114        }
1115        // 18th allocate: cooldown=0, band check fires, q*4=52 > cap*3=48,
1116        // step down from 4 → 3 ⇒ threshold 64.
1117        let _ = owner.allocate(layout).unwrap();
1118        assert_eq!(
1119            owner.adaptive_threshold_snapshot(),
1120            Some(64),
1121            "step should have dropped one level after q > 75% sample",
1122        );
1123    }
1124
1125    #[test]
1126    fn adaptive_steps_up_when_queue_below_25_percent() {
1127        // queue_capacity = 16; < 25% means q*4 < 16 ⇔ q < 4. q=0 qualifies.
1128        let owner = build_adaptive_owner(16);
1129        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1130        assert_eq!(owner.adaptive_threshold_snapshot(), Some(64));
1131        // One allocate observes empty queue → step up to 128.
1132        let _ = owner.allocate(layout).unwrap();
1133        assert_eq!(
1134            owner.adaptive_threshold_snapshot(),
1135            Some(128),
1136            "step should have climbed one level after q < 25% sample",
1137        );
1138    }
1139
1140    #[test]
1141    fn adaptive_cooldown_prevents_oscillation() {
1142        // After one step adjustment the cooldown counter prevents the
1143        // next ADAPTIVE_COOLDOWN_TICKS maybe_drain calls from changing
1144        // the step. Verify by checking the threshold stays put.
1145        let owner = build_adaptive_owner(16);
1146        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1147        // First call: step 3 → 4 (queue empty, < 25%).
1148        let _ = owner.allocate(layout).unwrap();
1149        let after_first = owner.adaptive_threshold_snapshot().unwrap();
1150        assert_eq!(after_first, 128);
1151        // Subsequent allocates DURING cooldown should not advance further.
1152        // The step is already at the ceiling (4) anyway, but the cooldown
1153        // also blocks any move; even if we wanted to drop down, we can't.
1154        for _ in 0..(ADAPTIVE_COOLDOWN_TICKS - 1) {
1155            let _ = owner.allocate(layout).unwrap();
1156            assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1157        }
1158    }
1159
1160    #[test]
1161    fn adaptive_ceiling_is_respected() {
1162        // Step 4 (= 128) is the ceiling. Climb to it via many empty-queue
1163        // ticks (with cooldown waits in between).
1164        let owner = build_adaptive_owner(16);
1165        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1166        // Each "step + cooldown_ticks" cycle moves the threshold one
1167        // level. 1 step from 3→4 puts us at the ceiling immediately.
1168        for _ in 0..(2 * ADAPTIVE_COOLDOWN_TICKS) {
1169            let _ = owner.allocate(layout).unwrap();
1170        }
1171        assert_eq!(owner.adaptive_threshold_snapshot(), Some(128));
1172    }
1173
1174    #[test]
1175    fn adaptive_floor_is_respected() {
1176        // Step 0 (= 8) is the floor. Push enough cross-thread frees to
1177        // keep the queue > 75% across repeated ticks; the policy should
1178        // step down 3→2→1→0 and then stop (no step below the floor).
1179        //
1180        // We need cap=16 (so q>12 triggers step down) and enough
1181        // pre-allocated pointers to keep the queue full as the owner
1182        // drains it on threshold crossings. After each step-down,
1183        // cooldown=16 protects 16 calls. To hit four step-downs
1184        // (3→2→1→0 plus a "floor reached" check) we need ~4 × 17 = 68
1185        // allocates plus enough queue refill in between.
1186        const SLAB_CAP: usize = 256;
1187        let owner: SlabOwner<u64, InlineBacked<8192>> = SlabOwner::with_batch_policy(
1188            SLAB_CAP,
1189            InlineBacked::<8192>::new(),
1190            BatchPolicy::Adaptive,
1191            16,
1192        )
1193        .unwrap();
1194        let remote = owner.remote();
1195        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1196        // Phase 1: pre-allocate a pool of pointers to push from.
1197        let pool: Vec<_> = (0..200).map(|_| owner.allocate(layout).unwrap()).collect();
1198        // Phase 2: feed the queue full before each drain so the
1199        // observed q stays > 75%. Strategy: tight loop — push 14 to the
1200        // queue, allocate once (decrements cooldown OR triggers
1201        // step-down on the cooldown=0 tick), continue.
1202        let mut pushed = 0usize;
1203        for cycle in 0..6 {
1204            // Top the queue back up to ~14 entries each cycle.
1205            while pushed < pool.len() && {
1206                let q = owner.inner.remote_queue.lock().unwrap();
1207                q.len() < 14
1208            } {
1209                // SAFETY: pool came from this owner.
1210                let _ = unsafe { remote.try_deallocate(pool[pushed].cast(), layout) };
1211                pushed += 1;
1212            }
1213            // Allocate once to trigger maybe_drain.
1214            let _ = owner.allocate(layout);
1215            // After enough cycles the step must hit the floor.
1216            if cycle == 5 {
1217                let thr = owner.adaptive_threshold_snapshot().unwrap();
1218                assert!(
1219                    thr <= 64,
1220                    "policy should have stepped down toward floor, got {thr}",
1221                );
1222            }
1223        }
1224    }
1225
1226    #[test]
1227    fn adaptive_levels_are_sorted_and_match_spec() {
1228        assert_eq!(ADAPTIVE_LEVELS, [8, 16, 32, 64, 128]);
1229    }
1230
1231    /// Regression: the `remote_queue_len` mirror tracks the queue
1232    /// length at lock-release granularity. Verifies the bookkeeping
1233    /// across push, drain, and `maybe_drain`:
1234    ///   - After N remote pushes, mirror == N.
1235    ///   - After `drain()`, mirror == 0 even though no allocate happened.
1236    ///   - After a fresh push following drain, mirror == 1 (no stale-add).
1237    #[test]
1238    fn remote_queue_len_mirror_tracks_lock_release_state() {
1239        let owner: SlabOwner<u64, InlineBacked<512>> =
1240            SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(64), 16)
1241                .unwrap();
1242        let remote = owner.remote();
1243        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1244
1245        // Start: mirror is 0.
1246        assert_eq!(owner.inner.remote_queue_len.load(Ordering::Relaxed), 0);
1247
1248        // Push three: mirror must be 3.
1249        let p1 = owner.allocate(layout).unwrap();
1250        let p2 = owner.allocate(layout).unwrap();
1251        let p3 = owner.allocate(layout).unwrap();
1252        unsafe {
1253            remote.try_deallocate(p1.cast(), layout).unwrap();
1254            remote.try_deallocate(p2.cast(), layout).unwrap();
1255            remote.try_deallocate(p3.cast(), layout).unwrap();
1256        }
1257        assert_eq!(
1258            owner.inner.remote_queue_len.load(Ordering::Relaxed),
1259            3,
1260            "mirror should equal the number of pushed entries"
1261        );
1262
1263        // Drain: mirror resets to 0 without an allocate tick.
1264        owner.drain();
1265        assert_eq!(
1266            owner.inner.remote_queue_len.load(Ordering::Relaxed),
1267            0,
1268            "drain() must reset the mirror under the same critical section"
1269        );
1270
1271        // Push one more — must be exactly 1, not 4.
1272        let p4 = owner.allocate(layout).unwrap();
1273        unsafe { remote.try_deallocate(p4.cast(), layout).unwrap() };
1274        assert_eq!(
1275            owner.inner.remote_queue_len.load(Ordering::Relaxed),
1276            1,
1277            "post-drain push must start from 0, not stale-add"
1278        );
1279
1280        // Cleanup so the slab balances under Drop's debug_assert.
1281        owner.drain();
1282    }
1283
1284    /// Regression: when an owner-side `allocate` tips the queue over
1285    /// threshold via `maybe_drain`, the mirror is reset under the same
1286    /// critical section as the actual VecDeque drain — otherwise a
1287    /// subsequent fast-path load would observe a stale non-zero value
1288    /// and keep firing the slow path with an empty queue.
1289    #[test]
1290    fn maybe_drain_resets_mirror_under_lock() {
1291        // Threshold low so a single tick drains.
1292        let owner: SlabOwner<u64, InlineBacked<512>> =
1293            SlabOwner::with_batch_policy(8, InlineBacked::<512>::new(), BatchPolicy::Fixed(2), 16)
1294                .unwrap();
1295        let remote = owner.remote();
1296        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1297        let p1 = owner.allocate(layout).unwrap();
1298        let p2 = owner.allocate(layout).unwrap();
1299        unsafe {
1300            remote.try_deallocate(p1.cast(), layout).unwrap();
1301            remote.try_deallocate(p2.cast(), layout).unwrap();
1302        }
1303        assert_eq!(owner.inner.remote_queue_len.load(Ordering::Relaxed), 2);
1304        // This allocate's maybe_drain hits the slow path (pending >= 2),
1305        // drains the two entries, and must reset the mirror.
1306        let _p3 = owner.allocate(layout).unwrap();
1307        assert_eq!(
1308            owner.inner.remote_queue_len.load(Ordering::Relaxed),
1309            0,
1310            "maybe_drain must reset the mirror inside the same critical section as the drain"
1311        );
1312    }
1313
1314    /// Regression: dropping the owner must drain pending remote frees
1315    /// (no T: Drop leaks beyond the natural Slab teardown) and close
1316    /// the queue so further remote pushes return `Err(ptr)` instead of
1317    /// piling into an undrainable queue.
1318    #[test]
1319    fn owner_drop_drains_pending_remote_frees_and_closes_queue() {
1320        let owner: SlabOwner<u64, InlineBacked<512>> =
1321            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1322        let remote = owner.remote();
1323        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1324
1325        // Pre-allocate two slots through the owner, then queue two
1326        // remote frees without draining.
1327        let a = owner.allocate(layout).unwrap();
1328        let b = owner.allocate(layout).unwrap();
1329        unsafe {
1330            remote.try_deallocate(a.cast(), layout).unwrap();
1331            remote.try_deallocate(b.cast(), layout).unwrap();
1332        }
1333        // Drop the owner. Drop impl must drain a and b into the local
1334        // freelist (otherwise the slab keeps them marked-live until the
1335        // last Arc drops).
1336        drop(owner);
1337
1338        // After owner drop, remote pushes must NOT block and must NOT
1339        // silently succeed — they return Err(ptr).
1340        let c_layout = layout;
1341        // Use a fake but well-formed pointer; we never call into the
1342        // slab from this path. (The Err path doesn't touch the slab.)
1343        let fake = unsafe { NonNull::new_unchecked(0x1000_usize as *mut u8) };
1344        let result = unsafe { remote.try_deallocate(fake, c_layout) };
1345        assert!(
1346            result.is_err(),
1347            "remote.try_deallocate must return Err after owner drop"
1348        );
1349    }
1350
1351    /// Regression: the spinning `Deallocator::deallocate` impl must
1352    /// bail out (not spin forever) when the queue has been closed by
1353    /// owner drop. Without the closed check, a long-running task
1354    /// holding a `SlabRemote` would hang when its `deallocate` runs
1355    /// after the owner is torn down.
1356    #[test]
1357    fn remote_deallocate_does_not_spin_after_owner_drop() {
1358        let owner: SlabOwner<u64, InlineBacked<512>> =
1359            SlabOwner::new(8, InlineBacked::<512>::new()).unwrap();
1360        let remote = owner.remote();
1361        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1362        let _p = owner.allocate(layout).unwrap();
1363        drop(owner);
1364        // After owner drop, this would spin forever before the fix.
1365        // The closed-check inside deallocate returns immediately, so
1366        // this completes synchronously.
1367        let fake = unsafe { NonNull::new_unchecked(0x1000_usize as *mut u8) };
1368        unsafe { remote.deallocate(fake, layout) };
1369    }
1370
1371    /// Multi-threaded stress test: owner + N remote workers running
1372    /// concurrent allocates/frees under the Adaptive policy. The test
1373    /// passes if (a) all allocated pointers come back distinct, (b)
1374    /// after-stop drain leaves the slab in a reusable state, and (c)
1375    /// the adaptive threshold has moved off its initial value at least
1376    /// once (i.e. the policy actually adapted, not just sat).
1377    #[test]
1378    #[cfg_attr(miri, ignore = "miri-incompatible: mmap / threads")]
1379    fn adaptive_multi_threaded_stress() {
1380        use std::collections::VecDeque;
1381        use std::sync::atomic::{AtomicBool, Ordering};
1382        use std::sync::{Arc, Mutex};
1383        use std::thread;
1384
1385        const CAP: usize = 1024;
1386        const QUEUE_CAP: usize = 64;
1387        let owner: SlabOwner<u64, InlineBacked<{ 16 * 1024 }>> = SlabOwner::with_batch_policy(
1388            CAP,
1389            InlineBacked::<{ 16 * 1024 }>::new(),
1390            BatchPolicy::Adaptive,
1391            QUEUE_CAP,
1392        )
1393        .unwrap();
1394        let layout = NonZeroLayout::for_type::<u64>().unwrap();
1395
1396        // Producer-consumer cycle so every address is remote-freed exactly
1397        // ONCE per allocation (the previous version re-freed a fixed pool
1398        // thousands of times — a wholesale double-free that exercised UB
1399        // rather than the adaptive path). The owner allocates LIVE blocks and
1400        // publishes their addresses to a shared work queue; remote workers pop
1401        // and remote-free them; the owner drains to reclaim the slots and
1402        // re-allocates. A slot only becomes allocatable again after it has been
1403        // remote-freed and drained, so the owner never re-issues a live slot —
1404        // no address is freed twice without an intervening allocate.
1405        let work: Arc<Mutex<VecDeque<usize>>> = Arc::new(Mutex::new(VecDeque::new()));
1406        let stop = Arc::new(AtomicBool::new(false));
1407
1408        let mut handles = Vec::new();
1409        for _ in 0..4 {
1410            let remote = owner.remote();
1411            let stop = Arc::clone(&stop);
1412            let work = Arc::clone(&work);
1413            handles.push(thread::spawn(move || {
1414                while !stop.load(Ordering::Relaxed) {
1415                    let addr = work.lock().unwrap().pop_front();
1416                    let Some(addr) = addr else {
1417                        thread::yield_now();
1418                        continue;
1419                    };
1420                    let p = unsafe { NonNull::new_unchecked(addr as *mut u8) };
1421                    // Retry until the bounded remote queue accepts it (the
1422                    // owner drains it concurrently). Each `addr` was a live
1423                    // owner-issued slot, so this frees it exactly once.
1424                    loop {
1425                        if unsafe { remote.try_deallocate(p, layout) }.is_ok() {
1426                            break;
1427                        }
1428                        if stop.load(Ordering::Relaxed) {
1429                            break;
1430                        }
1431                        thread::yield_now();
1432                    }
1433                }
1434            }));
1435        }
1436
1437        // Owner loop: reclaim remote frees, allocate a fresh batch of LIVE
1438        // slots, hand their addresses to the workers. Track whether the
1439        // adaptive threshold moves off its initial value of 64.
1440        let mut saw_step = false;
1441        let mut live = Vec::new();
1442        for _ in 0..3_000 {
1443            owner.drain();
1444            for _ in 0..16 {
1445                match owner.allocate(layout) {
1446                    Ok(block) => live.push(block.cast::<u8>().as_ptr() as usize),
1447                    Err(_) => break, // slab full — workers will free some
1448                }
1449            }
1450            if !live.is_empty() {
1451                let mut q = work.lock().unwrap();
1452                for addr in live.drain(..) {
1453                    q.push_back(addr);
1454                }
1455            }
1456            if let Some(t) = owner.adaptive_threshold_snapshot() {
1457                if t != 64 {
1458                    saw_step = true;
1459                }
1460            }
1461        }
1462        stop.store(true, Ordering::Relaxed);
1463        for h in handles {
1464            let _ = h.join();
1465        }
1466        owner.drain();
1467        // The invariant that actually matters: after a full produce/remote-
1468        // free/drain cycle across threads, the freelist must be uncorrupted.
1469        // A real double-free or freelist-link regression would trip the slab's
1470        // MAC/tripwire and bump this counter — assert it explicitly (the
1471        // previous version of this test checked only `saw_step`, so a genuine
1472        // corruption could have slipped through).
1473        assert_eq!(
1474            owner.corruption_events(),
1475            0,
1476            "remote-free/drain cycle must not corrupt the freelist",
1477        );
1478        assert!(
1479            saw_step,
1480            "Adaptive policy should have moved the threshold off 64 under contention",
1481        );
1482    }
1483}