use crate::observability::resource_accounting::ResourceAccounting;
use crate::record::region::RegionState;
use crate::runtime::resource_monitor::{ResourceMonitor, ResourceMonitorError};
use crate::runtime::state_verifier::{StateTransitionVerifier, StateVerifierConfig};
use crate::types::{RegionId, TaskId, Time};
use crate::util::Arena;
use parking_lot::{Mutex, RwLock};
use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet};
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Instant, SystemTime};
use thiserror::Error;
#[derive(Debug, Error)]
pub enum ResourceCleanupError {
#[error(
"resource leak detected in region {region_id:?}: {leak_count} resources not cleaned up"
)]
ResourceLeak {
region_id: RegionId,
leak_count: usize,
resource_types: Vec<ResourceType>,
},
#[error("cannot attribute resource {resource_id:?} to any region")]
AttributionFailed { resource_id: ResourceId },
#[error("resource cleanup verification is not enabled")]
NotEnabled,
#[error("invalid resource state transition: {resource_id:?} from {from:?} to {to:?}")]
InvalidTransition {
resource_id: ResourceId,
from: ResourceState,
to: ResourceState,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct ResourceId(u64);
impl ResourceId {
pub fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceType {
FileDescriptor,
HeapAllocation,
NetworkConnection,
IoOperation,
Timer,
ThreadLocal,
Custom(u32),
}
impl ResourceType {
pub fn requires_immediate_cleanup(self) -> bool {
matches!(
self,
Self::FileDescriptor | Self::NetworkConnection | Self::IoOperation
)
}
pub fn allows_deferred_cleanup(self) -> bool {
matches!(self, Self::HeapAllocation | Self::ThreadLocal)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum ResourceState {
Allocated,
Active,
Cleaning,
Cleaned,
Leaked,
}
impl ResourceState {
pub fn can_transition_to(self, target: Self) -> bool {
use ResourceState::*;
match (self, target) {
(Allocated, Active) => true,
(Active, Cleaning) => true,
(Cleaning, Cleaned) => true,
(Active, Leaked) => true,
(Cleaning, Leaked) => true,
_ => false,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceRecord {
pub id: ResourceId,
pub resource_type: ResourceType,
pub state: ResourceState,
pub owner_region: Option<RegionId>,
pub allocating_task: Option<TaskId>,
pub allocated_at: SystemTime,
pub last_updated: SystemTime,
pub description: Option<String>,
pub file_descriptor: Option<i32>,
pub size_bytes: Option<usize>,
}
impl ResourceRecord {
pub fn new(
resource_type: ResourceType,
owner_region: Option<RegionId>,
allocating_task: Option<TaskId>,
) -> Self {
let now = SystemTime::now();
Self {
id: ResourceId::new(),
resource_type,
state: ResourceState::Allocated,
owner_region,
allocating_task,
allocated_at: now,
last_updated: now,
description: None,
file_descriptor: None,
size_bytes: None,
}
}
pub fn transition_to(&mut self, new_state: ResourceState) -> Result<(), ResourceCleanupError> {
if !self.state.can_transition_to(new_state) {
return Err(ResourceCleanupError::InvalidTransition {
resource_id: self.id,
from: self.state,
to: new_state,
});
}
self.state = new_state;
self.last_updated = SystemTime::now();
Ok(())
}
pub fn activate(&mut self, region_id: RegionId) -> Result<(), ResourceCleanupError> {
self.owner_region = Some(region_id);
self.transition_to(ResourceState::Active)
}
pub fn begin_cleanup(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Cleaning)
}
pub fn complete_cleanup(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Cleaned)
}
pub fn mark_leaked(&mut self) -> Result<(), ResourceCleanupError> {
self.transition_to(ResourceState::Leaked)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceCleanupConfig {
pub enable_verification: bool,
pub enable_detailed_tracking: bool,
pub enable_allocation_traces: bool,
pub max_tracked_resources: usize,
pub cleanup_grace_period_ms: u64,
pub panic_on_leaks: bool,
pub tracked_resource_types: HashSet<ResourceType>,
}
impl Default for ResourceCleanupConfig {
fn default() -> Self {
Self {
enable_verification: true,
enable_detailed_tracking: cfg!(debug_assertions),
enable_allocation_traces: false,
max_tracked_resources: 10_000,
cleanup_grace_period_ms: 1000, panic_on_leaks: cfg!(debug_assertions),
tracked_resource_types: HashSet::new(), }
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ResourceCleanupStats {
pub total_allocated: u64,
pub total_cleaned: u64,
pub total_leaked: u64,
pub currently_tracked: u64,
pub peak_tracked: u64,
pub clean_region_closes: u64,
pub leaked_region_closes: u64,
}
pub struct ResourceCleanupVerifier {
config: ResourceCleanupConfig,
resources: RwLock<HashMap<ResourceId, ResourceRecord>>,
region_resources: RwLock<HashMap<RegionId, HashSet<ResourceId>>>,
stats: RwLock<ResourceCleanupStats>,
is_active: AtomicBool,
instance_id: u64,
}
impl ResourceCleanupVerifier {
pub fn new(config: ResourceCleanupConfig) -> Self {
static NEXT_INSTANCE_ID: AtomicU64 = AtomicU64::new(1);
let instance_id = NEXT_INSTANCE_ID.fetch_add(1, Ordering::Relaxed);
Self {
config,
resources: RwLock::new(HashMap::new()),
region_resources: RwLock::new(HashMap::new()),
stats: RwLock::new(ResourceCleanupStats::default()),
is_active: AtomicBool::new(false),
instance_id,
}
}
pub fn start(&self) -> Result<(), ResourceCleanupError> {
if !self.config.enable_verification {
return Err(ResourceCleanupError::NotEnabled);
}
self.is_active.store(true, Ordering::Release);
crate::tracing_compat::debug!(
"Started resource cleanup verifier instance {}",
self.instance_id
);
Ok(())
}
pub fn stop(&self) {
self.is_active.store(false, Ordering::Release);
crate::tracing_compat::debug!(
"Stopped resource cleanup verifier instance {}",
self.instance_id
);
}
pub fn is_active(&self) -> bool {
self.is_active.load(Ordering::Acquire)
}
pub fn track_allocation(
&self,
resource_type: ResourceType,
owner_region: Option<RegionId>,
allocating_task: Option<TaskId>,
) -> Result<ResourceId, ResourceCleanupError> {
if !self.is_active() {
return Err(ResourceCleanupError::NotEnabled);
}
if !self.config.tracked_resource_types.is_empty()
&& !self.config.tracked_resource_types.contains(&resource_type)
{
return Ok(ResourceId::new());
}
let mut record = ResourceRecord::new(resource_type, owner_region, allocating_task);
let resource_id = record.id;
if let Some(region_id) = owner_region {
record.activate(region_id)?;
}
{
let mut resources = self.resources.write();
resources.insert(resource_id, record);
if resources.len() > self.config.max_tracked_resources {
let oldest_cleaned = resources
.iter()
.filter(|(_, record)| record.state == ResourceState::Cleaned)
.min_by_key(|(_, record)| record.last_updated)
.map(|(id, _)| *id);
if let Some(id_to_evict) = oldest_cleaned {
resources.remove(&id_to_evict);
}
}
}
if let Some(region_id) = owner_region {
let mut region_resources = self.region_resources.write();
region_resources
.entry(region_id)
.or_insert_with(HashSet::new)
.insert(resource_id);
}
{
let mut stats = self.stats.write();
stats.total_allocated += 1;
stats.currently_tracked += 1;
if stats.currently_tracked > stats.peak_tracked {
stats.peak_tracked = stats.currently_tracked;
}
}
crate::tracing_compat::trace!(
"Tracked allocation: resource_id={:?} type={:?} region={:?}",
resource_id,
resource_type,
owner_region
);
Ok(resource_id)
}
pub fn track_cleanup(&self, resource_id: ResourceId) -> Result<(), ResourceCleanupError> {
if !self.is_active() {
return Err(ResourceCleanupError::NotEnabled);
}
let mut resources = self.resources.write();
if let Some(record) = resources.get_mut(&resource_id) {
record.complete_cleanup()?;
drop(resources);
let mut stats = self.stats.write();
stats.total_cleaned += 1;
stats.currently_tracked = stats.currently_tracked.saturating_sub(1);
crate::tracing_compat::trace!("Tracked cleanup: resource_id={:?}", resource_id);
Ok(())
} else {
Err(ResourceCleanupError::AttributionFailed { resource_id })
}
}
pub fn verify_region_cleanup(&self, region_id: RegionId) -> Result<(), ResourceCleanupError> {
if !self.is_active() {
return Ok(()); }
let owned_resources = {
let region_resources = self.region_resources.read();
region_resources
.get(®ion_id)
.cloned()
.unwrap_or_default()
};
if owned_resources.is_empty() {
let mut stats = self.stats.write();
stats.clean_region_closes += 1;
return Ok(());
}
let mut leaked_resources = Vec::new();
let mut leaked_types = HashSet::new();
{
let mut resources = self.resources.write();
for resource_id in &owned_resources {
if let Some(record) = resources.get_mut(resource_id) {
match record.state {
ResourceState::Active => {
if let Err(_) = record.begin_cleanup() {
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
}
ResourceState::Cleaning => {
let grace_period = std::time::Duration::from_millis(
self.config.cleanup_grace_period_ms,
);
if record.last_updated.elapsed().unwrap_or(grace_period) >= grace_period
{
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
}
ResourceState::Leaked => {
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
ResourceState::Cleaned => {
}
ResourceState::Allocated => {
record.mark_leaked().ok();
leaked_resources.push(*resource_id);
leaked_types.insert(record.resource_type);
}
}
}
}
}
{
let mut stats = self.stats.write();
if leaked_resources.is_empty() {
stats.clean_region_closes += 1;
} else {
stats.leaked_region_closes += 1;
stats.total_leaked += leaked_resources.len() as u64;
}
}
{
let mut region_resources = self.region_resources.write();
region_resources.remove(®ion_id);
}
if !leaked_resources.is_empty() {
let error = ResourceCleanupError::ResourceLeak {
region_id,
leak_count: leaked_resources.len(),
resource_types: leaked_types.into_iter().collect(),
};
crate::tracing_compat::error!("Resource cleanup verification failed: {}", error);
let resources = self.resources.read();
for resource_id in &leaked_resources {
if let Some(record) = resources.get(resource_id) {
crate::tracing_compat::warn!(
"Leaked resource: {:?} (type={:?}, allocated_at={:?})",
resource_id,
record.resource_type,
record.allocated_at
);
}
}
if self.config.panic_on_leaks {
panic!("Resource cleanup verification failed: {}", error);
}
return Err(error);
}
crate::tracing_compat::debug!(
"Region cleanup verified successfully: region_id={:?} resources_cleaned={}",
region_id,
owned_resources.len()
);
Ok(())
}
pub fn get_stats(&self) -> ResourceCleanupStats {
self.stats.read().clone()
}
pub fn get_tracked_resources(&self) -> HashMap<ResourceId, ResourceRecord> {
self.resources.read().clone()
}
pub fn get_region_resources(&self, region_id: RegionId) -> Vec<ResourceRecord> {
let region_resources = self.region_resources.read();
let resource_ids = region_resources
.get(®ion_id)
.cloned()
.unwrap_or_default();
let resources = self.resources.read();
resource_ids
.into_iter()
.filter_map(|id| resources.get(&id).cloned())
.collect()
}
pub fn force_global_cleanup_check(&self) -> Vec<ResourceCleanupError> {
let mut errors = Vec::new();
let region_ids: HashSet<RegionId> = {
let region_resources = self.region_resources.read();
region_resources.keys().copied().collect()
};
for region_id in region_ids {
if let Err(error) = self.verify_region_cleanup(region_id) {
errors.push(error);
}
}
errors
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_resource_state_transitions() {
use ResourceState::*;
assert!(Allocated.can_transition_to(Active));
assert!(Active.can_transition_to(Cleaning));
assert!(Cleaning.can_transition_to(Cleaned));
assert!(Active.can_transition_to(Leaked));
assert!(Cleaning.can_transition_to(Leaked));
assert!(!Cleaned.can_transition_to(Active));
assert!(!Leaked.can_transition_to(Cleaned));
assert!(!Allocated.can_transition_to(Cleaning));
}
#[test]
fn test_resource_record_lifecycle() {
let region_id = RegionId::new();
let task_id = TaskId::new();
let mut record =
ResourceRecord::new(ResourceType::FileDescriptor, Some(region_id), Some(task_id));
assert_eq!(record.state, ResourceState::Allocated);
record.activate(region_id).unwrap();
assert_eq!(record.state, ResourceState::Active);
assert_eq!(record.owner_region, Some(region_id));
record.begin_cleanup().unwrap();
assert_eq!(record.state, ResourceState::Cleaning);
record.complete_cleanup().unwrap();
assert_eq!(record.state, ResourceState::Cleaned);
}
#[test]
fn test_resource_cleanup_verifier() {
let config = ResourceCleanupConfig::default();
let verifier = ResourceCleanupVerifier::new(config);
verifier.start().unwrap();
assert!(verifier.is_active());
let region_id = RegionId::new();
let resource_id = verifier
.track_allocation(ResourceType::FileDescriptor, Some(region_id), None)
.unwrap();
let region_resources = verifier.get_region_resources(region_id);
assert_eq!(region_resources.len(), 1);
assert_eq!(region_resources[0].id, resource_id);
verifier.track_cleanup(resource_id).unwrap();
verifier.verify_region_cleanup(region_id).unwrap();
let stats = verifier.get_stats();
assert_eq!(stats.total_allocated, 1);
assert_eq!(stats.total_cleaned, 1);
assert_eq!(stats.total_leaked, 0);
assert_eq!(stats.clean_region_closes, 1);
}
#[test]
fn test_resource_leak_detection() {
let config = ResourceCleanupConfig {
panic_on_leaks: false, ..Default::default()
};
let verifier = ResourceCleanupVerifier::new(config);
verifier.start().unwrap();
let region_id = RegionId::new();
let _resource_id = verifier
.track_allocation(ResourceType::FileDescriptor, Some(region_id), None)
.unwrap();
let result = verifier.verify_region_cleanup(region_id);
assert!(result.is_err());
if let Err(ResourceCleanupError::ResourceLeak {
region_id: leaked_region,
leak_count,
resource_types,
}) = result
{
assert_eq!(leaked_region, region_id);
assert_eq!(leak_count, 1);
assert!(resource_types.contains(&ResourceType::FileDescriptor));
} else {
panic!("Expected ResourceLeak error");
}
let stats = verifier.get_stats();
assert_eq!(stats.total_leaked, 1);
assert_eq!(stats.leaked_region_closes, 1);
}
}