1use std::alloc::{Layout, alloc, dealloc};
43use std::cell::UnsafeCell;
44use std::ptr::NonNull;
45use std::sync::Arc;
46use std::sync::atomic::{AtomicPtr, AtomicU64, AtomicUsize, Ordering};
47
48const DEFAULT_BLOCK_SIZE: usize = 2 * 1024 * 1024;
50
51const MIN_ALIGN: usize = 8;
53
54const MAX_INLINE_KEY_SIZE: usize = 256;
56
57#[derive(Clone, Copy)]
65pub struct ArenaHandle {
66 ptr: NonNull<u8>,
68 len: u32,
70 epoch: u64,
72}
73
74impl ArenaHandle {
75 #[inline]
80 pub(crate) unsafe fn new(ptr: NonNull<u8>, len: usize, epoch: u64) -> Self {
81 Self {
82 ptr,
83 len: len as u32,
84 epoch,
85 }
86 }
87
88 #[inline]
90 pub fn epoch(&self) -> u64 {
91 self.epoch
92 }
93
94 #[inline]
96 pub fn len(&self) -> usize {
97 self.len as usize
98 }
99
100 #[inline]
102 pub fn is_empty(&self) -> bool {
103 self.len == 0
104 }
105
106 #[inline]
111 pub unsafe fn as_slice(&self) -> &[u8] {
112 unsafe { std::slice::from_raw_parts(self.ptr.as_ptr(), self.len as usize) }
113 }
114
115 #[inline]
120 pub unsafe fn as_mut_slice(&mut self) -> &mut [u8] {
121 unsafe { std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.len as usize) }
122 }
123}
124
125unsafe impl Send for ArenaHandle {}
127unsafe impl Sync for ArenaHandle {}
128
129struct MemoryBlock {
135 data: NonNull<u8>,
137 size: usize,
139 offset: AtomicUsize,
141 layout: Layout,
143}
144
145impl MemoryBlock {
146 fn new(size: usize) -> Option<Self> {
148 let layout = Layout::from_size_align(size, MIN_ALIGN).ok()?;
149
150 let ptr = unsafe { alloc(layout) };
152 let data = NonNull::new(ptr)?;
153
154 Some(Self {
155 data,
156 size,
157 offset: AtomicUsize::new(0),
158 layout,
159 })
160 }
161
162 #[inline]
166 fn allocate(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
167 let base = self.data.as_ptr() as usize;
168 loop {
169 let current = self.offset.load(Ordering::Relaxed);
170
171 let current_addr = base + current;
175 let aligned_addr = (current_addr + align - 1) & !(align - 1);
176 let aligned = aligned_addr - base;
177 let new_offset = aligned + size;
178
179 if new_offset > self.size {
180 return None;
181 }
182
183 match self.offset.compare_exchange_weak(
184 current,
185 new_offset,
186 Ordering::Release,
187 Ordering::Relaxed,
188 ) {
189 Ok(_) => {
190 let ptr = unsafe { self.data.as_ptr().add(aligned) };
191 return NonNull::new(ptr);
192 }
193 Err(_) => continue, }
195 }
196 }
197
198 #[inline]
200 #[allow(dead_code)]
201 fn remaining(&self) -> usize {
202 self.size
203 .saturating_sub(self.offset.load(Ordering::Relaxed))
204 }
205
206 #[inline]
208 fn used(&self) -> usize {
209 self.offset.load(Ordering::Relaxed)
210 }
211
212 fn reset(&self) {
214 self.offset.store(0, Ordering::Release);
215 }
216}
217
218impl Drop for MemoryBlock {
219 fn drop(&mut self) {
220 unsafe {
221 dealloc(self.data.as_ptr(), self.layout);
222 }
223 }
224}
225
226unsafe impl Send for MemoryBlock {}
228unsafe impl Sync for MemoryBlock {}
229
230pub struct EpochArena {
239 epoch: AtomicU64,
241 blocks: UnsafeCell<Vec<MemoryBlock>>,
243 active_block: AtomicUsize,
245 block_size: usize,
247 total_allocated: AtomicUsize,
249 allocation_count: AtomicUsize,
251 block_lock: std::sync::Mutex<()>,
253}
254
255impl EpochArena {
256 pub fn new(epoch: u64) -> Self {
258 Self::with_block_size(epoch, DEFAULT_BLOCK_SIZE)
259 }
260
261 pub fn with_block_size(epoch: u64, block_size: usize) -> Self {
263 let initial_block = MemoryBlock::new(block_size).expect("Failed to allocate initial block");
264
265 Self {
266 epoch: AtomicU64::new(epoch),
267 blocks: UnsafeCell::new(vec![initial_block]),
268 active_block: AtomicUsize::new(0),
269 block_size,
270 total_allocated: AtomicUsize::new(0),
271 allocation_count: AtomicUsize::new(0),
272 block_lock: std::sync::Mutex::new(()),
273 }
274 }
275
276 #[inline]
278 pub fn epoch(&self) -> u64 {
279 self.epoch.load(Ordering::Relaxed)
280 }
281
282 #[inline]
286 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
287 self.allocate_aligned(size, MIN_ALIGN)
288 }
289
290 pub fn allocate_aligned(&self, size: usize, align: usize) -> Option<ArenaHandle> {
292 if size == 0 {
293 return None;
294 }
295
296 let active_idx = self.active_block.load(Ordering::Acquire);
298 let blocks = unsafe { &*self.blocks.get() };
299
300 if active_idx < blocks.len() {
301 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
302 self.total_allocated.fetch_add(size, Ordering::Relaxed);
303 self.allocation_count.fetch_add(1, Ordering::Relaxed);
304 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
305 }
306 }
307
308 self.allocate_slow(size, align)
310 }
311
312 #[cold]
314 fn allocate_slow(&self, size: usize, align: usize) -> Option<ArenaHandle> {
315 let _guard = self.block_lock.lock().ok()?;
316
317 let active_idx = self.active_block.load(Ordering::Acquire);
319 let blocks = unsafe { &mut *self.blocks.get() };
320
321 if active_idx < blocks.len() {
322 if let Some(ptr) = blocks[active_idx].allocate(size, align) {
323 self.total_allocated.fetch_add(size, Ordering::Relaxed);
324 self.allocation_count.fetch_add(1, Ordering::Relaxed);
325 return Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) });
326 }
327 }
328
329 let new_block_size = self.block_size.max(size + align);
331 let new_block = MemoryBlock::new(new_block_size)?;
332
333 let ptr = new_block.allocate(size, align)?;
334 blocks.push(new_block);
335 self.active_block.store(blocks.len() - 1, Ordering::Release);
336
337 self.total_allocated.fetch_add(size, Ordering::Relaxed);
338 self.allocation_count.fetch_add(1, Ordering::Relaxed);
339
340 Some(unsafe { ArenaHandle::new(ptr, size, self.epoch()) })
341 }
342
343 #[inline]
345 pub fn allocate_copy(&self, data: &[u8]) -> Option<ArenaHandle> {
346 let handle = self.allocate(data.len())?;
347 unsafe {
348 std::ptr::copy_nonoverlapping(data.as_ptr(), handle.ptr.as_ptr(), data.len());
349 }
350 Some(handle)
351 }
352
353 #[inline]
355 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
356 if key.len() > MAX_INLINE_KEY_SIZE {
357 return None;
358 }
359 self.allocate_aligned(key.len(), 16).map(|handle| {
360 unsafe {
361 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
362 }
363 handle
364 })
365 }
366
367 pub fn stats(&self) -> ArenaStats {
369 let blocks = unsafe { &*self.blocks.get() };
370
371 ArenaStats {
372 epoch: self.epoch(),
373 block_count: blocks.len(),
374 total_capacity: blocks.iter().map(|b| b.size).sum(),
375 total_used: blocks.iter().map(|b| b.used()).sum(),
376 total_allocated: self.total_allocated.load(Ordering::Relaxed),
377 allocation_count: self.allocation_count.load(Ordering::Relaxed),
378 }
379 }
380
381 pub fn reset(&self, new_epoch: u64) {
385 let _guard = self.block_lock.lock().unwrap();
386
387 let blocks = unsafe { &*self.blocks.get() };
389 for block in blocks {
390 block.reset();
391 }
392
393 self.epoch.store(new_epoch, Ordering::Release);
394 self.active_block.store(0, Ordering::Release);
395 self.total_allocated.store(0, Ordering::Relaxed);
396 self.allocation_count.store(0, Ordering::Relaxed);
397 }
398}
399
400unsafe impl Send for EpochArena {}
402unsafe impl Sync for EpochArena {}
403
404#[derive(Debug, Clone)]
406pub struct ArenaStats {
407 pub epoch: u64,
409 pub block_count: usize,
411 pub total_capacity: usize,
413 pub total_used: usize,
415 pub total_allocated: usize,
417 pub allocation_count: usize,
419}
420
421pub struct ArenaPool {
429 arenas: Vec<Arc<EpochArena>>,
431 current_epoch: AtomicU64,
433 pool_size: usize,
435 #[allow(dead_code)]
437 block_size: usize,
438}
439
440impl ArenaPool {
441 pub fn new(pool_size: usize) -> Self {
443 Self::with_block_size(pool_size, DEFAULT_BLOCK_SIZE)
444 }
445
446 pub fn with_block_size(pool_size: usize, block_size: usize) -> Self {
448 let arenas = (0..pool_size)
449 .map(|i| Arc::new(EpochArena::with_block_size(i as u64, block_size)))
450 .collect();
451
452 Self {
453 arenas,
454 current_epoch: AtomicU64::new(0),
455 pool_size,
456 block_size,
457 }
458 }
459
460 #[inline]
462 pub fn current_epoch(&self) -> u64 {
463 self.current_epoch.load(Ordering::Acquire)
464 }
465
466 #[inline]
468 pub fn current_arena(&self) -> Arc<EpochArena> {
469 let epoch = self.current_epoch();
470 let idx = (epoch as usize) % self.pool_size;
471 self.arenas[idx].clone()
472 }
473
474 #[inline]
476 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
477 self.current_arena().allocate(size)
478 }
479
480 #[inline]
482 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
483 self.current_arena().allocate_key(key)
484 }
485
486 pub fn advance_epoch(&self) -> u64 {
490 let new_epoch = self.current_epoch.fetch_add(1, Ordering::AcqRel) + 1;
491
492 let next_idx = (new_epoch as usize) % self.pool_size;
494 self.arenas[next_idx].reset(new_epoch);
495
496 new_epoch
497 }
498
499 #[inline]
503 pub fn is_epoch_valid(&self, epoch: u64) -> bool {
504 let current = self.current_epoch();
505 epoch + (self.pool_size as u64) > current
506 }
507
508 pub fn stats(&self) -> Vec<ArenaStats> {
510 self.arenas.iter().map(|a| a.stats()).collect()
511 }
512}
513
514pub struct ThreadLocalArena {
520 pool: Arc<ArenaPool>,
522 cached_arena: AtomicPtr<EpochArena>,
524 cached_epoch: AtomicU64,
526}
527
528impl ThreadLocalArena {
529 pub fn new(pool: Arc<ArenaPool>) -> Self {
531 let arena = pool.current_arena();
532 let epoch = arena.epoch();
533
534 Self {
535 pool,
536 cached_arena: AtomicPtr::new(Arc::into_raw(arena) as *mut _),
537 cached_epoch: AtomicU64::new(epoch),
538 }
539 }
540
541 #[inline]
543 pub fn allocate(&self, size: usize) -> Option<ArenaHandle> {
544 let current_epoch = self.pool.current_epoch();
545 let cached_epoch = self.cached_epoch.load(Ordering::Relaxed);
546
547 if current_epoch == cached_epoch {
548 let arena_ptr = self.cached_arena.load(Ordering::Acquire);
550 if !arena_ptr.is_null() {
551 let arena = unsafe { &*arena_ptr };
552 return arena.allocate(size);
553 }
554 }
555
556 self.allocate_slow(size, current_epoch)
558 }
559
560 #[cold]
561 fn allocate_slow(&self, size: usize, _current_epoch: u64) -> Option<ArenaHandle> {
562 let new_arena = self.pool.current_arena();
563 let new_epoch = new_arena.epoch();
564
565 let old_ptr = self
567 .cached_arena
568 .swap(Arc::into_raw(new_arena.clone()) as *mut _, Ordering::AcqRel);
569 self.cached_epoch.store(new_epoch, Ordering::Release);
570
571 if !old_ptr.is_null() {
573 unsafe { Arc::from_raw(old_ptr as *const EpochArena) };
574 }
575
576 new_arena.allocate(size)
577 }
578
579 #[inline]
581 pub fn allocate_key(&self, key: &[u8]) -> Option<ArenaHandle> {
582 if key.len() > MAX_INLINE_KEY_SIZE {
583 return None;
584 }
585 self.allocate(key.len()).map(|handle| {
586 unsafe {
587 std::ptr::copy_nonoverlapping(key.as_ptr(), handle.ptr.as_ptr(), key.len());
588 }
589 handle
590 })
591 }
592}
593
594impl Drop for ThreadLocalArena {
595 fn drop(&mut self) {
596 let ptr = self.cached_arena.load(Ordering::Acquire);
597 if !ptr.is_null() {
598 unsafe { Arc::from_raw(ptr as *const EpochArena) };
599 }
600 }
601}
602
603unsafe impl Send for ThreadLocalArena {}
605unsafe impl Sync for ThreadLocalArena {}
606
607#[derive(Clone, Copy)]
613pub struct ArenaKey {
614 handle: ArenaHandle,
615}
616
617impl ArenaKey {
618 #[inline]
620 pub fn new(handle: ArenaHandle) -> Self {
621 Self { handle }
622 }
623
624 #[inline]
629 pub unsafe fn as_bytes(&self) -> &[u8] {
630 unsafe { self.handle.as_slice() }
631 }
632
633 #[inline]
635 pub fn epoch(&self) -> u64 {
636 self.handle.epoch()
637 }
638
639 #[inline]
641 pub fn len(&self) -> usize {
642 self.handle.len()
643 }
644
645 #[inline]
647 pub fn is_empty(&self) -> bool {
648 self.handle.is_empty()
649 }
650}
651
652unsafe impl Send for ArenaKey {}
654unsafe impl Sync for ArenaKey {}
655
656#[cfg(test)]
657mod tests {
658 use super::*;
659 use std::sync::Arc;
660 use std::thread;
661
662 #[test]
663 fn test_epoch_arena_basic() {
664 let arena = EpochArena::new(1);
665
666 let h1 = arena.allocate(16).unwrap();
667 let h2 = arena.allocate(32).unwrap();
668 let h3 = arena.allocate(64).unwrap();
669
670 assert_eq!(h1.len(), 16);
671 assert_eq!(h2.len(), 32);
672 assert_eq!(h3.len(), 64);
673 assert_eq!(h1.epoch(), 1);
674
675 let stats = arena.stats();
676 assert_eq!(stats.allocation_count, 3);
677 }
678
679 #[test]
680 fn test_allocate_copy() {
681 let arena = EpochArena::new(1);
682 let data = b"hello world";
683
684 let handle = arena.allocate_copy(data).unwrap();
685 assert_eq!(handle.len(), data.len());
686
687 let slice = unsafe { handle.as_slice() };
688 assert_eq!(slice, data);
689 }
690
691 #[test]
692 fn test_allocate_key() {
693 let arena = EpochArena::new(1);
694 let key = b"my_test_key";
695
696 let handle = arena.allocate_key(key).unwrap();
697 let slice = unsafe { handle.as_slice() };
698 assert_eq!(slice, key);
699 }
700
701 #[test]
702 fn test_arena_reset() {
703 let arena = EpochArena::new(1);
704
705 for _ in 0..1000 {
706 arena.allocate(64).unwrap();
707 }
708
709 let stats_before = arena.stats();
710 assert!(stats_before.total_allocated > 0);
711
712 arena.reset(2);
713
714 let stats_after = arena.stats();
715 assert_eq!(stats_after.epoch, 2);
716 assert_eq!(stats_after.allocation_count, 0);
717 }
718
719 #[test]
720 fn test_arena_pool() {
721 let pool = ArenaPool::new(4);
722
723 let h1 = pool.allocate(16).unwrap();
724 assert_eq!(h1.epoch(), 0);
725
726 pool.advance_epoch();
727
728 let h2 = pool.allocate(16).unwrap();
729 assert_eq!(h2.epoch(), 1);
730
731 assert!(pool.is_epoch_valid(0));
732 assert!(pool.is_epoch_valid(1));
733 }
734
735 #[test]
736 fn test_thread_local_arena() {
737 let pool = Arc::new(ArenaPool::new(4));
738 let tla = ThreadLocalArena::new(pool.clone());
739
740 let h1 = tla.allocate(32).unwrap();
741 assert_eq!(h1.len(), 32);
742
743 let h2 = tla.allocate_key(b"test").unwrap();
744 assert_eq!(h2.len(), 4);
745 }
746
747 #[test]
748 fn test_concurrent_allocation() {
749 let pool = Arc::new(ArenaPool::new(4));
750 let mut handles = vec![];
751
752 for _ in 0..8 {
753 let pool_clone = pool.clone();
754 handles.push(thread::spawn(move || {
755 for i in 0..10000 {
756 let size = (i % 64) + 8;
757 pool_clone.allocate(size).expect("allocation failed");
758 }
759 }));
760 }
761
762 for handle in handles {
763 handle.join().unwrap();
764 }
765
766 let stats = pool.stats();
767 let total_allocs: usize = stats.iter().map(|s| s.allocation_count).sum();
768 assert_eq!(total_allocs, 80000);
769 }
770
771 #[test]
772 fn test_large_allocation() {
773 let arena = EpochArena::new(1);
774
775 let large_size = 3 * 1024 * 1024;
777 let handle = arena.allocate(large_size).unwrap();
778 assert_eq!(handle.len(), large_size);
779
780 let stats = arena.stats();
781 assert!(stats.block_count >= 2); }
783
784 #[test]
785 fn test_alignment() {
786 let arena = EpochArena::new(1);
787
788 let h1 = arena.allocate_aligned(17, 16).unwrap();
790 assert!((h1.ptr.as_ptr() as usize) % 16 == 0);
791
792 let h2 = arena.allocate_aligned(65, 64).unwrap();
794 assert!((h2.ptr.as_ptr() as usize) % 64 == 0);
795 }
796
797 #[test]
798 fn test_arena_key() {
799 let arena = EpochArena::new(42);
800 let key_data = b"user:12345:profile";
801
802 let handle = arena.allocate_key(key_data).unwrap();
803 let key = ArenaKey::new(handle);
804
805 assert_eq!(key.len(), key_data.len());
806 assert_eq!(key.epoch(), 42);
807
808 let bytes = unsafe { key.as_bytes() };
809 assert_eq!(bytes, key_data);
810 }
811
812 #[test]
813 fn test_epoch_advancement() {
814 let pool = ArenaPool::new(4);
815
816 for expected_epoch in 1..=10 {
818 let new_epoch = pool.advance_epoch();
819 assert_eq!(new_epoch, expected_epoch);
820 }
821
822 assert!(!pool.is_epoch_valid(0)); assert!(pool.is_epoch_valid(7)); assert!(pool.is_epoch_valid(10)); }
827}