1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

//! Bump allocator over a single chunk.
//!
//! [`ChunkMutator<C>`] owns one strong reference to a chunk and exposes safe
//! allocation primitives that hand out [`InChunk`] pointers, [`Uninit`]
//! tickets, and [`UninitDrop`] tickets. All `unsafe` interaction with the
//! chunk's raw memory is concentrated here.
//!
//! When the mutator is dropped it decrements the chunk's refcount; if that
//! drops the count to zero it replays pending drop entries and routes the
//! chunk back through [`ChunkOps::teardown_and_release`].

use core::cell::Cell;
use core::ptr::{self, NonNull};
use core::{hint, mem};

use super::chunk_ops::ChunkOps;
use super::drop_entry::DropEntry;
use super::in_chunk::InChunk;
use super::uninit::{Uninit, UninitDrop};

/// Owns one strong reference to a chunk and tracks the bump cursor and the
/// growing-down drop-entry top.
///
/// Hot-path layout is intentionally minimal: only `chunk`, `bump`, and
/// `drop_top` are stored. The payload start/end addresses are re-derived
/// from `chunk` in the cold paths that need them (capacity reporting,
/// drop-publish on `Drop`, drop-entry rollback, value-offset encoding for
/// drop-requiring types).
pub(crate) struct ChunkMutator<C: ?Sized + ChunkOps> {
    chunk: Option<NonNull<C>>,
    /// Bump cursor stored as a pointer so that derivations preserve
    /// the chunk's full provenance under Stacked / Tree Borrows.
    /// Storing it as a `usize` and recovering the pointer via
    /// `addr as *mut u8` would produce a no-provenance pointer that
    /// would fail Miri whenever a derived value pointer is later read
    /// back (e.g., during drop-entry replay).
    bump: Cell<NonNull<u8>>,
    /// Top of the drop-entry region (entries grow downward). Same
    /// pointer-preserves-provenance rationale as `bump`.
    drop_top: Cell<NonNull<u8>>,
}

// SAFETY: `ChunkMutator` owns one strong refcount on its chunk and
// accesses its payload via interior-mutable cells; the underlying
// `NonNull<C>` is the only field that prevents auto-derivation of
// `Send`. Both implementors of `ChunkOps` (`LocalChunk`, `SharedChunk`)
// support cross-thread ownership transfer of a single owning reference
// (atomic refcount for shared, single-thread invariant for local that
// follows the owning thread). The `?Sized` bound matters: both chunk
// types are DSTs with a `[UnsafeCell<u8>]` tail, so a `C: Sized` bound
// would exclude every real instantiation and silently break
// `Arena: Send` (because the `Send` impl wouldn't apply). `Sync` is
// intentionally NOT implemented: the `Cell` fields make the mutator
// unsuitable for concurrent shared access.
unsafe impl<C: ?Sized + ChunkOps + Send> Send for ChunkMutator<C> {}

impl<C: ?Sized + ChunkOps> ChunkMutator<C> {
    /// Builds a mutator owning the +1 already on `chunk`.
    ///
    /// # Safety
    ///
    /// `chunk` must reference a live chunk whose refcount is already
    /// incremented in this mutator's name. The mutator becomes the unique
    /// owner of that +1 and will release it on drop.
    pub(crate) unsafe fn from_owned(chunk: NonNull<C>) -> Self {
        // SAFETY: caller asserts `chunk` is live; payload_ptr and capacity
        // are accessible while the chunk is live.
        let (start, cap) = unsafe { (C::payload_ptr(chunk), chunk.as_ref().capacity()) };
        let start_addr = start.as_ptr() as usize;
        // Align the drop-top down to `align_of::<DropEntry>()`.
        let entry_align = mem::align_of::<DropEntry>();
        let aligned_end_addr = (start_addr + cap) & !(entry_align - 1);
        let aligned_end_offset = aligned_end_addr - start_addr;
        // SAFETY: `aligned_end_offset <= cap`; `start.byte_add` lands
        // within (or at one-past-end of) the chunk payload.
        let drop_top = unsafe { start.byte_add(aligned_end_offset) };
        Self {
            chunk: Some(chunk),
            bump: Cell::new(start),
            drop_top: Cell::new(drop_top),
        }
    }

    /// Builds an empty mutator that owns no chunk. Every `try_alloc*`
    /// returns `None`, so the arena's hot path falls through to a
    /// `refill_*` call that installs a real chunk. Used to defer the
    /// first chunk allocation until the first user-visible alloc.
    pub(crate) const fn empty() -> Self {
        Self {
            chunk: None,
            // Sentinels: `bump > drop_top` so every `try_alloc*` falls
            // through to the refill path via the bound check without
            // any explicit `self.chunk?` test. Both `dangling()` values
            // are non-null, fit in `isize`, and are never dereferenced.
            bump: Cell::new(NonNull::<u16>::dangling().cast::<u8>()),
            drop_top: Cell::new(NonNull::<u8>::dangling()),
        }
    }

    /// Free byte count between the bump cursor and the drop-entry top.
    #[cfg(test)]
    #[inline]
    #[cfg_attr(coverage_nightly, coverage(off))]
    #[cfg_attr(test, mutants::skip)] // test-only helper
    pub(crate) fn free_bytes(&self) -> usize {
        let top = self.drop_top.get().as_ptr() as usize;
        let cur = self.bump.get().as_ptr() as usize;
        top.saturating_sub(cur)
    }

    /// Reads the chunk's payload start and end addresses.
    ///
    /// # Panics
    ///
    /// Panics on the empty mutator. Only the dead-code `capacity` /
    /// `free_bytes` helpers can hit that path; all hot-path callers
    /// invoke this after a successful `try_reserve_*`.
    #[inline]
    fn payload_range(&self) -> (usize, usize) {
        let chunk = self.chunk.expect("payload_range: chunk must be set");
        // SAFETY: mutator owns one strong reference to `chunk` for as
        // long as `&self` is borrowed.
        let (start, cap) = unsafe { (C::payload_ptr(chunk), chunk.as_ref().capacity()) };
        let start_addr = start.as_ptr() as usize;
        // Align the reported end down to match `from_owned`'s drop_top
        // alignment, keeping drop-entry math consistent.
        let entry_align = mem::align_of::<DropEntry>();
        let end_addr = (start_addr + cap) & !(entry_align - 1);
        (start_addr, end_addr)
    }

    /// Returns the underlying chunk pointer without checking the
    /// `Option` discriminant. Hot-path helper for smart-pointer
    /// reservations.
    ///
    /// # Safety
    ///
    /// Caller must have observed a successful `try_alloc*` /
    /// `try_reserve_*` on this mutator immediately prior (without
    /// intervening `CurrentChunk::replace` / `drop_replace`).
    #[inline]
    pub(crate) unsafe fn chunk_ptr_unchecked(&self) -> NonNull<C> {
        // SAFETY: see contract.
        unsafe { self.chunk.unwrap_unchecked() }
    }

    /// Reserves `size` bytes aligned to `align`. Returns an `InChunk<u8>`
    /// pointing at the start, or `None` if the chunk has insufficient
    /// room or the request would overflow.
    ///
    /// # Overflow safety
    ///
    /// `cur_addr` is asserted to fit in `isize` so the alignment math
    /// has no overflow guard; chunks live in the lower half of the
    /// address space on every realistic 64-bit platform. `size` is *not*
    /// constrained, so the `aligned_addr + size` step uses `checked_add`
    /// to refuse oversized requests.
    #[inline]
    // Mutation testing is suppressed: any mutation that always rejects
    // sends callers into an infinite refill spin (OOM).
    #[cfg_attr(test, mutants::skip)]
    pub(crate) fn try_alloc(&self, size: usize, align: usize) -> Option<InChunk<u8>> {
        debug_assert!(align.is_power_of_two(), "align must be a power of two");
        debug_assert!(size.checked_add(align).is_some(), "size + align overflows usize");
        let cur = self.bump.get();
        let cur_addr = cur.as_ptr() as usize;
        let drop_top_addr = self.drop_top.get().as_ptr() as usize;
        // SAFETY: see the overflow-safety note above.
        unsafe {
            hint::assert_unchecked(cur_addr > 0);
            hint::assert_unchecked(isize::try_from(cur_addr).is_ok());
        }
        let aligned_addr = (cur_addr + (align - 1)) & !(align - 1);
        // For zero-size allocations, probe one extra byte: a ZST alloc
        // at the chunk tail (`cur_addr == drop_top_addr`) would otherwise
        // return a value pointer equal to `chunk_base + CHUNK_ALIGN` for
        // the largest chunk class, which masks to the next 64 KiB tile
        // and breaks the smart-pointer chunk-header recovery. Non-zero-
        // size allocs already satisfy this because the last payload byte
        // sits at `end - 1`, strictly inside `[chunk_base, drop_top)`.
        // `size.max(1)` is a branchless CMOV; the checked_add cannot
        // overflow under the `cur_addr + (align - 1)` debug assertion
        // above.
        let probe_end = aligned_addr.checked_add(size.max(1))?;
        if probe_end > drop_top_addr {
            return None;
        }
        // SAFETY: `end_addr <= drop_top_addr`, so both `aligned_ptr`
        // and `new_bump` land within the chunk payload; `byte_add` on
        // the current bump cursor preserves chunk-wide provenance.
        let aligned_ptr = unsafe { cur.byte_add(aligned_addr - cur_addr) };
        // SAFETY: see above.
        let new_bump = unsafe { aligned_ptr.byte_add(size) };
        self.bump.set(new_bump);
        Some(InChunk::from_raw(aligned_ptr))
    }

    /// Reserves storage for one `T` and returns an `Uninit<'_, T>` ticket.
    #[inline]
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`: body→None ⇒ refill spin
    pub(crate) fn try_alloc_uninit<T>(&self) -> Option<Uninit<'_, T>> {
        let bytes = self.try_alloc(mem::size_of::<T>(), mem::align_of::<T>())?;
        Some(Uninit::new(bytes.cast::<T>()))
    }

    /// [`Self::try_alloc`] paired with the owning chunk pointer. The
    /// success of `try_alloc` proves the mutator owns a chunk, so the
    /// caller doesn't need to use [`Self::chunk_ptr_unchecked`] (which
    /// would require an `unsafe` block) afterwards.
    #[inline]
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`: body→None ⇒ refill spin
    pub(crate) fn try_alloc_with_chunk(&self, size: usize, align: usize) -> Option<(InChunk<u8>, NonNull<C>)> {
        let in_chunk = self.try_alloc(size, align)?;
        // SAFETY: a successful `try_alloc` proves the mutator owns a
        // chunk.
        Some((in_chunk, unsafe { self.chunk_ptr_unchecked() }))
    }

    /// [`Self::try_alloc_thin_dst_smart`] paired with the owning chunk
    /// pointer. See [`Self::try_alloc_with_chunk`].
    #[inline]
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    #[allow(
        clippy::type_complexity,
        reason = "matches try_alloc_thin_dst_smart's shape plus the chunk pointer"
    )]
    #[cfg(feature = "dst")]
    pub(crate) fn try_alloc_thin_dst_smart_with_chunk(
        &self,
        total: usize,
        align: usize,
        payload_offset: usize,
        needs_drop: bool,
        metadata_u16: u16,
    ) -> Option<(InChunk<u8>, Option<InChunk<DropEntry>>, NonNull<C>)> {
        let (base, drop_slot) = self.try_alloc_thin_dst_smart(total, align, payload_offset, needs_drop, metadata_u16)?;
        // SAFETY: a successful reservation proves the mutator owns a
        // chunk.
        Some((base, drop_slot, unsafe { self.chunk_ptr_unchecked() }))
    }

    /// Byte-slice fast path: skips the alignment mask, `checked_mul`,
    /// and ZST branch. Only valid for `T = u8` (align 1, size 1).
    #[inline]
    pub(crate) fn try_alloc_bytes(&self, len: usize) -> Option<Uninit<'_, [u8]>> {
        let cur = self.bump.get();
        let cur_addr = cur.as_ptr() as usize;
        let drop_top_addr = self.drop_top.get().as_ptr() as usize;
        hint_chunk_cur_addr_nonnull(cur_addr);
        let end_addr = cur_addr.checked_add(len)?;
        if end_addr > drop_top_addr {
            return None;
        }
        // SAFETY: `end_addr <= drop_top_addr`.
        let new_bump = unsafe { cur.byte_add(len) };
        self.bump.set(new_bump);
        Some(Uninit::new(InChunk::from_raw(cur).into_slice::<u8>(len)))
    }

    /// Reserves storage for `len` consecutive `T`s and returns an
    /// `Uninit<'_, [T]>` ticket.
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`: body→None ⇒ refill spin
    pub(crate) fn try_alloc_uninit_slice<T>(&self, len: usize) -> Option<Uninit<'_, [T]>> {
        let size = mem::size_of::<T>().checked_mul(len)?;
        let bytes = self.try_alloc(size, mem::align_of::<T>())?;
        Some(Uninit::new(bytes.into_slice::<T>(len)))
    }

    /// Like [`Self::try_alloc_uninit_slice`] but reserves
    /// `size_of::<usize>()` extra bytes immediately before the payload
    /// for a thin-pointer DST length prefix, and writes `len` into that
    /// prefix using [`ptr::write_unaligned`].
    ///
    /// Used by the smart-pointer slice paths
    /// ([`Arc<[T]>`](crate::Arc), [`Box<[T]>`](crate::Box)) so the
    /// resulting handles can be 8 bytes (thin) and recover `T::Metadata`
    /// (slice length) from the chunk prefix.
    ///
    /// The returned [`Uninit<[T]>`] ticket addresses the payload only;
    /// init helpers fill exactly `len * size_of::<T>()` bytes and have
    /// no awareness of the prefix.
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    #[allow(
        clippy::cast_ptr_alignment,
        reason = "prefix slot may be unaligned for T's whose align < align_of::<usize>(); paired with write_unaligned/read_unaligned"
    )]
    pub(crate) fn try_alloc_uninit_slice_prefixed<T>(&self, len: usize) -> Option<Uninit<'_, [T]>> {
        let elem_size = mem::size_of::<T>();
        let elem_align = mem::align_of::<T>();
        let prefix_size = mem::size_of::<usize>();
        // Payload starts at the lowest elem-align-aligned offset >=
        // `prefix_size`. Both values are powers of two so `max` gives
        // the right answer.
        let payload_offset = prefix_size.max(elem_align);
        // Floor the payload byte count to 1 so the returned payload
        // pointer is strictly less than the reservation's end. Without
        // this, an empty slice (`len == 0` or ZST element) at the chunk
        // tail could return a payload pointer at `chunk_base +
        // CHUNK_ALIGN`, masking to the wrong tile on smart-pointer Drop.
        let payload_bytes = elem_size.checked_mul(len)?.max(1);
        let total = payload_offset.checked_add(payload_bytes)?;
        let base_in_chunk = self.try_alloc(total, elem_align.max(1))?;
        // SAFETY: `base + payload_offset` is elem-align-aligned (both
        // factors are powers of two). The prefix word lives at
        // `payload - prefix_size` — for low-align T at offset 0, for
        // high-align T inside the padding [prefix_size, payload_offset).
        // `write_unaligned` tolerates any alignment.
        unsafe {
            let base_ptr = base_in_chunk.as_ptr();
            let payload_ptr = base_ptr.add(payload_offset);
            let prefix_ptr = payload_ptr.sub(prefix_size).cast::<usize>();
            ptr::write_unaligned(prefix_ptr, len);
            let payload_nn = NonNull::new_unchecked(payload_ptr);
            let payload_in_chunk = InChunk::from_raw(payload_nn).into_slice::<T>(len);
            Some(Uninit::new(payload_in_chunk))
        }
    }

    /// Reserves storage for one `T` plus a drop entry slot.
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`: body→None ⇒ refill spin
    pub(crate) fn try_alloc_uninit_with_drop<T>(&self) -> Option<UninitDrop<'_, T>> {
        let value_bytes = mem::size_of::<T>();
        let value_align = mem::align_of::<T>();
        // Pre-reserve a drop entry at the top before committing the value
        // allocation. This way we don't have to back out the value alloc if
        // the drop slot doesn't fit.
        let drop_slot = self.try_reserve_drop_entry()?;
        let Some(value_bytes_ptr) = self.try_alloc(value_bytes, value_align) else {
            // Roll back the drop slot we just reserved.
            self.unwind_drop_entry();
            return None;
        };
        let value = value_bytes_ptr.cast::<T>();
        // Initialize the drop slot as a placeholder (value_offset & len set,
        // drop_fn = None). When the ticket's init runs it commits drop_fn.
        let value_offset = self.offset_or_unwind(value_bytes_ptr.addr())?;
        // SAFETY: `drop_slot.as_ptr()` is a freshly reserved, aligned slot
        // inside payload; we own it exclusively.
        unsafe {
            ptr::write(drop_slot.as_ptr(), DropEntry::placeholder(value_offset, 1));
        }
        Some(UninitDrop::new(value, drop_slot))
    }

    /// Reserves storage for `len` consecutive `T`s plus a drop entry slot.
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`: body→None ⇒ refill spin
    pub(crate) fn try_alloc_uninit_slice_with_drop<T>(&self, len: usize) -> Option<UninitDrop<'_, [T]>> {
        let size = mem::size_of::<T>().checked_mul(len)?;
        // The drop entry encodes the element count in a `u16`; reject longer
        // slices up front, before committing any reservation, so we never
        // leave a counted-but-uninitialized drop slot behind. Callers also
        // guard this earlier (see `alloc_slice_*` layout checks) to convert
        // it into a clean `AllocError`/panic rather than a refill spin.
        let len_u16 = u16::try_from(len).ok()?;
        let drop_slot = self.try_reserve_drop_entry()?;
        let Some(value_bytes_ptr) = self.try_alloc(size, mem::align_of::<T>()) else {
            self.unwind_drop_entry();
            return None;
        };
        let value = value_bytes_ptr.into_slice::<T>(len);
        let value_offset = self.offset_or_unwind(value_bytes_ptr.addr())?;
        // SAFETY: see `try_alloc_uninit_with_drop`.
        unsafe {
            ptr::write(drop_slot.as_ptr(), DropEntry::placeholder(value_offset, len_u16));
        }
        Some(UninitDrop::new(value, drop_slot))
    }

    /// Like [`Self::try_alloc_uninit_slice_with_drop`] but additionally
    /// writes a thin-pointer DST length prefix (`size_of::<usize>()`
    /// bytes, unaligned) immediately before the payload. See
    /// [`Self::try_alloc_uninit_slice_prefixed`].
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    #[allow(
        clippy::cast_ptr_alignment,
        reason = "prefix slot may be unaligned for T's whose align < align_of::<usize>(); paired with write_unaligned/read_unaligned"
    )]
    pub(crate) fn try_alloc_uninit_slice_with_drop_prefixed<T>(&self, len: usize) -> Option<UninitDrop<'_, [T]>> {
        let elem_size = mem::size_of::<T>();
        let elem_align = mem::align_of::<T>();
        let prefix_size = mem::size_of::<usize>();
        let payload_offset = prefix_size.max(elem_align);
        // See `try_alloc_uninit_slice_prefixed`: floor the payload byte
        // count to 1 so the returned payload pointer is strictly inside
        // the reservation, never landing at `chunk_base + CHUNK_ALIGN`.
        let payload_bytes = elem_size.checked_mul(len)?.max(1);
        let total = payload_offset.checked_add(payload_bytes)?;
        // `len` must fit in the drop entry's `u16` element-count field.
        let len_u16 = u16::try_from(len).ok()?;
        let drop_slot = self.try_reserve_drop_entry()?;
        let Some(base_in_chunk) = self.try_alloc(total, elem_align.max(1)) else {
            self.unwind_drop_entry();
            return None;
        };
        // SAFETY: see `try_alloc_uninit_slice_prefixed`. The drop
        // entry's `value_offset` encodes the *payload* address
        // (post-prefix) so `replay_drops` runs `drop_in_place::<[T]>`
        // on the real elements.
        let value = unsafe {
            let base_ptr = base_in_chunk.as_ptr();
            let payload_ptr = base_ptr.add(payload_offset);
            let prefix_ptr = payload_ptr.sub(prefix_size).cast::<usize>();
            ptr::write_unaligned(prefix_ptr, len);
            let payload_nn = NonNull::new_unchecked(payload_ptr);
            InChunk::from_raw(payload_nn).into_slice::<T>(len)
        };
        let payload_addr = (base_in_chunk.as_ptr() as usize) + payload_offset;
        let value_offset = self.offset_or_unwind(payload_addr)?;
        // SAFETY: `drop_slot` is freshly reserved, aligned, exclusively
        // owned slot in the chunk's drop region.
        unsafe {
            ptr::write(drop_slot.as_ptr(), DropEntry::placeholder(value_offset, len_u16));
        }
        Some(UninitDrop::new(value, drop_slot))
    }

    /// Attempts to reclaim the unused tail of the most recent bump
    /// allocation in O(1).
    ///
    /// When `end_addr` (the one-past-the-end address of an allocation)
    /// equals the current bump cursor — i.e. nothing has been allocated
    /// after it — the cursor is rewound by `bytes`, returning that span to
    /// the chunk, and `true` is returned. Returns `false` (leaving the
    /// cursor untouched) when the allocation is not at the cursor or the
    /// mutator owns no chunk.
    #[inline]
    pub(crate) fn try_reclaim_tail(&self, end_addr: usize, bytes: usize) -> bool {
        if self.chunk.is_none() {
            return false;
        }
        let cur = self.bump.get();
        if cur.as_ptr() as usize != end_addr {
            return false;
        }
        debug_assert!(bytes <= end_addr, "try_reclaim_tail: rewind underflows");
        // SAFETY: caller guarantees `bytes` was previously consumed
        // forward from the bump cursor; rolling back by `bytes` stays
        // within the chunk payload.
        let new_bump = unsafe { cur.byte_sub(bytes) };
        self.bump.set(new_bump);
        true
    }

    /// Attempts to grow a prior allocation in place.
    pub(crate) fn try_grow_in_place(&self, prev_addr: usize, prev_len: usize, new_len: usize) -> bool {
        if self.chunk.is_none() {
            return false;
        }
        if new_len <= prev_len {
            return true;
        }
        let bump = self.bump.get();
        let bump_addr = bump.as_ptr() as usize;
        if prev_addr.checked_add(prev_len) != Some(bump_addr) {
            return false;
        }
        let Some(new_bump_addr) = prev_addr.checked_add(new_len) else {
            return false;
        };
        if new_bump_addr > self.drop_top.get().as_ptr() as usize {
            return false;
        }
        // SAFETY: `new_bump_addr - bump_addr = new_len - prev_len`,
        // within the chunk payload.
        let new_bump = unsafe { bump.byte_add(new_len - prev_len) };
        self.bump.set(new_bump);
        true
    }

    /// Thin-DST smart-pointer reservation. Reserves `total` bytes
    /// aligned to `align`, optionally pre-reserves a drop entry that
    /// will point at the *payload* address (i.e. `reservation_start +
    /// payload_offset`, not the reservation start), and returns the
    /// reservation start plus the drop slot. The caller is responsible
    /// for writing the metadata prefix at `[0, payload_offset)` and the
    /// payload at `[payload_offset, total)`.
    ///
    /// Used by the thin generic-DST smart-pointer alloc paths
    /// ([`Arc<T>`](crate::Arc) / [`Box<T>`](crate::Box) for `T: ?Sized`).
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    #[cfg(feature = "dst")]
    pub(crate) fn try_alloc_thin_dst_smart(
        &self,
        total: usize,
        align: usize,
        payload_offset: usize,
        needs_drop: bool,
        metadata_u16: u16,
    ) -> Option<(InChunk<u8>, Option<InChunk<DropEntry>>)> {
        debug_assert!(align.is_power_of_two(), "align must be a power of two");
        debug_assert!(payload_offset <= total, "payload_offset must fit inside the reservation");
        if needs_drop {
            let drop_slot = self.try_reserve_drop_entry()?;
            let Some(base) = self.try_alloc(total, align) else {
                self.unwind_drop_entry();
                return None;
            };
            // Drop entry encodes the payload address (post-prefix), so
            // `replay_drops` runs `drop_in_place::<T>` on the real
            // value bytes.
            let payload_addr = base.addr().wrapping_add(payload_offset);
            let value_offset = self.offset_or_unwind(payload_addr)?;
            // SAFETY: freshly reserved, aligned, exclusively owned slot.
            unsafe {
                ptr::write(drop_slot.as_ptr(), DropEntry::placeholder(value_offset, metadata_u16));
            }
            Some((base, Some(drop_slot)))
        } else {
            let base = self.try_alloc(total, align)?;
            Some((base, None))
        }
    }
    /// Reserves a [`DropEntry`]-sized slot at the top of the drop-entry
    /// region. Entries are packed end-to-end from the payload's high end
    /// downward, matching the layout walked by
    /// [`replay_drops`](super::drop_entry::replay_drops).
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    fn try_reserve_drop_entry(&self) -> Option<InChunk<DropEntry>> {
        let entry_size = mem::size_of::<DropEntry>();
        let top = self.drop_top.get();
        let top_addr = top.as_ptr() as usize;
        let new_top_addr = top_addr.checked_sub(entry_size)?;
        if new_top_addr < self.bump.get().as_ptr() as usize {
            return None;
        }
        // SAFETY: `new_top_addr >= bump.addr() >= chunk_start`.
        let new_top = unsafe { top.byte_sub(entry_size) };
        self.drop_top.set(new_top);
        Some(InChunk::from_raw(new_top).cast::<DropEntry>())
    }

    /// Reverses the most recent `try_reserve_drop_entry`. Used when a
    /// downstream allocation in the same compound operation fails.
    ///
    /// Cold: this fires only on the compound-reservation failure path,
    /// which co-occurs with a refill miss and is by definition rare.
    #[cold]
    #[inline(never)]
    #[cfg_attr(test, mutants::skip)] // only observable via skip'd callers
    fn unwind_drop_entry(&self) {
        let entry_size = mem::size_of::<DropEntry>();
        let top = self.drop_top.get();
        let top_addr = top.as_ptr() as usize;
        let new_top_addr = top_addr + entry_size;
        let payload_end_addr = self.payload_range().1;
        let clamped_addr = new_top_addr.min(payload_end_addr);
        let advance = clamped_addr - top_addr;
        // SAFETY: `clamped_addr <= payload_end_addr`.
        let clamped = unsafe { top.byte_add(advance) };
        self.drop_top.set(clamped);
    }

    /// Cold helper: roll back the most recently reserved drop entry and
    /// return `None`. Extracted from compound-reservation paths so the
    /// genuinely-unreachable `u16::try_from(...) == Err` arm is a single
    /// line at the call site.
    #[cold]
    #[inline(never)]
    #[cfg_attr(coverage_nightly, coverage(off))]
    #[cfg_attr(test, mutants::skip)] // unreachable in practice
    fn unwind_drop_entry_then_none<U>(&self) -> Option<U> {
        self.unwind_drop_entry();
        None
    }

    /// Cold helper: computes `value_offset` as a `u16`. Overflow is
    /// unreachable in practice — single values live below 64 KiB in a
    /// normal chunk, or at offset 0 in a one-shot oversized chunk.
    #[inline]
    #[cfg_attr(coverage_nightly, coverage(off))]
    #[cfg_attr(test, mutants::skip)] // see `try_alloc`
    fn offset_or_unwind(&self, addr: usize) -> Option<u16> {
        let payload_start_addr = self.payload_range().0;
        let raw = addr - payload_start_addr;
        match u16::try_from(raw) {
            Ok(v) => Some(v),
            Err(_) => self.unwind_drop_entry_then_none(),
        }
    }

    /// Derives the number of drop entries currently reserved from the bump
    /// state (drop entries grow downward from `payload_end_addr`).
    #[inline]
    fn local_drop_entry_count(&self) -> usize {
        let payload_end_addr = self.payload_range().1;
        let consumed = payload_end_addr - self.drop_top.get().as_ptr() as usize;
        consumed / mem::size_of::<DropEntry>()
    }

    /// Publishes the locally-tracked drop-entry count to the chunk header
    /// eagerly, before the mutator is dropped.
    ///
    /// The count is normally published exactly once in [`Drop`]; teardown
    /// reads it only after the refcount reaches zero, so the deferred publish
    /// is sufficient for the common path. The uninit-`Arc` reservation path
    /// (`Arena::alloc_uninit_arc`) calls this after writing a placeholder so
    /// that `Arc::<MaybeUninit<T>>::assume_init` can locate the entry while
    /// the chunk is still the arena's active chunk. It must be invoked only
    /// after the placeholder slot it counts has been fully written.
    #[inline]
    pub(crate) fn publish_drop_count(&self) {
        let Some(chunk) = self.chunk else { return };
        // SAFETY: the mutator owns a +1 on `chunk` for its whole lifetime,
        // so the header is live for this store.
        unsafe {
            chunk.as_ref().set_drop_entry_count(self.local_drop_entry_count());
        }
    }
}

impl<C: ?Sized + ChunkOps> Drop for ChunkMutator<C> {
    fn drop(&mut self) {
        let Some(chunk) = self.chunk else {
            return;
        };
        // SAFETY: chunk is live; we hold one of its refcount tickets.
        // Publish the locally-tracked drop-entry count to the chunk header
        // before releasing our refcount: if dec_ref returns true we own the
        // unique remaining reference, and `teardown_and_release` will read
        // the count to walk the drop list.
        unsafe {
            let chunk_ref = chunk.as_ref();
            chunk_ref.set_drop_entry_count(self.local_drop_entry_count());
            if chunk_ref.dec_ref() {
                C::teardown_and_release(chunk);
            }
        }
    }
}

/// Hint for `try_alloc_bytes`: the bump cursor is non-zero and fits
/// in `isize` (both hold because it's a `NonNull<u8>` sourced from a
/// real chunk in the lower half of the address space).
#[expect(clippy::inline_always, reason = "pure codegen hint; must inline to take effect")]
#[inline(always)]
#[cfg_attr(test, mutants::skip)] // pure hint, no observable behavior
fn hint_chunk_cur_addr_nonnull(cur_addr: usize) {
    // SAFETY: see fn doc.
    unsafe {
        hint::assert_unchecked(cur_addr > 0);
        hint::assert_unchecked(isize::try_from(cur_addr).is_ok());
    }
}

#[cfg(test)]
#[cfg_attr(coverage_nightly, coverage(off))]
fn align_up(value: usize, align: usize) -> Option<usize> {
    debug_assert!(align.is_power_of_two());
    let mask = align - 1;
    value.checked_add(mask).map(|v| v & !mask)
}

#[cfg(test)]
mod tests {
    use allocator_api2::alloc::Global;

    use super::*;
    use crate::internal::local_chunk::LocalChunk;

    fn empty_mutator() -> ChunkMutator<LocalChunk<Global>> {
        ChunkMutator::<LocalChunk<Global>>::empty()
    }

    // Covers try_reclaim_tail's chunk-is-None arm (line 314-316).
    #[test]
    fn try_reclaim_tail_on_empty_mutator_is_false() {
        let m = empty_mutator();
        assert!(!m.try_reclaim_tail(0, 0));
    }

    // Covers try_grow_in_place's chunk-is-None arm (line 332-334).
    #[test]
    fn try_grow_in_place_on_empty_mutator_is_false() {
        let m = empty_mutator();
        assert!(!m.try_grow_in_place(0, 0, 1));
    }

    // Covers try_grow_in_place's shrink/equal short-circuit (line 462-463)
    // and overflow checked_add arm (line 470-471): see the direct
    // `try_grow_in_place_non_growing_returns_true` /
    // `try_grow_in_place_new_len_overflow_returns_false` tests below,
    // which drive `ChunkMutator` directly (public Vec paths reject
    // these inputs before reaching `try_grow_in_place`).

    // --- Mutation-kill targets: dead-code-annotated helpers.
    //
    // `free_bytes` / `capacity` / `align_up` carry `#[allow(dead_code)]`
    // because no production call site is wired up yet, but they're
    // still mutated by cargo-mutants; exercising them here keeps
    // mutation testing honest.

    #[test]
    fn free_bytes_on_empty_mutator_is_zero() {
        let m = empty_mutator();
        assert_eq!(m.free_bytes(), 0);
    }

    #[test]
    fn capacity_and_free_bytes_match_chunk_layout() {
        // Build a real chunk via the arena, then construct a fresh
        // `ChunkMutator` over its payload to query layout. We reach the
        // mutator indirectly via the arena's preallocate path: a
        // `with_capacity_local`-built arena owns at least one chunk
        // with a known capacity.
        let arena = crate::Arena::builder().with_capacity_local(1024).build();
        // alloc one tiny value to be sure the chunk exists.
        let _ = arena.alloc(0_u32);
        // We can't reach the mutator from outside, so validate the
        // *behavior* via observable freed/capacity bytes: free_bytes
        // must be strictly less than capacity after at least one
        // allocation.
        // (The free_bytes/capacity helpers are tested directly above
        // for the empty-mutator path; this end-to-end check guards the
        // arithmetic via the real chunk-mutator state used by the
        // arena.)
        let v: &u32 = arena.alloc(42_u32);
        assert_eq!(*v, 42);
    }

    #[test]
    fn align_up_round_trips_powers_of_two() {
        // None case: overflow path is unreachable for any align <= 64K
        // and value <= usize::MAX - align + 1, but we still pin the
        // arithmetic.
        assert_eq!(align_up(0, 8), Some(0));
        assert_eq!(align_up(1, 8), Some(8));
        assert_eq!(align_up(7, 8), Some(8));
        assert_eq!(align_up(8, 8), Some(8));
        assert_eq!(align_up(9, 8), Some(16));
        assert_eq!(align_up(16, 16), Some(16));
        assert_eq!(align_up(17, 16), Some(32));
        // Powers-of-two non-aligned values.
        assert_eq!(align_up(33, 32), Some(64));
        assert_eq!(align_up(100, 64), Some(128));
        // align == 1 → identity.
        assert_eq!(align_up(0, 1), Some(0));
        assert_eq!(align_up(7, 1), Some(7));
        // checked_add overflow path: value + mask must overflow.
        assert_eq!(align_up(usize::MAX, 8), None);
    }

    // Kills `try_alloc_bytes`'s `end_addr > drop_top_addr` boundary
    // mutation (`> → >=`): an allocation that exactly fills the
    // remaining payload must succeed. With `>`, `end_addr == drop_top_addr`
    // is allowed; with `>=`, the same case is rejected.
    #[test]
    fn try_alloc_bytes_at_exact_remaining_capacity_succeeds() {
        let arena = crate::Arena::new();
        // Force the first refill so `current_local` carries a live chunk.
        let _ = arena.alloc(0_u8);
        let m = arena.current_local();
        let free = m.free_bytes();
        assert!(free > 0, "post-refill chunk must have remaining capacity");
        let result = m.try_alloc_bytes(free);
        assert!(result.is_some(), "try_alloc_bytes(free_bytes) must succeed at the exact boundary");
        // After consuming everything, the next byte must fail.
        assert!(m.try_alloc_bytes(1).is_none());
    }

    // Kills `try_grow_in_place`'s `new_bump_addr > drop_top_addr`
    // boundary mutation (`> → >=`): growing a prior allocation so its
    // new end lands exactly on `drop_top_addr` must succeed.
    #[test]
    fn try_grow_in_place_at_exact_remaining_capacity_succeeds() {
        let arena = crate::Arena::new();
        let _ = arena.alloc(0_u8);
        let m = arena.current_local();
        let free = m.free_bytes();
        assert!(free > 16, "need slack for an initial alloc plus grow");
        // Capture the bump cursor *before* the initial alloc so we know
        // `prev_addr` exactly.
        let prev_addr = m.bump.get().as_ptr() as usize;
        let initial = 16_usize;
        let _ = m.try_alloc_bytes(initial).expect("initial alloc");
        // Grow to exactly fill the remaining payload: new_len = free
        // ⇒ new_bump_addr = prev_addr + free = drop_top_addr.
        let new_len = free;
        assert!(
            m.try_grow_in_place(prev_addr, initial, new_len),
            "try_grow_in_place at exact remaining capacity must succeed",
        );
        // Bump should now sit at drop_top.
        assert_eq!(m.free_bytes(), 0);
    }

    // Directly exercises `try_grow_in_place`'s `new_len <= prev_len`
    // short-circuit (returns `true` without touching the bump cursor).
    // Public Vec paths can't reach this line because they reject
    // non-growing reservations before calling `try_grow_in_place`.
    #[test]
    fn try_grow_in_place_non_growing_returns_true() {
        let arena = crate::Arena::new();
        let _ = arena.alloc(0_u8);
        let m = arena.current_local();
        let bump_before = m.bump.get().as_ptr() as usize;
        // `prev_addr` is irrelevant here: the `new_len <= prev_len`
        // guard short-circuits before it is inspected.
        assert!(m.try_grow_in_place(0, 8, 8), "equal lengths must succeed");
        assert!(m.try_grow_in_place(0, 8, 4), "shrink must succeed");
        assert_eq!(
            m.bump.get().as_ptr() as usize,
            bump_before,
            "non-growing grow must not move the bump cursor",
        );
    }

    // Directly exercises `try_grow_in_place`'s `checked_add` overflow
    // arm (`prev_addr + new_len` wraps `usize` ⇒ returns `false`).
    // Public Vec paths reject `usize::MAX` capacities before reaching
    // this line.
    #[test]
    fn try_grow_in_place_new_len_overflow_returns_false() {
        let arena = crate::Arena::new();
        let _ = arena.alloc(0_u8);
        let m = arena.current_local();
        // Capture the cursor before the allocation so we know
        // `prev_addr` exactly (align-1 byte alloc lands here).
        let prev_addr = m.bump.get().as_ptr() as usize;
        let initial = 16_usize;
        let _ = m.try_alloc_bytes(initial).expect("initial alloc");
        // `new_len == usize::MAX` is > prev_len and `prev_addr + prev_len`
        // equals the current bump, so we reach the `checked_add(new_len)`
        // overflow guard, which must reject the grow.
        assert!(
            !m.try_grow_in_place(prev_addr, initial, usize::MAX),
            "overflowing new_len must fail",
        );
    }
}