#![allow(dead_code)]
use crate::protocol::PartitionId;
use parking_lot::{Mutex, RwLock};
use std::alloc::{alloc, dealloc, Layout};
use std::collections::HashMap;
use std::ptr::NonNull;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
#[derive(Clone, Debug)]
pub struct ArenaConfig {
pub arena_size: usize,
pub arenas_per_partition: usize,
pub java_batch_size_target: usize,
pub max_message_size: usize,
pub enable_metrics: bool,
}
impl Default for ArenaConfig {
fn default() -> Self {
Self {
arena_size: 1024 * 1024, arenas_per_partition: 4, java_batch_size_target: 65536, max_message_size: 1024 * 1024, enable_metrics: true,
}
}
}
#[repr(align(64))] pub struct ArenaRegion {
start: NonNull<u8>,
position: AtomicUsize,
size: usize,
arena_id: u64,
allocation_count: AtomicU64,
bytes_allocated: AtomicU64,
}
unsafe impl Send for ArenaRegion {}
unsafe impl Sync for ArenaRegion {}
impl ArenaRegion {
pub fn new(size: usize, arena_id: u64) -> Result<Self, &'static str> {
if size == 0 {
return Err("Arena size must be greater than 0");
}
let layout = Layout::from_size_align(size, 64).map_err(|_| "Invalid arena layout")?;
let ptr = unsafe { alloc(layout) };
let start = NonNull::new(ptr).ok_or("Failed to allocate arena memory")?;
Ok(Self {
start,
position: AtomicUsize::new(0),
size,
arena_id,
allocation_count: AtomicU64::new(0),
bytes_allocated: AtomicU64::new(0),
})
}
pub fn allocate(&self, size: usize, align: usize) -> Option<NonNull<u8>> {
if size == 0 {
return None;
}
let align = std::cmp::max(align, 8);
let mut current_pos = self.position.load(Ordering::Relaxed);
loop {
let aligned_pos = (current_pos + align - 1) & !(align - 1);
let new_pos = aligned_pos + size;
if new_pos > self.size {
return None; }
match self.position.compare_exchange_weak(
current_pos,
new_pos,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => {
self.allocation_count.fetch_add(1, Ordering::Relaxed);
self.bytes_allocated
.fetch_add(size as u64, Ordering::Relaxed);
let ptr =
unsafe { NonNull::new_unchecked(self.start.as_ptr().add(aligned_pos)) };
return Some(ptr);
}
Err(actual) => {
current_pos = actual;
}
}
}
}
pub fn allocate_java_batch(
&self,
message_count: usize,
avg_message_size: usize,
) -> Option<JavaBatchAllocation> {
let total_size = message_count * avg_message_size + 1024; let ptr = self.allocate(total_size, 64)?;
Some(JavaBatchAllocation {
ptr,
size: total_size,
message_count,
arena_id: self.arena_id,
})
}
pub fn reset(&self) {
self.position.store(0, Ordering::Relaxed);
self.allocation_count.store(0, Ordering::Relaxed);
self.bytes_allocated.store(0, Ordering::Relaxed);
}
pub fn utilization(&self) -> f64 {
let pos = self.position.load(Ordering::Relaxed);
(pos as f64 / self.size as f64) * 100.0
}
pub fn has_space(&self, size: usize) -> bool {
let current = self.position.load(Ordering::Relaxed);
current + size <= self.size
}
pub fn stats(&self) -> ArenaStats {
ArenaStats {
arena_id: self.arena_id,
total_size: self.size,
used_size: self.position.load(Ordering::Relaxed),
allocation_count: self.allocation_count.load(Ordering::Relaxed),
bytes_allocated: self.bytes_allocated.load(Ordering::Relaxed),
utilization_percent: self.utilization(),
}
}
}
impl Drop for ArenaRegion {
fn drop(&mut self) {
let layout = Layout::from_size_align(self.size, 64).unwrap();
unsafe {
dealloc(self.start.as_ptr(), layout);
}
}
}
#[derive(Debug)]
pub struct JavaBatchAllocation {
pub ptr: NonNull<u8>,
pub size: usize,
pub message_count: usize,
pub arena_id: u64,
}
impl JavaBatchAllocation {
pub unsafe fn as_slice(&self) -> &mut [u8] {
std::slice::from_raw_parts_mut(self.ptr.as_ptr(), self.size)
}
}
pub struct ArenaAllocator {
config: ArenaConfig,
partitions: Arc<RwLock<HashMap<(String, PartitionId), Vec<Arc<ArenaRegion>>>>>,
current_arena: Arc<Mutex<HashMap<(String, PartitionId), usize>>>,
next_arena_id: AtomicU64,
metrics: ArenaMetrics,
}
impl ArenaAllocator {
pub fn new() -> Self {
Self::with_config(ArenaConfig::default())
}
pub fn with_config(config: ArenaConfig) -> Self {
Self {
config,
partitions: Arc::new(RwLock::new(HashMap::new())),
current_arena: Arc::new(Mutex::new(HashMap::new())),
next_arena_id: AtomicU64::new(1),
metrics: ArenaMetrics::new(),
}
}
pub fn allocate_java_batch(
&self,
topic: &str,
partition: PartitionId,
message_count: usize,
avg_message_size: usize,
) -> Result<JavaBatchAllocation, ArenaError> {
let allocation_start = Instant::now();
let batch_size = message_count * avg_message_size;
if batch_size > self.config.max_message_size {
self.metrics
.large_batch_fallbacks
.fetch_add(1, Ordering::Relaxed);
return Err(ArenaError::BatchTooLarge(batch_size));
}
let arena = self.get_available_arena(topic, partition, batch_size)?;
match arena.allocate_java_batch(message_count, avg_message_size) {
Some(allocation) => {
let allocation_duration = allocation_start.elapsed();
self.metrics
.total_allocations
.fetch_add(1, Ordering::Relaxed);
self.metrics
.total_bytes_allocated
.fetch_add(batch_size as u64, Ordering::Relaxed);
if allocation_duration.as_nanos() < 1000 {
self.metrics
.fast_allocations
.fetch_add(1, Ordering::Relaxed);
}
Ok(allocation)
}
None => {
self.rotate_arena(topic, partition)?;
let next_arena = self.get_available_arena(topic, partition, batch_size)?;
next_arena
.allocate_java_batch(message_count, avg_message_size)
.ok_or(ArenaError::AllArenasFull)
}
}
}
fn get_available_arena(
&self,
topic: &str,
partition: PartitionId,
required_size: usize,
) -> Result<Arc<ArenaRegion>, ArenaError> {
let key = (topic.to_string(), partition);
{
let partitions = self.partitions.read();
if let Some(arenas) = partitions.get(&key) {
let current_index = {
let current_map = self.current_arena.lock();
*current_map.get(&key).unwrap_or(&0)
};
if current_index < arenas.len() {
let arena = &arenas[current_index];
if arena.has_space(required_size) {
return Ok(Arc::clone(arena));
}
}
}
}
self.create_partition_arenas(topic, partition)?;
let partitions = self.partitions.read();
let arenas = partitions.get(&key).unwrap();
let current_index = {
let current_map = self.current_arena.lock();
*current_map.get(&key).unwrap_or(&0)
};
Ok(Arc::clone(&arenas[current_index]))
}
fn create_partition_arenas(
&self,
topic: &str,
partition: PartitionId,
) -> Result<(), ArenaError> {
let key = (topic.to_string(), partition);
let mut partitions = self.partitions.write();
let mut current_map = self.current_arena.lock();
if partitions.contains_key(&key) {
return Ok(());
}
let mut arenas = Vec::with_capacity(self.config.arenas_per_partition);
for _ in 0..self.config.arenas_per_partition {
let arena_id = self.next_arena_id.fetch_add(1, Ordering::Relaxed);
let arena = ArenaRegion::new(self.config.arena_size, arena_id)
.map_err(|_| ArenaError::AllocationFailed)?;
arenas.push(Arc::new(arena));
}
partitions.insert(key.clone(), arenas);
current_map.insert(key, 0);
self.metrics
.partitions_created
.fetch_add(1, Ordering::Relaxed);
Ok(())
}
fn rotate_arena(&self, topic: &str, partition: PartitionId) -> Result<(), ArenaError> {
let key = (topic.to_string(), partition);
let mut current_map = self.current_arena.lock();
if let Some(current_index) = current_map.get_mut(&key) {
let partitions = self.partitions.read();
if let Some(arenas) = partitions.get(&key) {
*current_index = (*current_index + 1) % arenas.len();
let prev_index = if *current_index == 0 {
arenas.len() - 1
} else {
*current_index - 1
};
arenas[prev_index].reset();
self.metrics.arena_rotations.fetch_add(1, Ordering::Relaxed);
}
}
Ok(())
}
pub fn reset_partition_arenas(&self, topic: &str, partition: PartitionId) {
let key = (topic.to_string(), partition);
let partitions = self.partitions.read();
if let Some(arenas) = partitions.get(&key) {
for arena in arenas {
arena.reset();
}
let mut current_map = self.current_arena.lock();
current_map.insert(key, 0);
}
}
pub fn get_stats(&self) -> ArenaAllocatorStats {
let partitions = self.partitions.read();
let mut total_arenas = 0;
let mut total_utilization = 0.0;
let mut arena_stats = Vec::new();
for arenas in partitions.values() {
total_arenas += arenas.len();
for arena in arenas {
let stats = arena.stats();
total_utilization += stats.utilization_percent;
arena_stats.push(stats);
}
}
let avg_utilization = if total_arenas > 0 {
total_utilization / total_arenas as f64
} else {
0.0
};
ArenaAllocatorStats {
total_partitions: partitions.len(),
total_arenas,
average_utilization_percent: avg_utilization,
arena_stats,
metrics: self.metrics.snapshot(),
}
}
}
#[derive(Debug, Clone)]
pub enum ArenaError {
AllocationFailed,
BatchTooLarge(usize),
AllArenasFull,
PartitionNotFound,
}
impl std::fmt::Display for ArenaError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::AllocationFailed => write!(f, "Arena allocation failed"),
Self::BatchTooLarge(size) => write!(f, "Batch size {} exceeds maximum", size),
Self::AllArenasFull => write!(f, "All arenas are full"),
Self::PartitionNotFound => write!(f, "Partition not found"),
}
}
}
impl std::error::Error for ArenaError {}
#[derive(Debug)]
pub struct ArenaMetrics {
pub total_allocations: AtomicU64,
pub fast_allocations: AtomicU64, pub total_bytes_allocated: AtomicU64,
pub arena_rotations: AtomicU64,
pub partitions_created: AtomicU64,
pub large_batch_fallbacks: AtomicU64,
}
impl ArenaMetrics {
pub fn new() -> Self {
Self {
total_allocations: AtomicU64::new(0),
fast_allocations: AtomicU64::new(0),
total_bytes_allocated: AtomicU64::new(0),
arena_rotations: AtomicU64::new(0),
partitions_created: AtomicU64::new(0),
large_batch_fallbacks: AtomicU64::new(0),
}
}
pub fn snapshot(&self) -> ArenaMetricsSnapshot {
ArenaMetricsSnapshot {
total_allocations: self.total_allocations.load(Ordering::Relaxed),
fast_allocations: self.fast_allocations.load(Ordering::Relaxed),
total_bytes_allocated: self.total_bytes_allocated.load(Ordering::Relaxed),
arena_rotations: self.arena_rotations.load(Ordering::Relaxed),
partitions_created: self.partitions_created.load(Ordering::Relaxed),
large_batch_fallbacks: self.large_batch_fallbacks.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone)]
pub struct ArenaStats {
pub arena_id: u64,
pub total_size: usize,
pub used_size: usize,
pub allocation_count: u64,
pub bytes_allocated: u64,
pub utilization_percent: f64,
}
#[derive(Debug, Clone)]
pub struct ArenaAllocatorStats {
pub total_partitions: usize,
pub total_arenas: usize,
pub average_utilization_percent: f64,
pub arena_stats: Vec<ArenaStats>,
pub metrics: ArenaMetricsSnapshot,
}
#[derive(Debug, Clone)]
pub struct ArenaMetricsSnapshot {
pub total_allocations: u64,
pub fast_allocations: u64,
pub total_bytes_allocated: u64,
pub arena_rotations: u64,
pub partitions_created: u64,
pub large_batch_fallbacks: u64,
}
impl ArenaAllocatorStats {
pub fn report(&self) -> String {
let fast_allocation_rate = if self.metrics.total_allocations > 0 {
(self.metrics.fast_allocations as f64 / self.metrics.total_allocations as f64) * 100.0
} else {
0.0
};
format!(
"Arena Allocator Performance Report:\n\
Partitions: {}, Arenas: {}\n\
Allocations: {} total ({:.1}% <1μs)\n\
Bytes Allocated: {:.1} MB\n\
Arena Rotations: {}\n\
Average Utilization: {:.1}%\n\
Large Batch Fallbacks: {}",
self.total_partitions,
self.total_arenas,
self.metrics.total_allocations,
fast_allocation_rate,
self.metrics.total_bytes_allocated as f64 / 1_000_000.0,
self.metrics.arena_rotations,
self.average_utilization_percent,
self.metrics.large_batch_fallbacks
)
}
pub fn java_client_optimization_status(&self) -> String {
let fast_allocation_rate = if self.metrics.total_allocations > 0 {
(self.metrics.fast_allocations as f64 / self.metrics.total_allocations as f64) * 100.0
} else {
0.0
};
if fast_allocation_rate >= 95.0 && self.average_utilization_percent < 80.0 {
"✅ OPTIMAL: Arena allocator performing excellently for Java clients".to_string()
} else if fast_allocation_rate >= 80.0 {
"⚠️ GOOD: Arena allocator performing well, minor optimization possible".to_string()
} else {
"❌ NEEDS OPTIMIZATION: Arena allocator may be causing Java client timeouts".to_string()
}
}
}
pub fn estimate_java_batch_size(message_count: usize) -> usize {
let avg_message_size = match message_count {
1..=10 => 2048, 11..=50 => 1024, 51..=100 => 512, _ => 256, };
message_count * avg_message_size
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_arena_region_creation() {
let arena = ArenaRegion::new(1024, 1).unwrap();
assert_eq!(arena.size, 1024);
assert_eq!(arena.position.load(Ordering::Relaxed), 0);
}
#[test]
fn test_arena_allocation() {
let arena = ArenaRegion::new(1024, 1).unwrap();
let ptr1 = arena.allocate(100, 8).unwrap();
let ptr2 = arena.allocate(200, 8).unwrap();
assert_ne!(ptr1, ptr2);
assert_eq!(arena.position.load(Ordering::Relaxed), 300);
}
#[test]
fn test_java_batch_allocation() {
let allocator = ArenaAllocator::new();
let batch = allocator
.allocate_java_batch("test-topic", 0, 100, 512)
.unwrap();
assert!(batch.size >= 100 * 512);
assert_eq!(batch.message_count, 100);
}
#[test]
fn test_arena_reset() {
let arena = ArenaRegion::new(1024, 1).unwrap();
arena.allocate(500, 8).unwrap();
assert!(arena.position.load(Ordering::Relaxed) > 0);
arena.reset();
assert_eq!(arena.position.load(Ordering::Relaxed), 0);
}
#[test]
fn test_estimate_java_batch_size() {
assert_eq!(estimate_java_batch_size(5), 5 * 2048); assert_eq!(estimate_java_batch_size(25), 25 * 1024); assert_eq!(estimate_java_batch_size(75), 75 * 512); assert_eq!(estimate_java_batch_size(200), 200 * 256); }
}