use crate::eval::value::Value;
use crate::runtime::gc::generation::{GenerationManager, ObjectHeader, GenerationId};
use std::collections::HashMap;
use std::sync::{Arc, RwLock, Mutex, atomic::{AtomicUsize, AtomicU64, AtomicBool, Ordering}};
use std::thread::{self, ThreadId};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct Tlab {
start: *mut u8,
current: AtomicUsize,
end: *mut u8,
owner_thread: ThreadId,
size: usize,
objects_allocated: AtomicUsize,
bytes_allocated: AtomicUsize,
active: AtomicBool,
}
unsafe impl Send for Tlab {}
unsafe impl Sync for Tlab {}
impl Tlab {
pub fn new(size: usize, owner_thread: ThreadId) -> Result<Self, String> {
let layout = std::alloc::Layout::from_size_align(size, 8)
.map_err(|e| format!("Failed to create TLAB layout: {e}"))?;
let start = unsafe { std::alloc::alloc(layout) };
if start.is_null() {
return Err("Failed to allocate TLAB memory".to_string());
}
let end = unsafe { start.add(size) };
Ok(Tlab {
start,
current: AtomicUsize::new(start as usize),
end,
owner_thread,
size,
objects_allocated: AtomicUsize::new(0),
bytes_allocated: AtomicUsize::new(0),
active: AtomicBool::new(true),
})
}
pub fn try_allocate(&self, size: usize, alignment: usize) -> Option<*mut u8> {
if !self.active.load(Ordering::Relaxed) {
return None;
}
loop {
let current = self.current.load(Ordering::Relaxed);
let aligned_current = (current + alignment - 1) & !(alignment - 1);
let new_current = aligned_current + size;
if new_current > self.end as usize {
return None;
}
match self.current.compare_exchange_weak(
current,
new_current,
Ordering::Relaxed,
Ordering::Relaxed
) {
Ok(_) => {
self.objects_allocated.fetch_add(1, Ordering::Relaxed);
self.bytes_allocated.fetch_add(size, Ordering::Relaxed);
return Some(aligned_current as *mut u8);
}
Err(_) => continue, }
}
}
pub fn remaining_space(&self) -> usize {
let current = self.current.load(Ordering::Relaxed);
(self.end as usize).saturating_sub(current)
}
pub fn used_space(&self) -> usize {
self.bytes_allocated.load(Ordering::Relaxed)
}
pub fn utilization(&self) -> f64 {
if self.size == 0 {
0.0
} else {
self.used_space() as f64 / self.size as f64 * 100.0
}
}
pub fn retire(&self) {
self.active.store(false, Ordering::Relaxed);
}
pub fn belongs_to_thread(&self, thread_id: ThreadId) -> bool {
self.owner_thread == thread_id
}
}
impl Drop for Tlab {
fn drop(&mut self) {
if !self.start.is_null() {
let layout = std::alloc::Layout::from_size_align(self.size, 8).unwrap();
unsafe {
std::alloc::dealloc(self.start, layout);
}
}
}
}
#[derive(Debug)]
pub struct TlabManager {
tlabs: Arc<RwLock<HashMap<ThreadId, Arc<Tlab>>>>,
default_tlab_size: usize,
max_tlab_size: usize,
statistics: TlabStatistics,
adaptive_sizing: bool,
}
#[derive(Debug, Default)]
pub struct TlabStatistics {
pub tlabs_created: AtomicU64,
pub tlabs_retired: AtomicU64,
pub tlab_allocated_bytes: AtomicU64,
pub tlab_waste_bytes: AtomicU64,
pub avg_utilization: AtomicU64, }
impl TlabStatistics {
pub fn new() -> Self {
Self::default()
}
pub fn record_tlab_creation(&self, size: usize) {
self.tlabs_created.fetch_add(1, Ordering::Relaxed);
self.tlab_allocated_bytes.fetch_add(size as u64, Ordering::Relaxed);
}
pub fn record_tlab_retirement(&self, used: usize, total: usize) {
self.tlabs_retired.fetch_add(1, Ordering::Relaxed);
let waste = total.saturating_sub(used);
self.tlab_waste_bytes.fetch_add(waste as u64, Ordering::Relaxed);
let utilization = if total > 0 { (used as f64 / total as f64 * 10000.0) as u64 } else { 0 };
let retired_count = self.tlabs_retired.load(Ordering::Relaxed);
let current_avg = self.avg_utilization.load(Ordering::Relaxed);
let new_avg = (current_avg * (retired_count - 1) + utilization) / retired_count;
self.avg_utilization.store(new_avg, Ordering::Relaxed);
}
pub fn average_utilization(&self) -> f64 {
self.avg_utilization.load(Ordering::Relaxed) as f64 / 100.0
}
pub fn waste_percentage(&self) -> f64 {
let allocated = self.tlab_allocated_bytes.load(Ordering::Relaxed);
let waste = self.tlab_waste_bytes.load(Ordering::Relaxed);
if allocated > 0 {
waste as f64 / allocated as f64 * 100.0
} else {
0.0
}
}
}
impl TlabManager {
pub fn new(default_tlab_size: usize, max_tlab_size: usize) -> Self {
TlabManager {
tlabs: Arc::new(RwLock::new(HashMap::new())),
default_tlab_size,
max_tlab_size,
statistics: TlabStatistics::new(),
adaptive_sizing: true,
}
}
pub fn get_tlab(&self) -> Result<Arc<Tlab>, String> {
let thread_id = thread::current().id();
{
let tlabs = self.tlabs.read().map_err(|_| "Failed to read TLABs")?;
if let Some(tlab) = tlabs.get(&thread_id) {
if tlab.active.load(Ordering::Relaxed) && tlab.remaining_space() > 0 {
return Ok(Arc::clone(tlab));
}
}
}
self.create_tlab_for_thread(thread_id)
}
fn create_tlab_for_thread(&self, thread_id: ThreadId) -> Result<Arc<Tlab>, String> {
let tlab_size = if self.adaptive_sizing {
self.calculate_adaptive_tlab_size(thread_id)
} else {
self.default_tlab_size
};
let tlab = Arc::new(Tlab::new(tlab_size, thread_id)?);
self.statistics.record_tlab_creation(tlab_size);
{
let mut tlabs = self.tlabs.write().map_err(|_| "Failed to write TLABs")?;
if let Some(old_tlab) = tlabs.insert(thread_id, Arc::clone(&tlab)) {
old_tlab.retire();
self.statistics.record_tlab_retirement(old_tlab.used_space(), old_tlab.size);
}
}
Ok(tlab)
}
fn calculate_adaptive_tlab_size(&self, _thread_id: ThreadId) -> usize {
let avg_utilization = self.statistics.average_utilization();
if avg_utilization > 80.0 {
(self.default_tlab_size * 2).min(self.max_tlab_size)
} else if avg_utilization < 40.0 {
(self.default_tlab_size / 2).max(4096) } else {
self.default_tlab_size
}
}
pub fn retire_tlab(&self, thread_id: ThreadId) -> Result<(), String> {
let mut tlabs = self.tlabs.write().map_err(|_| "Failed to write TLABs")?;
if let Some(tlab) = tlabs.remove(&thread_id) {
tlab.retire();
self.statistics.record_tlab_retirement(tlab.used_space(), tlab.size);
}
Ok(())
}
pub fn get_statistics(&self) -> &TlabStatistics {
&self.statistics
}
pub fn cleanup_dead_threads(&self) -> Result<(), String> {
let mut tlabs = self.tlabs.write().map_err(|_| "Failed to write TLABs")?;
let mut to_remove = Vec::new();
for (thread_id, tlab) in tlabs.iter() {
if !tlab.active.load(Ordering::Relaxed) {
to_remove.push(*thread_id);
}
}
for thread_id in to_remove {
if let Some(tlab) = tlabs.remove(&thread_id) {
self.statistics.record_tlab_retirement(tlab.used_space(), tlab.size);
}
}
Ok(())
}
}
#[derive(Debug)]
pub struct AllocationSampler {
sample_rate: usize,
allocation_count: AtomicUsize,
samples: Arc<Mutex<Vec<AllocationSample>>>,
max_samples: usize,
}
#[derive(Debug, Clone)]
pub struct AllocationSample {
pub size: usize,
pub thread_id: ThreadId,
pub timestamp: Instant,
pub generation: GenerationId,
pub allocation_site: String,
}
impl AllocationSampler {
pub fn new(sample_rate: usize, max_samples: usize) -> Self {
AllocationSampler {
sample_rate,
allocation_count: AtomicUsize::new(0),
samples: Arc::new(Mutex::new(Vec::new())),
max_samples,
}
}
pub fn maybe_sample(&self, size: usize, generation: GenerationId, allocation_site: String) {
let count = self.allocation_count.fetch_add(1, Ordering::Relaxed);
if count % self.sample_rate == 0 {
let sample = AllocationSample {
size,
thread_id: thread::current().id(),
timestamp: Instant::now(),
generation,
allocation_site,
};
if let Ok(mut samples) = self.samples.lock() {
samples.push(sample);
let samples_len = samples.len();
if samples_len > self.max_samples {
samples.drain(0..samples_len - self.max_samples);
}
}
}
}
pub fn allocation_rate(&self) -> f64 {
if let Ok(samples) = self.samples.lock() {
if samples.len() < 2 {
return 0.0;
}
let oldest = samples.first().unwrap().timestamp;
let newest = samples.last().unwrap().timestamp;
let duration = newest.duration_since(oldest);
if duration.as_secs_f64() > 0.0 {
(samples.len() * self.sample_rate) as f64 / duration.as_secs_f64()
} else {
0.0
}
} else {
0.0
}
}
pub fn get_recent_samples(&self, limit: usize) -> Vec<AllocationSample> {
if let Ok(samples) = self.samples.lock() {
if samples.len() <= limit {
samples.clone()
} else {
samples[samples.len() - limit..].to_vec()
}
} else {
Vec::new()
}
}
pub fn clear_samples(&self) {
if let Ok(mut samples) = self.samples.lock() {
samples.clear();
}
}
}
#[derive(Debug)]
pub struct AllocationCoordinator {
generation_manager: Arc<GenerationManager>,
tlab_manager: Arc<TlabManager>,
allocation_sampler: Arc<AllocationSampler>,
statistics: AllocationStatistics,
large_object_threshold: usize,
}
#[derive(Debug, Default)]
pub struct AllocationStatistics {
pub total_allocations: AtomicU64,
pub total_allocated_bytes: AtomicU64,
pub young_allocations: AtomicU64,
pub old_allocations: AtomicU64,
pub large_allocations: AtomicU64,
pub failed_allocations: AtomicU64,
pub avg_allocation_size: AtomicU64,
}
impl AllocationStatistics {
pub fn new() -> Self {
Self::default()
}
pub fn record_allocation(&self, size: usize, generation: GenerationId) {
self.total_allocations.fetch_add(1, Ordering::Relaxed);
self.total_allocated_bytes.fetch_add(size as u64, Ordering::Relaxed);
match generation {
GenerationId::Young => { self.young_allocations.fetch_add(1, Ordering::Relaxed); },
GenerationId::Old => { self.old_allocations.fetch_add(1, Ordering::Relaxed); },
GenerationId::LargeObject => { self.large_allocations.fetch_add(1, Ordering::Relaxed); },
_ => {}
}
let total = self.total_allocations.load(Ordering::Relaxed);
let total_bytes = self.total_allocated_bytes.load(Ordering::Relaxed);
if total > 0 {
self.avg_allocation_size.store(total_bytes / total, Ordering::Relaxed);
}
}
pub fn record_failed_allocation(&self) {
self.failed_allocations.fetch_add(1, Ordering::Relaxed);
}
pub fn failure_rate(&self) -> f64 {
let total = self.total_allocations.load(Ordering::Relaxed);
let failed = self.failed_allocations.load(Ordering::Relaxed);
if total > 0 {
failed as f64 / (total + failed) as f64 * 100.0
} else {
0.0
}
}
}
impl AllocationCoordinator {
pub fn new(
generation_manager: Arc<GenerationManager>,
default_tlab_size: usize,
max_tlab_size: usize,
large_object_threshold: usize,
) -> Self {
let tlab_manager = Arc::new(TlabManager::new(default_tlab_size, max_tlab_size));
let allocation_sampler = Arc::new(AllocationSampler::new(1000, 10000));
AllocationCoordinator {
generation_manager,
tlab_manager,
allocation_sampler,
statistics: AllocationStatistics::new(),
large_object_threshold,
}
}
pub fn allocate(&self, value: Value, size: usize) -> Result<Arc<ObjectHeader>, String> {
let allocation_site = format!("{}:{}", file!(), line!());
let generation = self.choose_generation(size);
self.allocation_sampler.maybe_sample(size, generation, allocation_site);
let result = if size >= self.large_object_threshold {
self.allocate_large_object(value, size)
} else {
self.allocate_small_object(value, size)
};
match &result {
Ok(_) => {
self.statistics.record_allocation(size, generation);
}
Err(_) => {
self.statistics.record_failed_allocation();
}
}
result
}
fn choose_generation(&self, size: usize) -> GenerationId {
if size >= self.large_object_threshold {
GenerationId::LargeObject
} else {
GenerationId::Young }
}
fn allocate_small_object(&self, value: Value, size: usize) -> Result<Arc<ObjectHeader>, String> {
if let Ok(tlab) = self.tlab_manager.get_tlab() {
if let Some(ptr) = tlab.try_allocate(size, 8) {
let header = ObjectHeader::new(value, size, GenerationId::Young);
unsafe {
std::ptr::write(ptr as *mut ObjectHeader, header.clone());
}
return Ok(Arc::new(header));
}
}
self.generation_manager.allocate(value, size)
}
fn allocate_large_object(&self, value: Value, size: usize) -> Result<Arc<ObjectHeader>, String> {
self.generation_manager.allocate(value, size)
}
pub fn allocation_rate(&self) -> f64 {
self.allocation_sampler.allocation_rate()
}
pub fn get_statistics(&self) -> &AllocationStatistics {
&self.statistics
}
pub fn get_tlab_statistics(&self) -> &TlabStatistics {
self.tlab_manager.get_statistics()
}
pub fn cleanup(&self) -> Result<(), String> {
self.tlab_manager.cleanup_dead_threads()
}
pub fn retire_current_thread_tlab(&self) -> Result<(), String> {
let thread_id = thread::current().id();
self.tlab_manager.retire_tlab(thread_id)
}
}