1use std::{
2 cell::UnsafeCell,
3 marker::PhantomData,
4 sync::{
5 Arc,
6 atomic::{AtomicU32, Ordering},
7 },
8 vec::Vec,
9};
10
11#[repr(C, align(64))]
16pub struct Bytes<const MAX_ITEM_SIZE: usize> {
17 bytes: [u8; MAX_ITEM_SIZE],
18}
19
20pub struct Arena<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> {
51 buffer: Vec<Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>>,
55 ref_counts: Vec<Arc<AtomicU32>>,
60 cursor: usize,
63}
64
65pub struct ReservedMemory<const MAX_ITEM_SIZE: usize> {
72 data: Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>,
73 ref_count: Arc<AtomicU32>,
74 drop_fn: fn(&mut Bytes<MAX_ITEM_SIZE>),
75}
76
77pub struct UninitReservedMemory<const MAX_ITEM_SIZE: usize> {
85 data: Arc<UnsafeCell<Bytes<MAX_ITEM_SIZE>>>,
86 ref_count: Arc<AtomicU32>,
87 #[cfg(test)]
89 index: usize,
90 not_sync: PhantomData<*const ()>,
92}
93
94impl<const MAX_ITEM_SIZE: usize> UninitReservedMemory<MAX_ITEM_SIZE> {
95 pub fn init<O>(self, obj: O) -> ReservedMemory<MAX_ITEM_SIZE> {
101 assert!(
102 accept_obj::<O, MAX_ITEM_SIZE>(),
103 "Object isn't safe to store in this arena"
104 );
105
106 self.init_with_func(
107 |bytes| {
108 let ptr = core::ptr::from_mut(bytes);
109 unsafe {
110 core::ptr::write(ptr as *mut O, obj);
111 };
112 },
113 |bytes| {
114 let ptr = core::ptr::from_mut(bytes);
115 unsafe {
116 core::ptr::drop_in_place(ptr as *mut O);
117 }
118 },
119 )
120 }
121
122 fn init_with_func<F>(
125 self,
126 init_data: F,
127 drop_fn: fn(&mut Bytes<MAX_ITEM_SIZE>),
128 ) -> ReservedMemory<MAX_ITEM_SIZE>
129 where
130 F: FnOnce(&mut Bytes<MAX_ITEM_SIZE>),
131 {
132 assert_eq!(
138 Arc::strong_count(&self.data),
139 2,
140 "Slot must be held by exactly two owners (the arena and this \
141 UninitReservedMemory) to guarantee exclusive write access."
142 );
143
144 let bytes_mut = unsafe { self.data.as_ref().get().as_mut().unwrap() };
145 init_data(bytes_mut);
146
147 ReservedMemory {
148 data: self.data,
149 ref_count: self.ref_count,
150 drop_fn,
151 }
152 }
153}
154
155impl<const MAX_ITEM_SIZE: usize> core::fmt::Debug for ReservedMemory<MAX_ITEM_SIZE> {
156 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
157 f.debug_struct("ReservedMemory")
158 .field("data", &self.data)
159 .field("drop_fn", &self.drop_fn)
160 .finish()
161 }
162}
163
164impl<const MAX_ITEM_SIZE: usize> Clone for ReservedMemory<MAX_ITEM_SIZE> {
165 fn clone(&self) -> Self {
166 self.ref_count.fetch_add(1, Ordering::Release);
167
168 Self {
169 data: self.data.clone(),
170 ref_count: self.ref_count.clone(),
171 drop_fn: self.drop_fn,
172 }
173 }
174}
175
176impl<const MAX_ITEM_SIZE: usize> Drop for ReservedMemory<MAX_ITEM_SIZE> {
177 fn drop(&mut self) {
178 let previous = self.ref_count.fetch_sub(1, Ordering::Release);
189
190 if previous == 1 {
191 let bytes_mut = unsafe { self.data.get().as_mut().unwrap() };
194 (self.drop_fn)(bytes_mut);
195 }
196 }
197}
198
199unsafe impl<const MAX_ITEM_SIZE: usize> Send for ReservedMemory<MAX_ITEM_SIZE> {}
205unsafe impl<const MAX_ITEM_SIZE: usize> Sync for ReservedMemory<MAX_ITEM_SIZE> {}
206
207impl<const MAX_ITEM_SIZE: usize> std::convert::AsRef<Bytes<MAX_ITEM_SIZE>>
208 for ReservedMemory<MAX_ITEM_SIZE>
209{
210 fn as_ref(&self) -> &Bytes<MAX_ITEM_SIZE> {
212 unsafe { self.data.as_ref().get().as_ref().unwrap() }
214 }
215}
216
217impl<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> Default
218 for Arena<MAX_ITEM_COUNT, MAX_ITEM_SIZE>
219{
220 fn default() -> Self {
221 Self::new()
222 }
223}
224
225impl<const MAX_ITEM_COUNT: usize, const MAX_ITEM_SIZE: usize> Arena<MAX_ITEM_COUNT, MAX_ITEM_SIZE> {
226 pub const fn new() -> Self {
230 Self {
231 buffer: Vec::new(),
232 ref_counts: Vec::new(),
233 cursor: 0,
234 }
235 }
236
237 pub const fn accept<O>() -> bool {
242 accept_obj::<O, MAX_ITEM_SIZE>()
243 }
244
245 pub fn reserve(&mut self) -> Option<UninitReservedMemory<MAX_ITEM_SIZE>> {
259 if self.buffer.is_empty() {
260 for _ in 0..MAX_ITEM_COUNT {
261 self.ref_counts.push(Arc::new(AtomicU32::new(0)));
262
263 #[allow(clippy::arc_with_non_send_sync)]
267 self.buffer.push(Arc::new(UnsafeCell::new(Bytes {
268 bytes: [0; MAX_ITEM_SIZE],
269 })));
270 }
271 }
272
273 for i in 0..MAX_ITEM_COUNT {
274 let i = (i + self.cursor) % MAX_ITEM_COUNT;
275 let item = &self.buffer[i];
276
277 if Arc::strong_count(item) == 1 {
285 self.cursor = (i + 1) % MAX_ITEM_COUNT;
286 let data = item.clone();
287 let ref_count = self.ref_counts[i].clone();
288 ref_count.store(1, Ordering::Release);
289
290 return Some(UninitReservedMemory {
291 data,
292 ref_count,
293 #[cfg(test)]
294 index: i,
295 not_sync: PhantomData,
296 });
297 }
298 }
299
300 None
301 }
302}
303
304const fn accept_obj<O, const MAX_ITEM_SIZE: usize>() -> bool {
305 size_of::<O>() <= size_of::<Bytes<MAX_ITEM_SIZE>>()
306 && align_of::<O>() <= align_of::<Bytes<MAX_ITEM_SIZE>>()
307}
308
309#[cfg(test)]
310mod tests {
311 use super::*;
312
313 const MAX_ITEM_SIZE: usize = 2048;
314
315 #[test]
316 fn test_lazy_initialization() {
317 let mut arena = Arena::<10, MAX_ITEM_SIZE>::new();
318 assert_eq!(
319 arena.buffer.len(),
320 0,
321 "Buffer should be empty before first reservation"
322 );
323
324 arena.reserve();
325
326 assert_eq!(
327 arena.buffer.len(),
328 10,
329 "Buffer should be initialized to size"
330 );
331 }
332
333 #[test]
334 fn test_sequential_allocation_moves_cursor() {
335 let mut arena = Arena::<3, MAX_ITEM_SIZE>::new();
336
337 let _ = arena.reserve().expect("Should allocate");
339 assert_eq!(arena.cursor, 1);
340
341 let _ = arena.reserve().expect("Should allocate");
343 assert_eq!(arena.cursor, 2);
344 }
345
346 #[test]
347 fn test_reuse_of_freed_data() {
348 let mut arena = Arena::<2, MAX_ITEM_SIZE>::new();
349
350 let data0 = arena.reserve().unwrap();
352 let _data1 = arena.reserve().unwrap();
353
354 assert!(arena.reserve().is_none(), "Should be full");
356
357 let data0_index = data0.index;
359 core::mem::drop(data0);
360
361 let data2 = arena.reserve().expect("Should reuse index 0");
363 assert_eq!(data0_index, data2.index);
364 }
365
366 #[test]
367 fn test_circular_cursor_search() {
368 let mut arena = Arena::<3, MAX_ITEM_SIZE>::new();
369
370 let _d0 = arena.reserve().unwrap();
372 let d1 = arena.reserve().unwrap();
373 let _d2 = arena.reserve().unwrap();
374
375 core::mem::drop(d1);
377
378 let _ = arena.reserve().expect("Should find the hole at index 1");
381 assert_eq!(arena.cursor, 2);
382 }
383
384 #[test]
385 fn test_full_arena_returns_none() {
386 let mut arena = Arena::<5, MAX_ITEM_SIZE>::new();
387
388 let mut reserved = Vec::new();
389
390 for _ in 0..5 {
391 let item = arena.reserve();
392 assert!(item.is_some());
393 reserved.push(item);
394 }
395
396 assert!(arena.reserve().is_none());
398 }
399}
400
401#[cfg(test)]
402mod drop_lifecycle_tests {
403 use super::*;
412 use alloc::boxed::Box;
413 use alloc::vec::Vec;
414 use std::sync::Arc;
415
416 struct Payload {
417 _anchor: Arc<()>,
418 }
419
420 #[test]
421 fn last_clone_runs_destructor_with_one_clone() {
422 let anchor = Arc::new(());
423 let mut arena = Arena::<4, 256>::new();
424 let reserved = arena.reserve().unwrap().init(Payload {
425 _anchor: anchor.clone(),
426 });
427 assert_eq!(Arc::strong_count(&anchor), 2);
428 drop(reserved);
429 assert_eq!(
430 Arc::strong_count(&anchor),
431 1,
432 "single ReservedMemory must run drop_fn on drop"
433 );
434 }
435
436 #[test]
437 fn destructor_deferred_until_last_of_two_clones() {
438 let anchor = Arc::new(());
439 let mut arena = Arena::<4, 256>::new();
440 let a = arena.reserve().unwrap().init(Payload {
441 _anchor: anchor.clone(),
442 });
443 let b = a.clone();
444
445 drop(a);
446 assert_eq!(
447 Arc::strong_count(&anchor),
448 2,
449 "destructor fired prematurely — `b` still owns the payload"
450 );
451 drop(b);
452 assert_eq!(Arc::strong_count(&anchor), 1);
453 }
454
455 #[test]
456 fn destructor_deferred_until_last_of_many_clones() {
457 let anchor = Arc::new(());
458 let mut arena = Arena::<4, 256>::new();
459 let first = arena.reserve().unwrap().init(Payload {
460 _anchor: anchor.clone(),
461 });
462
463 const N: usize = 16;
464 let clones: Vec<_> = (0..N).map(|_| first.clone()).collect();
465 drop(first);
466 assert_eq!(Arc::strong_count(&anchor), 2);
467
468 for (i, c) in clones.into_iter().enumerate() {
469 drop(c);
470 let expected = if i + 1 == N { 1 } else { 2 };
471 assert_eq!(
472 Arc::strong_count(&anchor),
473 expected,
474 "premature destructor after dropping clone {i}"
475 );
476 }
477 }
478
479 #[test]
482 fn destructor_runs_exactly_once_across_refill_cycle() {
483 let anchor1 = Arc::new(());
484 let anchor2 = Arc::new(());
485 let mut arena = Arena::<1, 256>::new();
486
487 let first = arena.reserve().unwrap().init(Payload {
488 _anchor: anchor1.clone(),
489 });
490 drop(first);
491 assert_eq!(Arc::strong_count(&anchor1), 1);
492
493 let second = arena.reserve().unwrap().init(Payload {
495 _anchor: anchor2.clone(),
496 });
497 assert_eq!(
498 Arc::strong_count(&anchor1),
499 1,
500 "refilling the slot must not touch the prior payload's anchor"
501 );
502 assert_eq!(Arc::strong_count(&anchor2), 2);
503 drop(second);
504 assert_eq!(Arc::strong_count(&anchor2), 1);
505 }
506
507 #[test]
511 fn heap_owning_payload_drops_exactly_once() {
512 struct HeapOwner(#[allow(dead_code)] Box<[u64; 8]>);
513
514 let mut arena = Arena::<2, 256>::new();
515 let a = arena
516 .reserve()
517 .unwrap()
518 .init(HeapOwner(Box::new([1, 2, 3, 4, 5, 6, 7, 8])));
519 let b = a.clone();
520 let c = a.clone();
521 drop(a);
522 drop(b);
523 drop(c);
524 }
528}
529
530#[cfg(test)]
531mod concurrent_drop_timing_tests {
532 use super::*;
539 use std::sync::{Arc, Barrier};
540 use std::thread;
541
542 #[test]
548 fn concurrent_drops_release_anchor_exactly_once() {
549 let anchor = Arc::new(());
550 let mut arena = Arena::<4, 256>::new();
551
552 struct Payload {
553 #[allow(dead_code)]
554 anchor: Arc<()>,
555 }
556
557 let reserved = arena.reserve().unwrap().init(Payload {
558 anchor: anchor.clone(),
559 });
560
561 const N: usize = 8;
562 let barrier = Arc::new(Barrier::new(N));
563 let mut handles = Vec::with_capacity(N);
564 for _ in 0..N - 1 {
565 let clone = reserved.clone();
566 let b = barrier.clone();
567 handles.push(thread::spawn(move || {
568 b.wait();
569 drop(clone);
570 }));
571 }
572 {
573 let b = barrier.clone();
574 let original = reserved;
575 handles.push(thread::spawn(move || {
576 b.wait();
577 drop(original);
578 }));
579 }
580
581 for h in handles {
582 h.join().unwrap();
583 }
584
585 assert_eq!(
586 Arc::strong_count(&anchor),
587 1,
588 "after all clones drop, payload anchor must be released exactly once"
589 );
590 }
591}
592
593#[cfg(test)]
594mod concurrent_tests {
595 use super::*;
596 use std::sync::{Arc, Barrier, Mutex};
597 use std::{thread, vec};
598
599 const MAX_ITEM_SIZE: usize = 2048;
600
601 fn shared_arena<const N: usize>() -> Arc<Mutex<Arena<N, MAX_ITEM_SIZE>>> {
603 #[allow(clippy::arc_with_non_send_sync)]
604 Arc::new(Mutex::new(Arena::<N, MAX_ITEM_SIZE>::new()))
605 }
606
607 #[test]
610 fn test_drop_called_exactly_once_under_contention() {
611 let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
612 let arena = shared_arena::<4>();
613
614 let uninit = arena.lock().unwrap().reserve().unwrap();
615
616 struct Probe(Arc<std::sync::atomic::AtomicUsize>);
617 impl Drop for Probe {
618 fn drop(&mut self) {
619 self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
620 }
621 }
622
623 let reserved = uninit.init(Probe(drop_count.clone()));
624
625 let barrier = Arc::new(Barrier::new(32));
627 let mut handles = vec![];
628
629 for _ in 0..32 {
630 let r = reserved.clone();
631 let b = barrier.clone();
632 handles.push(thread::spawn(move || {
633 b.wait(); drop(r);
635 }));
636 }
637
638 drop(reserved); for h in handles {
640 h.join().unwrap();
641 }
642
643 assert_eq!(
644 drop_count.load(std::sync::atomic::Ordering::Relaxed),
645 1,
646 "drop_fn must be called exactly once"
647 );
648 }
649
650 #[test]
653 fn test_slot_reuse_after_concurrent_drop() {
654 let arena = shared_arena::<1>();
655 let uninit = arena.lock().unwrap().reserve().unwrap();
656 let reserved = uninit.init(42u64);
657
658 let barrier = Arc::new(Barrier::new(8));
659 let mut handles = vec![];
660
661 for _ in 0..8 {
662 let r = reserved.clone();
663 let b = barrier.clone();
664 handles.push(thread::spawn(move || {
665 b.wait();
666 drop(r);
667 }));
668 }
669
670 drop(reserved);
671 for h in handles {
672 h.join().unwrap();
673 }
674
675 assert!(
677 arena.lock().unwrap().reserve().is_some(),
678 "Slot should be available after all clones are dropped"
679 );
680 }
681
682 #[test]
685 fn test_drop_after_arena_dropped() {
686 let drop_count = Arc::new(std::sync::atomic::AtomicUsize::new(0));
687
688 struct Probe(Arc<std::sync::atomic::AtomicUsize>);
689 impl Drop for Probe {
690 fn drop(&mut self) {
691 self.0.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
692 }
693 }
694
695 let reserved = {
696 let mut arena = Arena::<4, MAX_ITEM_SIZE>::new();
697 let uninit = arena.reserve().unwrap();
698 uninit.init(Probe(drop_count.clone()))
699 };
701
702 let barrier = Arc::new(Barrier::new(8));
704 let mut handles = vec![];
705
706 for _ in 0..8 {
707 let r = reserved.clone();
708 let b = barrier.clone();
709 handles.push(thread::spawn(move || {
710 b.wait();
711 drop(r);
712 }));
713 }
714
715 drop(reserved);
716 for h in handles {
717 h.join().unwrap();
718 }
719
720 assert_eq!(
721 drop_count.load(std::sync::atomic::Ordering::Relaxed),
722 1,
723 "drop_fn must fire exactly once even when arena is dropped first"
724 );
725 }
726}