1use std::alloc::{alloc, dealloc, Layout};
43use std::cell::UnsafeCell;
44use std::ptr::NonNull;
45use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
46use std::sync::Arc;
47
48const DEFAULT_BLOCK_SIZE: usize = 2 * 1024 * 1024;
50
51const MIN_ALIGN: usize = 8;
53
54const MAX_INLINE_KEY_SIZE: usize = 256;
56
57#[derive(Clone, Copy)]
65pub struct ArenaHandle {
66 ptr: NonNull<u8>,
68 len: u32,
70 epoch: u64,
72}
73
74impl ArenaHandle {
75 #[inline]
80 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, epoch: u64) -> Self {
81 Self {
82 ptr,
83 len: len as u32,
84 epoch,
85 }
86 }
87
88 #[inline]
90 pub fn epoch(&self) -> u64 {
91 self.epoch
92 }
93
94 #[inline]
96 pub fn len(&self) -> usize {
97 self.len as usize
98 }
99
100 #[inline]
102 pub fn is_empty(&self) -> bool {
103 self.len == 0
104 }
105
106 #[inline]
111 pub unsafe fn as_slice(&self) -> &[u8] {
112 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len as usize) }
113 }
114
115 #[inline]
120 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
121 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len as usize) }
122 }
123}
124
125unsafe impl Send for ArenaHandle {}
127unsafe impl Sync for ArenaHandle {}
128
129struct MemoryBlock {
135 data: NonNull<u8>,
137 size: usize,
139 offset: AtomicUsize,
141 layout: Layout,
143}
144
145impl MemoryBlock {
146 fn new(size: usize) -> Option<Self> {
148 let layout = Layout::from_size_align(size, MIN_ALIGN).ok()?;
149
150 let ptr = unsafe { alloc(layout) };
152 let data = NonNull::new(ptr)?;
153
154 Some(Self {
155 data,
156 size,
157 offset: AtomicUsize::new(0),
158 layout,
159 })
160 }
161
162 #[inline]
166 fn allocate(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
167 loop {
168 let current = self.offset.load(Ordering::Relaxed);
169
170 let aligned = (current + align - 1) & !(align - 1);
172 let new_offset = aligned + size;
173
174 if new_offset > self.size {
175 return None;
176 }
177
178 match self.offset.compare_exchange_weak(
179 current,
180 new_offset,
181 Ordering::Release,
182 Ordering::Relaxed,
183 ) {
184 Ok(_) => {
185 let ptr = unsafe { self.data.as_ptr().add(aligned) };
186 return NonNull::new(ptr);
187 }
188 Err(_) => continue, }
190 }
191 }
192
193 #[inline]
195 #[allow(dead_code)]
196 fn remaining(&self) -> usize {
197 self.size.saturating_sub(self.offset.load(Ordering::Relaxed))
198 }
199
200 #[inline]
202 fn used(&self) -> usize {
203 self.offset.load(Ordering::Relaxed)
204 }
205
206 fn reset(&self) {
208 self.offset.store(0, Ordering::Release);
209 }
210}
211
212impl Drop for MemoryBlock {
213 fn drop(&mut self) {
214 unsafe {
215 dealloc(self.data.as_ptr(), self.layout);
216 }
217 }
218}
219
220unsafe impl Send for MemoryBlock {}
222unsafe impl Sync for MemoryBlock {}
223
224pub struct EpochArena {
233 epoch: AtomicU64,
235 blocks: UnsafeCell<Vec<MemoryBlock>>,
237 active_block: AtomicUsize,
239 block_size: usize,
241 total_allocated: AtomicUsize,
243 allocation_count: AtomicUsize,
245 block_lock: std::sync::Mutex<()>,
247}
248
249impl EpochArena {
250 pub fn new(epoch: u64) -> Self {
252 Self::with_block_size(epoch, DEFAULT_BLOCK_SIZE)
253 }
254
255 pub fn with_block_size(epoch: u64, block_size: usize) -> Self {
257 let initial_block = MemoryBlock::new(block_size)
258 .expect("Failed to allocate initial block");
259
260 Self {
261 epoch: AtomicU64::new(epoch),
262 blocks: UnsafeCell::new(vec![initial_block]),
263 active_block: AtomicUsize::new(0),
264 block_size,
265 total_allocated: AtomicUsize::new(0),
266 allocation_count: AtomicUsize::new(0),
267 block_lock: std::sync::Mutex::new(()),
268 }
269 }
270
271 #[inline]
273 pub fn epoch(&self) -> u64 {
274 self.epoch.load(Ordering::Relaxed)
275 }
276
277 #[inline]
281 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
282 self.allocate_aligned(size, MIN_ALIGN)
283 }
284
285 pub fn allocate_aligned(&self, size: usize, align: usize) -> Option<ArenaHandle> {
287 if size == 0 {
288 return None;
289 }
290
291 let active_idx = self.active_block.load(Ordering::Acquire);
293 let blocks = unsafe { &*self.blocks.get() };
294
295 if active_idx < blocks.len() {
296 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
297 self.total_allocated.fetch_add(size, Ordering::Relaxed);
298 self.allocation_count.fetch_add(1, Ordering::Relaxed);
299 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
300 }
301 }
302
303 self.allocate_slow(size, align)
305 }
306
307 #[cold]
309 fn allocate_slow(&self, size: usize, align: usize) -> Option<ArenaHandle> {
310 let _guard = self.block_lock.lock().ok()?;
311
312 let active_idx = self.active_block.load(Ordering::Acquire);
314 let blocks = unsafe { &mut *self.blocks.get() };
315
316 if active_idx < blocks.len() {
317 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
318 self.total_allocated.fetch_add(size, Ordering::Relaxed);
319 self.allocation_count.fetch_add(1, Ordering::Relaxed);
320 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
321 }
322 }
323
324 let new_block_size = self.block_size.max(size + align);
326 let new_block = MemoryBlock::new(new_block_size)?;
327
328 let ptr = new_block.allocate(size, align)?;
329 blocks.push(new_block);
330 self.active_block.store(blocks.len() - 1, Ordering::Release);
331
332 self.total_allocated.fetch_add(size, Ordering::Relaxed);
333 self.allocation_count.fetch_add(1, Ordering::Relaxed);
334
335 Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) })
336 }
337
338 #[inline]
340 pub fn allocate_copy(&self, data: &[u8]) -> Option<ArenaHandle> {
341 let handle = self.allocate(data.len())?;
342 unsafe {
343 std::ptr::copy_nonoverlapping(data.as_ptr(), handle.ptr.as_ptr(), data.len());
344 }
345 Some(handle)
346 }
347
348 #[inline]
350 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
351 if key.len() > MAX_INLINE_KEY_SIZE {
352 return None;
353 }
354 self.allocate_aligned(key.len(), 16).map(|handle| {
355 unsafe {
356 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
357 }
358 handle
359 })
360 }
361
362 pub fn stats(&self) -> ArenaStats {
364 let blocks = unsafe { &*self.blocks.get() };
365
366 ArenaStats {
367 epoch: self.epoch(),
368 block_count: blocks.len(),
369 total_capacity: blocks.iter().map(|b| b.size).sum(),
370 total_used: blocks.iter().map(|b| b.used()).sum(),
371 total_allocated: self.total_allocated.load(Ordering::Relaxed),
372 allocation_count: self.allocation_count.load(Ordering::Relaxed),
373 }
374 }
375
376 pub fn reset(&self, new_epoch: u64) {
380 let _guard = self.block_lock.lock().unwrap();
381
382 let blocks = unsafe { &*self.blocks.get() };
384 for block in blocks {
385 block.reset();
386 }
387
388 self.epoch.store(new_epoch, Ordering::Release);
389 self.active_block.store(0, Ordering::Release);
390 self.total_allocated.store(0, Ordering::Relaxed);
391 self.allocation_count.store(0, Ordering::Relaxed);
392 }
393}
394
395unsafe impl Send for EpochArena {}
397unsafe impl Sync for EpochArena {}
398
399#[derive(Debug, Clone)]
401pub struct ArenaStats {
402 pub epoch: u64,
404 pub block_count: usize,
406 pub total_capacity: usize,
408 pub total_used: usize,
410 pub total_allocated: usize,
412 pub allocation_count: usize,
414}
415
416pub struct ArenaPool {
424 arenas: Vec<Arc<EpochArena>>,
426 current_epoch: AtomicU64,
428 pool_size: usize,
430 #[allow(dead_code)]
432 block_size: usize,
433}
434
435impl ArenaPool {
436 pub fn new(pool_size: usize) -> Self {
438 Self::with_block_size(pool_size, DEFAULT_BLOCK_SIZE)
439 }
440
441 pub fn with_block_size(pool_size: usize, block_size: usize) -> Self {
443 let arenas = (0..pool_size)
444 .map(|i| Arc::new(EpochArena::with_block_size(i as u64, block_size)))
445 .collect();
446
447 Self {
448 arenas,
449 current_epoch: AtomicU64::new(0),
450 pool_size,
451 block_size,
452 }
453 }
454
455 #[inline]
457 pub fn current_epoch(&self) -> u64 {
458 self.current_epoch.load(Ordering::Acquire)
459 }
460
461 #[inline]
463 pub fn current_arena(&self) -> Arc<EpochArena> {
464 let epoch = self.current_epoch();
465 let idx = (epoch as usize) % self.pool_size;
466 self.arenas[idx].clone()
467 }
468
469 #[inline]
471 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
472 self.current_arena().allocate(size)
473 }
474
475 #[inline]
477 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
478 self.current_arena().allocate_key(key)
479 }
480
481 pub fn advance_epoch(&self) -> u64 {
485 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
486
487 let next_idx = (new_epoch as usize) % self.pool_size;
489 self.arenas[next_idx].reset(new_epoch);
490
491 new_epoch
492 }
493
494 #[inline]
498 pub fn is_epoch_valid(&self, epoch: u64) -> bool {
499 let current = self.current_epoch();
500 epoch + (self.pool_size as u64) > current
501 }
502
503 pub fn stats(&self) -> Vec<ArenaStats> {
505 self.arenas.iter().map(|a| a.stats()).collect()
506 }
507}
508
509pub struct ThreadLocalArena {
515 pool: Arc<ArenaPool>,
517 cached_arena: AtomicPtr<EpochArena>,
519 cached_epoch: AtomicU64,
521}
522
523impl ThreadLocalArena {
524 pub fn new(pool: Arc<ArenaPool>) -> Self {
526 let arena = pool.current_arena();
527 let epoch = arena.epoch();
528
529 Self {
530 pool,
531 cached_arena: AtomicPtr::new(Arc::into_raw(arena) as *mut _),
532 cached_epoch: AtomicU64::new(epoch),
533 }
534 }
535
536 #[inline]
538 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
539 let current_epoch = self.pool.current_epoch();
540 let cached_epoch = self.cached_epoch.load(Ordering::Relaxed);
541
542 if current_epoch == cached_epoch {
543 let arena_ptr = self.cached_arena.load(Ordering::Acquire);
545 if !arena_ptr.is_null() {
546 let arena = unsafe { &*arena_ptr };
547 return arena.allocate(size);
548 }
549 }
550
551 self.allocate_slow(size, current_epoch)
553 }
554
555 #[cold]
556 fn allocate_slow(&self, size: usize, _current_epoch: u64) -> Option<ArenaHandle> {
557 let new_arena = self.pool.current_arena();
558 let new_epoch = new_arena.epoch();
559
560 let old_ptr = self.cached_arena.swap(
562 Arc::into_raw(new_arena.clone()) as *mut _,
563 Ordering::AcqRel,
564 );
565 self.cached_epoch.store(new_epoch, Ordering::Release);
566
567 if !old_ptr.is_null() {
569 unsafe { Arc::from_raw(old_ptr as *const EpochArena) };
570 }
571
572 new_arena.allocate(size)
573 }
574
575 #[inline]
577 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
578 if key.len() > MAX_INLINE_KEY_SIZE {
579 return None;
580 }
581 self.allocate(key.len()).map(|handle| {
582 unsafe {
583 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
584 }
585 handle
586 })
587 }
588}
589
590impl Drop for ThreadLocalArena {
591 fn drop(&mut self) {
592 let ptr = self.cached_arena.load(Ordering::Acquire);
593 if !ptr.is_null() {
594 unsafe { Arc::from_raw(ptr as *const EpochArena) };
595 }
596 }
597}
598
599unsafe impl Send for ThreadLocalArena {}
601unsafe impl Sync for ThreadLocalArena {}
602
603#[derive(Clone, Copy)]
609pub struct ArenaKey {
610 handle: ArenaHandle,
611}
612
613impl ArenaKey {
614 #[inline]
616 pub fn new(handle: ArenaHandle) -> Self {
617 Self { handle }
618 }
619
620 #[inline]
625 pub unsafe fn as_bytes(&self) -> &[u8] {
626 unsafe { self.handle.as_slice() }
627 }
628
629 #[inline]
631 pub fn epoch(&self) -> u64 {
632 self.handle.epoch()
633 }
634
635 #[inline]
637 pub fn len(&self) -> usize {
638 self.handle.len()
639 }
640
641 #[inline]
643 pub fn is_empty(&self) -> bool {
644 self.handle.is_empty()
645 }
646}
647
648unsafe impl Send for ArenaKey {}
650unsafe impl Sync for ArenaKey {}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655 use std::sync::Arc;
656 use std::thread;
657
658 #[test]
659 fn test_epoch_arena_basic() {
660 let arena = EpochArena::new(1);
661
662 let h1 = arena.allocate(16).unwrap();
663 let h2 = arena.allocate(32).unwrap();
664 let h3 = arena.allocate(64).unwrap();
665
666 assert_eq!(h1.len(), 16);
667 assert_eq!(h2.len(), 32);
668 assert_eq!(h3.len(), 64);
669 assert_eq!(h1.epoch(), 1);
670
671 let stats = arena.stats();
672 assert_eq!(stats.allocation_count, 3);
673 }
674
675 #[test]
676 fn test_allocate_copy() {
677 let arena = EpochArena::new(1);
678 let data = b"hello world";
679
680 let handle = arena.allocate_copy(data).unwrap();
681 assert_eq!(handle.len(), data.len());
682
683 let slice = unsafe { handle.as_slice() };
684 assert_eq!(slice, data);
685 }
686
687 #[test]
688 fn test_allocate_key() {
689 let arena = EpochArena::new(1);
690 let key = b"my_test_key";
691
692 let handle = arena.allocate_key(key).unwrap();
693 let slice = unsafe { handle.as_slice() };
694 assert_eq!(slice, key);
695 }
696
697 #[test]
698 fn test_arena_reset() {
699 let arena = EpochArena::new(1);
700
701 for _ in 0..1000 {
702 arena.allocate(64).unwrap();
703 }
704
705 let stats_before = arena.stats();
706 assert!(stats_before.total_allocated > 0);
707
708 arena.reset(2);
709
710 let stats_after = arena.stats();
711 assert_eq!(stats_after.epoch, 2);
712 assert_eq!(stats_after.allocation_count, 0);
713 }
714
715 #[test]
716 fn test_arena_pool() {
717 let pool = ArenaPool::new(4);
718
719 let h1 = pool.allocate(16).unwrap();
720 assert_eq!(h1.epoch(), 0);
721
722 pool.advance_epoch();
723
724 let h2 = pool.allocate(16).unwrap();
725 assert_eq!(h2.epoch(), 1);
726
727 assert!(pool.is_epoch_valid(0));
728 assert!(pool.is_epoch_valid(1));
729 }
730
731 #[test]
732 fn test_thread_local_arena() {
733 let pool = Arc::new(ArenaPool::new(4));
734 let tla = ThreadLocalArena::new(pool.clone());
735
736 let h1 = tla.allocate(32).unwrap();
737 assert_eq!(h1.len(), 32);
738
739 let h2 = tla.allocate_key(b"test").unwrap();
740 assert_eq!(h2.len(), 4);
741 }
742
743 #[test]
744 fn test_concurrent_allocation() {
745 let pool = Arc::new(ArenaPool::new(4));
746 let mut handles = vec![];
747
748 for _ in 0..8 {
749 let pool_clone = pool.clone();
750 handles.push(thread::spawn(move || {
751 for i in 0..10000 {
752 let size = (i % 64) + 8;
753 pool_clone.allocate(size).expect("allocation failed");
754 }
755 }));
756 }
757
758 for handle in handles {
759 handle.join().unwrap();
760 }
761
762 let stats = pool.stats();
763 let total_allocs: usize = stats.iter().map(|s| s.allocation_count).sum();
764 assert_eq!(total_allocs, 80000);
765 }
766
767 #[test]
768 fn test_large_allocation() {
769 let arena = EpochArena::new(1);
770
771 let large_size = 3 * 1024 * 1024;
773 let handle = arena.allocate(large_size).unwrap();
774 assert_eq!(handle.len(), large_size);
775
776 let stats = arena.stats();
777 assert!(stats.block_count >= 2); }
779
780 #[test]
781 fn test_alignment() {
782 let arena = EpochArena::new(1);
783
784 let h1 = arena.allocate_aligned(17, 16).unwrap();
786 assert!((h1.ptr.as_ptr() as usize) % 16 == 0);
787
788 let h2 = arena.allocate_aligned(65, 64).unwrap();
790 assert!((h2.ptr.as_ptr() as usize) % 64 == 0);
791 }
792
793 #[test]
794 fn test_arena_key() {
795 let arena = EpochArena::new(42);
796 let key_data = b"user:12345:profile";
797
798 let handle = arena.allocate_key(key_data).unwrap();
799 let key = ArenaKey::new(handle);
800
801 assert_eq!(key.len(), key_data.len());
802 assert_eq!(key.epoch(), 42);
803
804 let bytes = unsafe { key.as_bytes() };
805 assert_eq!(bytes, key_data);
806 }
807
808 #[test]
809 fn test_epoch_advancement() {
810 let pool = ArenaPool::new(4);
811
812 for expected_epoch in 1..=10 {
814 let new_epoch = pool.advance_epoch();
815 assert_eq!(new_epoch, expected_epoch);
816 }
817
818 assert!(!pool.is_epoch_valid(0)); assert!(pool.is_epoch_valid(7)); assert!(pool.is_epoch_valid(10)); }
823}