use crate::eval::value::Value;
use crate::jit::metrics::JitMetrics;
use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, RwLock, Mutex, Condvar, atomic::{AtomicBool, AtomicU64, AtomicUsize, Ordering}};
use std::time::{Duration, Instant};
use std::thread::{self, ThreadId};
#[derive(Debug, Clone)]
pub struct ParallelGcConfig {
pub max_collector_threads: usize,
pub young_generation_size: usize,
pub old_generation_size: usize,
pub large_object_threshold: usize,
pub target_minor_pause_ms: u64,
pub target_major_pause_ms: u64,
pub numa_aware: bool,
pub adaptive_tuning: bool,
}
impl Default for ParallelGcConfig {
fn default() -> Self {
ParallelGcConfig {
max_collector_threads: num_cpus::get(),
young_generation_size: 64 * 1024 * 1024, old_generation_size: 256 * 1024 * 1024, large_object_threshold: 32 * 1024, target_minor_pause_ms: 10, target_major_pause_ms: 50, numa_aware: true,
adaptive_tuning: true,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CollectionPhase {
Idle,
MinorCollection,
MajorCollection,
IncrementalCollection,
ConcurrentMarking,
}
#[derive(Debug, Default)]
pub struct GcStatistics {
pub minor_collections: AtomicU64,
pub major_collections: AtomicU64,
pub minor_collection_time: AtomicU64,
pub major_collection_time: AtomicU64,
pub total_allocated: AtomicU64,
pub total_reclaimed: AtomicU64,
pub avg_minor_pause_ns: AtomicU64,
pub avg_major_pause_ns: AtomicU64,
pub heap_utilization: AtomicUsize,
}
impl GcStatistics {
pub fn new() -> Self {
Self::default()
}
pub fn record_minor_collection(&self, pause_time: Duration) {
self.minor_collections.fetch_add(1, Ordering::Relaxed);
let pause_ns = pause_time.as_nanos() as u64;
self.minor_collection_time.fetch_add(pause_ns, Ordering::Relaxed);
let count = self.minor_collections.load(Ordering::Relaxed);
let total_time = self.minor_collection_time.load(Ordering::Relaxed);
self.avg_minor_pause_ns.store(total_time / count, Ordering::Relaxed);
}
pub fn record_major_collection(&self, pause_time: Duration) {
self.major_collections.fetch_add(1, Ordering::Relaxed);
let pause_ns = pause_time.as_nanos() as u64;
self.major_collection_time.fetch_add(pause_ns, Ordering::Relaxed);
let count = self.major_collections.load(Ordering::Relaxed);
let total_time = self.major_collection_time.load(Ordering::Relaxed);
self.avg_major_pause_ns.store(total_time / count, Ordering::Relaxed);
}
pub fn record_allocation(&self, size: u64) {
self.total_allocated.fetch_add(size, Ordering::Relaxed);
}
pub fn record_reclamation(&self, size: u64) {
self.total_reclaimed.fetch_add(size, Ordering::Relaxed);
}
pub fn update_heap_utilization(&self, percentage: usize) {
self.heap_utilization.store(percentage, Ordering::Relaxed);
}
}
#[derive(Debug)]
pub struct SafepointCoordinator {
safepoint_requested: AtomicBool,
threads_at_safepoint: AtomicUsize,
total_threads: AtomicUsize,
safepoint_lock: Mutex<()>,
safepoint_reached: Condvar,
safepoint_released: Condvar,
}
impl SafepointCoordinator {
pub fn new() -> Self {
SafepointCoordinator {
safepoint_requested: AtomicBool::new(false),
threads_at_safepoint: AtomicUsize::new(0),
total_threads: AtomicUsize::new(0),
safepoint_lock: Mutex::new(()),
safepoint_reached: Condvar::new(),
safepoint_released: Condvar::new(),
}
}
pub fn register_thread(&self) {
self.total_threads.fetch_add(1, Ordering::Relaxed);
}
pub fn unregister_thread(&self) {
self.total_threads.fetch_sub(1, Ordering::Relaxed);
}
pub fn request_safepoint(&self) -> Result<(), String> {
self.safepoint_requested.store(true, Ordering::Relaxed);
let mut lock = self.safepoint_lock.lock().map_err(|_| "Failed to acquire safepoint lock")?;
let total = self.total_threads.load(Ordering::Relaxed);
loop {
let threads_reached = self.threads_at_safepoint.load(Ordering::Relaxed);
if threads_reached >= total {
break;
}
match self.safepoint_reached.wait_timeout(lock, Duration::from_millis(100)) {
Ok((new_lock, timeout_result)) => {
lock = new_lock;
if timeout_result.timed_out() {
let current_reached = self.threads_at_safepoint.load(Ordering::Relaxed);
if current_reached < threads_reached {
return Err("Timeout waiting for threads to reach safepoint".to_string());
}
}
}
Err(_) => return Err("Error waiting for safepoint".to_string()),
}
}
Ok(())
}
pub fn release_safepoint(&self) {
self.safepoint_requested.store(false, Ordering::Relaxed);
self.threads_at_safepoint.store(0, Ordering::Relaxed);
self.safepoint_released.notify_all();
}
pub fn check_safepoint(&self) -> bool {
if self.safepoint_requested.load(Ordering::Relaxed) {
self.reach_safepoint();
true
} else {
false
}
}
fn reach_safepoint(&self) {
self.threads_at_safepoint.fetch_add(1, Ordering::Relaxed);
self.safepoint_reached.notify_all();
let lock = self.safepoint_lock.lock().unwrap();
let _result = self.safepoint_released.wait_while(lock, |_| {
self.safepoint_requested.load(Ordering::Relaxed)
});
}
}
impl Default for SafepointCoordinator {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct ParallelGc {
config: Arc<ParallelGcConfig>,
current_phase: Arc<RwLock<CollectionPhase>>,
statistics: Arc<GcStatistics>,
safepoint: Arc<SafepointCoordinator>,
jit_metrics: Option<Arc<RwLock<JitMetrics>>>,
adaptive_params: Arc<RwLock<AdaptiveTuningParams>>,
collection_requests: Arc<Mutex<VecDeque<CollectionRequest>>>,
worker_threads: Arc<RwLock<Vec<thread::JoinHandle<()>>>>,
shutdown_requested: Arc<AtomicBool>,
}
#[derive(Debug, Clone)]
pub struct AdaptiveTuningParams {
allocation_rate: f64,
pause_time_samples: VecDeque<Duration>,
heap_growth_rate: f64,
collection_frequency_multiplier: f64,
}
impl Default for AdaptiveTuningParams {
fn default() -> Self {
AdaptiveTuningParams {
allocation_rate: 0.0,
pause_time_samples: VecDeque::with_capacity(100),
heap_growth_rate: 1.0,
collection_frequency_multiplier: 1.0,
}
}
}
#[derive(Debug, Clone)]
pub enum CollectionRequest {
Minor,
Major,
IncrementalStep,
Emergency,
}
impl ParallelGc {
pub fn new(config: ParallelGcConfig) -> Self {
ParallelGc {
config: Arc::new(config),
current_phase: Arc::new(RwLock::new(CollectionPhase::Idle)),
statistics: Arc::new(GcStatistics::new()),
safepoint: Arc::new(SafepointCoordinator::new()),
jit_metrics: None,
adaptive_params: Arc::new(RwLock::new(AdaptiveTuningParams::default())),
collection_requests: Arc::new(Mutex::new(VecDeque::new())),
worker_threads: Arc::new(RwLock::new(Vec::new())),
shutdown_requested: Arc::new(AtomicBool::new(false)),
}
}
pub fn initialize(&mut self, jit_metrics: Option<Arc<RwLock<JitMetrics>>>) -> Result<(), String> {
self.jit_metrics = jit_metrics;
self.start_worker_threads()?;
Ok(())
}
fn start_worker_threads(&self) -> Result<(), String> {
let num_threads = self.config.max_collector_threads;
let mut worker_handles = self.worker_threads.write().map_err(|_| "Failed to acquire worker threads lock")?;
for thread_id in 0..num_threads {
let config = Arc::clone(&self.config);
let statistics = Arc::clone(&self.statistics);
let safepoint = Arc::clone(&self.safepoint);
let collection_requests = Arc::clone(&self.collection_requests);
let shutdown_requested = Arc::clone(&self.shutdown_requested);
let current_phase = Arc::clone(&self.current_phase);
let handle = thread::Builder::new()
.name(format!("gc-worker-{thread_id}"))
.spawn(move || {
Self::worker_thread_main(
thread_id,
config,
statistics,
safepoint,
collection_requests,
current_phase,
shutdown_requested,
);
})
.map_err(|e| format!("Failed to spawn GC worker thread: {e}"))?;
worker_handles.push(handle);
}
Ok(())
}
fn worker_thread_main(
_thread_id: usize,
_config: Arc<ParallelGcConfig>,
_statistics: Arc<GcStatistics>,
_safepoint: Arc<SafepointCoordinator>,
collection_requests: Arc<Mutex<VecDeque<CollectionRequest>>>,
_current_phase: Arc<RwLock<CollectionPhase>>,
shutdown_requested: Arc<AtomicBool>,
) {
while !shutdown_requested.load(Ordering::Relaxed) {
let request = {
let mut requests = collection_requests.lock().unwrap();
requests.pop_front()
};
match request {
Some(CollectionRequest::Minor) => {
}
Some(CollectionRequest::Major) => {
}
Some(CollectionRequest::IncrementalStep) => {
}
Some(CollectionRequest::Emergency) => {
}
None => {
thread::sleep(Duration::from_millis(1));
}
}
}
}
pub fn request_minor_collection(&self) -> Result<(), String> {
let mut requests = self.collection_requests.lock().map_err(|_| "Failed to acquire collection requests lock")?;
requests.push_back(CollectionRequest::Minor);
Ok(())
}
pub fn request_major_collection(&self) -> Result<(), String> {
let mut requests = self.collection_requests.lock().map_err(|_| "Failed to acquire collection requests lock")?;
requests.push_back(CollectionRequest::Major);
Ok(())
}
pub fn get_statistics(&self) -> &GcStatistics {
&self.statistics
}
pub fn get_current_phase(&self) -> CollectionPhase {
*self.current_phase.read().unwrap()
}
pub fn update_adaptive_tuning(&self, allocation_rate: f64, recent_pause: Duration) {
if !self.config.adaptive_tuning {
return;
}
if let Ok(mut params) = self.adaptive_params.write() {
params.allocation_rate = allocation_rate;
params.pause_time_samples.push_back(recent_pause);
if params.pause_time_samples.len() > 100 {
params.pause_time_samples.pop_front();
}
let avg_pause = if !params.pause_time_samples.is_empty() {
let total: Duration = params.pause_time_samples.iter().sum();
total / params.pause_time_samples.len() as u32
} else {
Duration::from_millis(0)
};
let target_pause = Duration::from_millis(self.config.target_minor_pause_ms);
if avg_pause > target_pause {
params.collection_frequency_multiplier = (params.collection_frequency_multiplier * 1.1).min(3.0);
} else if avg_pause < target_pause / 2 {
params.collection_frequency_multiplier = (params.collection_frequency_multiplier * 0.9).max(0.5);
}
}
}
pub fn register_mutator_thread(&self) {
self.safepoint.register_thread();
}
pub fn unregister_mutator_thread(&self) {
self.safepoint.unregister_thread();
}
pub fn check_safepoint(&self) -> bool {
self.safepoint.check_safepoint()
}
pub fn shutdown(&self) -> Result<(), String> {
self.shutdown_requested.store(true, Ordering::Relaxed);
let mut handles = self.worker_threads.write().map_err(|_| "Failed to acquire worker threads lock")?;
while let Some(handle) = handles.pop() {
handle.join().map_err(|_| "Failed to join worker thread")?;
}
Ok(())
}
}
impl Drop for ParallelGc {
fn drop(&mut self) {
let _ = self.shutdown();
}
}