Skip to main content

arena_alligator/
async_alloc.rs

1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, AtomicU64, Ordering as AtomicOrdering};
7use std::sync::{Arc, Mutex};
8use std::task::{Context, Poll};
9use std::time::Instant;
10
11use tokio::sync::Notify;
12use tokio::sync::futures::OwnedNotified;
13use tokio::sync::oneshot;
14
15use crate::buffer::Buffer;
16use crate::sync::atomic::{AtomicUsize, Ordering};
17use crate::{BuddyArena, FixedArena};
18
19// ---------------------------------------------------------------------------
20// Waiter traits
21// ---------------------------------------------------------------------------
22
23/// Wait strategy for fixed arena async allocation.
24///
25/// Fixed arenas have uniform slot sizes, so no order awareness is needed.
26pub trait Waiter: Send + Sync + 'static {
27    /// Future returned when a waiter registers interest in allocation progress.
28    type Registration: WaitRegistration;
29
30    /// Register a waiter before retrying allocation.
31    fn register(&self) -> Self::Registration;
32
33    /// Wake one waiter after a slot becomes available.
34    fn wake(&self);
35}
36
37/// Wait strategy for buddy arena async allocation.
38///
39/// Buddy arenas have per-order block sizes. Waiters register at the order
40/// matching their request, and wakes target the best candidate order via
41/// scoring.
42pub trait BuddyWaiter: Send + Sync + 'static {
43    /// Future returned when a waiter registers interest.
44    type Registration: WaitRegistration;
45
46    /// Register a waiter at the given allocation order.
47    fn register(&self, order: usize) -> Self::Registration;
48
49    /// Wake the best-scoring waiter after a block at `freed_order` becomes available.
50    fn wake(&self, freed_order: usize);
51}
52
53/// Registration returned by waiter traits.
54pub trait WaitRegistration: Future<Output = ()> {
55    /// Arm the registration before the post-registration allocation retry.
56    fn prepare(self: Pin<&mut Self>);
57
58    /// Revoke the registration when the retry succeeds immediately.
59    fn revoke(self: Pin<&mut Self>);
60}
61
62// ---------------------------------------------------------------------------
63// Wake handles (type-erased dispatch for arena inner structs)
64// ---------------------------------------------------------------------------
65
66pub(crate) trait WakeOne: Send + Sync {
67    fn wake(&self);
68}
69
70impl<W: Waiter> WakeOne for W {
71    fn wake(&self) {
72        Waiter::wake(self);
73    }
74}
75
76pub(crate) struct WakeHandle {
77    inner: Arc<dyn WakeOne>,
78}
79
80impl WakeHandle {
81    pub(crate) fn new<W: Waiter>(waiters: Arc<W>) -> Self {
82        let inner: Arc<dyn WakeOne> = waiters;
83        Self { inner }
84    }
85
86    pub(crate) fn wake(&self) {
87        self.inner.wake();
88    }
89}
90
91pub(crate) trait BuddyWakeOne: Send + Sync {
92    fn wake(&self, freed_order: usize);
93}
94
95impl<W: BuddyWaiter> BuddyWakeOne for W {
96    fn wake(&self, freed_order: usize) {
97        BuddyWaiter::wake(self, freed_order);
98    }
99}
100
101pub(crate) struct BuddyWakeHandle {
102    inner: Arc<dyn BuddyWakeOne>,
103}
104
105impl BuddyWakeHandle {
106    pub(crate) fn new<W: BuddyWaiter>(waiters: Arc<W>) -> Self {
107        let inner: Arc<dyn BuddyWakeOne> = waiters;
108        Self { inner }
109    }
110
111    pub(crate) fn wake(&self, freed_order: usize) {
112        self.inner.wake(freed_order);
113    }
114}
115
116// ---------------------------------------------------------------------------
117// WaiterEntry — CAS-arbitrated shared state between queue and registration
118// ---------------------------------------------------------------------------
119
120const LIVE: u8 = 0;
121const WOKEN: u8 = 1;
122const REVOKED: u8 = 2;
123
124struct WaiterEntry {
125    state: AtomicU8,
126    // SAFETY: accessed exactly once by whichever side wins the CAS on `state`.
127    // See the safety contract in .plan/2026-03-06-buddy-wake-starvation.md.
128    tx: UnsafeCell<Option<oneshot::Sender<usize>>>,
129    timestamp: u64,
130    #[allow(dead_code)]
131    order: usize,
132}
133
134impl WaiterEntry {
135    fn new(tx: oneshot::Sender<usize>, timestamp: u64, order: usize) -> Self {
136        Self {
137            state: AtomicU8::new(LIVE),
138            tx: UnsafeCell::new(Some(tx)),
139            timestamp,
140            order,
141        }
142    }
143
144    /// Take the oneshot sender out of this entry.
145    ///
146    /// # Safety
147    ///
148    /// Caller must have won the CAS on `state` (Live→Woken or Live→Revoked).
149    /// This guarantees exclusive access — no other code path will call this.
150    unsafe fn take_tx(&self) -> Option<oneshot::Sender<usize>> {
151        unsafe { (*self.tx.get()).take() }
152    }
153}
154
155// SAFETY: tx is only accessed by the CAS winner, so no data races.
156// The oneshot::Sender is Send, and AtomicU8/u64/usize are Sync.
157unsafe impl Send for WaiterEntry {}
158unsafe impl Sync for WaiterEntry {}
159
160// ---------------------------------------------------------------------------
161// Buddy wake internals — per-order state + scoring
162// ---------------------------------------------------------------------------
163
164/// Sentinel: no waiters at this order.
165const NO_WAITERS_TIMESTAMP: u64 = u64::MAX;
166
167/// Maximum stale entries to pop per wake call (bounds free-path work).
168const MAX_POPS_PER_WAKE: usize = 8;
169
170/// Maximum consecutive wins by one order before it's banned.
171const MAX_CONSECUTIVE: u32 = 10;
172
173/// Nanosecond bonus per order level for scoring (higher order = looks older).
174const ORDER_BONUS_NS: u64 = 50_000; // 50μs per order level
175
176/// Per-waiter depth bonus in nanoseconds.
177const DEPTH_BONUS_NS: u64 = 5_000; // 5μs per waiting peer
178
179struct BuddyOrderSlot {
180    queue: Mutex<std::collections::VecDeque<Arc<WaiterEntry>>>,
181    count: AtomicUsize,
182    head_timestamp: AtomicU64,
183}
184
185impl BuddyOrderSlot {
186    fn new() -> Self {
187        Self {
188            queue: Mutex::new(std::collections::VecDeque::new()),
189            count: AtomicUsize::new(0),
190            head_timestamp: AtomicU64::new(NO_WAITERS_TIMESTAMP),
191        }
192    }
193}
194
195// ---------------------------------------------------------------------------
196// NotifyWaiters — implements both Waiter (fixed) and BuddyWaiter (buddy)
197// ---------------------------------------------------------------------------
198
199struct FixedOrderSlot {
200    notify: Arc<Notify>,
201    count: AtomicUsize,
202}
203
204/// Per-order waiter system.
205///
206/// Fixed arenas use a single `Notify`-based FIFO (order 0 only).
207/// Buddy arenas use per-order mutex queues with CAS-arbitrated oneshot
208/// delivery and 4-factor scoring to prevent starvation.
209#[derive(Clone)]
210pub struct NotifyWaiters {
211    inner: Arc<NotifyWaitersInner>,
212}
213
214struct NotifyWaitersInner {
215    // Fixed arena path (always has exactly one slot for order 0)
216    fixed_slot: FixedOrderSlot,
217    // Buddy arena path (empty vec for fixed-only arenas)
218    buddy_orders: Box<[BuddyOrderSlot]>,
219    // Packed streak state: upper 32 bits = last_winner order, lower 32 = streak count
220    streak_state: AtomicU64,
221    // Monotonic clock epoch for timestamps
222    epoch: Instant,
223}
224
225impl NotifyWaiters {
226    /// Create a waiter set with the given number of buddy orders.
227    ///
228    /// Fixed arenas use `num_orders = 1` (only the fixed slot is used).
229    /// Buddy arenas use `num_orders = max_order + 1`.
230    pub fn new(num_orders: usize) -> Self {
231        assert!(num_orders > 0, "must have at least one order");
232        let buddy_orders: Vec<BuddyOrderSlot> =
233            (0..num_orders).map(|_| BuddyOrderSlot::new()).collect();
234        Self {
235            inner: Arc::new(NotifyWaitersInner {
236                fixed_slot: FixedOrderSlot {
237                    notify: Arc::new(Notify::new()),
238                    count: AtomicUsize::new(0),
239                },
240                buddy_orders: buddy_orders.into_boxed_slice(),
241                streak_state: AtomicU64::new(0),
242                epoch: Instant::now(),
243            }),
244        }
245    }
246
247    fn now_ns(&self) -> u64 {
248        self.inner.epoch.elapsed().as_nanos() as u64
249    }
250
251    // -- Buddy wake scoring --
252
253    fn score_orders(&self, freed_order: usize) -> Vec<usize> {
254        let max = freed_order.min(self.inner.buddy_orders.len() - 1);
255        let streak = self.inner.streak_state.load(AtomicOrdering::Relaxed);
256        let last_winner = (streak >> 32) as usize;
257        let streak_count = streak as u32;
258
259        // Collect all eligible orders first, then apply ban as post-filter.
260        // Building candidates before ban check avoids order-dependent evaluation
261        // where early-scanned orders see an empty candidate list.
262        let mut candidates: Vec<(usize, u64)> = Vec::new();
263
264        for order in 0..=max {
265            let slot = &self.inner.buddy_orders[order];
266            let count = slot.count.load(Ordering::Acquire);
267            if count == 0 {
268                continue;
269            }
270            let ts = slot.head_timestamp.load(AtomicOrdering::Acquire);
271            if ts == NO_WAITERS_TIMESTAMP {
272                continue;
273            }
274
275            let effective_age = ts
276                .saturating_sub((order as u64).saturating_mul(ORDER_BONUS_NS))
277                .saturating_sub((count as u64).saturating_mul(DEPTH_BONUS_NS));
278
279            candidates.push((order, effective_age));
280        }
281
282        // Streak ban: remove the streak winner if it has exceeded MAX_CONSECUTIVE
283        // and there is at least one other candidate to serve instead.
284        if streak_count >= MAX_CONSECUTIVE
285            && candidates.len() > 1
286            && candidates.iter().any(|(o, _)| *o == last_winner)
287        {
288            candidates.retain(|(o, _)| *o != last_winner);
289        }
290
291        // Sort by effective_age ascending (lowest = oldest = highest priority)
292        candidates.sort_by_key(|&(_, age)| age);
293        candidates.into_iter().map(|(order, _)| order).collect()
294    }
295
296    fn update_streak(&self, winner_order: usize) {
297        let current = self.inner.streak_state.load(AtomicOrdering::Relaxed);
298        let last_winner = (current >> 32) as usize;
299        let new = if winner_order == last_winner {
300            let streak = (current as u32).saturating_add(1);
301            ((winner_order as u64) << 32) | streak as u64
302        } else {
303            ((winner_order as u64) << 32) | 1u64
304        };
305        self.inner.streak_state.store(new, AtomicOrdering::Relaxed);
306    }
307
308    fn update_head_timestamp(
309        &self,
310        queue: &std::collections::VecDeque<Arc<WaiterEntry>>,
311        order: usize,
312    ) {
313        let slot = &self.inner.buddy_orders[order];
314        // Scan past tombstones (WOKEN/REVOKED) to find the first live entry.
315        // Without this, a tombstone at front would set NO_WAITERS_TIMESTAMP
316        // even when live waiters exist further back, making them invisible
317        // to scoring until an unrelated event repairs head state.
318        for entry in queue.iter() {
319            if entry.state.load(AtomicOrdering::Relaxed) == LIVE {
320                slot.head_timestamp
321                    .store(entry.timestamp, AtomicOrdering::Release);
322                return;
323            }
324        }
325        slot.head_timestamp
326            .store(NO_WAITERS_TIMESTAMP, AtomicOrdering::Release);
327    }
328
329    fn buddy_wake(&self, freed_order: usize) {
330        let candidates = self.score_orders(freed_order);
331        let mut pops: usize = 0;
332
333        // Deliver one wake per candidate order. A freed block at order N can
334        // serve waiters at multiple lower orders via splitting, so the wake
335        // across orders rather than stopping at the first delivery.
336        for order in candidates {
337            let mut queue = self.inner.buddy_orders[order].queue.lock().unwrap();
338            while let Some(entry) = queue.pop_front() {
339                pops += 1;
340                if pops > MAX_POPS_PER_WAKE {
341                    return;
342                }
343
344                if entry.state.load(AtomicOrdering::Relaxed) != LIVE {
345                    continue;
346                }
347
348                if entry
349                    .state
350                    .compare_exchange(LIVE, WOKEN, AtomicOrdering::AcqRel, AtomicOrdering::Relaxed)
351                    .is_ok()
352                {
353                    self.inner.buddy_orders[order]
354                        .count
355                        .fetch_sub(1, Ordering::Release);
356                    self.update_head_timestamp(&queue, order);
357                    drop(queue);
358
359                    // SAFETY: this path won the CAS Live→Woken
360                    let tx = unsafe { entry.take_tx() };
361                    if let Some(tx) = tx
362                        && tx.send(freed_order).is_ok()
363                    {
364                        self.update_streak(order);
365                    }
366                    // Move to next order (one delivery per order)
367                    break;
368                }
369            }
370        }
371    }
372
373    fn buddy_register(&self, order: usize) -> BuddyRegistration {
374        let order = order.min(self.inner.buddy_orders.len() - 1);
375        let (tx, rx) = oneshot::channel();
376        let timestamp = self.now_ns();
377        let entry = Arc::new(WaiterEntry::new(tx, timestamp, order));
378
379        BuddyRegistration {
380            entry: Some(Arc::clone(&entry)),
381            rx: Some(rx),
382            waiters: Arc::clone(&self.inner),
383            order,
384            registered: false,
385            pending_entry: Some(entry),
386        }
387    }
388}
389
390impl Waiter for NotifyWaiters {
391    type Registration = NotifyRegistration;
392
393    fn register(&self) -> NotifyRegistration {
394        NotifyRegistration {
395            future: self.inner.fixed_slot.notify.clone().notified_owned(),
396            inner: Arc::clone(&self.inner),
397            registered: false,
398            woken: false,
399        }
400    }
401
402    fn wake(&self) {
403        if self.inner.fixed_slot.count.load(Ordering::Acquire) > 0 {
404            self.inner.fixed_slot.notify.notify_one();
405        }
406    }
407}
408
409impl BuddyWaiter for NotifyWaiters {
410    type Registration = BuddyRegistration;
411
412    fn register(&self, order: usize) -> BuddyRegistration {
413        self.buddy_register(order)
414    }
415
416    fn wake(&self, freed_order: usize) {
417        self.buddy_wake(freed_order);
418    }
419}
420
421// ---------------------------------------------------------------------------
422// NotifyRegistration — fixed arena path (unchanged Notify-based FIFO)
423// ---------------------------------------------------------------------------
424
425/// Registration future for fixed arena [`NotifyWaiters`].
426pub struct NotifyRegistration {
427    future: OwnedNotified,
428    inner: Arc<NotifyWaitersInner>,
429    registered: bool,
430    woken: bool,
431}
432
433impl WaitRegistration for NotifyRegistration {
434    fn prepare(self: Pin<&mut Self>) {
435        let this = unsafe { self.get_unchecked_mut() };
436        let future = unsafe { Pin::new_unchecked(&mut this.future) };
437        let _ = future.enable();
438        if !this.registered {
439            this.inner.fixed_slot.count.fetch_add(1, Ordering::Release);
440            this.registered = true;
441        }
442    }
443
444    fn revoke(self: Pin<&mut Self>) {
445        let this = unsafe { self.get_unchecked_mut() };
446        if this.registered {
447            this.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
448            this.registered = false;
449        }
450        this.woken = false;
451    }
452}
453
454impl Future for NotifyRegistration {
455    type Output = ();
456
457    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
458        let this = unsafe { self.get_unchecked_mut() };
459        let poll = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx);
460        if poll.is_ready() {
461            if this.registered {
462                this.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
463                this.registered = false;
464            }
465            this.woken = true;
466        }
467        poll
468    }
469}
470
471impl Drop for NotifyRegistration {
472    fn drop(&mut self) {
473        if self.registered {
474            self.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
475        }
476        // If a Notify permit was consumed (poll returned Ready) but drop ran
477        // before the allocation loop could retry, propagate the wake so the
478        // next waiter isn't stalled. The OwnedNotified won't propagate on its
479        // own because it was already polled to completion.
480        // Only propagate when another waiter exists — otherwise the stored
481        // permit creates a hot retry loop under full-arena conditions.
482        if self.woken && self.inner.fixed_slot.count.load(Ordering::Acquire) > 0 {
483            self.inner.fixed_slot.notify.notify_one();
484        }
485    }
486}
487
488// ---------------------------------------------------------------------------
489// BuddyRegistration — buddy arena path (CAS + oneshot)
490// ---------------------------------------------------------------------------
491
492/// Registration future for buddy arena [`NotifyWaiters`].
493pub struct BuddyRegistration {
494    entry: Option<Arc<WaiterEntry>>,
495    rx: Option<oneshot::Receiver<usize>>,
496    waiters: Arc<NotifyWaitersInner>,
497    order: usize,
498    registered: bool,
499    // Entry waiting to be pushed into queue on prepare()
500    pending_entry: Option<Arc<WaiterEntry>>,
501}
502
503impl WaitRegistration for BuddyRegistration {
504    fn prepare(self: Pin<&mut Self>) {
505        let this = unsafe { self.get_unchecked_mut() };
506        if !this.registered
507            && let Some(entry) = this.pending_entry.take()
508        {
509            let slot = &this.waiters.buddy_orders[this.order];
510            let mut queue = slot.queue.lock().unwrap();
511            queue.push_back(entry);
512            let prev = slot.count.fetch_add(1, Ordering::Release);
513            if prev == 0
514                && let Some(e) = &this.entry
515            {
516                slot.head_timestamp
517                    .store(e.timestamp, AtomicOrdering::Release);
518            }
519            this.registered = true;
520        }
521    }
522
523    fn revoke(self: Pin<&mut Self>) {
524        let this = unsafe { self.get_unchecked_mut() };
525        if this.registered {
526            if let Some(entry) = &this.entry
527                && entry
528                    .state
529                    .compare_exchange(
530                        LIVE,
531                        REVOKED,
532                        AtomicOrdering::AcqRel,
533                        AtomicOrdering::Relaxed,
534                    )
535                    .is_ok()
536            {
537                // SAFETY: this path won the CAS Live→Revoked
538                let _tx = unsafe { entry.take_tx() };
539                this.waiters.buddy_orders[this.order]
540                    .count
541                    .fetch_sub(1, Ordering::Release);
542            }
543            this.registered = false;
544        }
545    }
546}
547
548impl Future for BuddyRegistration {
549    type Output = ();
550
551    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552        let this = unsafe { self.get_unchecked_mut() };
553        if let Some(rx) = &mut this.rx {
554            match Pin::new(rx).poll(cx) {
555                Poll::Ready(_) => {
556                    this.rx = None;
557                    this.registered = false;
558                    Poll::Ready(())
559                }
560                Poll::Pending => Poll::Pending,
561            }
562        } else {
563            Poll::Ready(())
564        }
565    }
566}
567
568impl Drop for BuddyRegistration {
569    fn drop(&mut self) {
570        if self.registered
571            && let Some(entry) = &self.entry
572            && entry
573                .state
574                .compare_exchange(
575                    LIVE,
576                    REVOKED,
577                    AtomicOrdering::AcqRel,
578                    AtomicOrdering::Relaxed,
579                )
580                .is_ok()
581        {
582            // SAFETY: this path won the CAS Live→Revoked
583            let _tx = unsafe { entry.take_tx() };
584            self.waiters.buddy_orders[self.order]
585                .count
586                .fetch_sub(1, Ordering::Release);
587        }
588    }
589}
590
591// ---------------------------------------------------------------------------
592// Allocation loops
593// ---------------------------------------------------------------------------
594
595async fn allocate_with_waiter<W, T, F>(waiters: &W, mut try_allocate: F) -> T
596where
597    W: Waiter,
598    F: FnMut() -> Option<T>,
599{
600    loop {
601        let registration = waiters.register();
602        tokio::pin!(registration);
603        registration.as_mut().prepare();
604        if let Some(value) = try_allocate() {
605            registration.as_mut().revoke();
606            return value;
607        }
608        registration.await;
609    }
610}
611
612async fn allocate_with_buddy_waiter<W, T, F>(waiters: &W, order: usize, mut try_allocate: F) -> T
613where
614    W: BuddyWaiter,
615    F: FnMut() -> Option<T>,
616{
617    loop {
618        let registration = waiters.register(order);
619        tokio::pin!(registration);
620        registration.as_mut().prepare();
621        if let Some(value) = try_allocate() {
622            registration.as_mut().revoke();
623            return value;
624        }
625        registration.await;
626    }
627}
628
629// ---------------------------------------------------------------------------
630// AsyncFixedArena
631// ---------------------------------------------------------------------------
632
633/// Async-capable wrapper around [`FixedArena`].
634///
635/// Created via
636/// [`FixedArenaBuilder::build_async()`](crate::FixedArenaBuilder::build_async).
637/// [`allocate_async()`](AsyncFixedArena::allocate_async) parks until a slot
638/// becomes available. Sync methods remain accessible through
639/// `Deref<Target = FixedArena>`.
640#[derive(Clone)]
641pub struct AsyncFixedArena<W = NotifyWaiters> {
642    inner: FixedArena,
643    waiters: Arc<W>,
644}
645
646impl<W> AsyncFixedArena<W> {
647    pub(crate) fn new(inner: FixedArena, waiters: Arc<W>) -> Self {
648        Self { inner, waiters }
649    }
650}
651
652impl<W: Waiter> AsyncFixedArena<W> {
653    /// Allocate a buffer, waiting asynchronously if the arena is full.
654    ///
655    /// Returns once a slot becomes available. The bitmap is the source
656    /// of truth; notifications are hints to retry.
657    pub async fn allocate_async(&self) -> Buffer {
658        allocate_with_waiter(self.waiters.as_ref(), || self.inner.allocate().ok()).await
659    }
660}
661
662impl<W> Deref for AsyncFixedArena<W> {
663    type Target = FixedArena;
664
665    fn deref(&self) -> &Self::Target {
666        &self.inner
667    }
668}
669
670impl<W> fmt::Debug for AsyncFixedArena<W> {
671    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672        f.debug_struct("AsyncFixedArena")
673            .field("inner", &self.inner)
674            .finish()
675    }
676}
677
678// ---------------------------------------------------------------------------
679// AsyncBuddyArena
680// ---------------------------------------------------------------------------
681
682/// Async-capable wrapper around [`BuddyArena`].
683///
684/// Created via
685/// [`BuddyArenaBuilder::build_async()`](crate::BuddyArenaBuilder::build_async).
686/// [`allocate_async()`](AsyncBuddyArena::allocate_async) parks until a
687/// large-enough block becomes available. Sync methods remain accessible
688/// through `Deref<Target = BuddyArena>`.
689#[derive(Clone)]
690pub struct AsyncBuddyArena<W = NotifyWaiters> {
691    inner: BuddyArena,
692    waiters: Arc<W>,
693}
694
695impl<W> AsyncBuddyArena<W> {
696    pub(crate) fn new(inner: BuddyArena, waiters: Arc<W>) -> Self {
697        Self { inner, waiters }
698    }
699}
700
701impl<W: BuddyWaiter> AsyncBuddyArena<W> {
702    /// Allocate a buffer, waiting asynchronously if the arena is full.
703    ///
704    /// The buddy bitmaps remain the source of truth; notifications are hints
705    /// to retry after free or coalesce publishes a usable block.
706    pub async fn allocate_async(&self, len: std::num::NonZeroUsize) -> Buffer {
707        let order = self
708            .order_for_request(len.get())
709            .unwrap_or(self.max_order());
710        allocate_with_buddy_waiter(self.waiters.as_ref(), order, || {
711            self.inner.allocate(len).ok()
712        })
713        .await
714    }
715}
716
717impl<W> Deref for AsyncBuddyArena<W> {
718    type Target = BuddyArena;
719
720    fn deref(&self) -> &Self::Target {
721        &self.inner
722    }
723}
724
725impl<W> fmt::Debug for AsyncBuddyArena<W> {
726    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
727        f.debug_struct("AsyncBuddyArena")
728            .field("inner", &self.inner)
729            .finish()
730    }
731}
732
733// ---------------------------------------------------------------------------
734// Tests
735// ---------------------------------------------------------------------------
736
737#[cfg(test)]
738mod tests {
739    use std::num::NonZeroUsize;
740    use std::sync::Arc;
741    use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
742
743    use bytes::BufMut;
744    use tokio::time::{Duration, timeout};
745
746    use crate::BuddyArena;
747    use crate::BuddyGeometry;
748    use crate::FixedArena;
749
750    use super::*;
751
752    fn nz(n: usize) -> NonZeroUsize {
753        NonZeroUsize::new(n).unwrap()
754    }
755
756    // -- CountingWaiters: implements Waiter (fixed) and BuddyWaiter (buddy) --
757
758    #[derive(Clone)]
759    struct CountingWaiters {
760        inner: NotifyWaiters,
761        registrations: Arc<AtomicUsize>,
762        wakes: Arc<AtomicUsize>,
763    }
764
765    impl CountingWaiters {
766        fn new(num_orders: usize) -> Self {
767            Self {
768                inner: NotifyWaiters::new(num_orders),
769                registrations: Arc::new(AtomicUsize::new(0)),
770                wakes: Arc::new(AtomicUsize::new(0)),
771            }
772        }
773
774        fn registrations(&self) -> usize {
775            self.registrations.load(AtomicOrdering::Relaxed)
776        }
777
778        fn wakes(&self) -> usize {
779            self.wakes.load(AtomicOrdering::Relaxed)
780        }
781    }
782
783    // Wrapper to project through to the correct inner registration type
784    struct CountingFixedRegistration {
785        inner: NotifyRegistration,
786    }
787
788    struct CountingBuddyRegistration {
789        inner: BuddyRegistration,
790    }
791
792    impl Waiter for CountingWaiters {
793        type Registration = CountingFixedRegistration;
794
795        fn register(&self) -> Self::Registration {
796            self.registrations.fetch_add(1, AtomicOrdering::Relaxed);
797            CountingFixedRegistration {
798                inner: Waiter::register(&self.inner),
799            }
800        }
801
802        fn wake(&self) {
803            self.wakes.fetch_add(1, AtomicOrdering::Relaxed);
804            Waiter::wake(&self.inner);
805        }
806    }
807
808    impl BuddyWaiter for CountingWaiters {
809        type Registration = CountingBuddyRegistration;
810
811        fn register(&self, order: usize) -> Self::Registration {
812            self.registrations.fetch_add(1, AtomicOrdering::Relaxed);
813            CountingBuddyRegistration {
814                inner: BuddyWaiter::register(&self.inner, order),
815            }
816        }
817
818        fn wake(&self, freed_order: usize) {
819            self.wakes.fetch_add(1, AtomicOrdering::Relaxed);
820            BuddyWaiter::wake(&self.inner, freed_order);
821        }
822    }
823
824    impl WaitRegistration for CountingFixedRegistration {
825        fn prepare(self: Pin<&mut Self>) {
826            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.prepare();
827        }
828
829        fn revoke(self: Pin<&mut Self>) {
830            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.revoke();
831        }
832    }
833
834    impl Future for CountingFixedRegistration {
835        type Output = ();
836
837        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
838            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
839        }
840    }
841
842    impl WaitRegistration for CountingBuddyRegistration {
843        fn prepare(self: Pin<&mut Self>) {
844            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.prepare();
845        }
846
847        fn revoke(self: Pin<&mut Self>) {
848            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.revoke();
849        }
850    }
851
852    impl Future for CountingBuddyRegistration {
853        type Output = ();
854
855        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
856            unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
857        }
858    }
859
860    // -- Fixed arena tests --
861
862    #[tokio::test]
863    async fn allocate_async_basic() {
864        let arena = FixedArena::with_slot_capacity(nz(1), nz(32))
865            .build_async()
866            .unwrap();
867        let mut buf = arena.allocate_async().await;
868        buf.put_slice(b"data");
869        let bytes = buf.freeze();
870        drop(bytes);
871        let _buf2 = arena.allocate_async().await;
872    }
873
874    #[tokio::test]
875    async fn allocate_async_waits_then_succeeds() {
876        let arena = Arc::new(
877            FixedArena::with_slot_capacity(nz(1), nz(32))
878                .build_async()
879                .unwrap(),
880        );
881        let mut buf = arena.allocate_async().await;
882        buf.put_slice(b"blocking");
883        let bytes = buf.freeze();
884        let arena2 = Arc::clone(&arena);
885        let handle = tokio::spawn(async move {
886            let buf = arena2.allocate_async().await;
887            buf.capacity()
888        });
889        tokio::time::sleep(Duration::from_millis(50)).await;
890        drop(bytes);
891        let cap = timeout(Duration::from_secs(2), handle)
892            .await
893            .expect("should not timeout")
894            .expect("task should not panic");
895        assert_eq!(cap, 32);
896    }
897
898    #[tokio::test]
899    async fn sync_allocate_still_fast_fails() {
900        let arena = FixedArena::with_slot_capacity(nz(1), nz(32))
901            .build_async()
902            .unwrap();
903        let _buf = arena.allocate().unwrap();
904        let err = arena.allocate().unwrap_err();
905        assert_eq!(err, crate::AllocError::ArenaFull);
906    }
907
908    #[tokio::test]
909    async fn multiple_waiters_all_served() {
910        let arena = Arc::new(
911            FixedArena::with_slot_capacity(nz(2), nz(32))
912                .build_async()
913                .unwrap(),
914        );
915        let buf1 = arena.allocate().unwrap();
916        let buf2 = arena.allocate().unwrap();
917        let a1 = Arc::clone(&arena);
918        let h1 = tokio::spawn(async move { a1.allocate_async().await.capacity() });
919        let a2 = Arc::clone(&arena);
920        let h2 = tokio::spawn(async move { a2.allocate_async().await.capacity() });
921        tokio::time::sleep(Duration::from_millis(50)).await;
922        drop(buf1);
923        drop(buf2);
924        let (r1, r2) = tokio::join!(
925            timeout(Duration::from_secs(2), h1),
926            timeout(Duration::from_secs(2), h2),
927        );
928        assert_eq!(r1.unwrap().unwrap(), 32);
929        assert_eq!(r2.unwrap().unwrap(), 32);
930    }
931
932    #[tokio::test]
933    async fn deref_exposes_sync_methods() {
934        let arena = FixedArena::with_slot_capacity(nz(4), nz(64))
935            .build_async()
936            .unwrap();
937        assert_eq!(arena.slot_count(), 4);
938        assert_eq!(arena.slot_capacity(), 64);
939    }
940
941    #[tokio::test]
942    async fn fixed_cancellation_no_leak() {
943        let arena = Arc::new(
944            FixedArena::with_slot_capacity(nz(1), nz(32))
945                .build_async()
946                .unwrap(),
947        );
948        let buf = arena.allocate().unwrap();
949
950        let arena2 = Arc::clone(&arena);
951        let handle = tokio::spawn(async move { arena2.allocate_async().await });
952        tokio::time::sleep(Duration::from_millis(50)).await;
953        handle.abort();
954        let _ = handle.await;
955
956        drop(buf);
957        let _buf2 = arena.allocate().unwrap();
958    }
959
960    /// A woken registration that is dropped without allocating must propagate
961    /// the wake to the next waiter. Without this, the Notify permit is consumed
962    /// and the second waiter stalls forever.
963    #[tokio::test(flavor = "current_thread")]
964    async fn fixed_woken_drop_propagates_to_next_waiter() {
965        let waiters = Arc::new(NotifyWaiters::new(1));
966
967        let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
968
969        // Task A: registers, gets woken, then drops registration without
970        // allocating — simulates a cancelled task that consumed the permit.
971        let w = Arc::clone(&waiters);
972        let h_a = tokio::spawn(async move {
973            let reg = Waiter::register(&*w);
974            tokio::pin!(reg);
975            reg.as_mut().prepare();
976            ready_tx.send(()).ok();
977            reg.await;
978            // Drop reg without doing anything — permit consumed, no allocation
979        });
980
981        // Wait for task A to register
982        ready_rx.await.ok();
983
984        // Task B: registers after A
985        let w2 = Arc::clone(&waiters);
986        let h_b = tokio::spawn(async move {
987            let reg = Waiter::register(&*w2);
988            tokio::pin!(reg);
989            reg.as_mut().prepare();
990            reg.await;
991        });
992
993        // Let task B register and start awaiting
994        tokio::task::yield_now().await;
995
996        // One wake — task A gets the permit (first in Notify queue)
997        Waiter::wake(&*waiters);
998
999        // Let task A run to completion (consumes permit, drops registration)
1000        let _ = h_a.await;
1001
1002        // Task B must complete — propagation from A's drop must re-notify
1003        timeout(Duration::from_secs(2), h_b)
1004            .await
1005            .expect("task B must not stall when A drops after wake")
1006            .expect("task B should not panic");
1007    }
1008
1009    /// When the woken waiter is the last waiter (count == 0), dropping it
1010    /// must NOT store a stale permit. A stale permit would cause the next
1011    /// registrant to immediately wake, fail allocation, drop, re-notify —
1012    /// creating a hot retry loop.
1013    #[tokio::test(flavor = "current_thread")]
1014    async fn fixed_last_waiter_woken_drop_no_stale_permit() {
1015        let waiters = Arc::new(NotifyWaiters::new(1));
1016
1017        // Single waiter — no peers
1018        let w = Arc::clone(&waiters);
1019        let h = tokio::spawn(async move {
1020            let reg = Waiter::register(&*w);
1021            tokio::pin!(reg);
1022            reg.as_mut().prepare();
1023            reg.await;
1024        });
1025
1026        tokio::task::yield_now().await;
1027
1028        // Wake the sole waiter
1029        Waiter::wake(&*waiters);
1030        let _ = h.await;
1031
1032        // No waiters remain. If a stale permit was stored, the next
1033        // registration would resolve immediately (spurious wake).
1034        let w2 = Arc::clone(&waiters);
1035        let h2 = tokio::spawn(async move {
1036            let reg = Waiter::register(&*w2);
1037            tokio::pin!(reg);
1038            reg.as_mut().prepare();
1039            // This must NOT resolve immediately — no real wake happened
1040            reg.await;
1041        });
1042
1043        tokio::task::yield_now().await;
1044
1045        // h2 should still be pending (no stale permit)
1046        assert!(!h2.is_finished(), "stale permit caused spurious wake");
1047
1048        // Clean up: wake h2 so it completes
1049        Waiter::wake(&*waiters);
1050        timeout(Duration::from_secs(1), h2)
1051            .await
1052            .expect("cleanup wake should work")
1053            .expect("no panic");
1054    }
1055
1056    #[tokio::test]
1057    async fn fixed_custom_waiter_supported() {
1058        let waiters = CountingWaiters::new(1);
1059        let arena = Arc::new(
1060            FixedArena::with_slot_capacity(nz(1), nz(32))
1061                .build_async_with(waiters.clone())
1062                .unwrap(),
1063        );
1064        let buf = arena.allocate().unwrap();
1065
1066        let arena2 = Arc::clone(&arena);
1067        let handle = tokio::spawn(async move { arena2.allocate_async().await.capacity() });
1068        tokio::time::sleep(Duration::from_millis(50)).await;
1069        drop(buf);
1070
1071        let cap = timeout(Duration::from_secs(2), handle)
1072            .await
1073            .expect("should not timeout")
1074            .expect("task should not panic");
1075        assert_eq!(cap, 32);
1076        assert!(waiters.registrations() >= 1);
1077        assert!(waiters.wakes() >= 1);
1078    }
1079
1080    // -- Buddy arena tests --
1081
1082    #[tokio::test]
1083    async fn buddy_allocate_async_waits_then_succeeds() {
1084        let arena = Arc::new(
1085            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1086                .build_async()
1087                .unwrap(),
1088        );
1089        let buf = arena.allocate(nz(2048)).unwrap();
1090
1091        let arena2 = Arc::clone(&arena);
1092        let handle = tokio::spawn(async move {
1093            let buf = arena2.allocate_async(nz(2048)).await;
1094            buf.capacity()
1095        });
1096
1097        tokio::time::sleep(Duration::from_millis(50)).await;
1098        drop(buf);
1099
1100        let cap = timeout(Duration::from_secs(2), handle)
1101            .await
1102            .expect("should not timeout")
1103            .expect("task should not panic");
1104        assert_eq!(cap, 2048);
1105    }
1106
1107    #[tokio::test]
1108    async fn buddy_multiple_waiters_all_served() {
1109        let arena = Arc::new(
1110            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1111                .build_async()
1112                .unwrap(),
1113        );
1114        let buf1 = arena.allocate(nz(2048)).unwrap();
1115        let buf2 = arena.allocate(nz(2048)).unwrap();
1116
1117        let a1 = Arc::clone(&arena);
1118        let h1 = tokio::spawn(async move { a1.allocate_async(nz(2048)).await.capacity() });
1119        let a2 = Arc::clone(&arena);
1120        let h2 = tokio::spawn(async move { a2.allocate_async(nz(2048)).await.capacity() });
1121
1122        tokio::time::sleep(Duration::from_millis(50)).await;
1123        drop(buf1);
1124        drop(buf2);
1125
1126        let (r1, r2) = tokio::join!(
1127            timeout(Duration::from_secs(2), h1),
1128            timeout(Duration::from_secs(2), h2),
1129        );
1130        assert_eq!(r1.unwrap().unwrap(), 2048);
1131        assert_eq!(r2.unwrap().unwrap(), 2048);
1132    }
1133
1134    /// Per-order wake prevents the starvation scenario where a large waiter
1135    /// is starved because a small waiter steals the coalesced block.
1136    #[tokio::test]
1137    async fn buddy_large_waiter_not_starved_by_small() {
1138        let arena = Arc::new(
1139            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1140                .build_async()
1141                .unwrap(),
1142        );
1143        let buf1 = arena.allocate(nz(2048)).unwrap();
1144        let buf2 = arena.allocate(nz(2048)).unwrap();
1145
1146        let (small_tx, small_rx) = tokio::sync::oneshot::channel::<()>();
1147
1148        let arena_large = Arc::clone(&arena);
1149        let large =
1150            tokio::spawn(async move { arena_large.allocate_async(nz(4096)).await.capacity() });
1151        tokio::task::yield_now().await;
1152
1153        let arena_small = Arc::clone(&arena);
1154        let small = tokio::spawn(async move {
1155            let buf = arena_small.allocate_async(nz(512)).await;
1156            let cap = buf.capacity();
1157            small_rx.await.ok();
1158            drop(buf);
1159            cap
1160        });
1161        tokio::task::yield_now().await;
1162
1163        drop(buf1);
1164        tokio::task::yield_now().await;
1165
1166        drop(buf2);
1167        tokio::task::yield_now().await;
1168
1169        small_tx.send(()).ok();
1170
1171        let large_cap = timeout(Duration::from_secs(2), large)
1172            .await
1173            .expect("large waiter should not starve")
1174            .expect("task should not panic");
1175        assert_eq!(large_cap, 4096);
1176
1177        let small_cap = timeout(Duration::from_secs(2), small)
1178            .await
1179            .expect("small waiter should complete")
1180            .expect("task should not panic");
1181        assert_eq!(small_cap, 512);
1182    }
1183
1184    #[tokio::test]
1185    async fn buddy_large_request_unblocks_after_coalesce() {
1186        let arena = Arc::new(
1187            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1188                .build_async()
1189                .unwrap(),
1190        );
1191        let buf1 = arena.allocate(nz(2048)).unwrap();
1192        let buf2 = arena.allocate(nz(2048)).unwrap();
1193
1194        let arena2 = Arc::clone(&arena);
1195        let handle = tokio::spawn(async move {
1196            let buf = arena2.allocate_async(nz(4096)).await;
1197            buf.capacity()
1198        });
1199
1200        tokio::time::sleep(Duration::from_millis(50)).await;
1201        drop(buf1);
1202        tokio::time::sleep(Duration::from_millis(25)).await;
1203        assert!(!handle.is_finished());
1204        drop(buf2);
1205
1206        let cap = timeout(Duration::from_secs(2), handle)
1207            .await
1208            .expect("should not timeout")
1209            .expect("task should not panic");
1210        assert_eq!(cap, 4096);
1211    }
1212
1213    #[tokio::test]
1214    async fn buddy_cancellation_does_not_leak() {
1215        let arena = Arc::new(
1216            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1217                .build_async()
1218                .unwrap(),
1219        );
1220        let buf = arena.allocate(nz(4096)).unwrap();
1221
1222        let arena2 = Arc::clone(&arena);
1223        let handle = tokio::spawn(async move { arena2.allocate_async(nz(512)).await });
1224        tokio::time::sleep(Duration::from_millis(50)).await;
1225        handle.abort();
1226        let _ = handle.await;
1227
1228        drop(buf);
1229        let _buf2 = arena.allocate(nz(4096)).unwrap();
1230    }
1231
1232    #[tokio::test]
1233    async fn buddy_custom_waiter_supported() {
1234        let waiters = CountingWaiters::new(4);
1235        let arena = Arc::new(
1236            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1237                .build_async_with(waiters.clone())
1238                .unwrap(),
1239        );
1240        let buf = arena.allocate(nz(2048)).unwrap();
1241
1242        let arena2 = Arc::clone(&arena);
1243        let handle = tokio::spawn(async move { arena2.allocate_async(nz(2048)).await.capacity() });
1244        tokio::time::sleep(Duration::from_millis(50)).await;
1245        drop(buf);
1246
1247        let cap = timeout(Duration::from_secs(2), handle)
1248            .await
1249            .expect("should not timeout")
1250            .expect("task should not panic");
1251        assert_eq!(cap, 2048);
1252        assert!(waiters.registrations() >= 1);
1253        assert!(waiters.wakes() >= 1);
1254    }
1255
1256    /// Multiple waiters at different orders are all served from one freed block
1257    /// via buddy splitting.
1258    #[tokio::test]
1259    async fn buddy_multi_order_waiters_served_via_split() {
1260        let arena = Arc::new(
1261            BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1262                .build_async()
1263                .unwrap(),
1264        );
1265        let buf = arena.allocate(nz(4096)).unwrap();
1266
1267        let a1 = Arc::clone(&arena);
1268        let h1 = tokio::spawn(async move { a1.allocate_async(nz(2048)).await.capacity() });
1269
1270        let a2 = Arc::clone(&arena);
1271        let h2 = tokio::spawn(async move { a2.allocate_async(nz(512)).await.capacity() });
1272
1273        tokio::task::yield_now().await;
1274
1275        drop(buf);
1276
1277        let (r1, r2) = tokio::join!(
1278            timeout(Duration::from_secs(2), h1),
1279            timeout(Duration::from_secs(2), h2),
1280        );
1281        assert_eq!(r1.unwrap().unwrap(), 2048);
1282        assert_eq!(r2.unwrap().unwrap(), 512);
1283    }
1284
1285    // -- Race-heavy cancellation/wake interleaving tests --
1286
1287    /// Many concurrent cancellations interleaved with wakes must not corrupt
1288    /// the count invariant or double-take tx.
1289    #[tokio::test]
1290    async fn buddy_cancel_wake_interleaving_count_invariant() {
1291        let arena = Arc::new(
1292            BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1293                .build_async()
1294                .unwrap(),
1295        );
1296
1297        for _ in 0..20 {
1298            // Allocate individual blocks so each free wakes one waiter
1299            let mut bufs = Vec::new();
1300            while let Ok(buf) = arena.allocate(nz(512)) {
1301                bufs.push(buf);
1302            }
1303
1304            let waiter_count = 4;
1305            let cancel_count = 2;
1306            let mut handles = Vec::new();
1307            for _ in 0..waiter_count {
1308                let a = Arc::clone(&arena);
1309                handles.push(tokio::spawn(async move { a.allocate_async(nz(512)).await }));
1310            }
1311            tokio::task::yield_now().await;
1312
1313            // Cancel some waiters
1314            for h in handles.drain(..cancel_count) {
1315                h.abort();
1316                let _ = h.await;
1317            }
1318            tokio::task::yield_now().await;
1319
1320            // Free enough blocks for remaining waiters (one per waiter)
1321            let remaining = waiter_count - cancel_count;
1322            for buf in bufs.drain(..remaining) {
1323                drop(buf);
1324                tokio::task::yield_now().await;
1325            }
1326
1327            for h in handles {
1328                let buf = timeout(Duration::from_secs(2), h)
1329                    .await
1330                    .expect("waiter should complete")
1331                    .expect("task should not panic");
1332                drop(buf);
1333            }
1334
1335            // Free remaining blocks
1336            drop(bufs);
1337        }
1338    }
1339
1340    /// Teardown while waiters are live must not panic or leak.
1341    #[tokio::test]
1342    async fn buddy_teardown_with_live_waiters() {
1343        for _ in 0..20 {
1344            let arena = Arc::new(
1345                BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1346                    .build_async()
1347                    .unwrap(),
1348            );
1349            let _buf = arena.allocate(nz(4096)).unwrap();
1350
1351            let mut handles = Vec::new();
1352            for _ in 0..4 {
1353                let a = Arc::clone(&arena);
1354                handles.push(tokio::spawn(async move { a.allocate_async(nz(512)).await }));
1355            }
1356            tokio::task::yield_now().await;
1357
1358            // Drop arena while waiters are still registered — they hold Arc clones
1359            // so no UB, but the waiters will never complete
1360            drop(arena);
1361            drop(_buf);
1362
1363            // Abort all waiters — this exercises the Drop path with Live state
1364            for h in handles {
1365                h.abort();
1366                let _ = h.await;
1367            }
1368        }
1369    }
1370
1371    // -- Fairness regression tests --
1372
1373    /// Under repeated small frees, a large waiter must eventually be served
1374    /// (not starved indefinitely). This tests the scoring system's time-based
1375    /// priority escalation.
1376    #[tokio::test]
1377    async fn buddy_fairness_large_not_starved_by_repeated_small() {
1378        let arena = Arc::new(
1379            BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1380                .build_async()
1381                .unwrap(),
1382        );
1383
1384        // Fill the arena with small blocks
1385        let mut bufs = Vec::new();
1386        while let Ok(buf) = arena.allocate(nz(512)) {
1387            bufs.push(buf);
1388        }
1389
1390        // Large waiter wants 4096
1391        let arena_large = Arc::clone(&arena);
1392        let large_handle =
1393            tokio::spawn(async move { arena_large.allocate_async(nz(4096)).await.capacity() });
1394        tokio::task::yield_now().await;
1395
1396        // Small waiter wants 512
1397        let arena_small = Arc::clone(&arena);
1398        let (small_done_tx, small_done_rx) = tokio::sync::oneshot::channel::<()>();
1399        let small_handle = tokio::spawn(async move {
1400            let buf = arena_small.allocate_async(nz(512)).await;
1401            let cap = buf.capacity();
1402            // Hold until signaled
1403            small_done_rx.await.ok();
1404            drop(buf);
1405            cap
1406        });
1407        tokio::task::yield_now().await;
1408
1409        // Free blocks one at a time — small should get served first (lower order),
1410        // but eventually enough blocks free to coalesce for large
1411        for buf in bufs.drain(..) {
1412            drop(buf);
1413            tokio::task::yield_now().await;
1414        }
1415
1416        // Signal small to release its buffer
1417        small_done_tx.send(()).ok();
1418
1419        let large_cap = timeout(Duration::from_secs(5), large_handle)
1420            .await
1421            .expect("large waiter must not starve")
1422            .expect("task should not panic");
1423        assert_eq!(large_cap, 4096);
1424
1425        let small_cap = timeout(Duration::from_secs(2), small_handle)
1426            .await
1427            .expect("small waiter should complete")
1428            .expect("task should not panic");
1429        assert_eq!(small_cap, 512);
1430    }
1431
1432    /// Mixed small/large free patterns over many iterations should serve
1433    /// all waiters without deadlock.
1434    #[tokio::test]
1435    async fn buddy_fairness_mixed_sizes_no_deadlock() {
1436        let arena = Arc::new(
1437            BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1438                .build_async()
1439                .unwrap(),
1440        );
1441
1442        for round in 0..10 {
1443            let size = if round % 2 == 0 { 2048 } else { 512 };
1444            let buf = arena.allocate(nz(size)).unwrap();
1445
1446            let a = Arc::clone(&arena);
1447            let handle = tokio::spawn(async move { a.allocate_async(nz(size)).await.capacity() });
1448
1449            tokio::time::sleep(Duration::from_millis(10)).await;
1450            drop(buf);
1451
1452            let cap = timeout(Duration::from_secs(2), handle)
1453                .await
1454                .expect("waiter should not deadlock")
1455                .expect("task should not panic");
1456            assert_eq!(cap, size);
1457        }
1458    }
1459}