multitude 0.1.3

Fast and flexible arena allocator.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! `ChunkProvider`: factory + cache for chunks.
//!
//! The local cache is a singly-linked list mutated only by the arena's
//! owning thread, so its bookkeeping lives in [`LocalSlot`]s. The
//! shared cache is a lock-free Treiber stack: multi-producer (any
//! thread that drops the last `Arc<T>` to refcount zero), single-
//! consumer (only the arena's owning thread pops, via
//! `acquire_shared`). The single-consumer property eliminates the
//! classic Treiber field-read-before-ownership UAF and ABA hazards:
//! no other thread can pop, so the head node observed by the popper
//! cannot be removed (and therefore cannot be freed) between the
//! `head.load` and the popper's CAS.
//!
//! When the head's `capacity` is below the requested `min_bytes` the
//! popper pops anyway, frees the too-small chunk, and tries again.
//! This trades a (rare) too-small cached chunk for the simplicity of
//! head-only pops; in practice the high-water ratchet keeps cached
//! chunks at the current class, so misfits are uncommon.

use alloc::sync::Arc;
use core::ptr::NonNull;

use allocator_api2::alloc::{AllocError, Allocator};

use super::constants::{LARGE, MAX_CHUNK_BYTES, NUM_CHUNK_CLASSES, class_to_bytes, min_class_for_bytes};
use super::local_chunk::{LocalChunk, header_size as local_header_size};
use super::local_slot::LocalSlot;
use super::shared_chunk::{SharedChunk, header_size as shared_header_size};
use super::sync::{AtomicPtr, AtomicU8, AtomicUsize, Ordering};
#[cfg(feature = "stats")]
use crate::arena_stats::bump_stat;

/// Per-arena chunk factory + cache.
///
/// Held as `Arc<ChunkProvider>` by exactly one [`Arena`](crate::Arena)
/// and as `Weak<ChunkProvider>` by every chunk produced by the
/// provider so a chunk can route itself back to the cache when its
/// refcount reaches zero — *if* the provider is still alive at that
/// time.
pub(crate) struct ChunkProvider<A: Allocator + Clone> {
    /// Backing allocator the provider uses for fresh chunks. Cloned
    /// into every chunk it creates so the chunk can self-free even
    /// after the provider has been dropped.
    pub(crate) allocator: A,

    /// Per-arena `max_normal_alloc` knob: at chunk-acquisition time, a
    /// request strictly larger than this threshold causes
    /// `acquire_local`/`acquire_shared` to hand back a one-shot
    /// oversized chunk (never cached). It does **not** prevent an
    /// already-installed `current_*` chunk from satisfying a
    /// larger-than-threshold request from its tail — see
    /// [`crate::ArenaBuilder::max_normal_alloc`] for the rationale.
    pub(crate) max_normal_alloc: usize,

    /// Optional lifetime cap on total chunk bytes outstanding (live +
    /// cached). When set, the provider returns [`AllocError`] from
    /// `acquire_*` rather than allocate past the budget.
    ///
    /// **Counting convention:** the budget tracks each chunk's
    /// **total allocation size** (`header_size + capacity` =
    /// `class_to_bytes(class)` for cached chunks, or
    /// `header_size + round_payload(user_request)` for one-shot
    /// oversized chunks). Reservation and release are symmetric.
    /// The underlying VM allocation matches this exactly (up to a
    /// small structural-alignment rounding for oversized shared
    /// chunks, at most 63 bytes).
    pub(crate) byte_budget: Option<usize>,

    /// Total bytes of chunk capacity currently outstanding (live +
    /// cached). Compared against `byte_budget` on every fresh
    /// allocation.
    pub(crate) total_chunk_bytes: AtomicUsize,

    /// Local-cache list head. Touched only by the arena's owning
    /// thread (enforced structurally by `LocalChunk: !Send`). The
    /// cache is unbounded — every chunk that passes the high-water
    /// filter is retained.
    pub(crate) local_cache: LocalSlot<Option<NonNull<LocalChunk<A>>>>,

    /// Shared-cache list head, as a thin pointer to the top
    /// [`SharedChunk`] (or null if empty). Lock-free Treiber stack:
    /// pushes (from any thread) use a CAS loop; pops (only by the
    /// arena's owning thread — `acquire_shared` is reached from
    /// owner-thread code paths) are single-consumer and therefore
    /// don't need ABA defenses or hazard pointers (no other thread
    /// can pop the head between our load and CAS).
    pub(crate) shared_cache_head: AtomicPtr<u8>,

    /// Largest local class ever produced; monotonic. Touched only by
    /// the arena's owning thread.
    pub(crate) local_high_water: LocalSlot<u8>,

    /// Largest shared class ever produced; monotonic. Updated with
    /// `fetch_max(Relaxed)` because shared-flavor acquires/releases
    /// can come from any thread.
    pub(crate) shared_high_water: AtomicU8,

    /// Runtime statistics counters. Only present when the `stats`
    /// Cargo feature is enabled.
    #[cfg(feature = "stats")]
    pub(crate) stats: crate::arena_stats::StatsStorage,
}

impl<A: Allocator + Clone> ChunkProvider<A> {
    pub(crate) fn new(
        allocator: A,
        max_normal_alloc: usize,
        byte_budget: Option<usize>,
        initial_local_class: u8,
        initial_shared_class: u8,
    ) -> Arc<Self> {
        Arc::new(Self {
            allocator,
            max_normal_alloc,
            byte_budget,
            total_chunk_bytes: AtomicUsize::new(0),
            local_cache: LocalSlot::new(None),
            shared_cache_head: AtomicPtr::new(core::ptr::null_mut()),
            local_high_water: LocalSlot::new(initial_local_class),
            shared_high_water: AtomicU8::new(initial_shared_class),
            #[cfg(feature = "stats")]
            stats: crate::arena_stats::StatsStorage::default(),
        })
    }

    /// Try to reserve `bytes` against the lifetime byte budget.
    /// Returns `Ok(())` if the budget allows it (or if no budget is
    /// configured); `Err(AllocError)` otherwise. Adds `bytes` to
    /// `total_chunk_bytes` on success.
    fn reserve_budget(&self, bytes: usize) -> Result<(), AllocError> {
        // Single-writer per provider: `reserve_budget` only runs on
        // the arena's owning thread. `release_budget` runs on any
        // thread (cross-thread `Arc::drop`).
        //
        // Speculative-add then rollback on overflow. A prior `load +
        // check + fetch_add` shape had a TOCTOU window between the
        // load and the add: a concurrent `release_budget` `fetch_sub`
        // on a remote thread could let the load observe a stale
        // (too-high) value, causing a false rejection of an
        // allocation that actually fits. By committing the add first
        // we eliminate the window — overflow simply rolls the value
        // back. AcqRel pairs with `release_budget`'s Release so the
        // post-add value reflects all completed releases.
        let Some(budget) = self.byte_budget else {
            self.total_chunk_bytes.fetch_add(bytes, Ordering::Relaxed);
            return Ok(());
        };
        let prev = self.total_chunk_bytes.fetch_add(bytes, Ordering::AcqRel);
        // Single-writer for `reserve_budget` means `prev + bytes` cannot
        // overflow in a way the sub can't undo: even if the addition
        // wraps `usize`, the matching `fetch_sub` below restores the
        // original value before any other reader observes the rollback
        // (releases only ever decrement). Reject regardless on either
        // wraparound or budget overrun.
        let next = prev.wrapping_add(bytes);
        if next < prev || next > budget {
            // Release: pair with any subsequent reader that may
            // observe the rolled-back value.
            self.total_chunk_bytes.fetch_sub(bytes, Ordering::Release);
            return Err(AllocError);
        }
        Ok(())
    }

    /// Release `bytes` from the lifetime byte budget tracker.
    fn release_budget(&self, bytes: usize) {
        // Release so a subsequent owner-thread `reserve_budget`
        // `fetch_add(AcqRel)` observes the decrement and admits the
        // freed budget on the next reservation.
        self.total_chunk_bytes.fetch_sub(bytes, Ordering::Release);
    }

    /// Produce a chunk whose payload is at least `min_payload` bytes.
    ///
    /// The returned [`NonNull`] holds a `+1` on the chunk's refcount;
    /// the caller owns that hold.
    ///
    /// `min_payload > self.max_normal_alloc` at acquisition time
    /// triggers a one-shot oversized chunk sized exactly to the
    /// request (plus header + drop-list rounding); it bypasses the
    /// cache and leaves the high-water mark untouched. Note that this
    /// only governs the case where the caller actually needs a fresh
    /// chunk — large requests that fit in the tail of `current_local`
    /// are satisfied without ever reaching this function.
    #[cfg_attr(test, mutants::skip)] // Chunk-class clamp mutations still choose a class that satisfies the request.
    pub(crate) fn acquire_local(self: &Arc<Self>, min_payload: usize) -> Result<NonNull<LocalChunk<A>>, AllocError> {
        if min_payload > self.max_normal_alloc {
            // Oversized: allocation sized exactly to the request,
            // plus header and drop-list rounding. Bypasses the cache.
            let rounded_payload = super::drop_list::round_payload(min_payload, local_header_size::<A>()).ok_or(AllocError)?;
            let total_bytes = local_header_size::<A>().checked_add(rounded_payload).ok_or(AllocError)?;
            self.reserve_budget(total_bytes)?;
            #[cfg(feature = "stats")]
            bump_stat!(self, oversized_local_chunks_allocated, 1);
            match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
                Ok(c) => return Ok(c),
                Err(e) => {
                    self.release_budget(total_bytes);
                    return Err(e);
                }
            }
        }

        // Smallest class whose payload covers the request.
        // `class_to_bytes` is total, so add the header.
        let req_class = min_class_for_bytes(min_payload + local_header_size::<A>());
        // SAFETY: single-thread-local — `LocalChunk: !Send`.
        let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
        let max_class = NUM_CHUNK_CLASSES - 1;
        let target_class = req_class.max(high_water).min(max_class);
        debug_assert!(target_class < NUM_CHUNK_CLASSES);
        let target_total = class_to_bytes(target_class);

        // Walk the cached list, freeing too-small chunks and popping the
        // first chunk whose payload fits.
        // SAFETY: single-thread-local invariant.
        let popped = unsafe {
            self.local_cache.with_mut(|head| -> Option<NonNull<LocalChunk<A>>> {
                use core::cell::Cell;
                // Re-interpret `&mut Option` as `&Cell<Option>` so the
                // walk can rewrite the predecessor's `next` link
                // without juggling overlapping borrows.
                let prev_link: *const Cell<Option<NonNull<LocalChunk<A>>>> = {
                    let head_cell: &Cell<Option<NonNull<LocalChunk<A>>>> = Cell::from_mut(head);
                    core::ptr::from_ref(head_cell)
                };
                let mut cur = (*prev_link).get();
                while let Some(chunk) = cur {
                    // SAFETY: chunk lives in the cache list.
                    let c = chunk.as_ref();
                    let cap = c.capacity;
                    let next = c.next.get();
                    (*prev_link).set(next);
                    c.next.set(None);
                    if cap >= min_payload {
                        return Some(chunk);
                    }
                    self.release_budget(local_header_size::<A>() + cap);
                    // SAFETY: refcount-zero with drops already replayed
                    // at cache-push time; we own the chunk.
                    LocalChunk::free_backing(chunk);
                    cur = next;
                }
                None
            })
        };

        if let Some(chunk) = popped {
            // SAFETY: chunk just left the cache; we own it exclusively.
            unsafe { chunk.as_ref().revive_for_reuse() };
            return Ok(chunk);
        }

        let total_bytes = target_total;
        self.reserve_budget(total_bytes)?;
        #[cfg(feature = "stats")]
        bump_stat!(self, normal_local_chunks_allocated, 1);
        let chunk = match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
            Ok(c) => c,
            Err(e) => {
                self.release_budget(total_bytes);
                return Err(e);
            }
        };
        // Ratchet high-water so future chunks grow with the workload.
        let next_high_water = target_class.saturating_add(1).min(NUM_CHUNK_CLASSES - 1).min(max_class);
        // SAFETY: single-thread-local invariant.
        unsafe {
            self.local_high_water.with_mut(|h| {
                if next_high_water > *h {
                    *h = next_high_water;
                }
            });
        }
        Ok(chunk)
    }

    /// Allocate a fresh local chunk, set its refcount to 0 (cache
    /// state), and push it onto the local cache. Used by
    /// [`ArenaBuilder::with_capacity_local`](crate::ArenaBuilder::with_capacity_local).
    pub(crate) fn preallocate_local(self: &Arc<Self>) -> Result<(), AllocError> {
        // SAFETY: single-thread-local invariant.
        let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
        let target_class = high_water;
        let total_bytes = class_to_bytes(target_class);
        self.reserve_budget(total_bytes)?;
        #[cfg(feature = "stats")]
        bump_stat!(self, normal_local_chunks_allocated, 1);
        let chunk = match LocalChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
            Ok(c) => c,
            Err(e) => {
                self.release_budget(total_bytes);
                return Err(e);
            }
        };
        // SAFETY: just allocated; the chunk's refcount is inflated
        // to `LARGE`. Drop it now to 0 so the chunk sits in the
        // cache; `revive_for_reuse` will restore the inflation when
        // the chunk is later popped.
        let chunk_ref = unsafe { chunk.as_ref() };
        let prev = chunk_ref.refcount.replace(0);
        debug_assert_eq!(prev, LARGE);
        // SAFETY: single-thread-local invariant.
        unsafe {
            self.local_cache.with_mut(|head| {
                chunk_ref.next.set(*head);
                *head = Some(chunk);
            });
        }
        // `target_class == high_water` by construction, so the high-water mark
        // would not change here; skip the redundant read-modify-write.
        Ok(())
    }

    /// Allocate a fresh shared chunk, set its refcount to 0 (cache
    /// state), and push it onto the shared cache Treiber stack. Used
    /// by [`ArenaBuilder::with_capacity_shared`](crate::ArenaBuilder::with_capacity_shared).
    pub(crate) fn preallocate_shared(self: &Arc<Self>) -> Result<(), AllocError> {
        let high_water = self.shared_high_water.load(Ordering::Relaxed);
        let target_class = high_water;
        let total_bytes = class_to_bytes(target_class);
        self.reserve_budget(total_bytes)?;
        #[cfg(feature = "stats")]
        bump_stat!(self, normal_shared_chunks_allocated, 1);
        let chunk = match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
            Ok(c) => c,
            Err(e) => {
                self.release_budget(total_bytes);
                return Err(e);
            }
        };
        // SAFETY: just allocated; refcount is inflated to LARGE.
        // Drop to 0 so the chunk sits in the cache;
        // `revive_for_reuse` will restore the inflation on pop.
        unsafe { chunk.as_ref() }.refcount.0.store(0, Ordering::Relaxed);
        // SAFETY: refcount-zero — exclusive access; publish on the
        // lock-free shared cache.
        unsafe { self.push_shared_cache(chunk) };
        let _ = self.shared_high_water.fetch_max(target_class, Ordering::Relaxed);
        Ok(())
    }

    /// Get a snapshot of the current statistics counters.
    #[cfg(feature = "stats")]
    #[inline]
    pub fn stats_snapshot(&self) -> crate::arena_stats::ArenaStats {
        self.stats.snapshot()
    }

    /// Release a chunk whose refcount has just reached zero.
    ///
    /// Replays the chunk's drop list, then routes the chunk to the
    /// local cache (if it passes the high-water + budget filter) or
    /// frees its backing allocation directly.
    ///
    /// # Safety
    ///
    /// `chunk` must have refcount zero and the caller must own that
    /// last `+1` (i.e., the chunk must not be reachable from any other
    /// strand).
    pub(crate) unsafe fn release_local(&self, chunk: NonNull<LocalChunk<A>>) {
        // SAFETY: caller owns the last +1; no other strand observes the payload.
        unsafe { LocalChunk::replay_drops(chunk) };

        // SAFETY: caller owns the last +1.
        let cap = unsafe { (*chunk.as_ptr()).capacity };
        let header = local_header_size::<A>();
        // Oversized iff the chunk's total exceeds the largest cached class.
        let oversized = header.saturating_add(cap) > MAX_CHUNK_BYTES;

        // Eligible for caching iff not oversized and the payload covers
        // the high-water class (never re-issue a chunk smaller than
        // what current workloads expect).
        let eligible = if oversized {
            false
        } else {
            // SAFETY: single-thread-local invariant.
            let high_water = unsafe { self.local_high_water.with_mut(|h| *h) };
            cap >= class_to_bytes(high_water).saturating_sub(header)
        };

        if eligible {
            // SAFETY: only the owning thread mutates the cache. Use raw
            // access to keep the SharedReadOnly borrow tag off the chunk
            // (uniform with paths that do reach `free_backing`).
            unsafe {
                self.local_cache.with_mut(|head| {
                    (*chunk.as_ptr()).next.set(*head);
                    *head = Some(chunk);
                });
            }
        } else {
            self.release_budget(header + cap);
            // SAFETY: caller owns the last +1; drops just replayed.
            unsafe { LocalChunk::free_backing(chunk) };
        }
    }

    /// Produce a [`SharedChunk`] whose payload is at least `min_payload`
    /// bytes. The returned pointer holds the chunk's inflated refcount
    /// ([`LARGE`]) for the new tenant arena.
    #[cfg_attr(test, mutants::skip)] // Chunk-class clamp mutations still choose a class that satisfies the request.
    pub(crate) fn acquire_shared(self: &Arc<Self>, min_payload: usize) -> Result<NonNull<SharedChunk<A>>, AllocError> {
        if min_payload > self.max_normal_alloc {
            // Oversized: see `acquire_local`.
            let rounded_payload = super::drop_list::round_payload(min_payload, shared_header_size::<A>()).ok_or(AllocError)?;
            let total_bytes = shared_header_size::<A>().checked_add(rounded_payload).ok_or(AllocError)?;
            self.reserve_budget(total_bytes)?;
            #[cfg(feature = "stats")]
            bump_stat!(self, oversized_shared_chunks_allocated, 1);
            match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
                Ok(c) => return Ok(c),
                Err(e) => {
                    self.release_budget(total_bytes);
                    return Err(e);
                }
            }
        }

        let req_class = min_class_for_bytes(min_payload + shared_header_size::<A>());
        let high_water = self.shared_high_water.load(Ordering::Relaxed);
        let max_class = NUM_CHUNK_CLASSES - 1;
        let target_class = req_class.max(high_water).min(max_class);
        debug_assert!(target_class < NUM_CHUNK_CLASSES);
        let target_total = class_to_bytes(target_class);

        if let Some(chunk) = self.try_pop_shared_at_least(min_payload) {
            // SAFETY: chunk just left the cache; sole owner until handoff.
            unsafe { chunk.as_ref().revive_for_reuse() };
            return Ok(chunk);
        }

        let total_bytes = target_total;
        self.reserve_budget(total_bytes)?;
        #[cfg(feature = "stats")]
        bump_stat!(self, normal_shared_chunks_allocated, 1);
        let chunk = match SharedChunk::allocate(self.allocator.clone(), Arc::downgrade(self), total_bytes) {
            Ok(c) => c,
            Err(e) => {
                self.release_budget(total_bytes);
                return Err(e);
            }
        };
        let next_high_water = target_class.saturating_add(1).min(NUM_CHUNK_CLASSES - 1).min(max_class);
        let _ = self.shared_high_water.fetch_max(next_high_water, Ordering::Relaxed);
        Ok(chunk)
    }

    /// Push a refcount-zero shared chunk onto the lock-free Treiber
    /// stack. Callable from any thread.
    ///
    /// # Safety
    ///
    /// `chunk` must be at refcount zero with its drop list replayed,
    /// and the caller must own the chunk exclusively.
    #[cfg_attr(coverage_nightly, coverage(off))]
    // Treiber-stack push from any thread that drops the last reference of a shared
    // chunk. The CAS retry arm (`Err(observed)`) only fires when two threads
    // attempt to push concurrently — deterministically exercising this requires
    // precise interleaving that isn't feasible in unit tests.
    unsafe fn push_shared_cache(&self, chunk: NonNull<SharedChunk<A>>) {
        let chunk_thin = SharedChunk::to_thin_ptr(chunk);
        // SAFETY: caller owns `chunk` exclusively; safe to borrow.
        let chunk_ref = unsafe { chunk.as_ref() };
        // Relaxed initial load: the pusher has no read-side dependency
        // on the head. If the load is stale, the failure-ordering
        // `Acquire` of `compare_exchange_weak` returns the fresh
        // value on retry.
        let mut current = self.shared_cache_head.load(Ordering::Relaxed);
        loop {
            // Store `next` BEFORE the CAS so that any concurrent pop
            // that succeeds in taking `chunk` sees the settled `next`
            // link.
            chunk_ref.next.store(current, Ordering::Relaxed);
            match self
                .shared_cache_head
                .compare_exchange_weak(current, chunk_thin, Ordering::AcqRel, Ordering::Acquire)
            {
                Ok(_) => return,
                Err(observed) => current = observed,
            }
        }
    }

    /// Pop a cached chunk whose capacity is at least `min_bytes`,
    /// freeing the backing memory of any cached chunks that are too
    /// small for the request along the way.
    ///
    /// Single-consumer: callable only from the arena's owning thread
    /// (enforced by routing through `acquire_shared`). The pop reads
    /// `head.next` before the CAS that takes ownership — this is
    /// sound because no other thread can pop, so `head` remains in
    /// the list (and therefore alive) for the duration of the CAS.
    /// Concurrent pushes from other threads only add nodes above
    /// `head`; they do not mutate `head.next` of the existing top
    /// node, so the read is stable.
    #[cfg_attr(coverage_nightly, coverage(off))]
    #[cfg_attr(test, mutants::skip)] // CAS-race and cache-release arithmetic mutations are not deterministic through public APIs.
    fn try_pop_shared_at_least(&self, min_bytes: usize) -> Option<NonNull<SharedChunk<A>>> {
        loop {
            let head_thin = self.shared_cache_head.load(Ordering::Acquire);
            if head_thin.is_null() {
                return None;
            }
            // SAFETY: `head_thin` is the thin-pointer base of a live
            // cached chunk. Single-consumer: no other thread pops, so
            // the chunk cannot be freed between this load and the CAS
            // below.
            let head = unsafe { SharedChunk::<A>::from_thin_ptr(head_thin) };
            // Read `next` and `capacity` via raw pointer (no `&Self`
            // borrow) so we don't leave a SharedReadOnly tag that
            // would conflict with `free_backing` further down.
            // SAFETY: `head` is live until we either CAS it off the
            // list or another push appears above it. Relaxed is
            // sufficient: the `Acquire` on `shared_cache_head` above
            // already synchronizes-with the pusher's AcqRel CAS that
            // installed `head` (which is sequenced-after the pusher's
            // Relaxed store to `head.next`).
            let next_thin = unsafe { (*head.as_ptr()).next.load(Ordering::Relaxed) };
            if self
                .shared_cache_head
                .compare_exchange_weak(head_thin, next_thin, Ordering::AcqRel, Ordering::Acquire)
                .is_err()
            {
                // CAS Err: lost the race to a concurrent push; loop will
                // retry with the new head observed on the next iteration.
                continue;
            }
            // We now own `head` exclusively.
            // Clear our `next` link so the popped chunk is in no list.
            // SAFETY: caller-owned chunk.
            unsafe { (*head.as_ptr()).next.store(core::ptr::null_mut(), Ordering::Relaxed) };
            // SAFETY: caller-owned chunk.
            let cap = unsafe { (*head.as_ptr()).capacity };
            if cap >= min_bytes {
                return Some(head);
            }
            // Too small: free its backing and retry. We still
            // hold the budget reservation made when the chunk was
            // originally allocated, so release it now.
            self.release_budget(shared_header_size::<A>() + cap);
            // SAFETY: refcount-zero with drops already replayed
            // at cache-push time; we own the chunk.
            unsafe { SharedChunk::free_backing(head) };
        }
    }

    /// Release a [`SharedChunk`] whose refcount has just reached zero.
    ///
    /// Replays the drop list, then routes the chunk to the shared
    /// cache (if it passes the high-water + budget filter) or frees
    /// its backing allocation directly.
    ///
    /// # Safety
    ///
    /// `chunk` must have refcount zero (with Acquire-fence already
    /// taken by the caller); caller owns the last reference.
    pub(crate) unsafe fn release_shared(&self, chunk: NonNull<SharedChunk<A>>) {
        // SAFETY: refcount-zero — caller owns the last reference.
        unsafe { SharedChunk::replay_drops(chunk) };
        // SAFETY: refcount-zero — caller owns the last reference.
        let cap = unsafe { (*chunk.as_ptr()).capacity };
        let header = shared_header_size::<A>();
        let oversized = header.saturating_add(cap) > MAX_CHUNK_BYTES;

        let eligible = if oversized {
            false
        } else {
            let high_water = self.shared_high_water.load(Ordering::Relaxed);
            cap >= class_to_bytes(high_water).saturating_sub(header)
        };

        if eligible {
            // SAFETY: refcount-zero — exclusive access until pushed.
            // `push_shared_cache` publishes with Release so any popper
            // sees the chunk's settled fields.
            unsafe { self.push_shared_cache(chunk) };
        } else {
            self.release_budget(header + cap);
            // SAFETY: refcount-zero — exclusive access; drops replayed.
            unsafe { SharedChunk::free_backing(chunk) };
        }
    }
}

impl<A: Allocator + Clone> Drop for ChunkProvider<A> {
    fn drop(&mut self) {
        // Drain the local cache list, freeing each chunk's backing
        // allocation. The cache holds chunks at refcount zero with
        // empty drop ledgers; only the backing allocation needs
        // releasing here.
        //
        // SAFETY: `&mut self` proves no other strand can observe the
        // cache list, so we may drain without synchronization.
        let mut head = unsafe { self.local_cache.with_mut(Option::take) };
        while let Some(chunk) = head {
            // SAFETY: chunk is in the cache (refcount zero, drops
            // replayed); we hold exclusive access via `&mut self`.
            // Read `next` via raw pointer (no `&Self` borrow) so it
            // doesn't conflict with `free_backing`'s `drop_in_place`.
            let next = unsafe { (*chunk.as_ptr()).next.replace(None) };
            // SAFETY: see above; the cache is the sole owner.
            unsafe { LocalChunk::free_backing(chunk) };
            head = next;
        }

        // Drain the shared cache. `&mut self` proves no other strand
        // can observe the cache, so a plain load (no synchronization)
        // is sufficient.
        let mut cur_thin = self.shared_cache_head.swap(core::ptr::null_mut(), Ordering::Relaxed);
        while !cur_thin.is_null() {
            // SAFETY: `cur_thin` is the thin-pointer base of a cached
            // chunk; chunks in the cache are at refcount zero with
            // their drop ledgers already replayed.
            let chunk = unsafe { SharedChunk::<A>::from_thin_ptr(cur_thin) };
            // SAFETY: chunk is live and exclusively owned (raw access
            // to avoid SRO tag conflicting with `free_backing`).
            let next_thin = unsafe { (*chunk.as_ptr()).next.load(Ordering::Relaxed) };
            // SAFETY: refcount-zero with cleaned-up drop ledger.
            unsafe { SharedChunk::free_backing(chunk) };
            cur_thin = next_thin;
        }
    }
}

/// `ChunkProvider` is `Send + Sync`: shared-cache fields are atomic,
/// and local-cache fields are only touched by the arena's owning thread.
// SAFETY: `ChunkProvider` is `Send + Sync` even though it contains a
// `LocalSlot<Option<NonNull<LocalChunk<A>>>>` (whose `NonNull` content is
// `!Send`). This is sound only because of an additional runtime
// invariant: every code path that mutates `local_cache` originates
// from `LocalChunk::route_release`, and `LocalChunk` is `!Send` /
// `!Sync` (its header contains `Cell<u64>` and `Cell<Option<...>>`),
// so any release into the local cache is necessarily on the owning
// thread of the chunk. The provider can be shared across threads via
// `Arc<ChunkProvider>` for shared-chunk allocation (which uses
// atomic state) but the local-chunk cache is touched only by the
// owning thread. Removing `LocalChunk: !Send` would invalidate this
// reasoning — keep that constraint or revisit these impls.
unsafe impl<A: Allocator + Clone + Send + Sync> Send for ChunkProvider<A> {}
// SAFETY: see the `Send` impl above.
unsafe impl<A: Allocator + Clone + Send + Sync> Sync for ChunkProvider<A> {}

#[cfg(test)]
mod tests {
    use allocator_api2::alloc::Global;

    use super::*;

    /// `reserve_budget` must reject a request whose `prev + bytes`
    /// wraps `usize`. The seed total is set so `wrapping_add(bytes)`
    /// produces a value strictly less than `prev`; the budget itself
    /// is `usize::MAX`, so the only branch that can reject is the
    /// `next < prev` wraparound check. A mutant replacing `<` with
    /// `==` makes the wrap case `next == prev` false (they differ),
    /// the budget check also false (small `next` ≤ `usize::MAX`),
    /// and `reserve_budget` would erroneously return `Ok`.
    #[test]
    fn reserve_budget_rejects_wraparound() {
        let provider = ChunkProvider::<Global>::new(Global, 4096, Some(usize::MAX), 0, 0);
        let seed = usize::MAX - 100;
        provider.total_chunk_bytes.store(seed, Ordering::Relaxed);
        // 200 + (usize::MAX - 100) wraps to 99.
        assert!(provider.reserve_budget(200).is_err());
        // The fetch_sub rollback restores the seeded value.
        assert_eq!(provider.total_chunk_bytes.load(Ordering::Relaxed), seed);
    }

    /// A zero-byte reservation under a non-trivial seed must succeed:
    /// `next == prev` is the no-op fixed point of the `wrapping_add`,
    /// not a wraparound, and falls within budget. A mutant replacing
    /// `<` with `<=` on line 159 would erroneously reject this case
    /// (`next <= prev` would be true), so this test pins the strict
    /// inequality semantics of the wraparound check.
    #[test]
    fn reserve_budget_zero_bytes_succeeds_within_budget() {
        let provider = ChunkProvider::<Global>::new(Global, 4096, Some(1024), 0, 0);
        provider.total_chunk_bytes.store(512, Ordering::Relaxed);
        provider.reserve_budget(0).unwrap();
        // The counter must be unchanged: `fetch_add(0)` is a no-op
        // and no rollback should have occurred.
        assert_eq!(provider.total_chunk_bytes.load(Ordering::Relaxed), 512);
    }
}