1use std::alloc::{Layout, alloc, dealloc};
43use std::cell::UnsafeCell;
44use std::ptr::NonNull;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
47
48const DEFAULT_BLOCK_SIZE: usize = 2 * 1024 * 1024;
50
51const MIN_ALIGN: usize = 8;
53
54const MAX_INLINE_KEY_SIZE: usize = 256;
56
57#[derive(Clone, Copy)]
65pub struct ArenaHandle {
66 ptr: NonNull<u8>,
68 len: u32,
70 epoch: u64,
72}
73
74impl ArenaHandle {
75 #[inline]
80 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, epoch: u64) -> Self {
81 Self {
82 ptr,
83 len: len as u32,
84 epoch,
85 }
86 }
87
88 #[inline]
90 pub fn epoch(&self) -> u64 {
91 self.epoch
92 }
93
94 #[inline]
96 pub fn len(&self) -> usize {
97 self.len as usize
98 }
99
100 #[inline]
102 pub fn is_empty(&self) -> bool {
103 self.len == 0
104 }
105
106 #[inline]
111 pub unsafe fn as_slice(&self) -> &[u8] {
112 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len as usize) }
113 }
114
115 #[inline]
120 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
121 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len as usize) }
122 }
123}
124
125unsafe impl Send for ArenaHandle {}
127unsafe impl Sync for ArenaHandle {}
128
129struct MemoryBlock {
135 data: NonNull<u8>,
137 size: usize,
139 offset: AtomicUsize,
141 layout: Layout,
143}
144
145impl MemoryBlock {
146 fn new(size: usize) -> Option<Self> {
148 let layout = Layout::from_size_align(size, MIN_ALIGN).ok()?;
149
150 let ptr = unsafe { alloc(layout) };
152 let data = NonNull::new(ptr)?;
153
154 Some(Self {
155 data,
156 size,
157 offset: AtomicUsize::new(0),
158 layout,
159 })
160 }
161
162 #[inline]
166 fn allocate(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
167 loop {
168 let current = self.offset.load(Ordering::Relaxed);
169
170 let aligned = (current + align - 1) & !(align - 1);
172 let new_offset = aligned + size;
173
174 if new_offset > self.size {
175 return None;
176 }
177
178 match self.offset.compare_exchange_weak(
179 current,
180 new_offset,
181 Ordering::Release,
182 Ordering::Relaxed,
183 ) {
184 Ok(_) => {
185 let ptr = unsafe { self.data.as_ptr().add(aligned) };
186 return NonNull::new(ptr);
187 }
188 Err(_) => continue, }
190 }
191 }
192
193 #[inline]
195 #[allow(dead_code)]
196 fn remaining(&self) -> usize {
197 self.size
198 .saturating_sub(self.offset.load(Ordering::Relaxed))
199 }
200
201 #[inline]
203 fn used(&self) -> usize {
204 self.offset.load(Ordering::Relaxed)
205 }
206
207 fn reset(&self) {
209 self.offset.store(0, Ordering::Release);
210 }
211}
212
213impl Drop for MemoryBlock {
214 fn drop(&mut self) {
215 unsafe {
216 dealloc(self.data.as_ptr(), self.layout);
217 }
218 }
219}
220
221unsafe impl Send for MemoryBlock {}
223unsafe impl Sync for MemoryBlock {}
224
225pub struct EpochArena {
234 epoch: AtomicU64,
236 blocks: UnsafeCell<Vec<MemoryBlock>>,
238 active_block: AtomicUsize,
240 block_size: usize,
242 total_allocated: AtomicUsize,
244 allocation_count: AtomicUsize,
246 block_lock: std::sync::Mutex<()>,
248}
249
250impl EpochArena {
251 pub fn new(epoch: u64) -> Self {
253 Self::with_block_size(epoch, DEFAULT_BLOCK_SIZE)
254 }
255
256 pub fn with_block_size(epoch: u64, block_size: usize) -> Self {
258 let initial_block = MemoryBlock::new(block_size).expect("Failed to allocate initial block");
259
260 Self {
261 epoch: AtomicU64::new(epoch),
262 blocks: UnsafeCell::new(vec![initial_block]),
263 active_block: AtomicUsize::new(0),
264 block_size,
265 total_allocated: AtomicUsize::new(0),
266 allocation_count: AtomicUsize::new(0),
267 block_lock: std::sync::Mutex::new(()),
268 }
269 }
270
271 #[inline]
273 pub fn epoch(&self) -> u64 {
274 self.epoch.load(Ordering::Relaxed)
275 }
276
277 #[inline]
281 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
282 self.allocate_aligned(size, MIN_ALIGN)
283 }
284
285 pub fn allocate_aligned(&self, size: usize, align: usize) -> Option<ArenaHandle> {
287 if size == 0 {
288 return None;
289 }
290
291 let active_idx = self.active_block.load(Ordering::Acquire);
293 let blocks = unsafe { &*self.blocks.get() };
294
295 if active_idx < blocks.len() {
296 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
297 self.total_allocated.fetch_add(size, Ordering::Relaxed);
298 self.allocation_count.fetch_add(1, Ordering::Relaxed);
299 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
300 }
301 }
302
303 self.allocate_slow(size, align)
305 }
306
307 #[cold]
309 fn allocate_slow(&self, size: usize, align: usize) -> Option<ArenaHandle> {
310 let _guard = self.block_lock.lock().ok()?;
311
312 let active_idx = self.active_block.load(Ordering::Acquire);
314 let blocks = unsafe { &mut *self.blocks.get() };
315
316 if active_idx < blocks.len() {
317 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
318 self.total_allocated.fetch_add(size, Ordering::Relaxed);
319 self.allocation_count.fetch_add(1, Ordering::Relaxed);
320 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
321 }
322 }
323
324 let new_block_size = self.block_size.max(size + align);
326 let new_block = MemoryBlock::new(new_block_size)?;
327
328 let ptr = new_block.allocate(size, align)?;
329 blocks.push(new_block);
330 self.active_block.store(blocks.len() - 1, Ordering::Release);
331
332 self.total_allocated.fetch_add(size, Ordering::Relaxed);
333 self.allocation_count.fetch_add(1, Ordering::Relaxed);
334
335 Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) })
336 }
337
338 #[inline]
340 pub fn allocate_copy(&self, data: &[u8]) -> Option<ArenaHandle> {
341 let handle = self.allocate(data.len())?;
342 unsafe {
343 std::ptr::copy_nonoverlapping(data.as_ptr(), handle.ptr.as_ptr(), data.len());
344 }
345 Some(handle)
346 }
347
348 #[inline]
350 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
351 if key.len() > MAX_INLINE_KEY_SIZE {
352 return None;
353 }
354 self.allocate_aligned(key.len(), 16).map(|handle| {
355 unsafe {
356 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
357 }
358 handle
359 })
360 }
361
362 pub fn stats(&self) -> ArenaStats {
364 let blocks = unsafe { &*self.blocks.get() };
365
366 ArenaStats {
367 epoch: self.epoch(),
368 block_count: blocks.len(),
369 total_capacity: blocks.iter().map(|b| b.size).sum(),
370 total_used: blocks.iter().map(|b| b.used()).sum(),
371 total_allocated: self.total_allocated.load(Ordering::Relaxed),
372 allocation_count: self.allocation_count.load(Ordering::Relaxed),
373 }
374 }
375
376 pub fn reset(&self, new_epoch: u64) {
380 let _guard = self.block_lock.lock().unwrap();
381
382 let blocks = unsafe { &*self.blocks.get() };
384 for block in blocks {
385 block.reset();
386 }
387
388 self.epoch.store(new_epoch, Ordering::Release);
389 self.active_block.store(0, Ordering::Release);
390 self.total_allocated.store(0, Ordering::Relaxed);
391 self.allocation_count.store(0, Ordering::Relaxed);
392 }
393}
394
395unsafe impl Send for EpochArena {}
397unsafe impl Sync for EpochArena {}
398
399#[derive(Debug, Clone)]
401pub struct ArenaStats {
402 pub epoch: u64,
404 pub block_count: usize,
406 pub total_capacity: usize,
408 pub total_used: usize,
410 pub total_allocated: usize,
412 pub allocation_count: usize,
414}
415
416pub struct ArenaPool {
424 arenas: Vec<Arc<EpochArena>>,
426 current_epoch: AtomicU64,
428 pool_size: usize,
430 #[allow(dead_code)]
432 block_size: usize,
433}
434
435impl ArenaPool {
436 pub fn new(pool_size: usize) -> Self {
438 Self::with_block_size(pool_size, DEFAULT_BLOCK_SIZE)
439 }
440
441 pub fn with_block_size(pool_size: usize, block_size: usize) -> Self {
443 let arenas = (0..pool_size)
444 .map(|i| Arc::new(EpochArena::with_block_size(i as u64, block_size)))
445 .collect();
446
447 Self {
448 arenas,
449 current_epoch: AtomicU64::new(0),
450 pool_size,
451 block_size,
452 }
453 }
454
455 #[inline]
457 pub fn current_epoch(&self) -> u64 {
458 self.current_epoch.load(Ordering::Acquire)
459 }
460
461 #[inline]
463 pub fn current_arena(&self) -> Arc<EpochArena> {
464 let epoch = self.current_epoch();
465 let idx = (epoch as usize) % self.pool_size;
466 self.arenas[idx].clone()
467 }
468
469 #[inline]
471 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
472 self.current_arena().allocate(size)
473 }
474
475 #[inline]
477 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
478 self.current_arena().allocate_key(key)
479 }
480
481 pub fn advance_epoch(&self) -> u64 {
485 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
486
487 let next_idx = (new_epoch as usize) % self.pool_size;
489 self.arenas[next_idx].reset(new_epoch);
490
491 new_epoch
492 }
493
494 #[inline]
498 pub fn is_epoch_valid(&self, epoch: u64) -> bool {
499 let current = self.current_epoch();
500 epoch + (self.pool_size as u64) > current
501 }
502
503 pub fn stats(&self) -> Vec<ArenaStats> {
505 self.arenas.iter().map(|a| a.stats()).collect()
506 }
507}
508
509pub struct ThreadLocalArena {
515 pool: Arc<ArenaPool>,
517 cached_arena: AtomicPtr<EpochArena>,
519 cached_epoch: AtomicU64,
521}
522
523impl ThreadLocalArena {
524 pub fn new(pool: Arc<ArenaPool>) -> Self {
526 let arena = pool.current_arena();
527 let epoch = arena.epoch();
528
529 Self {
530 pool,
531 cached_arena: AtomicPtr::new(Arc::into_raw(arena) as *mut _),
532 cached_epoch: AtomicU64::new(epoch),
533 }
534 }
535
536 #[inline]
538 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
539 let current_epoch = self.pool.current_epoch();
540 let cached_epoch = self.cached_epoch.load(Ordering::Relaxed);
541
542 if current_epoch == cached_epoch {
543 let arena_ptr = self.cached_arena.load(Ordering::Acquire);
545 if !arena_ptr.is_null() {
546 let arena = unsafe { &*arena_ptr };
547 return arena.allocate(size);
548 }
549 }
550
551 self.allocate_slow(size, current_epoch)
553 }
554
555 #[cold]
556 fn allocate_slow(&self, size: usize, _current_epoch: u64) -> Option<ArenaHandle> {
557 let new_arena = self.pool.current_arena();
558 let new_epoch = new_arena.epoch();
559
560 let old_ptr = self
562 .cached_arena
563 .swap(Arc::into_raw(new_arena.clone()) as *mut _, Ordering::AcqRel);
564 self.cached_epoch.store(new_epoch, Ordering::Release);
565
566 if !old_ptr.is_null() {
568 unsafe { Arc::from_raw(old_ptr as *const EpochArena) };
569 }
570
571 new_arena.allocate(size)
572 }
573
574 #[inline]
576 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
577 if key.len() > MAX_INLINE_KEY_SIZE {
578 return None;
579 }
580 self.allocate(key.len()).map(|handle| {
581 unsafe {
582 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
583 }
584 handle
585 })
586 }
587}
588
589impl Drop for ThreadLocalArena {
590 fn drop(&mut self) {
591 let ptr = self.cached_arena.load(Ordering::Acquire);
592 if !ptr.is_null() {
593 unsafe { Arc::from_raw(ptr as *const EpochArena) };
594 }
595 }
596}
597
598unsafe impl Send for ThreadLocalArena {}
600unsafe impl Sync for ThreadLocalArena {}
601
602#[derive(Clone, Copy)]
608pub struct ArenaKey {
609 handle: ArenaHandle,
610}
611
612impl ArenaKey {
613 #[inline]
615 pub fn new(handle: ArenaHandle) -> Self {
616 Self { handle }
617 }
618
619 #[inline]
624 pub unsafe fn as_bytes(&self) -> &[u8] {
625 unsafe { self.handle.as_slice() }
626 }
627
628 #[inline]
630 pub fn epoch(&self) -> u64 {
631 self.handle.epoch()
632 }
633
634 #[inline]
636 pub fn len(&self) -> usize {
637 self.handle.len()
638 }
639
640 #[inline]
642 pub fn is_empty(&self) -> bool {
643 self.handle.is_empty()
644 }
645}
646
647unsafe impl Send for ArenaKey {}
649unsafe impl Sync for ArenaKey {}
650
651#[cfg(test)]
652mod tests {
653 use super::*;
654 use std::sync::Arc;
655 use std::thread;
656
657 #[test]
658 fn test_epoch_arena_basic() {
659 let arena = EpochArena::new(1);
660
661 let h1 = arena.allocate(16).unwrap();
662 let h2 = arena.allocate(32).unwrap();
663 let h3 = arena.allocate(64).unwrap();
664
665 assert_eq!(h1.len(), 16);
666 assert_eq!(h2.len(), 32);
667 assert_eq!(h3.len(), 64);
668 assert_eq!(h1.epoch(), 1);
669
670 let stats = arena.stats();
671 assert_eq!(stats.allocation_count, 3);
672 }
673
674 #[test]
675 fn test_allocate_copy() {
676 let arena = EpochArena::new(1);
677 let data = b"hello world";
678
679 let handle = arena.allocate_copy(data).unwrap();
680 assert_eq!(handle.len(), data.len());
681
682 let slice = unsafe { handle.as_slice() };
683 assert_eq!(slice, data);
684 }
685
686 #[test]
687 fn test_allocate_key() {
688 let arena = EpochArena::new(1);
689 let key = b"my_test_key";
690
691 let handle = arena.allocate_key(key).unwrap();
692 let slice = unsafe { handle.as_slice() };
693 assert_eq!(slice, key);
694 }
695
696 #[test]
697 fn test_arena_reset() {
698 let arena = EpochArena::new(1);
699
700 for _ in 0..1000 {
701 arena.allocate(64).unwrap();
702 }
703
704 let stats_before = arena.stats();
705 assert!(stats_before.total_allocated > 0);
706
707 arena.reset(2);
708
709 let stats_after = arena.stats();
710 assert_eq!(stats_after.epoch, 2);
711 assert_eq!(stats_after.allocation_count, 0);
712 }
713
714 #[test]
715 fn test_arena_pool() {
716 let pool = ArenaPool::new(4);
717
718 let h1 = pool.allocate(16).unwrap();
719 assert_eq!(h1.epoch(), 0);
720
721 pool.advance_epoch();
722
723 let h2 = pool.allocate(16).unwrap();
724 assert_eq!(h2.epoch(), 1);
725
726 assert!(pool.is_epoch_valid(0));
727 assert!(pool.is_epoch_valid(1));
728 }
729
730 #[test]
731 fn test_thread_local_arena() {
732 let pool = Arc::new(ArenaPool::new(4));
733 let tla = ThreadLocalArena::new(pool.clone());
734
735 let h1 = tla.allocate(32).unwrap();
736 assert_eq!(h1.len(), 32);
737
738 let h2 = tla.allocate_key(b"test").unwrap();
739 assert_eq!(h2.len(), 4);
740 }
741
742 #[test]
743 fn test_concurrent_allocation() {
744 let pool = Arc::new(ArenaPool::new(4));
745 let mut handles = vec![];
746
747 for _ in 0..8 {
748 let pool_clone = pool.clone();
749 handles.push(thread::spawn(move || {
750 for i in 0..10000 {
751 let size = (i % 64) + 8;
752 pool_clone.allocate(size).expect("allocation failed");
753 }
754 }));
755 }
756
757 for handle in handles {
758 handle.join().unwrap();
759 }
760
761 let stats = pool.stats();
762 let total_allocs: usize = stats.iter().map(|s| s.allocation_count).sum();
763 assert_eq!(total_allocs, 80000);
764 }
765
766 #[test]
767 fn test_large_allocation() {
768 let arena = EpochArena::new(1);
769
770 let large_size = 3 * 1024 * 1024;
772 let handle = arena.allocate(large_size).unwrap();
773 assert_eq!(handle.len(), large_size);
774
775 let stats = arena.stats();
776 assert!(stats.block_count >= 2); }
778
779 #[test]
780 fn test_alignment() {
781 let arena = EpochArena::new(1);
782
783 let h1 = arena.allocate_aligned(17, 16).unwrap();
785 assert!((h1.ptr.as_ptr() as usize) % 16 == 0);
786
787 let h2 = arena.allocate_aligned(65, 64).unwrap();
789 assert!((h2.ptr.as_ptr() as usize) % 64 == 0);
790 }
791
792 #[test]
793 fn test_arena_key() {
794 let arena = EpochArena::new(42);
795 let key_data = b"user:12345:profile";
796
797 let handle = arena.allocate_key(key_data).unwrap();
798 let key = ArenaKey::new(handle);
799
800 assert_eq!(key.len(), key_data.len());
801 assert_eq!(key.epoch(), 42);
802
803 let bytes = unsafe { key.as_bytes() };
804 assert_eq!(bytes, key_data);
805 }
806
807 #[test]
808 fn test_epoch_advancement() {
809 let pool = ArenaPool::new(4);
810
811 for expected_epoch in 1..=10 {
813 let new_epoch = pool.advance_epoch();
814 assert_eq!(new_epoch, expected_epoch);
815 }
816
817 assert!(!pool.is_epoch_valid(0)); assert!(pool.is_epoch_valid(7)); assert!(pool.is_epoch_valid(10)); }
822}