use crate::types::{RegionId, TaskId};
use parking_lot::RwLock;
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::time::{Duration, SystemTime};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ResourceCleanupError {
#[error(
"resource leak detected in region {region_id:?}: {leak_count} resources not cleaned up"
)]
ResourceLeak {
region_id: RegionId,
leak_count: usize,
resource_types: Vec<ResourceType>,
},
#[error("cannot attribute resource {resource_id:?} to any region")]
AttributionFailed {
resource_id: ResourceId,
},
#[error("resource cleanup verification is not enabled")]
NotEnabled,
#[error("invalid resource state transition: {resource_id:?} from {from:?} to {to:?}")]
InvalidTransition {
resource_id: ResourceId,
from: ResourceState,
to: ResourceState,
},
#[error(
"resource cleanup still pending in region {region_id:?}: {pending_count} resources not yet cleaned"
)]
CleanupPending {
region_id: RegionId,
pending_count: usize,
resource_types: Vec<ResourceType>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ResourceId(u64);
impl ResourceId {
const UNTRACKED: Self = Self(0);
pub fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
pub fn untracked() -> Self {
Self::UNTRACKED
}
pub fn is_tracked(self) -> bool {
self != Self::UNTRACKED
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceType {
FileDescriptor,
HeapAllocation,
NetworkConnection,
IoOperation,
Timer,
ThreadLocal,
Custom(u32),
}
impl ResourceType {
pub fn requires_immediate_cleanup(self) -> bool {
matches!(
self,
Self::FileDescriptor | Self::NetworkConnection | Self::IoOperation
)
}
pub fn allows_deferred_cleanup(self) -> bool {
matches!(self, Self::HeapAllocation | Self::ThreadLocal)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceState {
Allocated,
Active,
Cleaning,
Cleaned,
Leaked,
}
impl ResourceState {
pub fn can_transition_to(self, target: Self) -> bool {
use ResourceState::{Active, Allocated, Cleaned, Cleaning, Leaked};
match (self, target) {
(Allocated, Active) => true,
(Allocated, Cleaned) => true,
(Active, Cleaning) => true,
(Active, Cleaned) => true,
(Cleaning, Cleaned) => true,
(Cleaned, Cleaned) => true,
(Active, Leaked) => true,
(Allocated, Leaked) => true,
(Cleaning, Leaked) => true,
_ => false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceRecord {
pub id: ResourceId,
pub resource_type: ResourceType,
pub state: ResourceState,
pub owner_region: Option<RegionId>,
pub allocating_task: Option<TaskId>,
pub allocated_at: SystemTime,
pub last_updated: SystemTime,
pub description: Option<String>,
pub file_descriptor: Option<i32>,
pub size_bytes: Option<usize>,
}
impl ResourceRecord {
pub fn new(
resource_type: ResourceType,
owner_region: Option<RegionId>,
allocating_task: Option<TaskId>,
) -> Self {
let now = SystemTime::now();
Self {
id: ResourceId::new(),
resource_type,
state: ResourceState::Allocated,
owner_region,
allocating_task,
allocated_at: now,
last_updated: now,
description: None,
file_descriptor: None,
size_bytes: None,
}
}
pub fn transition_to(&mut self, new_state: ResourceState) -> Result<(), ResourceCleanupError> {
if !self.state.can_transition_to(new_state) {
return Err(ResourceCleanupError::InvalidTransition {
resource_id: self.id,
from: self.state,
to: new_state,
});
}
self.state = new_state;
self.last_updated = SystemTime::now();
Ok(())
}
pub fn activate(&mut self, region_id: RegionId) -> Result<(), ResourceCleanupError> {
self.owner_region = Some(region_id);
self.transition_to(ResourceState::Active)
}
pub fn begin_cleanup(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Cleaning)
}
pub fn complete_cleanup(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Cleaned)
}
pub fn mark_leaked(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Leaked)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceCleanupConfig {
pub enable_verification: bool,
pub enable_detailed_tracking: bool,
pub enable_allocation_traces: bool,
pub max_tracked_resources: usize,
pub cleanup_grace_period_ms: u64,
pub panic_on_leaks: bool,
pub tracked_resource_types: HashSet<ResourceType>,
}
impl Default for ResourceCleanupConfig {
fn default() -> Self {
Self {
enable_verification: true,
enable_detailed_tracking: cfg!(debug_assertions),
enable_allocation_traces: false,
max_tracked_resources: 10_000,
cleanup_grace_period_ms: 1000, panic_on_leaks: cfg!(debug_assertions),
tracked_resource_types: HashSet::new(), }
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResourceCleanupStats {
pub total_allocated: u64,
pub total_cleaned: u64,
pub total_leaked: u64,
pub currently_tracked: u64,
pub peak_tracked: u64,
pub clean_region_closes: u64,
pub leaked_region_closes: u64,
}
pub struct ResourceCleanupVerifier {
config: ResourceCleanupConfig,
resources: RwLock<HashMap<ResourceId, ResourceRecord>>,
region_resources: RwLock<HashMap<RegionId, HashSet<ResourceId>>>,
stats: RwLock<ResourceCleanupStats>,
is_active: AtomicBool,
#[cfg(feature = "tracing-integration")]
instance_id: u64,
}
impl ResourceCleanupVerifier {
pub fn new(config: ResourceCleanupConfig) -> Self {
#[cfg(feature = "tracing-integration")]
let instance_id = {
static NEXT_INSTANCE_ID: AtomicU64 = AtomicU64::new(1);
NEXT_INSTANCE_ID.fetch_add(1, Ordering::Relaxed)
};
Self {
config,
resources: RwLock::new(HashMap::new()),
region_resources: RwLock::new(HashMap::new()),
stats: RwLock::new(ResourceCleanupStats::default()),
is_active: AtomicBool::new(false),
#[cfg(feature = "tracing-integration")]
instance_id,
}
}
pub fn start(&self) -> Result<(), ResourceCleanupError> {
if !self.config.enable_verification {
return Err(ResourceCleanupError::NotEnabled);
}
self.is_active.store(true, Ordering::Release);
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::debug!(
"Started resource cleanup verifier instance {}",
self.instance_id
);
Ok(())
}
pub fn stop(&self) {
self.is_active.store(false, Ordering::Release);
#[cfg(feature = "tracing-integration")]
crate::tracing_compat::debug!(
"Stopped resource cleanup verifier instance {}",
self.instance_id
);
}
pub fn is_active(&self) -> bool {
self.is_active.load(Ordering::Acquire)
}
pub fn track_allocation(
&self,
resource_type: ResourceType,
owner_region: Option<RegionId>,
allocating_task: Option<TaskId>,
) -> Result<ResourceId, ResourceCleanupError> {
if !self.is_active() {
return Err(ResourceCleanupError::NotEnabled);
}
if !self.config.tracked_resource_types.is_empty()
&& !self.config.tracked_resource_types.contains(&resource_type)
{
return Ok(ResourceId::untracked());
}
let mut record = ResourceRecord::new(resource_type, owner_region, allocating_task);
let resource_id = record.id;
if let Some(region_id) = owner_region {
record.activate(region_id)?;
}
let evicted_region_resource = {
let mut resources = self.resources.write();
resources.insert(resource_id, record);
let mut evicted_region_resource = None;
if resources.len() > self.config.max_tracked_resources {
let oldest_cleaned = resources
.iter()
.filter(|(_, record)| record.state == ResourceState::Cleaned)
.min_by_key(|(_, record)| record.last_updated)
.map(|(id, _)| *id);
if let Some(id_to_evict) = oldest_cleaned {
evicted_region_resource = resources.remove(&id_to_evict).and_then(|record| {
record
.owner_region
.map(|region_id| (region_id, id_to_evict))
});
}
}
evicted_region_resource
};
if let Some((region_id, evicted_resource_id)) = evicted_region_resource {
let mut region_resources = self.region_resources.write();
if let Some(resource_ids) = region_resources.get_mut(®ion_id) {
resource_ids.remove(&evicted_resource_id);
if resource_ids.is_empty() {
region_resources.remove(®ion_id);
}
}
}
if let Some(region_id) = owner_region {
let mut region_resources = self.region_resources.write();
region_resources
.entry(region_id)
.or_default()
.insert(resource_id);
}
{
let mut stats = self.stats.write();
stats.total_allocated += 1;
stats.currently_tracked += 1;
if stats.currently_tracked > stats.peak_tracked {
stats.peak_tracked = stats.currently_tracked;
}
}
crate::tracing_compat::trace!(
"Tracked allocation: resource_id={:?} type={:?} region={:?}",
resource_id,
resource_type,
owner_region
);
Ok(resource_id)
}
pub fn track_cleanup(&self, resource_id: ResourceId) -> Result<(), ResourceCleanupError> {
if !self.is_active() {
return Err(ResourceCleanupError::NotEnabled);
}
if !resource_id.is_tracked() {
return Ok(());
}
let mut resources = self.resources.write();
if let Some(record) = resources.get_mut(&resource_id) {
if record.state == ResourceState::Cleaned {
return Ok(());
}
record.complete_cleanup()?;
drop(resources);
let mut stats = self.stats.write();
stats.total_cleaned += 1;
stats.currently_tracked = stats.currently_tracked.saturating_sub(1);
crate::tracing_compat::trace!("Tracked cleanup: resource_id={:?}", resource_id);
Ok(())
} else {
Err(ResourceCleanupError::AttributionFailed { resource_id })
}
}
pub fn verify_region_cleanup(&self, region_id: RegionId) -> Result<(), ResourceCleanupError> {
if !self.is_active() {
return Ok(()); }
let owned_resources = {
let region_resources = self.region_resources.read();
region_resources
.get(®ion_id)
.cloned()
.unwrap_or_default()
};
if owned_resources.is_empty() {
let mut stats = self.stats.write();
stats.clean_region_closes += 1;
return Ok(());
}
let mut leaked_resources = Vec::new();
let mut leaked_types = HashSet::new();
let mut pending_resources = Vec::new();
let mut pending_types = HashSet::new();
{
let mut resources = self.resources.write();
for resource_id in &owned_resources {
if let Some(record) = resources.get_mut(resource_id) {
match record.state {
ResourceState::Active => {
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
ResourceState::Cleaning => {
let grace_period =
Duration::from_millis(self.config.cleanup_grace_period_ms);
if record.last_updated.elapsed().unwrap_or_default() >= grace_period {
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
} else {
pending_resources.push(*resource_id);
pending_types.insert(record.resource_type);
}
}
ResourceState::Leaked => {
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
ResourceState::Cleaned => {
}
ResourceState::Allocated => {
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
}
}
}
}
{
let mut stats = self.stats.write();
if leaked_resources.is_empty() && pending_resources.is_empty() {
stats.clean_region_closes += 1;
} else if !leaked_resources.is_empty() {
stats.leaked_region_closes += 1;
stats.total_leaked += leaked_resources.len() as u64;
stats.currently_tracked = stats
.currently_tracked
.saturating_sub(leaked_resources.len() as u64);
}
}
if !pending_resources.is_empty() && leaked_resources.is_empty() {
let error = ResourceCleanupError::CleanupPending {
region_id,
pending_count: pending_resources.len(),
resource_types: pending_types.into_iter().collect(),
};
crate::tracing_compat::debug!("Resource cleanup verification pending: {}", error);
return Err(error);
}
if pending_resources.is_empty() || !leaked_resources.is_empty() {
let mut region_resources = self.region_resources.write();
region_resources.remove(®ion_id);
}
if !leaked_resources.is_empty() {
let error = ResourceCleanupError::ResourceLeak {
region_id,
leak_count: leaked_resources.len(),
resource_types: leaked_types.into_iter().collect(),
};
crate::tracing_compat::error!("Resource cleanup verification failed: {}", error);
#[cfg(feature = "tracing-integration")]
{
let resources = self.resources.read();
for resource_id in &leaked_resources {
if let Some(record) = resources.get(resource_id) {
crate::tracing_compat::warn!(
"Leaked resource: {:?} (type={:?}, allocated_at={:?})",
resource_id,
record.resource_type,
record.allocated_at
);
}
}
}
assert!(
!self.config.panic_on_leaks,
"Resource cleanup verification failed: {}",
error
);
return Err(error);
}
crate::tracing_compat::debug!(
"Region cleanup verified successfully: region_id={:?} resources_cleaned={}",
region_id,
owned_resources.len()
);
Ok(())
}
pub fn get_stats(&self) -> ResourceCleanupStats {
self.stats.read().clone()
}
pub fn get_tracked_resources(&self) -> HashMap<ResourceId, ResourceRecord> {
self.resources.read().clone()
}
pub fn get_region_resources(&self, region_id: RegionId) -> Vec<ResourceRecord> {
let region_resources = self.region_resources.read();
let resource_ids = region_resources
.get(®ion_id)
.cloned()
.unwrap_or_default();
let resources = self.resources.read();
resource_ids
.into_iter()
.filter_map(|id| resources.get(&id).cloned())
.collect()
}
pub fn force_global_cleanup_check(&self) -> Vec<ResourceCleanupError> {
let mut errors = Vec::new();
let region_ids: HashSet<RegionId> = {
let region_resources = self.region_resources.read();
region_resources.keys().copied().collect()
};
for region_id in region_ids {
if let Err(error) = self.verify_region_cleanup(region_id) {
errors.push(error);
}
}
errors
}
}
#[cfg(test)]
mod tests {
#![allow(
clippy::pedantic,
clippy::nursery,
clippy::expect_fun_call,
clippy::map_unwrap_or,
clippy::cast_possible_wrap,
clippy::future_not_send
)]
use super::*;
#[test]
fn test_resource_state_transitions() {
use ResourceState::*;
assert!(Allocated.can_transition_to(Active));
assert!(Allocated.can_transition_to(Cleaned));
assert!(Active.can_transition_to(Cleaning));
assert!(Active.can_transition_to(Cleaned));
assert!(Cleaning.can_transition_to(Cleaned));
assert!(Cleaned.can_transition_to(Cleaned));
assert!(Active.can_transition_to(Leaked));
assert!(Allocated.can_transition_to(Leaked));
assert!(Cleaning.can_transition_to(Leaked));
assert!(!Cleaned.can_transition_to(Active));
assert!(!Leaked.can_transition_to(Cleaned));
assert!(!Allocated.can_transition_to(Cleaning));
}
#[test]
fn test_resource_record_lifecycle() -> Result<(), ResourceCleanupError> {
let region_id = RegionId::new_ephemeral();
let task_id = TaskId::new_ephemeral();
let mut record =
ResourceRecord::new(ResourceType::FileDescriptor, Some(region_id), Some(task_id));
assert_eq!(record.state, ResourceState::Allocated);
record.activate(region_id)?;
assert_eq!(record.state, ResourceState::Active);
assert_eq!(record.owner_region, Some(region_id));
record.begin_cleanup()?;
assert_eq!(record.state, ResourceState::Cleaning);
record.complete_cleanup()?;
assert_eq!(record.state, ResourceState::Cleaned);
Ok(())
}
#[test]
fn test_resource_cleanup_verifier() -> Result<(), ResourceCleanupError> {
let config = ResourceCleanupConfig::default();
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
assert!(verifier.is_active());
let region_id = RegionId::new_ephemeral();
let resource_id =
verifier.track_allocation(ResourceType::FileDescriptor, Some(region_id), None)?;
let region_resources = verifier.get_region_resources(region_id);
assert_eq!(region_resources.len(), 1);
assert_eq!(region_resources[0].id, resource_id);
verifier.track_cleanup(resource_id)?;
verifier.verify_region_cleanup(region_id)?;
let stats = verifier.get_stats();
assert_eq!(stats.total_allocated, 1);
assert_eq!(stats.total_cleaned, 1);
assert_eq!(stats.total_leaked, 0);
assert_eq!(stats.clean_region_closes, 1);
Ok(())
}
#[test]
fn filtered_resource_cleanup_is_a_noop() -> Result<(), ResourceCleanupError> {
let mut tracked_resource_types = HashSet::new();
tracked_resource_types.insert(ResourceType::FileDescriptor);
let config = ResourceCleanupConfig {
tracked_resource_types,
..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
let region_id = RegionId::new_ephemeral();
let resource_id = verifier.track_allocation(ResourceType::Timer, Some(region_id), None)?;
assert!(!resource_id.is_tracked());
assert!(verifier.get_region_resources(region_id).is_empty());
verifier.track_cleanup(resource_id)?;
let stats = verifier.get_stats();
assert_eq!(stats.total_allocated, 0);
assert_eq!(stats.total_cleaned, 0);
assert_eq!(stats.currently_tracked, 0);
Ok(())
}
#[test]
fn duplicate_cleanup_does_not_corrupt_statistics() -> Result<(), ResourceCleanupError> {
let verifier = ResourceCleanupVerifier::new(ResourceCleanupConfig::default());
verifier.start()?;
let region_id = RegionId::new_ephemeral();
let resource_id =
verifier.track_allocation(ResourceType::FileDescriptor, Some(region_id), None)?;
verifier.track_cleanup(resource_id)?;
verifier.track_cleanup(resource_id)?;
let stats = verifier.get_stats();
assert_eq!(stats.total_allocated, 1);
assert_eq!(stats.total_cleaned, 1);
assert_eq!(stats.currently_tracked, 0);
Ok(())
}
#[test]
fn evicting_cleaned_resource_removes_region_index() -> Result<(), ResourceCleanupError> {
let config = ResourceCleanupConfig {
max_tracked_resources: 1,
..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
let first_region = RegionId::new_ephemeral();
let first_resource =
verifier.track_allocation(ResourceType::HeapAllocation, Some(first_region), None)?;
verifier.track_cleanup(first_resource)?;
let second_region = RegionId::new_ephemeral();
let _second_resource =
verifier.track_allocation(ResourceType::FileDescriptor, Some(second_region), None)?;
assert!(
verifier.get_region_resources(first_region).is_empty(),
"evicted cleaned resources must not leave stale region ownership entries",
);
assert_eq!(verifier.get_region_resources(second_region).len(), 1);
Ok(())
}
#[test]
fn test_resource_leak_detection() -> Result<(), ResourceCleanupError> {
let config = ResourceCleanupConfig {
panic_on_leaks: false, ..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
let region_id = RegionId::new_ephemeral();
let _resource_id =
verifier.track_allocation(ResourceType::FileDescriptor, Some(region_id), None)?;
let result = verifier.verify_region_cleanup(region_id);
assert!(matches!(
result,
Err(ResourceCleanupError::ResourceLeak { .. })
));
let Err(ResourceCleanupError::ResourceLeak {
region_id: leaked_region,
leak_count,
resource_types,
}) = result
else {
return Ok(());
};
assert_eq!(leaked_region, region_id);
assert_eq!(leak_count, 1);
assert!(resource_types.contains(&ResourceType::FileDescriptor));
let stats = verifier.get_stats();
assert_eq!(stats.total_leaked, 1);
assert_eq!(stats.leaked_region_closes, 1);
Ok(())
}
#[test]
fn test_pending_cleanup_does_not_close_region_mapping() -> Result<(), ResourceCleanupError> {
let config = ResourceCleanupConfig {
panic_on_leaks: false,
cleanup_grace_period_ms: 60_000,
..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
let region_id = RegionId::new_ephemeral();
let resource_id =
verifier.track_allocation(ResourceType::NetworkConnection, Some(region_id), None)?;
{
let mut resources = verifier.resources.write();
let Some(record) = resources.get_mut(&resource_id) else {
return Err(ResourceCleanupError::AttributionFailed { resource_id });
};
record.begin_cleanup()?;
}
let result = verifier.verify_region_cleanup(region_id);
assert!(matches!(
result,
Err(ResourceCleanupError::CleanupPending {
pending_count: 1,
..
})
));
assert_eq!(
verifier.get_region_resources(region_id).len(),
1,
"pending cleanup must remain attributed for a later recheck"
);
verifier.track_cleanup(resource_id)?;
verifier.verify_region_cleanup(region_id)?;
assert!(verifier.get_region_resources(region_id).is_empty());
Ok(())
}
#[test]
fn pending_cleanup_tolerates_system_clock_skew() -> Result<(), ResourceCleanupError> {
let config = ResourceCleanupConfig {
panic_on_leaks: false,
cleanup_grace_period_ms: 10,
..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start()?;
let region_id = RegionId::new_ephemeral();
let resource_id =
verifier.track_allocation(ResourceType::NetworkConnection, Some(region_id), None)?;
{
let mut resources = verifier.resources.write();
let Some(record) = resources.get_mut(&resource_id) else {
return Err(ResourceCleanupError::AttributionFailed { resource_id });
};
record.begin_cleanup()?;
record.last_updated = SystemTime::now() + Duration::from_secs(60);
}
let result = verifier.verify_region_cleanup(region_id);
assert!(matches!(
result,
Err(ResourceCleanupError::CleanupPending {
pending_count: 1,
..
})
));
let stats = verifier.get_stats();
assert_eq!(stats.total_leaked, 0);
assert_eq!(stats.leaked_region_closes, 0);
assert_eq!(verifier.get_region_resources(region_id).len(), 1);
Ok(())
}
}