1use std::alloc::{alloc, dealloc, Layout};
40use std::cell::UnsafeCell;
41use std::ptr::NonNull;
42use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
43use std::sync::Arc;
44
45const DEFAULT_BLOCK_SIZE: usize = 2 * 1024 * 1024;
47
48const MIN_ALIGN: usize = 8;
50
51const MAX_INLINE_KEY_SIZE: usize = 256;
53
54#[derive(Clone, Copy)]
62pub struct ArenaHandle {
63 ptr: NonNull<u8>,
65 len: u32,
67 epoch: u64,
69}
70
71impl ArenaHandle {
72 #[inline]
77 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, epoch: u64) -> Self {
78 Self {
79 ptr,
80 len: len as u32,
81 epoch,
82 }
83 }
84
85 #[inline]
87 pub fn epoch(&self) -> u64 {
88 self.epoch
89 }
90
91 #[inline]
93 pub fn len(&self) -> usize {
94 self.len as usize
95 }
96
97 #[inline]
99 pub fn is_empty(&self) -> bool {
100 self.len == 0
101 }
102
103 #[inline]
108 pub unsafe fn as_slice(&self) -> &[u8] {
109 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len as usize) }
110 }
111
112 #[inline]
117 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
118 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len as usize) }
119 }
120}
121
122unsafe impl Send for ArenaHandle {}
124unsafe impl Sync for ArenaHandle {}
125
126struct MemoryBlock {
132 data: NonNull<u8>,
134 size: usize,
136 offset: AtomicUsize,
138 layout: Layout,
140}
141
142impl MemoryBlock {
143 fn new(size: usize) -> Option<Self> {
145 let layout = Layout::from_size_align(size, MIN_ALIGN).ok()?;
146
147 let ptr = unsafe { alloc(layout) };
149 let data = NonNull::new(ptr)?;
150
151 Some(Self {
152 data,
153 size,
154 offset: AtomicUsize::new(0),
155 layout,
156 })
157 }
158
159 #[inline]
163 fn allocate(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
164 loop {
165 let current = self.offset.load(Ordering::Relaxed);
166
167 let aligned = (current + align - 1) & !(align - 1);
169 let new_offset = aligned + size;
170
171 if new_offset > self.size {
172 return None;
173 }
174
175 match self.offset.compare_exchange_weak(
176 current,
177 new_offset,
178 Ordering::Release,
179 Ordering::Relaxed,
180 ) {
181 Ok(_) => {
182 let ptr = unsafe { self.data.as_ptr().add(aligned) };
183 return NonNull::new(ptr);
184 }
185 Err(_) => continue, }
187 }
188 }
189
190 #[inline]
192 #[allow(dead_code)]
193 fn remaining(&self) -> usize {
194 self.size.saturating_sub(self.offset.load(Ordering::Relaxed))
195 }
196
197 #[inline]
199 fn used(&self) -> usize {
200 self.offset.load(Ordering::Relaxed)
201 }
202
203 fn reset(&self) {
205 self.offset.store(0, Ordering::Release);
206 }
207}
208
209impl Drop for MemoryBlock {
210 fn drop(&mut self) {
211 unsafe {
212 dealloc(self.data.as_ptr(), self.layout);
213 }
214 }
215}
216
217unsafe impl Send for MemoryBlock {}
219unsafe impl Sync for MemoryBlock {}
220
221pub struct EpochArena {
230 epoch: AtomicU64,
232 blocks: UnsafeCell<Vec<MemoryBlock>>,
234 active_block: AtomicUsize,
236 block_size: usize,
238 total_allocated: AtomicUsize,
240 allocation_count: AtomicUsize,
242 block_lock: std::sync::Mutex<()>,
244}
245
246impl EpochArena {
247 pub fn new(epoch: u64) -> Self {
249 Self::with_block_size(epoch, DEFAULT_BLOCK_SIZE)
250 }
251
252 pub fn with_block_size(epoch: u64, block_size: usize) -> Self {
254 let initial_block = MemoryBlock::new(block_size)
255 .expect("Failed to allocate initial block");
256
257 Self {
258 epoch: AtomicU64::new(epoch),
259 blocks: UnsafeCell::new(vec![initial_block]),
260 active_block: AtomicUsize::new(0),
261 block_size,
262 total_allocated: AtomicUsize::new(0),
263 allocation_count: AtomicUsize::new(0),
264 block_lock: std::sync::Mutex::new(()),
265 }
266 }
267
268 #[inline]
270 pub fn epoch(&self) -> u64 {
271 self.epoch.load(Ordering::Relaxed)
272 }
273
274 #[inline]
278 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
279 self.allocate_aligned(size, MIN_ALIGN)
280 }
281
282 pub fn allocate_aligned(&self, size: usize, align: usize) -> Option<ArenaHandle> {
284 if size == 0 {
285 return None;
286 }
287
288 let active_idx = self.active_block.load(Ordering::Acquire);
290 let blocks = unsafe { &*self.blocks.get() };
291
292 if active_idx < blocks.len() {
293 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
294 self.total_allocated.fetch_add(size, Ordering::Relaxed);
295 self.allocation_count.fetch_add(1, Ordering::Relaxed);
296 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
297 }
298 }
299
300 self.allocate_slow(size, align)
302 }
303
304 #[cold]
306 fn allocate_slow(&self, size: usize, align: usize) -> Option<ArenaHandle> {
307 let _guard = self.block_lock.lock().ok()?;
308
309 let active_idx = self.active_block.load(Ordering::Acquire);
311 let blocks = unsafe { &mut *self.blocks.get() };
312
313 if active_idx < blocks.len() {
314 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
315 self.total_allocated.fetch_add(size, Ordering::Relaxed);
316 self.allocation_count.fetch_add(1, Ordering::Relaxed);
317 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
318 }
319 }
320
321 let new_block_size = self.block_size.max(size + align);
323 let new_block = MemoryBlock::new(new_block_size)?;
324
325 let ptr = new_block.allocate(size, align)?;
326 blocks.push(new_block);
327 self.active_block.store(blocks.len() - 1, Ordering::Release);
328
329 self.total_allocated.fetch_add(size, Ordering::Relaxed);
330 self.allocation_count.fetch_add(1, Ordering::Relaxed);
331
332 Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) })
333 }
334
335 #[inline]
337 pub fn allocate_copy(&self, data: &[u8]) -> Option<ArenaHandle> {
338 let handle = self.allocate(data.len())?;
339 unsafe {
340 std::ptr::copy_nonoverlapping(data.as_ptr(), handle.ptr.as_ptr(), data.len());
341 }
342 Some(handle)
343 }
344
345 #[inline]
347 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
348 if key.len() > MAX_INLINE_KEY_SIZE {
349 return None;
350 }
351 self.allocate_aligned(key.len(), 16).map(|handle| {
352 unsafe {
353 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
354 }
355 handle
356 })
357 }
358
359 pub fn stats(&self) -> ArenaStats {
361 let blocks = unsafe { &*self.blocks.get() };
362
363 ArenaStats {
364 epoch: self.epoch(),
365 block_count: blocks.len(),
366 total_capacity: blocks.iter().map(|b| b.size).sum(),
367 total_used: blocks.iter().map(|b| b.used()).sum(),
368 total_allocated: self.total_allocated.load(Ordering::Relaxed),
369 allocation_count: self.allocation_count.load(Ordering::Relaxed),
370 }
371 }
372
373 pub fn reset(&self, new_epoch: u64) {
377 let _guard = self.block_lock.lock().unwrap();
378
379 let blocks = unsafe { &*self.blocks.get() };
381 for block in blocks {
382 block.reset();
383 }
384
385 self.epoch.store(new_epoch, Ordering::Release);
386 self.active_block.store(0, Ordering::Release);
387 self.total_allocated.store(0, Ordering::Relaxed);
388 self.allocation_count.store(0, Ordering::Relaxed);
389 }
390}
391
392unsafe impl Send for EpochArena {}
394unsafe impl Sync for EpochArena {}
395
396#[derive(Debug, Clone)]
398pub struct ArenaStats {
399 pub epoch: u64,
401 pub block_count: usize,
403 pub total_capacity: usize,
405 pub total_used: usize,
407 pub total_allocated: usize,
409 pub allocation_count: usize,
411}
412
413pub struct ArenaPool {
421 arenas: Vec<Arc<EpochArena>>,
423 current_epoch: AtomicU64,
425 pool_size: usize,
427 #[allow(dead_code)]
429 block_size: usize,
430}
431
432impl ArenaPool {
433 pub fn new(pool_size: usize) -> Self {
435 Self::with_block_size(pool_size, DEFAULT_BLOCK_SIZE)
436 }
437
438 pub fn with_block_size(pool_size: usize, block_size: usize) -> Self {
440 let arenas = (0..pool_size)
441 .map(|i| Arc::new(EpochArena::with_block_size(i as u64, block_size)))
442 .collect();
443
444 Self {
445 arenas,
446 current_epoch: AtomicU64::new(0),
447 pool_size,
448 block_size,
449 }
450 }
451
452 #[inline]
454 pub fn current_epoch(&self) -> u64 {
455 self.current_epoch.load(Ordering::Acquire)
456 }
457
458 #[inline]
460 pub fn current_arena(&self) -> Arc<EpochArena> {
461 let epoch = self.current_epoch();
462 let idx = (epoch as usize) % self.pool_size;
463 self.arenas[idx].clone()
464 }
465
466 #[inline]
468 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
469 self.current_arena().allocate(size)
470 }
471
472 #[inline]
474 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
475 self.current_arena().allocate_key(key)
476 }
477
478 pub fn advance_epoch(&self) -> u64 {
482 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
483
484 let next_idx = (new_epoch as usize) % self.pool_size;
486 self.arenas[next_idx].reset(new_epoch);
487
488 new_epoch
489 }
490
491 #[inline]
495 pub fn is_epoch_valid(&self, epoch: u64) -> bool {
496 let current = self.current_epoch();
497 epoch + (self.pool_size as u64) > current
498 }
499
500 pub fn stats(&self) -> Vec<ArenaStats> {
502 self.arenas.iter().map(|a| a.stats()).collect()
503 }
504}
505
506pub struct ThreadLocalArena {
512 pool: Arc<ArenaPool>,
514 cached_arena: AtomicPtr<EpochArena>,
516 cached_epoch: AtomicU64,
518}
519
520impl ThreadLocalArena {
521 pub fn new(pool: Arc<ArenaPool>) -> Self {
523 let arena = pool.current_arena();
524 let epoch = arena.epoch();
525
526 Self {
527 pool,
528 cached_arena: AtomicPtr::new(Arc::into_raw(arena) as *mut _),
529 cached_epoch: AtomicU64::new(epoch),
530 }
531 }
532
533 #[inline]
535 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
536 let current_epoch = self.pool.current_epoch();
537 let cached_epoch = self.cached_epoch.load(Ordering::Relaxed);
538
539 if current_epoch == cached_epoch {
540 let arena_ptr = self.cached_arena.load(Ordering::Acquire);
542 if !arena_ptr.is_null() {
543 let arena = unsafe { &*arena_ptr };
544 return arena.allocate(size);
545 }
546 }
547
548 self.allocate_slow(size, current_epoch)
550 }
551
552 #[cold]
553 fn allocate_slow(&self, size: usize, _current_epoch: u64) -> Option<ArenaHandle> {
554 let new_arena = self.pool.current_arena();
555 let new_epoch = new_arena.epoch();
556
557 let old_ptr = self.cached_arena.swap(
559 Arc::into_raw(new_arena.clone()) as *mut _,
560 Ordering::AcqRel,
561 );
562 self.cached_epoch.store(new_epoch, Ordering::Release);
563
564 if !old_ptr.is_null() {
566 unsafe { Arc::from_raw(old_ptr as *const EpochArena) };
567 }
568
569 new_arena.allocate(size)
570 }
571
572 #[inline]
574 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
575 if key.len() > MAX_INLINE_KEY_SIZE {
576 return None;
577 }
578 self.allocate(key.len()).map(|handle| {
579 unsafe {
580 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
581 }
582 handle
583 })
584 }
585}
586
587impl Drop for ThreadLocalArena {
588 fn drop(&mut self) {
589 let ptr = self.cached_arena.load(Ordering::Acquire);
590 if !ptr.is_null() {
591 unsafe { Arc::from_raw(ptr as *const EpochArena) };
592 }
593 }
594}
595
596unsafe impl Send for ThreadLocalArena {}
598unsafe impl Sync for ThreadLocalArena {}
599
600#[derive(Clone, Copy)]
606pub struct ArenaKey {
607 handle: ArenaHandle,
608}
609
610impl ArenaKey {
611 #[inline]
613 pub fn new(handle: ArenaHandle) -> Self {
614 Self { handle }
615 }
616
617 #[inline]
622 pub unsafe fn as_bytes(&self) -> &[u8] {
623 unsafe { self.handle.as_slice() }
624 }
625
626 #[inline]
628 pub fn epoch(&self) -> u64 {
629 self.handle.epoch()
630 }
631
632 #[inline]
634 pub fn len(&self) -> usize {
635 self.handle.len()
636 }
637
638 #[inline]
640 pub fn is_empty(&self) -> bool {
641 self.handle.is_empty()
642 }
643}
644
645unsafe impl Send for ArenaKey {}
647unsafe impl Sync for ArenaKey {}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652 use std::sync::Arc;
653 use std::thread;
654
655 #[test]
656 fn test_epoch_arena_basic() {
657 let arena = EpochArena::new(1);
658
659 let h1 = arena.allocate(16).unwrap();
660 let h2 = arena.allocate(32).unwrap();
661 let h3 = arena.allocate(64).unwrap();
662
663 assert_eq!(h1.len(), 16);
664 assert_eq!(h2.len(), 32);
665 assert_eq!(h3.len(), 64);
666 assert_eq!(h1.epoch(), 1);
667
668 let stats = arena.stats();
669 assert_eq!(stats.allocation_count, 3);
670 }
671
672 #[test]
673 fn test_allocate_copy() {
674 let arena = EpochArena::new(1);
675 let data = b"hello world";
676
677 let handle = arena.allocate_copy(data).unwrap();
678 assert_eq!(handle.len(), data.len());
679
680 let slice = unsafe { handle.as_slice() };
681 assert_eq!(slice, data);
682 }
683
684 #[test]
685 fn test_allocate_key() {
686 let arena = EpochArena::new(1);
687 let key = b"my_test_key";
688
689 let handle = arena.allocate_key(key).unwrap();
690 let slice = unsafe { handle.as_slice() };
691 assert_eq!(slice, key);
692 }
693
694 #[test]
695 fn test_arena_reset() {
696 let arena = EpochArena::new(1);
697
698 for _ in 0..1000 {
699 arena.allocate(64).unwrap();
700 }
701
702 let stats_before = arena.stats();
703 assert!(stats_before.total_allocated > 0);
704
705 arena.reset(2);
706
707 let stats_after = arena.stats();
708 assert_eq!(stats_after.epoch, 2);
709 assert_eq!(stats_after.allocation_count, 0);
710 }
711
712 #[test]
713 fn test_arena_pool() {
714 let pool = ArenaPool::new(4);
715
716 let h1 = pool.allocate(16).unwrap();
717 assert_eq!(h1.epoch(), 0);
718
719 pool.advance_epoch();
720
721 let h2 = pool.allocate(16).unwrap();
722 assert_eq!(h2.epoch(), 1);
723
724 assert!(pool.is_epoch_valid(0));
725 assert!(pool.is_epoch_valid(1));
726 }
727
728 #[test]
729 fn test_thread_local_arena() {
730 let pool = Arc::new(ArenaPool::new(4));
731 let tla = ThreadLocalArena::new(pool.clone());
732
733 let h1 = tla.allocate(32).unwrap();
734 assert_eq!(h1.len(), 32);
735
736 let h2 = tla.allocate_key(b"test").unwrap();
737 assert_eq!(h2.len(), 4);
738 }
739
740 #[test]
741 fn test_concurrent_allocation() {
742 let pool = Arc::new(ArenaPool::new(4));
743 let mut handles = vec![];
744
745 for _ in 0..8 {
746 let pool_clone = pool.clone();
747 handles.push(thread::spawn(move || {
748 for i in 0..10000 {
749 let size = (i % 64) + 8;
750 pool_clone.allocate(size).expect("allocation failed");
751 }
752 }));
753 }
754
755 for handle in handles {
756 handle.join().unwrap();
757 }
758
759 let stats = pool.stats();
760 let total_allocs: usize = stats.iter().map(|s| s.allocation_count).sum();
761 assert_eq!(total_allocs, 80000);
762 }
763
764 #[test]
765 fn test_large_allocation() {
766 let arena = EpochArena::new(1);
767
768 let large_size = 3 * 1024 * 1024;
770 let handle = arena.allocate(large_size).unwrap();
771 assert_eq!(handle.len(), large_size);
772
773 let stats = arena.stats();
774 assert!(stats.block_count >= 2); }
776
777 #[test]
778 fn test_alignment() {
779 let arena = EpochArena::new(1);
780
781 let h1 = arena.allocate_aligned(17, 16).unwrap();
783 assert!((h1.ptr.as_ptr() as usize) % 16 == 0);
784
785 let h2 = arena.allocate_aligned(65, 64).unwrap();
787 assert!((h2.ptr.as_ptr() as usize) % 64 == 0);
788 }
789
790 #[test]
791 fn test_arena_key() {
792 let arena = EpochArena::new(42);
793 let key_data = b"user:12345:profile";
794
795 let handle = arena.allocate_key(key_data).unwrap();
796 let key = ArenaKey::new(handle);
797
798 assert_eq!(key.len(), key_data.len());
799 assert_eq!(key.epoch(), 42);
800
801 let bytes = unsafe { key.as_bytes() };
802 assert_eq!(bytes, key_data);
803 }
804
805 #[test]
806 fn test_epoch_advancement() {
807 let pool = ArenaPool::new(4);
808
809 for expected_epoch in 1..=10 {
811 let new_epoch = pool.advance_epoch();
812 assert_eq!(new_epoch, expected_epoch);
813 }
814
815 assert!(!pool.is_epoch_valid(0)); assert!(pool.is_epoch_valid(7)); assert!(pool.is_epoch_valid(10)); }
820}