1use std::cell::UnsafeCell;
2use std::fmt;
3use std::future::Future;
4use std::ops::Deref;
5use std::pin::Pin;
6use std::sync::atomic::{AtomicU8, AtomicU64, Ordering as AtomicOrdering};
7use std::sync::{Arc, Mutex};
8use std::task::{Context, Poll};
9use std::time::Instant;
10
11use tokio::sync::Notify;
12use tokio::sync::futures::OwnedNotified;
13use tokio::sync::oneshot;
14
15use crate::buffer::Buffer;
16use crate::sync::atomic::{AtomicUsize, Ordering};
17use crate::{BuddyArena, FixedArena};
18
19pub trait Waiter: Send + Sync + 'static {
27 type Registration: WaitRegistration;
29
30 fn register(&self) -> Self::Registration;
32
33 fn wake(&self);
35}
36
37pub trait BuddyWaiter: Send + Sync + 'static {
43 type Registration: WaitRegistration;
45
46 fn register(&self, order: usize) -> Self::Registration;
48
49 fn wake(&self, freed_order: usize);
51}
52
53pub trait WaitRegistration: Future<Output = ()> {
55 fn prepare(self: Pin<&mut Self>);
57
58 fn revoke(self: Pin<&mut Self>);
60}
61
62pub(crate) trait WakeOne: Send + Sync {
67 fn wake(&self);
68}
69
70impl<W: Waiter> WakeOne for W {
71 fn wake(&self) {
72 Waiter::wake(self);
73 }
74}
75
76pub(crate) struct WakeHandle {
77 inner: Arc<dyn WakeOne>,
78}
79
80impl WakeHandle {
81 pub(crate) fn new<W: Waiter>(waiters: Arc<W>) -> Self {
82 let inner: Arc<dyn WakeOne> = waiters;
83 Self { inner }
84 }
85
86 pub(crate) fn wake(&self) {
87 self.inner.wake();
88 }
89}
90
91pub(crate) trait BuddyWakeOne: Send + Sync {
92 fn wake(&self, freed_order: usize);
93}
94
95impl<W: BuddyWaiter> BuddyWakeOne for W {
96 fn wake(&self, freed_order: usize) {
97 BuddyWaiter::wake(self, freed_order);
98 }
99}
100
101pub(crate) struct BuddyWakeHandle {
102 inner: Arc<dyn BuddyWakeOne>,
103}
104
105impl BuddyWakeHandle {
106 pub(crate) fn new<W: BuddyWaiter>(waiters: Arc<W>) -> Self {
107 let inner: Arc<dyn BuddyWakeOne> = waiters;
108 Self { inner }
109 }
110
111 pub(crate) fn wake(&self, freed_order: usize) {
112 self.inner.wake(freed_order);
113 }
114}
115
116const LIVE: u8 = 0;
121const WOKEN: u8 = 1;
122const REVOKED: u8 = 2;
123
124struct WaiterEntry {
125 state: AtomicU8,
126 tx: UnsafeCell<Option<oneshot::Sender<usize>>>,
129 timestamp: u64,
130 #[allow(dead_code)]
131 order: usize,
132}
133
134impl WaiterEntry {
135 fn new(tx: oneshot::Sender<usize>, timestamp: u64, order: usize) -> Self {
136 Self {
137 state: AtomicU8::new(LIVE),
138 tx: UnsafeCell::new(Some(tx)),
139 timestamp,
140 order,
141 }
142 }
143
144 unsafe fn take_tx(&self) -> Option<oneshot::Sender<usize>> {
151 unsafe { (*self.tx.get()).take() }
152 }
153}
154
155unsafe impl Send for WaiterEntry {}
158unsafe impl Sync for WaiterEntry {}
159
160const NO_WAITERS_TIMESTAMP: u64 = u64::MAX;
166
167const MAX_POPS_PER_WAKE: usize = 8;
169
170const MAX_CONSECUTIVE: u32 = 10;
172
173const ORDER_BONUS_NS: u64 = 50_000; const DEPTH_BONUS_NS: u64 = 5_000; struct BuddyOrderSlot {
180 queue: Mutex<std::collections::VecDeque<Arc<WaiterEntry>>>,
181 count: AtomicUsize,
182 head_timestamp: AtomicU64,
183}
184
185impl BuddyOrderSlot {
186 fn new() -> Self {
187 Self {
188 queue: Mutex::new(std::collections::VecDeque::new()),
189 count: AtomicUsize::new(0),
190 head_timestamp: AtomicU64::new(NO_WAITERS_TIMESTAMP),
191 }
192 }
193}
194
195struct FixedOrderSlot {
200 notify: Arc<Notify>,
201 count: AtomicUsize,
202}
203
204#[derive(Clone)]
210pub struct NotifyWaiters {
211 inner: Arc<NotifyWaitersInner>,
212}
213
214struct NotifyWaitersInner {
215 fixed_slot: FixedOrderSlot,
217 buddy_orders: Box<[BuddyOrderSlot]>,
219 streak_state: AtomicU64,
221 epoch: Instant,
223}
224
225impl NotifyWaiters {
226 pub fn new(num_orders: usize) -> Self {
231 assert!(num_orders > 0, "must have at least one order");
232 let buddy_orders: Vec<BuddyOrderSlot> =
233 (0..num_orders).map(|_| BuddyOrderSlot::new()).collect();
234 Self {
235 inner: Arc::new(NotifyWaitersInner {
236 fixed_slot: FixedOrderSlot {
237 notify: Arc::new(Notify::new()),
238 count: AtomicUsize::new(0),
239 },
240 buddy_orders: buddy_orders.into_boxed_slice(),
241 streak_state: AtomicU64::new(0),
242 epoch: Instant::now(),
243 }),
244 }
245 }
246
247 fn now_ns(&self) -> u64 {
248 self.inner.epoch.elapsed().as_nanos() as u64
249 }
250
251 fn score_orders(&self, freed_order: usize) -> Vec<usize> {
254 let max = freed_order.min(self.inner.buddy_orders.len() - 1);
255 let streak = self.inner.streak_state.load(AtomicOrdering::Relaxed);
256 let last_winner = (streak >> 32) as usize;
257 let streak_count = streak as u32;
258
259 let mut candidates: Vec<(usize, u64)> = Vec::new();
263
264 for order in 0..=max {
265 let slot = &self.inner.buddy_orders[order];
266 let count = slot.count.load(Ordering::Acquire);
267 if count == 0 {
268 continue;
269 }
270 let ts = slot.head_timestamp.load(AtomicOrdering::Acquire);
271 if ts == NO_WAITERS_TIMESTAMP {
272 continue;
273 }
274
275 let effective_age = ts
276 .saturating_sub((order as u64).saturating_mul(ORDER_BONUS_NS))
277 .saturating_sub((count as u64).saturating_mul(DEPTH_BONUS_NS));
278
279 candidates.push((order, effective_age));
280 }
281
282 if streak_count >= MAX_CONSECUTIVE
285 && candidates.len() > 1
286 && candidates.iter().any(|(o, _)| *o == last_winner)
287 {
288 candidates.retain(|(o, _)| *o != last_winner);
289 }
290
291 candidates.sort_by_key(|&(_, age)| age);
293 candidates.into_iter().map(|(order, _)| order).collect()
294 }
295
296 fn update_streak(&self, winner_order: usize) {
297 let current = self.inner.streak_state.load(AtomicOrdering::Relaxed);
298 let last_winner = (current >> 32) as usize;
299 let new = if winner_order == last_winner {
300 let streak = (current as u32).saturating_add(1);
301 ((winner_order as u64) << 32) | streak as u64
302 } else {
303 ((winner_order as u64) << 32) | 1u64
304 };
305 self.inner.streak_state.store(new, AtomicOrdering::Relaxed);
306 }
307
308 fn update_head_timestamp(
309 &self,
310 queue: &std::collections::VecDeque<Arc<WaiterEntry>>,
311 order: usize,
312 ) {
313 let slot = &self.inner.buddy_orders[order];
314 for entry in queue.iter() {
319 if entry.state.load(AtomicOrdering::Relaxed) == LIVE {
320 slot.head_timestamp
321 .store(entry.timestamp, AtomicOrdering::Release);
322 return;
323 }
324 }
325 slot.head_timestamp
326 .store(NO_WAITERS_TIMESTAMP, AtomicOrdering::Release);
327 }
328
329 fn buddy_wake(&self, freed_order: usize) {
330 let candidates = self.score_orders(freed_order);
331 let mut pops: usize = 0;
332
333 for order in candidates {
337 let mut queue = self.inner.buddy_orders[order].queue.lock().unwrap();
338 while let Some(entry) = queue.pop_front() {
339 pops += 1;
340 if pops > MAX_POPS_PER_WAKE {
341 return;
342 }
343
344 if entry.state.load(AtomicOrdering::Relaxed) != LIVE {
345 continue;
346 }
347
348 if entry
349 .state
350 .compare_exchange(LIVE, WOKEN, AtomicOrdering::AcqRel, AtomicOrdering::Relaxed)
351 .is_ok()
352 {
353 self.inner.buddy_orders[order]
354 .count
355 .fetch_sub(1, Ordering::Release);
356 self.update_head_timestamp(&queue, order);
357 drop(queue);
358
359 let tx = unsafe { entry.take_tx() };
361 if let Some(tx) = tx
362 && tx.send(freed_order).is_ok()
363 {
364 self.update_streak(order);
365 }
366 break;
368 }
369 }
370 }
371 }
372
373 fn buddy_register(&self, order: usize) -> BuddyRegistration {
374 let order = order.min(self.inner.buddy_orders.len() - 1);
375 let (tx, rx) = oneshot::channel();
376 let timestamp = self.now_ns();
377 let entry = Arc::new(WaiterEntry::new(tx, timestamp, order));
378
379 BuddyRegistration {
380 entry: Some(Arc::clone(&entry)),
381 rx: Some(rx),
382 waiters: Arc::clone(&self.inner),
383 order,
384 registered: false,
385 pending_entry: Some(entry),
386 }
387 }
388}
389
390impl Waiter for NotifyWaiters {
391 type Registration = NotifyRegistration;
392
393 fn register(&self) -> NotifyRegistration {
394 NotifyRegistration {
395 future: self.inner.fixed_slot.notify.clone().notified_owned(),
396 inner: Arc::clone(&self.inner),
397 registered: false,
398 woken: false,
399 }
400 }
401
402 fn wake(&self) {
403 if self.inner.fixed_slot.count.load(Ordering::Acquire) > 0 {
404 self.inner.fixed_slot.notify.notify_one();
405 }
406 }
407}
408
409impl BuddyWaiter for NotifyWaiters {
410 type Registration = BuddyRegistration;
411
412 fn register(&self, order: usize) -> BuddyRegistration {
413 self.buddy_register(order)
414 }
415
416 fn wake(&self, freed_order: usize) {
417 self.buddy_wake(freed_order);
418 }
419}
420
421pub struct NotifyRegistration {
427 future: OwnedNotified,
428 inner: Arc<NotifyWaitersInner>,
429 registered: bool,
430 woken: bool,
431}
432
433impl WaitRegistration for NotifyRegistration {
434 fn prepare(self: Pin<&mut Self>) {
435 let this = unsafe { self.get_unchecked_mut() };
436 let future = unsafe { Pin::new_unchecked(&mut this.future) };
437 let _ = future.enable();
438 if !this.registered {
439 this.inner.fixed_slot.count.fetch_add(1, Ordering::Release);
440 this.registered = true;
441 }
442 }
443
444 fn revoke(self: Pin<&mut Self>) {
445 let this = unsafe { self.get_unchecked_mut() };
446 if this.registered {
447 this.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
448 this.registered = false;
449 }
450 this.woken = false;
451 }
452}
453
454impl Future for NotifyRegistration {
455 type Output = ();
456
457 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
458 let this = unsafe { self.get_unchecked_mut() };
459 let poll = unsafe { Pin::new_unchecked(&mut this.future) }.poll(cx);
460 if poll.is_ready() {
461 if this.registered {
462 this.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
463 this.registered = false;
464 }
465 this.woken = true;
466 }
467 poll
468 }
469}
470
471impl Drop for NotifyRegistration {
472 fn drop(&mut self) {
473 if self.registered {
474 self.inner.fixed_slot.count.fetch_sub(1, Ordering::Release);
475 }
476 if self.woken && self.inner.fixed_slot.count.load(Ordering::Acquire) > 0 {
483 self.inner.fixed_slot.notify.notify_one();
484 }
485 }
486}
487
488pub struct BuddyRegistration {
494 entry: Option<Arc<WaiterEntry>>,
495 rx: Option<oneshot::Receiver<usize>>,
496 waiters: Arc<NotifyWaitersInner>,
497 order: usize,
498 registered: bool,
499 pending_entry: Option<Arc<WaiterEntry>>,
501}
502
503impl WaitRegistration for BuddyRegistration {
504 fn prepare(self: Pin<&mut Self>) {
505 let this = unsafe { self.get_unchecked_mut() };
506 if !this.registered
507 && let Some(entry) = this.pending_entry.take()
508 {
509 let slot = &this.waiters.buddy_orders[this.order];
510 let mut queue = slot.queue.lock().unwrap();
511 queue.push_back(entry);
512 let prev = slot.count.fetch_add(1, Ordering::Release);
513 if prev == 0
514 && let Some(e) = &this.entry
515 {
516 slot.head_timestamp
517 .store(e.timestamp, AtomicOrdering::Release);
518 }
519 this.registered = true;
520 }
521 }
522
523 fn revoke(self: Pin<&mut Self>) {
524 let this = unsafe { self.get_unchecked_mut() };
525 if this.registered {
526 if let Some(entry) = &this.entry
527 && entry
528 .state
529 .compare_exchange(
530 LIVE,
531 REVOKED,
532 AtomicOrdering::AcqRel,
533 AtomicOrdering::Relaxed,
534 )
535 .is_ok()
536 {
537 let _tx = unsafe { entry.take_tx() };
539 this.waiters.buddy_orders[this.order]
540 .count
541 .fetch_sub(1, Ordering::Release);
542 }
543 this.registered = false;
544 }
545 }
546}
547
548impl Future for BuddyRegistration {
549 type Output = ();
550
551 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
552 let this = unsafe { self.get_unchecked_mut() };
553 if let Some(rx) = &mut this.rx {
554 match Pin::new(rx).poll(cx) {
555 Poll::Ready(_) => {
556 this.rx = None;
557 this.registered = false;
558 Poll::Ready(())
559 }
560 Poll::Pending => Poll::Pending,
561 }
562 } else {
563 Poll::Ready(())
564 }
565 }
566}
567
568impl Drop for BuddyRegistration {
569 fn drop(&mut self) {
570 if self.registered
571 && let Some(entry) = &self.entry
572 && entry
573 .state
574 .compare_exchange(
575 LIVE,
576 REVOKED,
577 AtomicOrdering::AcqRel,
578 AtomicOrdering::Relaxed,
579 )
580 .is_ok()
581 {
582 let _tx = unsafe { entry.take_tx() };
584 self.waiters.buddy_orders[self.order]
585 .count
586 .fetch_sub(1, Ordering::Release);
587 }
588 }
589}
590
591async fn allocate_with_waiter<W, T, F>(waiters: &W, mut try_allocate: F) -> T
596where
597 W: Waiter,
598 F: FnMut() -> Option<T>,
599{
600 loop {
601 let registration = waiters.register();
602 tokio::pin!(registration);
603 registration.as_mut().prepare();
604 if let Some(value) = try_allocate() {
605 registration.as_mut().revoke();
606 return value;
607 }
608 registration.await;
609 }
610}
611
612async fn allocate_with_buddy_waiter<W, T, F>(waiters: &W, order: usize, mut try_allocate: F) -> T
613where
614 W: BuddyWaiter,
615 F: FnMut() -> Option<T>,
616{
617 loop {
618 let registration = waiters.register(order);
619 tokio::pin!(registration);
620 registration.as_mut().prepare();
621 if let Some(value) = try_allocate() {
622 registration.as_mut().revoke();
623 return value;
624 }
625 registration.await;
626 }
627}
628
629#[derive(Clone)]
641pub struct AsyncFixedArena<W = NotifyWaiters> {
642 inner: FixedArena,
643 waiters: Arc<W>,
644}
645
646impl<W> AsyncFixedArena<W> {
647 pub(crate) fn new(inner: FixedArena, waiters: Arc<W>) -> Self {
648 Self { inner, waiters }
649 }
650}
651
652impl<W: Waiter> AsyncFixedArena<W> {
653 pub async fn allocate_async(&self) -> Buffer {
658 allocate_with_waiter(self.waiters.as_ref(), || self.inner.allocate().ok()).await
659 }
660}
661
662impl<W> Deref for AsyncFixedArena<W> {
663 type Target = FixedArena;
664
665 fn deref(&self) -> &Self::Target {
666 &self.inner
667 }
668}
669
670impl<W> fmt::Debug for AsyncFixedArena<W> {
671 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
672 f.debug_struct("AsyncFixedArena")
673 .field("inner", &self.inner)
674 .finish()
675 }
676}
677
678#[derive(Clone)]
690pub struct AsyncBuddyArena<W = NotifyWaiters> {
691 inner: BuddyArena,
692 waiters: Arc<W>,
693}
694
695impl<W> AsyncBuddyArena<W> {
696 pub(crate) fn new(inner: BuddyArena, waiters: Arc<W>) -> Self {
697 Self { inner, waiters }
698 }
699}
700
701impl<W: BuddyWaiter> AsyncBuddyArena<W> {
702 pub async fn allocate_async(&self, len: std::num::NonZeroUsize) -> Buffer {
707 let order = self
708 .order_for_request(len.get())
709 .unwrap_or(self.max_order());
710 allocate_with_buddy_waiter(self.waiters.as_ref(), order, || {
711 self.inner.allocate(len).ok()
712 })
713 .await
714 }
715}
716
717impl<W> Deref for AsyncBuddyArena<W> {
718 type Target = BuddyArena;
719
720 fn deref(&self) -> &Self::Target {
721 &self.inner
722 }
723}
724
725impl<W> fmt::Debug for AsyncBuddyArena<W> {
726 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
727 f.debug_struct("AsyncBuddyArena")
728 .field("inner", &self.inner)
729 .finish()
730 }
731}
732
733#[cfg(test)]
738mod tests {
739 use std::num::NonZeroUsize;
740 use std::sync::Arc;
741 use std::sync::atomic::{AtomicUsize, Ordering as AtomicOrdering};
742
743 use bytes::BufMut;
744 use tokio::time::{Duration, timeout};
745
746 use crate::BuddyArena;
747 use crate::BuddyGeometry;
748 use crate::FixedArena;
749
750 use super::*;
751
752 fn nz(n: usize) -> NonZeroUsize {
753 NonZeroUsize::new(n).unwrap()
754 }
755
756 #[derive(Clone)]
759 struct CountingWaiters {
760 inner: NotifyWaiters,
761 registrations: Arc<AtomicUsize>,
762 wakes: Arc<AtomicUsize>,
763 }
764
765 impl CountingWaiters {
766 fn new(num_orders: usize) -> Self {
767 Self {
768 inner: NotifyWaiters::new(num_orders),
769 registrations: Arc::new(AtomicUsize::new(0)),
770 wakes: Arc::new(AtomicUsize::new(0)),
771 }
772 }
773
774 fn registrations(&self) -> usize {
775 self.registrations.load(AtomicOrdering::Relaxed)
776 }
777
778 fn wakes(&self) -> usize {
779 self.wakes.load(AtomicOrdering::Relaxed)
780 }
781 }
782
783 struct CountingFixedRegistration {
785 inner: NotifyRegistration,
786 }
787
788 struct CountingBuddyRegistration {
789 inner: BuddyRegistration,
790 }
791
792 impl Waiter for CountingWaiters {
793 type Registration = CountingFixedRegistration;
794
795 fn register(&self) -> Self::Registration {
796 self.registrations.fetch_add(1, AtomicOrdering::Relaxed);
797 CountingFixedRegistration {
798 inner: Waiter::register(&self.inner),
799 }
800 }
801
802 fn wake(&self) {
803 self.wakes.fetch_add(1, AtomicOrdering::Relaxed);
804 Waiter::wake(&self.inner);
805 }
806 }
807
808 impl BuddyWaiter for CountingWaiters {
809 type Registration = CountingBuddyRegistration;
810
811 fn register(&self, order: usize) -> Self::Registration {
812 self.registrations.fetch_add(1, AtomicOrdering::Relaxed);
813 CountingBuddyRegistration {
814 inner: BuddyWaiter::register(&self.inner, order),
815 }
816 }
817
818 fn wake(&self, freed_order: usize) {
819 self.wakes.fetch_add(1, AtomicOrdering::Relaxed);
820 BuddyWaiter::wake(&self.inner, freed_order);
821 }
822 }
823
824 impl WaitRegistration for CountingFixedRegistration {
825 fn prepare(self: Pin<&mut Self>) {
826 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.prepare();
827 }
828
829 fn revoke(self: Pin<&mut Self>) {
830 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.revoke();
831 }
832 }
833
834 impl Future for CountingFixedRegistration {
835 type Output = ();
836
837 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
838 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
839 }
840 }
841
842 impl WaitRegistration for CountingBuddyRegistration {
843 fn prepare(self: Pin<&mut Self>) {
844 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.prepare();
845 }
846
847 fn revoke(self: Pin<&mut Self>) {
848 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.revoke();
849 }
850 }
851
852 impl Future for CountingBuddyRegistration {
853 type Output = ();
854
855 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
856 unsafe { self.map_unchecked_mut(|this| &mut this.inner) }.poll(cx)
857 }
858 }
859
860 #[tokio::test]
863 async fn allocate_async_basic() {
864 let arena = FixedArena::with_slot_capacity(nz(1), nz(32))
865 .build_async()
866 .unwrap();
867 let mut buf = arena.allocate_async().await;
868 buf.put_slice(b"data");
869 let bytes = buf.freeze();
870 drop(bytes);
871 let _buf2 = arena.allocate_async().await;
872 }
873
874 #[tokio::test]
875 async fn allocate_async_waits_then_succeeds() {
876 let arena = Arc::new(
877 FixedArena::with_slot_capacity(nz(1), nz(32))
878 .build_async()
879 .unwrap(),
880 );
881 let mut buf = arena.allocate_async().await;
882 buf.put_slice(b"blocking");
883 let bytes = buf.freeze();
884 let arena2 = Arc::clone(&arena);
885 let handle = tokio::spawn(async move {
886 let buf = arena2.allocate_async().await;
887 buf.capacity()
888 });
889 tokio::time::sleep(Duration::from_millis(50)).await;
890 drop(bytes);
891 let cap = timeout(Duration::from_secs(2), handle)
892 .await
893 .expect("should not timeout")
894 .expect("task should not panic");
895 assert_eq!(cap, 32);
896 }
897
898 #[tokio::test]
899 async fn sync_allocate_still_fast_fails() {
900 let arena = FixedArena::with_slot_capacity(nz(1), nz(32))
901 .build_async()
902 .unwrap();
903 let _buf = arena.allocate().unwrap();
904 let err = arena.allocate().unwrap_err();
905 assert_eq!(err, crate::AllocError::ArenaFull);
906 }
907
908 #[tokio::test]
909 async fn multiple_waiters_all_served() {
910 let arena = Arc::new(
911 FixedArena::with_slot_capacity(nz(2), nz(32))
912 .build_async()
913 .unwrap(),
914 );
915 let buf1 = arena.allocate().unwrap();
916 let buf2 = arena.allocate().unwrap();
917 let a1 = Arc::clone(&arena);
918 let h1 = tokio::spawn(async move { a1.allocate_async().await.capacity() });
919 let a2 = Arc::clone(&arena);
920 let h2 = tokio::spawn(async move { a2.allocate_async().await.capacity() });
921 tokio::time::sleep(Duration::from_millis(50)).await;
922 drop(buf1);
923 drop(buf2);
924 let (r1, r2) = tokio::join!(
925 timeout(Duration::from_secs(2), h1),
926 timeout(Duration::from_secs(2), h2),
927 );
928 assert_eq!(r1.unwrap().unwrap(), 32);
929 assert_eq!(r2.unwrap().unwrap(), 32);
930 }
931
932 #[tokio::test]
933 async fn deref_exposes_sync_methods() {
934 let arena = FixedArena::with_slot_capacity(nz(4), nz(64))
935 .build_async()
936 .unwrap();
937 assert_eq!(arena.slot_count(), 4);
938 assert_eq!(arena.slot_capacity(), 64);
939 }
940
941 #[tokio::test]
942 async fn fixed_cancellation_no_leak() {
943 let arena = Arc::new(
944 FixedArena::with_slot_capacity(nz(1), nz(32))
945 .build_async()
946 .unwrap(),
947 );
948 let buf = arena.allocate().unwrap();
949
950 let arena2 = Arc::clone(&arena);
951 let handle = tokio::spawn(async move { arena2.allocate_async().await });
952 tokio::time::sleep(Duration::from_millis(50)).await;
953 handle.abort();
954 let _ = handle.await;
955
956 drop(buf);
957 let _buf2 = arena.allocate().unwrap();
958 }
959
960 #[tokio::test(flavor = "current_thread")]
964 async fn fixed_woken_drop_propagates_to_next_waiter() {
965 let waiters = Arc::new(NotifyWaiters::new(1));
966
967 let (ready_tx, ready_rx) = tokio::sync::oneshot::channel::<()>();
968
969 let w = Arc::clone(&waiters);
972 let h_a = tokio::spawn(async move {
973 let reg = Waiter::register(&*w);
974 tokio::pin!(reg);
975 reg.as_mut().prepare();
976 ready_tx.send(()).ok();
977 reg.await;
978 });
980
981 ready_rx.await.ok();
983
984 let w2 = Arc::clone(&waiters);
986 let h_b = tokio::spawn(async move {
987 let reg = Waiter::register(&*w2);
988 tokio::pin!(reg);
989 reg.as_mut().prepare();
990 reg.await;
991 });
992
993 tokio::task::yield_now().await;
995
996 Waiter::wake(&*waiters);
998
999 let _ = h_a.await;
1001
1002 timeout(Duration::from_secs(2), h_b)
1004 .await
1005 .expect("task B must not stall when A drops after wake")
1006 .expect("task B should not panic");
1007 }
1008
1009 #[tokio::test(flavor = "current_thread")]
1014 async fn fixed_last_waiter_woken_drop_no_stale_permit() {
1015 let waiters = Arc::new(NotifyWaiters::new(1));
1016
1017 let w = Arc::clone(&waiters);
1019 let h = tokio::spawn(async move {
1020 let reg = Waiter::register(&*w);
1021 tokio::pin!(reg);
1022 reg.as_mut().prepare();
1023 reg.await;
1024 });
1025
1026 tokio::task::yield_now().await;
1027
1028 Waiter::wake(&*waiters);
1030 let _ = h.await;
1031
1032 let w2 = Arc::clone(&waiters);
1035 let h2 = tokio::spawn(async move {
1036 let reg = Waiter::register(&*w2);
1037 tokio::pin!(reg);
1038 reg.as_mut().prepare();
1039 reg.await;
1041 });
1042
1043 tokio::task::yield_now().await;
1044
1045 assert!(!h2.is_finished(), "stale permit caused spurious wake");
1047
1048 Waiter::wake(&*waiters);
1050 timeout(Duration::from_secs(1), h2)
1051 .await
1052 .expect("cleanup wake should work")
1053 .expect("no panic");
1054 }
1055
1056 #[tokio::test]
1057 async fn fixed_custom_waiter_supported() {
1058 let waiters = CountingWaiters::new(1);
1059 let arena = Arc::new(
1060 FixedArena::with_slot_capacity(nz(1), nz(32))
1061 .build_async_with(waiters.clone())
1062 .unwrap(),
1063 );
1064 let buf = arena.allocate().unwrap();
1065
1066 let arena2 = Arc::clone(&arena);
1067 let handle = tokio::spawn(async move { arena2.allocate_async().await.capacity() });
1068 tokio::time::sleep(Duration::from_millis(50)).await;
1069 drop(buf);
1070
1071 let cap = timeout(Duration::from_secs(2), handle)
1072 .await
1073 .expect("should not timeout")
1074 .expect("task should not panic");
1075 assert_eq!(cap, 32);
1076 assert!(waiters.registrations() >= 1);
1077 assert!(waiters.wakes() >= 1);
1078 }
1079
1080 #[tokio::test]
1083 async fn buddy_allocate_async_waits_then_succeeds() {
1084 let arena = Arc::new(
1085 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1086 .build_async()
1087 .unwrap(),
1088 );
1089 let buf = arena.allocate(nz(2048)).unwrap();
1090
1091 let arena2 = Arc::clone(&arena);
1092 let handle = tokio::spawn(async move {
1093 let buf = arena2.allocate_async(nz(2048)).await;
1094 buf.capacity()
1095 });
1096
1097 tokio::time::sleep(Duration::from_millis(50)).await;
1098 drop(buf);
1099
1100 let cap = timeout(Duration::from_secs(2), handle)
1101 .await
1102 .expect("should not timeout")
1103 .expect("task should not panic");
1104 assert_eq!(cap, 2048);
1105 }
1106
1107 #[tokio::test]
1108 async fn buddy_multiple_waiters_all_served() {
1109 let arena = Arc::new(
1110 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1111 .build_async()
1112 .unwrap(),
1113 );
1114 let buf1 = arena.allocate(nz(2048)).unwrap();
1115 let buf2 = arena.allocate(nz(2048)).unwrap();
1116
1117 let a1 = Arc::clone(&arena);
1118 let h1 = tokio::spawn(async move { a1.allocate_async(nz(2048)).await.capacity() });
1119 let a2 = Arc::clone(&arena);
1120 let h2 = tokio::spawn(async move { a2.allocate_async(nz(2048)).await.capacity() });
1121
1122 tokio::time::sleep(Duration::from_millis(50)).await;
1123 drop(buf1);
1124 drop(buf2);
1125
1126 let (r1, r2) = tokio::join!(
1127 timeout(Duration::from_secs(2), h1),
1128 timeout(Duration::from_secs(2), h2),
1129 );
1130 assert_eq!(r1.unwrap().unwrap(), 2048);
1131 assert_eq!(r2.unwrap().unwrap(), 2048);
1132 }
1133
1134 #[tokio::test]
1137 async fn buddy_large_waiter_not_starved_by_small() {
1138 let arena = Arc::new(
1139 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1140 .build_async()
1141 .unwrap(),
1142 );
1143 let buf1 = arena.allocate(nz(2048)).unwrap();
1144 let buf2 = arena.allocate(nz(2048)).unwrap();
1145
1146 let (small_tx, small_rx) = tokio::sync::oneshot::channel::<()>();
1147
1148 let arena_large = Arc::clone(&arena);
1149 let large =
1150 tokio::spawn(async move { arena_large.allocate_async(nz(4096)).await.capacity() });
1151 tokio::task::yield_now().await;
1152
1153 let arena_small = Arc::clone(&arena);
1154 let small = tokio::spawn(async move {
1155 let buf = arena_small.allocate_async(nz(512)).await;
1156 let cap = buf.capacity();
1157 small_rx.await.ok();
1158 drop(buf);
1159 cap
1160 });
1161 tokio::task::yield_now().await;
1162
1163 drop(buf1);
1164 tokio::task::yield_now().await;
1165
1166 drop(buf2);
1167 tokio::task::yield_now().await;
1168
1169 small_tx.send(()).ok();
1170
1171 let large_cap = timeout(Duration::from_secs(2), large)
1172 .await
1173 .expect("large waiter should not starve")
1174 .expect("task should not panic");
1175 assert_eq!(large_cap, 4096);
1176
1177 let small_cap = timeout(Duration::from_secs(2), small)
1178 .await
1179 .expect("small waiter should complete")
1180 .expect("task should not panic");
1181 assert_eq!(small_cap, 512);
1182 }
1183
1184 #[tokio::test]
1185 async fn buddy_large_request_unblocks_after_coalesce() {
1186 let arena = Arc::new(
1187 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1188 .build_async()
1189 .unwrap(),
1190 );
1191 let buf1 = arena.allocate(nz(2048)).unwrap();
1192 let buf2 = arena.allocate(nz(2048)).unwrap();
1193
1194 let arena2 = Arc::clone(&arena);
1195 let handle = tokio::spawn(async move {
1196 let buf = arena2.allocate_async(nz(4096)).await;
1197 buf.capacity()
1198 });
1199
1200 tokio::time::sleep(Duration::from_millis(50)).await;
1201 drop(buf1);
1202 tokio::time::sleep(Duration::from_millis(25)).await;
1203 assert!(!handle.is_finished());
1204 drop(buf2);
1205
1206 let cap = timeout(Duration::from_secs(2), handle)
1207 .await
1208 .expect("should not timeout")
1209 .expect("task should not panic");
1210 assert_eq!(cap, 4096);
1211 }
1212
1213 #[tokio::test]
1214 async fn buddy_cancellation_does_not_leak() {
1215 let arena = Arc::new(
1216 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1217 .build_async()
1218 .unwrap(),
1219 );
1220 let buf = arena.allocate(nz(4096)).unwrap();
1221
1222 let arena2 = Arc::clone(&arena);
1223 let handle = tokio::spawn(async move { arena2.allocate_async(nz(512)).await });
1224 tokio::time::sleep(Duration::from_millis(50)).await;
1225 handle.abort();
1226 let _ = handle.await;
1227
1228 drop(buf);
1229 let _buf2 = arena.allocate(nz(4096)).unwrap();
1230 }
1231
1232 #[tokio::test]
1233 async fn buddy_custom_waiter_supported() {
1234 let waiters = CountingWaiters::new(4);
1235 let arena = Arc::new(
1236 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1237 .build_async_with(waiters.clone())
1238 .unwrap(),
1239 );
1240 let buf = arena.allocate(nz(2048)).unwrap();
1241
1242 let arena2 = Arc::clone(&arena);
1243 let handle = tokio::spawn(async move { arena2.allocate_async(nz(2048)).await.capacity() });
1244 tokio::time::sleep(Duration::from_millis(50)).await;
1245 drop(buf);
1246
1247 let cap = timeout(Duration::from_secs(2), handle)
1248 .await
1249 .expect("should not timeout")
1250 .expect("task should not panic");
1251 assert_eq!(cap, 2048);
1252 assert!(waiters.registrations() >= 1);
1253 assert!(waiters.wakes() >= 1);
1254 }
1255
1256 #[tokio::test]
1259 async fn buddy_multi_order_waiters_served_via_split() {
1260 let arena = Arc::new(
1261 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1262 .build_async()
1263 .unwrap(),
1264 );
1265 let buf = arena.allocate(nz(4096)).unwrap();
1266
1267 let a1 = Arc::clone(&arena);
1268 let h1 = tokio::spawn(async move { a1.allocate_async(nz(2048)).await.capacity() });
1269
1270 let a2 = Arc::clone(&arena);
1271 let h2 = tokio::spawn(async move { a2.allocate_async(nz(512)).await.capacity() });
1272
1273 tokio::task::yield_now().await;
1274
1275 drop(buf);
1276
1277 let (r1, r2) = tokio::join!(
1278 timeout(Duration::from_secs(2), h1),
1279 timeout(Duration::from_secs(2), h2),
1280 );
1281 assert_eq!(r1.unwrap().unwrap(), 2048);
1282 assert_eq!(r2.unwrap().unwrap(), 512);
1283 }
1284
1285 #[tokio::test]
1290 async fn buddy_cancel_wake_interleaving_count_invariant() {
1291 let arena = Arc::new(
1292 BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1293 .build_async()
1294 .unwrap(),
1295 );
1296
1297 for _ in 0..20 {
1298 let mut bufs = Vec::new();
1300 while let Ok(buf) = arena.allocate(nz(512)) {
1301 bufs.push(buf);
1302 }
1303
1304 let waiter_count = 4;
1305 let cancel_count = 2;
1306 let mut handles = Vec::new();
1307 for _ in 0..waiter_count {
1308 let a = Arc::clone(&arena);
1309 handles.push(tokio::spawn(async move { a.allocate_async(nz(512)).await }));
1310 }
1311 tokio::task::yield_now().await;
1312
1313 for h in handles.drain(..cancel_count) {
1315 h.abort();
1316 let _ = h.await;
1317 }
1318 tokio::task::yield_now().await;
1319
1320 let remaining = waiter_count - cancel_count;
1322 for buf in bufs.drain(..remaining) {
1323 drop(buf);
1324 tokio::task::yield_now().await;
1325 }
1326
1327 for h in handles {
1328 let buf = timeout(Duration::from_secs(2), h)
1329 .await
1330 .expect("waiter should complete")
1331 .expect("task should not panic");
1332 drop(buf);
1333 }
1334
1335 drop(bufs);
1337 }
1338 }
1339
1340 #[tokio::test]
1342 async fn buddy_teardown_with_live_waiters() {
1343 for _ in 0..20 {
1344 let arena = Arc::new(
1345 BuddyArena::builder(BuddyGeometry::exact(nz(4096), nz(512)).unwrap())
1346 .build_async()
1347 .unwrap(),
1348 );
1349 let _buf = arena.allocate(nz(4096)).unwrap();
1350
1351 let mut handles = Vec::new();
1352 for _ in 0..4 {
1353 let a = Arc::clone(&arena);
1354 handles.push(tokio::spawn(async move { a.allocate_async(nz(512)).await }));
1355 }
1356 tokio::task::yield_now().await;
1357
1358 drop(arena);
1361 drop(_buf);
1362
1363 for h in handles {
1365 h.abort();
1366 let _ = h.await;
1367 }
1368 }
1369 }
1370
1371 #[tokio::test]
1377 async fn buddy_fairness_large_not_starved_by_repeated_small() {
1378 let arena = Arc::new(
1379 BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1380 .build_async()
1381 .unwrap(),
1382 );
1383
1384 let mut bufs = Vec::new();
1386 while let Ok(buf) = arena.allocate(nz(512)) {
1387 bufs.push(buf);
1388 }
1389
1390 let arena_large = Arc::clone(&arena);
1392 let large_handle =
1393 tokio::spawn(async move { arena_large.allocate_async(nz(4096)).await.capacity() });
1394 tokio::task::yield_now().await;
1395
1396 let arena_small = Arc::clone(&arena);
1398 let (small_done_tx, small_done_rx) = tokio::sync::oneshot::channel::<()>();
1399 let small_handle = tokio::spawn(async move {
1400 let buf = arena_small.allocate_async(nz(512)).await;
1401 let cap = buf.capacity();
1402 small_done_rx.await.ok();
1404 drop(buf);
1405 cap
1406 });
1407 tokio::task::yield_now().await;
1408
1409 for buf in bufs.drain(..) {
1412 drop(buf);
1413 tokio::task::yield_now().await;
1414 }
1415
1416 small_done_tx.send(()).ok();
1418
1419 let large_cap = timeout(Duration::from_secs(5), large_handle)
1420 .await
1421 .expect("large waiter must not starve")
1422 .expect("task should not panic");
1423 assert_eq!(large_cap, 4096);
1424
1425 let small_cap = timeout(Duration::from_secs(2), small_handle)
1426 .await
1427 .expect("small waiter should complete")
1428 .expect("task should not panic");
1429 assert_eq!(small_cap, 512);
1430 }
1431
1432 #[tokio::test]
1435 async fn buddy_fairness_mixed_sizes_no_deadlock() {
1436 let arena = Arc::new(
1437 BuddyArena::builder(BuddyGeometry::exact(nz(8192), nz(512)).unwrap())
1438 .build_async()
1439 .unwrap(),
1440 );
1441
1442 for round in 0..10 {
1443 let size = if round % 2 == 0 { 2048 } else { 512 };
1444 let buf = arena.allocate(nz(size)).unwrap();
1445
1446 let a = Arc::clone(&arena);
1447 let handle = tokio::spawn(async move { a.allocate_async(nz(size)).await.capacity() });
1448
1449 tokio::time::sleep(Duration::from_millis(10)).await;
1450 drop(buf);
1451
1452 let cap = timeout(Duration::from_secs(2), handle)
1453 .await
1454 .expect("waiter should not deadlock")
1455 .expect("task should not panic");
1456 assert_eq!(cap, size);
1457 }
1458 }
1459}