use crate::sync::ContendedMutex;
use crate::types::{RegionId, TaskId};
use crossbeam_queue::SegQueue;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::thread;
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct EpochCounter {
global: AtomicU64,
last_advance: ContendedMutex<Instant>,
advance_interval: Duration,
}
impl Default for EpochCounter {
fn default() -> Self {
Self::new(Duration::from_millis(100))
}
}
impl EpochCounter {
#[must_use]
pub fn new(advance_interval: Duration) -> Self {
Self {
global: AtomicU64::new(1), last_advance: ContendedMutex::new("epoch_gc.last_advance", Instant::now()),
advance_interval,
}
}
pub fn current(&self) -> u64 {
self.global.load(Ordering::Acquire)
}
pub fn try_advance(&self) -> Option<u64> {
let now = Instant::now();
let mut last_advance = self
.last_advance
.lock()
.expect("epoch_gc last_advance mutex poisoned");
if now.duration_since(*last_advance) >= self.advance_interval {
let new_epoch = self.global.fetch_add(1, Ordering::AcqRel) + 1;
*last_advance = now;
Some(new_epoch)
} else {
None
}
}
#[cfg(any(test, feature = "test-internals"))]
pub fn force_advance(&self) -> u64 {
let mut last_advance = self
.last_advance
.lock()
.expect("epoch_gc last_advance mutex poisoned");
let new_epoch = self.global.fetch_add(1, Ordering::AcqRel) + 1;
*last_advance = Instant::now();
new_epoch
}
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct LocalEpoch {
local: AtomicU64,
thread_id: thread::ThreadId,
}
impl LocalEpoch {
#[must_use]
pub fn new() -> Self {
Self {
local: AtomicU64::new(0), thread_id: thread::current().id(),
}
}
pub fn current(&self) -> u64 {
self.local.load(Ordering::Acquire)
}
pub fn sync_to_global(&self, global_epoch: u64) {
self.local.store(global_epoch, Ordering::Release);
}
pub fn is_behind(&self, global_epoch: u64) -> bool {
self.current() < global_epoch
}
}
impl Default for LocalEpoch {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum CleanupWork {
Obligation {
id: u64,
metadata: Vec<u8>,
},
WakerCleanup {
waker_id: u64,
source: String,
},
RegionCleanup {
region_id: RegionId,
task_ids: Vec<TaskId>,
},
TimerCleanup {
timer_id: u64,
timer_type: String,
},
ChannelCleanup {
channel_id: u64,
cleanup_type: String,
data: Vec<u8>,
},
}
impl CleanupWork {
#[must_use]
pub fn description(&self) -> String {
match self {
Self::Obligation { id, .. } => format!("obligation:{id}"),
Self::WakerCleanup { waker_id, source } => format!("waker:{source}:{waker_id}"),
Self::RegionCleanup {
region_id,
task_ids,
} => {
format!("region:{}:tasks:{}", region_id, task_ids.len())
}
Self::TimerCleanup {
timer_id,
timer_type,
} => {
format!("timer:{timer_type}:{timer_id}")
}
Self::ChannelCleanup {
channel_id,
cleanup_type,
..
} => {
format!("channel:{cleanup_type}:{channel_id}")
}
}
}
#[must_use]
pub fn memory_usage(&self) -> usize {
let base_size = std::mem::size_of::<Self>();
match self {
Self::Obligation { metadata, .. } => base_size + metadata.len(),
Self::WakerCleanup { source, .. } => base_size + source.len(),
Self::RegionCleanup { task_ids, .. } => {
base_size + task_ids.len() * std::mem::size_of::<TaskId>()
}
Self::TimerCleanup { timer_type, .. } => base_size + timer_type.len(),
Self::ChannelCleanup {
cleanup_type, data, ..
} => base_size + cleanup_type.len() + data.len(),
}
}
}
#[derive(Debug)]
#[allow(dead_code)]
struct EpochWork {
epoch: u64,
work: CleanupWork,
enqueued_at: Instant,
}
#[derive(Debug, Clone)]
pub struct CleanupConfig {
pub max_queue_size: usize,
pub min_batch_size: usize,
pub max_batch_size: usize,
pub max_batch_time: Duration,
pub enable_fallback: bool,
pub enable_logging: bool,
}
impl Default for CleanupConfig {
fn default() -> Self {
Self {
max_queue_size: 10_000,
min_batch_size: 10,
max_batch_size: 100,
max_batch_time: Duration::from_millis(5),
enable_fallback: true,
enable_logging: false,
}
}
}
#[derive(Debug)]
pub struct DeferredCleanupQueue {
queue: SegQueue<EpochWork>,
size: AtomicUsize,
config: CleanupConfig,
stats: CleanupStats,
}
#[derive(Debug, Default)]
pub struct CleanupStats {
pub total_enqueued: AtomicU64,
pub total_processed: AtomicU64,
pub total_dropped: AtomicU64,
pub total_batches: AtomicU64,
pub total_cleanup_time: AtomicU64, pub current_queue_size: AtomicUsize,
pub peak_queue_size: AtomicUsize,
}
impl CleanupStats {
pub fn efficiency_percent(&self) -> f64 {
let enqueued = self.total_enqueued.load(Ordering::Relaxed) as f64;
let processed = self.total_processed.load(Ordering::Relaxed) as f64;
if enqueued > 0.0 {
(processed / enqueued) * 100.0
} else {
100.0
}
}
pub fn average_batch_size(&self) -> f64 {
let processed = self.total_processed.load(Ordering::Relaxed) as f64;
let batches = self.total_batches.load(Ordering::Relaxed) as f64;
if batches > 0.0 {
processed / batches
} else {
0.0
}
}
pub fn average_batch_time_us(&self) -> f64 {
let total_time = self.total_cleanup_time.load(Ordering::Relaxed) as f64;
let batches = self.total_batches.load(Ordering::Relaxed) as f64;
if batches > 0.0 {
total_time / batches
} else {
0.0
}
}
}
impl Default for DeferredCleanupQueue {
fn default() -> Self {
Self::with_config(CleanupConfig::default())
}
}
impl DeferredCleanupQueue {
#[must_use]
pub fn new() -> Self {
Self::with_config(CleanupConfig::default())
}
#[must_use]
pub fn with_config(config: CleanupConfig) -> Self {
Self {
queue: SegQueue::new(),
size: AtomicUsize::new(0),
config,
stats: CleanupStats::default(),
}
}
pub fn collect_expired(&self, safe_epoch: u64) -> Vec<CleanupWork> {
let mut drained = Vec::new();
let mut held_back = Vec::new();
while let Some(epoch_work) = self.queue.pop() {
self.size.fetch_sub(1, Ordering::Relaxed);
if epoch_work.epoch < safe_epoch {
drained.push(epoch_work.work);
} else {
held_back.push(epoch_work);
}
}
for epoch_work in held_back {
self.size.fetch_add(1, Ordering::Relaxed);
self.queue.push(epoch_work);
}
drained
}
pub fn enqueue(&self, work: CleanupWork, current_epoch: u64) -> Result<(), CleanupWork> {
let current_size = self.size.load(Ordering::Relaxed);
if current_size >= self.config.max_queue_size {
self.stats.total_dropped.fetch_add(1, Ordering::Relaxed);
return Err(work);
}
let epoch_work = EpochWork {
epoch: current_epoch,
work,
enqueued_at: Instant::now(),
};
self.queue.push(epoch_work);
let new_size = self.size.fetch_add(1, Ordering::Relaxed) + 1;
self.stats.total_enqueued.fetch_add(1, Ordering::Relaxed);
self.stats
.current_queue_size
.store(new_size, Ordering::Relaxed);
let current_peak = self.stats.peak_queue_size.load(Ordering::Relaxed);
if new_size > current_peak {
self.stats
.peak_queue_size
.compare_exchange_weak(current_peak, new_size, Ordering::Relaxed, Ordering::Relaxed)
.ok(); }
Ok(())
}
pub fn process_safe_epochs(&self, safe_epoch: u64) -> usize {
let start_time = Instant::now();
let mut processed_count = 0;
let mut batch = Vec::new();
while processed_count < self.config.max_batch_size {
if let Some(epoch_work) = self.queue.pop() {
self.size.fetch_sub(1, Ordering::Relaxed);
if epoch_work.epoch < safe_epoch {
batch.push(epoch_work);
processed_count += 1;
if start_time.elapsed() >= self.config.max_batch_time {
break;
}
} else {
self.queue.push(epoch_work);
self.size.fetch_add(1, Ordering::Relaxed);
break;
}
} else {
break; }
}
if !batch.is_empty() {
self.process_cleanup_batch(batch);
self.stats
.total_processed
.fetch_add(processed_count as u64, Ordering::Relaxed);
self.stats.total_batches.fetch_add(1, Ordering::Relaxed);
let batch_time_us = start_time.elapsed().as_micros() as u64;
self.stats
.total_cleanup_time
.fetch_add(batch_time_us, Ordering::Relaxed);
if self.config.enable_logging && processed_count >= self.config.min_batch_size {
#[cfg(feature = "tracing-integration")]
tracing::debug!(
processed = processed_count,
safe_epoch = safe_epoch,
batch_time_us = batch_time_us,
"Processed epoch GC cleanup batch"
);
}
}
let current_size = self.size.load(Ordering::Relaxed);
self.stats
.current_queue_size
.store(current_size, Ordering::Relaxed);
processed_count
}
fn process_cleanup_batch(&self, batch: Vec<EpochWork>) {
for epoch_work in batch {
self.process_single_work_item(&epoch_work.work);
}
}
fn process_single_work_item(&self, work: &CleanupWork) {
if self.config.enable_logging {
#[cfg(feature = "tracing-integration")]
tracing::trace!(
work_description = work.description(),
"Processing cleanup work item"
);
}
match work {
CleanupWork::Obligation { id, .. } => {
self.cleanup_obligation(*id);
}
CleanupWork::WakerCleanup { waker_id, source } => {
self.cleanup_waker(*waker_id, source);
}
CleanupWork::RegionCleanup {
region_id,
task_ids,
} => {
self.cleanup_region(*region_id, task_ids);
}
CleanupWork::TimerCleanup {
timer_id,
timer_type,
} => {
self.cleanup_timer(*timer_id, timer_type);
}
CleanupWork::ChannelCleanup {
channel_id,
cleanup_type,
..
} => {
self.cleanup_channel(*channel_id, cleanup_type);
}
}
}
fn cleanup_obligation(&self, obligation_id: u64) {
#[cfg(not(feature = "tracing-integration"))]
let _ = obligation_id;
#[cfg(feature = "tracing-integration")]
tracing::debug!(
obligation_id = obligation_id,
"Cleaning up obligation in deferred cleanup"
);
}
fn cleanup_waker(&self, waker_id: u64, source: &str) {
#[cfg(not(feature = "tracing-integration"))]
let _ = waker_id;
#[cfg(feature = "tracing-integration")]
tracing::debug!(
waker_id = waker_id,
source = source,
"Cleaning up waker in deferred cleanup"
);
match source {
"epoll" => {
}
"kqueue" => {
}
"iocp" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(source = source, "Unknown waker source for cleanup");
}
}
}
fn cleanup_region(&self, region_id: RegionId, task_ids: &[TaskId]) {
#[cfg(not(feature = "tracing-integration"))]
let _ = region_id;
#[cfg(feature = "tracing-integration")]
tracing::debug!(
region_id = region_id.as_u64(),
task_count = task_ids.len(),
"Cleaning up region state in deferred cleanup"
);
for &task_id in task_ids {
#[cfg(not(feature = "tracing-integration"))]
let _ = task_id;
#[cfg(feature = "tracing-integration")]
tracing::trace!(
task_id = task_id.as_u64(),
region_id = region_id.as_u64(),
"Cleaning up task in region cleanup"
);
}
}
fn cleanup_timer(&self, timer_id: u64, timer_type: &str) {
#[cfg(not(feature = "tracing-integration"))]
let _ = timer_id;
#[cfg(feature = "tracing-integration")]
tracing::debug!(
timer_id = timer_id,
timer_type = timer_type,
"Cleaning up timer in deferred cleanup"
);
match timer_type {
"sleep" => {
}
"timeout" => {
}
"interval" => {
}
"deadline" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(timer_type = timer_type, "Unknown timer type for cleanup");
}
}
}
fn cleanup_channel(&self, channel_id: u64, cleanup_type: &str) {
#[cfg(not(feature = "tracing-integration"))]
let _ = channel_id;
#[cfg(feature = "tracing-integration")]
tracing::debug!(
channel_id = channel_id,
cleanup_type = cleanup_type,
"Cleaning up channel state in deferred cleanup"
);
match cleanup_type {
"waker" => {
}
"buffer" => {
}
"mpsc_sender" => {
}
"mpsc_receiver" => {
}
"oneshot" => {
}
"broadcast" => {
}
"watch" => {
}
"session" => {
}
_ => {
#[cfg(feature = "tracing-integration")]
tracing::warn!(cleanup_type = cleanup_type, "Unknown channel cleanup type");
}
}
}
pub fn stats(&self) -> &CleanupStats {
&self.stats
}
pub fn config(&self) -> &CleanupConfig {
&self.config
}
pub fn len(&self) -> usize {
self.size.load(Ordering::Relaxed)
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn is_near_capacity(&self) -> bool {
let current_size = self.len();
let threshold = (self.config.max_queue_size as f64 * 0.8) as usize;
current_size >= threshold
}
}
#[derive(Debug)]
pub struct EpochGC {
epoch_counter: Arc<EpochCounter>,
cleanup_queue: Arc<DeferredCleanupQueue>,
enabled: AtomicUsize, }
impl EpochGC {
#[must_use]
pub fn new() -> Self {
Self::with_config(CleanupConfig::default())
}
#[must_use]
pub fn with_config(config: CleanupConfig) -> Self {
Self {
epoch_counter: Arc::new(EpochCounter::default()),
cleanup_queue: Arc::new(DeferredCleanupQueue::with_config(config)),
enabled: AtomicUsize::new(1), }
}
#[must_use]
pub fn disabled() -> Self {
let system = Self::new();
system.enabled.store(0, Ordering::Relaxed);
system
}
pub fn is_enabled(&self) -> bool {
self.enabled.load(Ordering::Relaxed) != 0
}
pub fn enable(&self) {
self.enabled.store(1, Ordering::Relaxed);
}
pub fn disable(&self) {
self.enabled.store(0, Ordering::Relaxed);
}
pub fn current_epoch(&self) -> u64 {
self.epoch_counter.current()
}
pub fn defer_cleanup(&self, work: CleanupWork) -> Result<(), CleanupWork> {
if !self.is_enabled() {
return Err(work); }
let current_epoch = self.current_epoch();
self.cleanup_queue.enqueue(work, current_epoch)
}
pub fn try_advance_and_cleanup(&self) -> usize {
if !self.is_enabled() {
return 0;
}
if let Some(new_epoch) = self.epoch_counter.try_advance() {
let safe_epoch = new_epoch.saturating_sub(1);
self.cleanup_queue.process_safe_epochs(safe_epoch)
} else {
0
}
}
#[cfg(test)]
pub fn force_advance_and_cleanup(&self) -> usize {
if !self.is_enabled() {
return 0;
}
let new_epoch = self.epoch_counter.force_advance();
let safe_epoch = new_epoch.saturating_sub(1);
self.cleanup_queue.process_safe_epochs(safe_epoch)
}
pub fn stats(&self) -> &CleanupStats {
self.cleanup_queue.stats()
}
pub fn config(&self) -> &CleanupConfig {
self.cleanup_queue.config()
}
pub fn is_cleanup_queue_near_capacity(&self) -> bool {
self.cleanup_queue.is_near_capacity()
}
pub fn epoch_counter(&self) -> &Arc<EpochCounter> {
&self.epoch_counter
}
}
impl Default for EpochGC {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Arc;
use std::thread;
use std::time::Duration;
#[test]
fn epoch_counter_advances_over_time() {
let counter = EpochCounter::new(Duration::from_millis(10));
let initial = counter.current();
assert_eq!(counter.try_advance(), None);
assert_eq!(counter.current(), initial);
thread::sleep(Duration::from_millis(15));
let new_epoch = counter.try_advance().expect("Should advance");
assert_eq!(new_epoch, initial + 1);
assert_eq!(counter.current(), new_epoch);
}
#[test]
fn local_epoch_tracks_behind_state() {
let local = LocalEpoch::new();
assert_eq!(local.current(), 0);
assert!(local.is_behind(1));
assert!(!local.is_behind(0));
local.sync_to_global(5);
assert_eq!(local.current(), 5);
assert!(!local.is_behind(5));
assert!(local.is_behind(6));
}
#[test]
fn cleanup_work_memory_usage_calculation() {
let obligation_work = CleanupWork::Obligation {
id: 123,
metadata: vec![1, 2, 3, 4, 5],
};
assert!(obligation_work.memory_usage() > std::mem::size_of::<CleanupWork>());
let waker_work = CleanupWork::WakerCleanup {
waker_id: 456,
source: "epoll".to_string(),
};
assert!(waker_work.memory_usage() > std::mem::size_of::<CleanupWork>());
}
#[test]
fn cleanup_queue_enqueue_and_process() {
let config = CleanupConfig {
max_queue_size: 10,
min_batch_size: 1,
max_batch_size: 5,
..CleanupConfig::default()
};
let queue = DeferredCleanupQueue::with_config(config);
let work1 = CleanupWork::Obligation {
id: 1,
metadata: vec![],
};
let work2 = CleanupWork::WakerCleanup {
waker_id: 2,
source: "kqueue".to_string(),
};
assert!(queue.enqueue(work1, 1).is_ok());
assert!(queue.enqueue(work2, 1).is_ok());
assert_eq!(queue.len(), 2);
let processed = queue.process_safe_epochs(2);
assert_eq!(processed, 2);
assert_eq!(queue.len(), 0);
let stats = queue.stats();
assert_eq!(stats.total_enqueued.load(Ordering::Relaxed), 2);
assert_eq!(stats.total_processed.load(Ordering::Relaxed), 2);
assert_eq!(stats.total_batches.load(Ordering::Relaxed), 1);
}
#[test]
fn cleanup_queue_respects_epoch_safety() {
let config = CleanupConfig::default();
let queue = DeferredCleanupQueue::with_config(config);
let work1 = CleanupWork::Obligation {
id: 1,
metadata: vec![],
};
let work2 = CleanupWork::Obligation {
id: 2,
metadata: vec![],
};
let work3 = CleanupWork::Obligation {
id: 3,
metadata: vec![],
};
assert!(queue.enqueue(work1, 1).is_ok());
assert!(queue.enqueue(work2, 2).is_ok());
assert!(queue.enqueue(work3, 3).is_ok());
assert_eq!(queue.len(), 3);
let processed = queue.process_safe_epochs(2);
assert_eq!(processed, 1);
assert_eq!(queue.len(), 2);
let processed = queue.process_safe_epochs(3);
assert_eq!(processed, 1);
assert_eq!(queue.len(), 1);
let processed = queue.process_safe_epochs(4);
assert_eq!(processed, 1);
assert_eq!(queue.len(), 0);
}
#[test]
fn cleanup_queue_handles_overflow() {
let config = CleanupConfig {
max_queue_size: 2,
..CleanupConfig::default()
};
let queue = DeferredCleanupQueue::with_config(config);
let work = CleanupWork::Obligation {
id: 1,
metadata: vec![],
};
assert!(queue.enqueue(work.clone(), 1).is_ok());
assert!(queue.enqueue(work.clone(), 1).is_ok());
assert!(queue.enqueue(work, 1).is_err());
let stats = queue.stats();
assert_eq!(stats.total_dropped.load(Ordering::Relaxed), 1);
}
#[test]
fn epoch_gc_system_integration() {
let gc = EpochGC::new();
assert!(gc.is_enabled());
let work = CleanupWork::Obligation {
id: 123,
metadata: vec![],
};
assert!(gc.defer_cleanup(work).is_ok());
let processed = gc.force_advance_and_cleanup();
assert_eq!(processed, 1);
let stats = gc.stats();
assert_eq!(stats.total_processed.load(Ordering::Relaxed), 1);
}
#[test]
fn epoch_gc_disabled_fallback() {
let gc = EpochGC::disabled();
assert!(!gc.is_enabled());
let work = CleanupWork::Obligation {
id: 123,
metadata: vec![],
};
assert!(gc.defer_cleanup(work).is_err());
let processed = gc.try_advance_and_cleanup();
assert_eq!(processed, 0);
}
#[test]
fn cleanup_stats_calculations() {
let config = CleanupConfig::default();
let queue = DeferredCleanupQueue::with_config(config);
for i in 0..10 {
let work = CleanupWork::Obligation {
id: i,
metadata: vec![],
};
let _ = queue.enqueue(work, 1);
}
queue.process_safe_epochs(2);
let stats = queue.stats();
assert_eq!(stats.efficiency_percent(), 100.0);
assert!(stats.average_batch_size() > 0.0);
assert!(stats.average_batch_time_us() >= 0.0);
}
#[test]
fn stress_test_concurrent_enqueue_dequeue() {
let config = CleanupConfig {
max_queue_size: 100_000,
max_batch_size: 1000,
..CleanupConfig::default()
};
let _queue = Arc::new(DeferredCleanupQueue::with_config(config));
let epoch_gc = Arc::new(EpochGC::with_config(CleanupConfig {
max_queue_size: 100_000,
max_batch_size: 1000,
..CleanupConfig::default()
}));
const NUM_THREADS: usize = 8;
const WORK_ITEMS_PER_THREAD: usize = 1000;
let mut handles = Vec::new();
for thread_id in 0..NUM_THREADS {
let gc = Arc::clone(&epoch_gc);
let handle = thread::spawn(move || {
for i in 0..WORK_ITEMS_PER_THREAD {
let work = CleanupWork::Obligation {
id: (thread_id * WORK_ITEMS_PER_THREAD + i) as u64,
metadata: vec![thread_id as u8; 10],
};
while gc.defer_cleanup(work.clone()).is_err() {
thread::sleep(Duration::from_micros(1));
}
}
});
handles.push(handle);
}
let gc_consumer = Arc::clone(&epoch_gc);
let consumer_handle = thread::spawn(move || {
let mut total_processed = 0;
let start = Instant::now();
while start.elapsed() < Duration::from_secs(10)
&& total_processed < NUM_THREADS * WORK_ITEMS_PER_THREAD
{
let processed = gc_consumer.force_advance_and_cleanup();
total_processed += processed;
if processed == 0 {
thread::sleep(Duration::from_millis(1));
}
}
total_processed
});
for handle in handles {
handle.join().unwrap();
}
let total_processed = consumer_handle.join().unwrap();
assert!(
total_processed >= NUM_THREADS * WORK_ITEMS_PER_THREAD * 9 / 10,
"Should process at least 90% of work items, got {}",
total_processed
);
let stats = epoch_gc.stats();
assert!(stats.efficiency_percent() >= 90.0);
assert_eq!(stats.total_dropped.load(Ordering::Relaxed), 0);
}
#[test]
fn stress_test_memory_usage_extended_operation() {
use std::sync::atomic::AtomicUsize;
static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
let config = CleanupConfig {
max_queue_size: 10_000,
max_batch_size: 100,
max_batch_time: Duration::from_millis(1),
..CleanupConfig::default()
};
let epoch_gc = Arc::new(EpochGC::with_config(config));
let start_memory = ALLOCATED.load(Ordering::Relaxed);
for iteration in 0..1000 {
for i in 0..100 {
let work = CleanupWork::RegionCleanup {
region_id: RegionId::new_for_test(iteration * 100 + i, 1),
task_ids: vec![TaskId::new_for_test(i, 1); 10], };
let _ = epoch_gc.defer_cleanup(work);
}
if iteration % 10 == 0 {
epoch_gc.force_advance_and_cleanup();
}
if iteration % 100 == 0 {
epoch_gc.force_advance_and_cleanup();
let current_memory = ALLOCATED.load(Ordering::Relaxed);
let memory_growth = current_memory.saturating_sub(start_memory);
assert!(
memory_growth < 1_000_000,
"Memory growth {} exceeds limit at iteration {}",
memory_growth,
iteration
);
assert!(
epoch_gc.cleanup_queue.len() < 1000,
"Queue size {} too large at iteration {}",
epoch_gc.cleanup_queue.len(),
iteration
);
}
}
for _ in 0..10 {
epoch_gc.force_advance_and_cleanup();
}
let stats = epoch_gc.stats();
assert!(stats.efficiency_percent() >= 95.0);
}
#[test]
fn performance_benchmark_deferred_vs_direct() {
const NUM_OPERATIONS: usize = 10_000;
let start = Instant::now();
for i in 0..NUM_OPERATIONS {
let _ = format!("cleanup_{}", i);
thread::sleep(Duration::from_nanos(100)); }
let direct_duration = start.elapsed();
let epoch_gc = EpochGC::new();
let start = Instant::now();
for i in 0..NUM_OPERATIONS {
let work = CleanupWork::Obligation {
id: i as u64,
metadata: vec![],
};
let _ = epoch_gc.defer_cleanup(work);
if i % 100 == 0 {
epoch_gc.force_advance_and_cleanup();
}
}
while epoch_gc.cleanup_queue.len() > 0 {
epoch_gc.force_advance_and_cleanup();
}
let deferred_duration = start.elapsed();
let _ = (direct_duration, deferred_duration);
let stats = epoch_gc.stats();
assert!(stats.total_processed.load(Ordering::Relaxed) as usize >= NUM_OPERATIONS);
assert!(stats.efficiency_percent() >= 95.0);
assert!(stats.average_batch_size() > 1.0); }
#[test]
fn correctness_test_no_work_lost() {
use std::sync::atomic::AtomicU64;
let epoch_gc = Arc::new(EpochGC::new());
let _processed_counter = Arc::new(AtomicU64::new(0));
const NUM_WORK_ITEMS: u64 = 5000;
let mut expected_ids = std::collections::HashSet::new();
for i in 0..NUM_WORK_ITEMS {
expected_ids.insert(i);
}
let mut handles = Vec::new();
for thread_id in 0..4 {
let gc = Arc::clone(&epoch_gc);
let start_id = thread_id * NUM_WORK_ITEMS / 4;
let end_id = (thread_id + 1) * NUM_WORK_ITEMS / 4;
let handle = thread::spawn(move || {
for id in start_id..end_id {
let work = CleanupWork::Obligation {
id,
metadata: vec![],
};
while gc.defer_cleanup(work.clone()).is_err() {
thread::sleep(Duration::from_micros(10));
}
}
});
handles.push(handle);
}
let gc_processor = Arc::clone(&epoch_gc);
let processor_handle = thread::spawn(move || {
let mut total_processed = 0;
while total_processed < NUM_WORK_ITEMS {
let processed = gc_processor.force_advance_and_cleanup();
total_processed += processed as u64;
if processed == 0 {
thread::sleep(Duration::from_millis(1));
}
}
});
for handle in handles {
handle.join().unwrap();
}
processor_handle.join().unwrap();
let stats = epoch_gc.stats();
assert_eq!(
stats.total_processed.load(Ordering::Relaxed),
NUM_WORK_ITEMS
);
assert_eq!(stats.total_dropped.load(Ordering::Relaxed), 0);
assert_eq!(stats.efficiency_percent(), 100.0);
}
#[test]
fn backpressure_prevents_unbounded_growth() {
let config = CleanupConfig {
max_queue_size: 100,
..CleanupConfig::default()
};
let queue = DeferredCleanupQueue::with_config(config);
let work = CleanupWork::Obligation {
id: 1,
metadata: vec![0; 1000],
};
let mut enqueued = 0;
let mut rejected = 0;
for _ in 0..200 {
match queue.enqueue(work.clone(), 1) {
Ok(()) => enqueued += 1,
Err(_) => rejected += 1,
}
}
assert!(
enqueued <= 100,
"Should not enqueue more than max_queue_size"
);
assert!(rejected > 0, "Should reject some items when full");
assert!(queue.is_near_capacity(), "Should detect near capacity");
let stats = queue.stats();
assert_eq!(
stats.total_dropped.load(Ordering::Relaxed) as usize,
rejected
);
}
#[test]
fn epoch_safety_prevents_premature_cleanup() {
let gc = EpochGC::with_config(CleanupConfig {
max_batch_size: 1000,
..CleanupConfig::default()
});
for i in 0..10 {
let work = CleanupWork::Obligation {
id: i,
metadata: vec![],
};
gc.defer_cleanup(work).unwrap();
}
let processed = gc.try_advance_and_cleanup();
assert_eq!(processed, 0, "Should not process work from current epoch");
let processed = gc.force_advance_and_cleanup();
assert_eq!(
processed, 10,
"Should process all work after forced advance"
);
}
}