use serde::{Deserialize, Serialize};
use std::collections::{HashMap, HashSet, VecDeque};
use std::time::{Duration, Instant, SystemTime};
use crate::types::{Budget, Outcome, RegionId, TaskId};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionLeakConfig {
pub max_creation_delay: Duration,
pub max_closing_time: Duration,
pub max_finalizing_time: Duration,
pub max_task_lifetime: Duration,
pub max_idle_time: Duration,
pub continuous_checking: bool,
pub fail_fast_mode: bool,
pub max_violations_tracked: usize,
pub include_stack_traces: bool,
}
impl Default for RegionLeakConfig {
fn default() -> Self {
Self {
max_creation_delay: Duration::from_millis(100),
max_closing_time: Duration::from_secs(5),
max_finalizing_time: Duration::from_secs(10),
max_task_lifetime: Duration::from_secs(30),
max_idle_time: Duration::from_secs(60),
continuous_checking: true,
fail_fast_mode: false,
max_violations_tracked: 100,
include_stack_traces: true,
}
}
}
#[derive(Debug, Clone)]
pub struct RegionState {
pub region_id: RegionId,
pub parent_id: Option<RegionId>,
pub state: RegionLifecycleState,
pub creation_time: Instant,
pub last_activity: Instant,
pub active_tasks: HashSet<TaskId>,
pub child_regions: HashSet<RegionId>,
pub expected_finalizers: u32,
pub completed_finalizers: u32,
pub creation_context: Option<String>,
pub budget: Budget,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum RegionLifecycleState {
Created,
Active,
Closing,
Finalizing,
Closed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionViolation {
pub violation_type: ViolationType,
pub region_id: RegionId,
pub detected_at: SystemTime,
pub duration: Duration,
pub description: String,
pub context: ViolationContext,
pub suggested_fix: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum ViolationType {
StuckCreation,
StuckClosing,
StuckFinalizing,
IdleRegion,
LongRunningTask,
OrphanedChildren,
OrphanedTasks,
FinalizersIncomplete,
ResourceLeak,
CircularDependency,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ViolationContext {
pub active_tasks: Vec<TaskId>,
pub child_regions: Vec<RegionId>,
pub parent_region: Option<RegionId>,
pub last_activity_description: String,
pub outstanding_finalizers: u32,
pub budget_info: BudgetInfo,
pub stack_trace: Option<String>,
pub related_violations: Vec<RegionId>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct BudgetInfo {
pub budget_type: String,
pub initial_amount: String,
pub remaining_amount: String,
pub exhaustion_state: String,
}
#[derive(Debug, Clone)]
pub struct TaskState {
pub task_id: TaskId,
pub region_id: RegionId,
pub spawn_time: Instant,
pub last_poll_time: Option<Instant>,
pub state: TaskLifecycleState,
pub spawn_context: Option<String>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum TaskLifecycleState {
Spawned,
Running,
Completed,
Cancelled,
Panicked,
}
#[derive(Debug)]
pub struct RegionLeakOracle {
config: RegionLeakConfig,
regions: HashMap<RegionId, RegionState>,
tasks: HashMap<TaskId, TaskState>,
violations: VecDeque<RegionViolation>,
start_time: Instant,
last_check_time: Instant,
total_regions_created: u64,
total_regions_closed: u64,
total_tasks_spawned: u64,
total_tasks_completed: u64,
}
impl RegionLeakOracle {
#[must_use]
pub fn new(config: RegionLeakConfig) -> Self {
let now = Instant::now();
Self {
config,
regions: HashMap::new(),
tasks: HashMap::new(),
violations: VecDeque::new(),
start_time: now,
last_check_time: now,
total_regions_created: 0,
total_regions_closed: 0,
total_tasks_spawned: 0,
total_tasks_completed: 0,
}
}
#[must_use]
pub fn with_defaults() -> Self {
Self::new(RegionLeakConfig::default())
}
#[must_use]
pub fn with_strict_timeouts() -> Self {
Self::new(RegionLeakConfig {
max_creation_delay: Duration::from_millis(10),
max_closing_time: Duration::from_millis(100),
max_finalizing_time: Duration::from_millis(500),
max_task_lifetime: Duration::from_secs(1),
max_idle_time: Duration::from_secs(2),
fail_fast_mode: true,
..RegionLeakConfig::default()
})
}
pub fn on_region_created(
&mut self,
region_id: RegionId,
parent_id: Option<RegionId>,
context: Option<String>,
budget: Budget,
) {
let now = Instant::now();
if self.regions.contains_key(®ion_id) {
self.record_violation(RegionViolation {
violation_type: ViolationType::CircularDependency,
region_id,
detected_at: SystemTime::now(),
duration: Duration::from_secs(0),
description: format!("Region {region_id} created twice"),
context: ViolationContext::empty(),
suggested_fix: "Check for duplicate region creation logic".to_string(),
});
return;
}
if let Some(parent) = parent_id {
if let Some(parent_state) = self.regions.get_mut(&parent) {
parent_state.child_regions.insert(region_id);
parent_state.last_activity = now;
}
}
let region_state = RegionState {
region_id,
parent_id,
state: RegionLifecycleState::Created,
creation_time: now,
last_activity: now,
active_tasks: HashSet::new(),
child_regions: HashSet::new(),
expected_finalizers: 0,
completed_finalizers: 0,
creation_context: context,
budget,
};
self.regions.insert(region_id, region_state);
self.total_regions_created += 1;
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_region_activated(&mut self, region_id: RegionId) {
if let Some(region) = self.regions.get_mut(®ion_id) {
region.state = RegionLifecycleState::Active;
region.last_activity = Instant::now();
}
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_task_spawned(
&mut self,
task_id: TaskId,
region_id: RegionId,
context: Option<String>,
) {
let now = Instant::now();
if let Some(region) = self.regions.get_mut(®ion_id) {
region.active_tasks.insert(task_id);
region.last_activity = now;
if region.state == RegionLifecycleState::Created {
region.state = RegionLifecycleState::Active;
}
}
let task_state = TaskState {
task_id,
region_id,
spawn_time: now,
last_poll_time: None,
state: TaskLifecycleState::Spawned,
spawn_context: context,
};
self.tasks.insert(task_id, task_state);
self.total_tasks_spawned += 1;
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_task_polled(&mut self, task_id: TaskId) {
let now = Instant::now();
if let Some(task) = self.tasks.get_mut(&task_id) {
task.last_poll_time = Some(now);
task.state = TaskLifecycleState::Running;
if let Some(region) = self.regions.get_mut(&task.region_id) {
region.last_activity = now;
}
}
}
pub fn on_task_completed(&mut self, task_id: TaskId, outcome: Outcome<(), String>) {
let now = Instant::now();
if let Some(task) = self.tasks.get_mut(&task_id) {
task.state = match outcome {
Outcome::Ok(()) => TaskLifecycleState::Completed,
Outcome::Err(_) => TaskLifecycleState::Completed,
Outcome::Cancelled(_) => TaskLifecycleState::Cancelled,
Outcome::Panicked(_) => TaskLifecycleState::Panicked,
};
if let Some(region) = self.regions.get_mut(&task.region_id) {
region.active_tasks.remove(&task_id);
region.last_activity = now;
}
}
self.total_tasks_completed += 1;
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_region_closing(&mut self, region_id: RegionId, expected_finalizers: u32) {
if let Some(region) = self.regions.get_mut(®ion_id) {
region.state = RegionLifecycleState::Closing;
region.expected_finalizers = expected_finalizers;
region.last_activity = Instant::now();
}
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_finalizer_completed(&mut self, region_id: RegionId) {
if let Some(region) = self.regions.get_mut(®ion_id) {
region.completed_finalizers += 1;
region.last_activity = Instant::now();
if region.child_regions.is_empty()
&& region.active_tasks.is_empty()
&& region.completed_finalizers < region.expected_finalizers
{
region.state = RegionLifecycleState::Finalizing;
}
}
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn on_region_closed(&mut self, region_id: RegionId) {
if let Some(region) = self.regions.get_mut(®ion_id) {
region.state = RegionLifecycleState::Closed;
region.last_activity = Instant::now();
}
self.total_regions_closed += 1;
let parent_id = self.regions.get(®ion_id).and_then(|r| r.parent_id);
if let Some(parent) = parent_id {
if let Some(parent_region) = self.regions.get_mut(&parent) {
parent_region.child_regions.remove(®ion_id);
parent_region.last_activity = Instant::now();
}
}
if self.config.continuous_checking {
let _ = self.check_for_violations();
}
}
pub fn check_for_violations(&mut self) -> Result<Vec<RegionViolation>, String> {
let now = Instant::now();
self.last_check_time = now;
let mut new_violations = Vec::new();
for region in self.regions.values() {
if let Some(violation) = self.check_region_violations(region, now) {
new_violations.push(violation);
}
}
for task in self.tasks.values() {
if let Some(violation) = self.check_task_violations(task, now) {
new_violations.push(violation);
}
}
new_violations.extend(self.check_structural_violations(now));
for violation in &new_violations {
self.record_violation(violation.clone());
}
if new_violations.is_empty() {
Ok(vec![])
} else {
if self.config.fail_fast_mode {
return Err(format!("Region leak detected: {:?}", new_violations[0]));
}
Ok(new_violations)
}
}
#[must_use]
pub fn violations(&self) -> &VecDeque<RegionViolation> {
&self.violations
}
#[must_use]
pub fn statistics(&self) -> RegionLeakStatistics {
RegionLeakStatistics {
total_regions_created: self.total_regions_created,
total_regions_closed: self.total_regions_closed,
total_tasks_spawned: self.total_tasks_spawned,
total_tasks_completed: self.total_tasks_completed,
active_regions: self.regions.len() as u64,
active_tasks: self.tasks.len() as u64,
total_violations: self.violations.len() as u64,
monitoring_duration: self.last_check_time.duration_since(self.start_time),
}
}
pub fn clear_violations(&mut self) {
self.violations.clear();
}
pub fn reset(&mut self) {
self.regions.clear();
self.tasks.clear();
self.violations.clear();
let now = Instant::now();
self.start_time = now;
self.last_check_time = now;
self.total_regions_created = 0;
self.total_regions_closed = 0;
self.total_tasks_spawned = 0;
self.total_tasks_completed = 0;
}
fn record_violation(&mut self, violation: RegionViolation) {
self.violations.push_back(violation);
while self.violations.len() > self.config.max_violations_tracked {
self.violations.pop_front();
}
}
fn check_region_violations(
&self,
region: &RegionState,
now: Instant,
) -> Option<RegionViolation> {
let duration = now.duration_since(region.creation_time);
match region.state {
RegionLifecycleState::Created => {
if duration > self.config.max_creation_delay {
return Some(RegionViolation {
violation_type: ViolationType::StuckCreation,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration,
description: format!(
"Region {} stuck in Created state for {:?}",
region.region_id, duration
),
context: self.build_violation_context(region),
suggested_fix: "Check region activation logic".to_string(),
});
}
}
RegionLifecycleState::Closing => {
if duration > self.config.max_closing_time {
return Some(RegionViolation {
violation_type: ViolationType::StuckClosing,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration,
description: format!(
"Region {} stuck in Closing state for {:?}",
region.region_id, duration
),
context: self.build_violation_context(region),
suggested_fix: "Check for hanging child tasks or finalizers".to_string(),
});
}
}
RegionLifecycleState::Finalizing => {
if duration > self.config.max_finalizing_time {
return Some(RegionViolation {
violation_type: ViolationType::StuckFinalizing,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration,
description: format!(
"Region {} stuck in Finalizing state for {:?}",
region.region_id, duration
),
context: self.build_violation_context(region),
suggested_fix: "Check for hanging finalizer logic".to_string(),
});
}
}
RegionLifecycleState::Active => {
let idle_duration = now.duration_since(region.last_activity);
if idle_duration > self.config.max_idle_time {
return Some(RegionViolation {
violation_type: ViolationType::IdleRegion,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration: idle_duration,
description: format!(
"Region {} idle for {:?}",
region.region_id, idle_duration
),
context: self.build_violation_context(region),
suggested_fix: "Check for deadlocked or infinite-loop tasks".to_string(),
});
}
}
RegionLifecycleState::Closed => {
}
}
None
}
fn check_task_violations(&self, task: &TaskState, now: Instant) -> Option<RegionViolation> {
if task.state == TaskLifecycleState::Completed
|| task.state == TaskLifecycleState::Cancelled
|| task.state == TaskLifecycleState::Panicked
{
return None;
}
let duration = now.duration_since(task.spawn_time);
if duration > self.config.max_task_lifetime {
return Some(RegionViolation {
violation_type: ViolationType::LongRunningTask,
region_id: task.region_id,
detected_at: SystemTime::now(),
duration,
description: format!(
"Task {} running for {:?} in region {}",
task.task_id, duration, task.region_id
),
context: ViolationContext {
active_tasks: vec![task.task_id],
child_regions: vec![],
parent_region: None,
last_activity_description: format!(
"Task spawned at {:?}, last poll: {:?}",
task.spawn_time,
task.last_poll_time.unwrap_or(task.spawn_time)
),
outstanding_finalizers: 0,
budget_info: BudgetInfo {
budget_type: "Unknown".to_string(),
initial_amount: "Unknown".to_string(),
remaining_amount: "Unknown".to_string(),
exhaustion_state: "Unknown".to_string(),
},
stack_trace: None,
related_violations: vec![],
},
suggested_fix: "Check for infinite loops or blocking operations".to_string(),
});
}
None
}
fn check_structural_violations(&self, _now: Instant) -> Vec<RegionViolation> {
let mut violations = Vec::new();
for region in self.regions.values() {
if let Some(parent_id) = region.parent_id {
if let Some(parent) = self.regions.get(&parent_id) {
if parent.state == RegionLifecycleState::Closed
&& region.state != RegionLifecycleState::Closed
{
violations.push(RegionViolation {
violation_type: ViolationType::OrphanedChildren,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration: Duration::from_secs(0),
description: format!(
"Region {} orphaned by closed parent {}",
region.region_id, parent_id
),
context: self.build_violation_context(region),
suggested_fix: "Ensure parent waits for all children to close"
.to_string(),
});
}
}
}
if region.state == RegionLifecycleState::Closed && !region.active_tasks.is_empty() {
violations.push(RegionViolation {
violation_type: ViolationType::OrphanedTasks,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration: Duration::from_secs(0),
description: format!(
"Region {} closed with {} active tasks",
region.region_id,
region.active_tasks.len()
),
context: self.build_violation_context(region),
suggested_fix: "Ensure all tasks complete before region closes".to_string(),
});
}
if region.state == RegionLifecycleState::Closed
&& region.completed_finalizers < region.expected_finalizers
{
violations.push(RegionViolation {
violation_type: ViolationType::FinalizersIncomplete,
region_id: region.region_id,
detected_at: SystemTime::now(),
duration: Duration::from_secs(0),
description: format!(
"Region {} closed with {}/{} finalizers completed",
region.region_id, region.completed_finalizers, region.expected_finalizers
),
context: self.build_violation_context(region),
suggested_fix: "Ensure all finalizers run to completion".to_string(),
});
}
}
violations
}
fn build_violation_context(&self, region: &RegionState) -> ViolationContext {
ViolationContext {
active_tasks: region.active_tasks.iter().copied().collect(),
child_regions: region.child_regions.iter().copied().collect(),
parent_region: region.parent_id,
last_activity_description: format!("Last activity: {:?}", region.last_activity),
outstanding_finalizers: region.expected_finalizers - region.completed_finalizers,
budget_info: BudgetInfo {
budget_type: format!("{:?}", region.budget),
initial_amount: "Unknown".to_string(),
remaining_amount: "Unknown".to_string(),
exhaustion_state: "Unknown".to_string(),
},
stack_trace: if self.config.include_stack_traces {
Some("Stack trace capture not implemented".to_string())
} else {
None
},
related_violations: vec![],
}
}
}
impl ViolationContext {
fn empty() -> Self {
Self {
active_tasks: vec![],
child_regions: vec![],
parent_region: None,
last_activity_description: String::new(),
outstanding_finalizers: 0,
budget_info: BudgetInfo {
budget_type: String::new(),
initial_amount: String::new(),
remaining_amount: String::new(),
exhaustion_state: String::new(),
},
stack_trace: None,
related_violations: vec![],
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RegionLeakStatistics {
pub total_regions_created: u64,
pub total_regions_closed: u64,
pub total_tasks_spawned: u64,
pub total_tasks_completed: u64,
pub active_regions: u64,
pub active_tasks: u64,
pub total_violations: u64,
pub monitoring_duration: Duration,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_oracle_creation() {
let oracle = RegionLeakOracle::with_defaults();
assert_eq!(oracle.violations().len(), 0);
}
#[test]
fn test_region_lifecycle_tracking() {
let mut oracle = RegionLeakOracle::with_strict_timeouts();
let region_id = RegionId::new_for_test(1, 0);
oracle.on_region_created(region_id, None, None, Budget::INFINITE);
oracle.on_region_activated(region_id);
oracle.on_region_closing(region_id, 0);
oracle.on_region_closed(region_id);
let violations = oracle.check_for_violations().unwrap();
assert!(violations.is_empty());
}
#[test]
fn test_stuck_region_detection() {
let mut oracle = RegionLeakOracle::with_strict_timeouts();
let region_id = RegionId::new_for_test(1, 0);
oracle.on_region_created(region_id, None, None, Budget::INFINITE);
std::thread::sleep(Duration::from_millis(50));
let _ = oracle.check_for_violations();
let recorded: Vec<_> = oracle.violations().iter().cloned().collect();
assert!(!recorded.is_empty());
assert!(matches!(
recorded[0].violation_type,
ViolationType::StuckCreation
));
}
#[test]
fn test_task_tracking() {
let mut oracle = RegionLeakOracle::with_defaults();
let region_id = RegionId::new_for_test(1, 0);
let task_id = TaskId::new_for_test(100, 0);
oracle.on_region_created(region_id, None, None, Budget::INFINITE);
oracle.on_task_spawned(task_id, region_id, None);
oracle.on_task_completed(task_id, Outcome::Ok(()));
oracle.on_region_closing(region_id, 0);
oracle.on_region_closed(region_id);
let violations = oracle.check_for_violations().unwrap();
assert!(violations.is_empty());
let stats = oracle.statistics();
assert_eq!(stats.total_tasks_spawned, 1);
assert_eq!(stats.total_tasks_completed, 1);
}
#[test]
fn test_orphaned_task_detection() {
let mut oracle = RegionLeakOracle::with_defaults();
let region_id = RegionId::new_for_test(1, 0);
let task_id = TaskId::new_for_test(100, 0);
oracle.on_region_created(region_id, None, None, Budget::INFINITE);
oracle.on_task_spawned(task_id, region_id, None);
oracle.on_region_closing(region_id, 0);
oracle.on_region_closed(region_id);
let violations = oracle.check_for_violations().unwrap();
assert!(!violations.is_empty());
assert!(matches!(
violations[0].violation_type,
ViolationType::OrphanedTasks
));
}
}
impl Default for RegionLeakOracle {
fn default() -> Self {
Self::with_defaults()
}
}
impl std::fmt::Display for RegionViolation {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.violation_type, self.description)
}
}
impl std::fmt::Display for ViolationType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::StuckCreation => write!(f, "Stuck Creation"),
Self::StuckClosing => write!(f, "Stuck Closing"),
Self::StuckFinalizing => write!(f, "Stuck Finalizing"),
Self::IdleRegion => write!(f, "Idle Region"),
Self::LongRunningTask => write!(f, "Long Running Task"),
Self::OrphanedChildren => write!(f, "Orphaned Children"),
Self::OrphanedTasks => write!(f, "Orphaned Tasks"),
Self::FinalizersIncomplete => write!(f, "Finalizers Incomplete"),
Self::ResourceLeak => write!(f, "Resource Leak"),
Self::CircularDependency => write!(f, "Circular Dependency"),
}
}
}