use crate::types::{RegionId, TaskId, Time};
use parking_lot::RwLock;
use std::backtrace::Backtrace;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone)]
pub struct CancelDebtConfig {
pub max_debt_items: usize,
pub measurement_window_ns: u64,
pub max_debt_rate_per_sec: f64,
pub max_violations: usize,
pub panic_on_violation: bool,
pub capture_stack_traces: bool,
pub max_stack_trace_depth: usize,
}
impl Default for CancelDebtConfig {
fn default() -> Self {
Self {
max_debt_items: 1000,
measurement_window_ns: 10_000_000_000, max_debt_rate_per_sec: 100.0, max_violations: 1000,
panic_on_violation: false,
capture_stack_traces: true,
max_stack_trace_depth: 32,
}
}
}
#[derive(Debug, Clone)]
pub enum CancelDebtViolation {
DebtThresholdExceeded {
current_debt: usize,
max_debt: usize,
queue_type: String,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
DebtAccumulationTooFast {
current_rate: f64,
max_rate: f64,
window_size_ns: u64,
queue_type: String,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
CleanupStall {
queue_type: String,
stall_duration_ns: u64,
pending_items: usize,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
ResourcePressure {
queue_type: String,
estimated_memory_bytes: usize,
current_debt: usize,
detected_at: Time,
stack_trace: Option<Arc<Backtrace>>,
},
}
impl fmt::Display for CancelDebtViolation {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::DebtThresholdExceeded {
current_debt,
max_debt,
queue_type,
detected_at,
..
} => {
write!(
f,
"Debt threshold exceeded: {} has {} items (max: {}) at {}",
queue_type,
current_debt,
max_debt,
detected_at.as_nanos()
)
}
Self::DebtAccumulationTooFast {
current_rate,
max_rate,
window_size_ns,
queue_type,
detected_at,
..
} => {
write!(
f,
"Debt accumulating too fast: {} at {:.1} items/sec (max: {:.1}) over {}ns window at {}",
queue_type,
current_rate,
max_rate,
window_size_ns,
detected_at.as_nanos()
)
}
Self::CleanupStall {
queue_type,
stall_duration_ns,
pending_items,
detected_at,
..
} => {
write!(
f,
"Cleanup stall: {} stalled for {}ns with {} pending items at {}",
queue_type,
stall_duration_ns,
pending_items,
detected_at.as_nanos()
)
}
Self::ResourcePressure {
queue_type,
estimated_memory_bytes,
current_debt,
detected_at,
..
} => {
write!(
f,
"Resource pressure: {} using ~{} bytes for {} items at {}",
queue_type,
estimated_memory_bytes,
current_debt,
detected_at.as_nanos()
)
}
}
}
}
#[derive(Debug, Clone)]
struct CleanupWorkItem {
task_id: Option<TaskId>,
region_id: Option<RegionId>,
work_type: CleanupWorkType,
created_at: Time,
estimated_size_bytes: usize,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CleanupWorkType {
TaskFinalization,
RegionCleanup,
ObligationDischarge,
ResourceDeallocation,
FinalizerExecution,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CleanupWorkItemSnapshot {
pub task_id: Option<TaskId>,
pub region_id: Option<RegionId>,
pub work_type: CleanupWorkType,
pub work_type_name: &'static str,
pub created_at: Time,
pub estimated_size_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct QueueDebtSnapshot {
pub queue_type: String,
pub pending_items: usize,
pub estimated_memory_usage: usize,
pub work_items: Vec<CleanupWorkItemSnapshot>,
}
impl CleanupWorkType {
fn name(self) -> &'static str {
match self {
Self::TaskFinalization => "task_finalization",
Self::RegionCleanup => "region_cleanup",
Self::ObligationDischarge => "obligation_discharge",
Self::ResourceDeallocation => "resource_deallocation",
Self::FinalizerExecution => "finalizer_execution",
}
}
fn estimated_size_bytes(self) -> usize {
match self {
Self::TaskFinalization => 200, Self::RegionCleanup => 300, Self::ObligationDischarge => 150, Self::ResourceDeallocation => 100, Self::FinalizerExecution => 250, }
}
}
#[derive(Debug)]
struct QueueState {
queue_type: String,
pending_items: VecDeque<CleanupWorkItem>,
completion_times: VecDeque<(Time, usize)>, last_completion: Option<Time>,
total_completed: u64,
}
impl QueueState {
fn new(queue_type: String) -> Self {
Self {
queue_type,
pending_items: VecDeque::new(),
completion_times: VecDeque::new(),
last_completion: None,
total_completed: 0,
}
}
fn add_work_item(&mut self, item: CleanupWorkItem) {
self.pending_items.push_back(item);
}
fn complete_items(&mut self, count: usize, completion_time: Time) {
let actual_completed = std::cmp::min(count, self.pending_items.len());
for _ in 0..actual_completed {
self.pending_items.pop_front();
}
self.completion_times
.push_back((completion_time, actual_completed));
self.last_completion = Some(completion_time);
self.total_completed += actual_completed as u64;
while self.completion_times.len() > 1000 {
self.completion_times.pop_front();
}
}
fn current_debt(&self) -> usize {
self.pending_items.len()
}
fn estimated_memory_usage(&self) -> usize {
self.pending_items
.iter()
.map(|item| item.estimated_size_bytes)
.sum()
}
#[allow(clippy::cast_precision_loss)]
fn completion_rate_over_window(&self, window_ns: u64, now: Time) -> f64 {
if self.completion_times.is_empty() {
return 0.0;
}
let cutoff_time = Time::from_nanos(now.as_nanos().saturating_sub(window_ns));
let completions_in_window: usize = self
.completion_times
.iter()
.filter(|(time, _)| *time >= cutoff_time)
.map(|(_, count)| *count)
.sum();
let window_seconds = window_ns as f64 / 1_000_000_000.0;
completions_in_window as f64 / window_seconds
}
#[allow(clippy::cast_precision_loss)]
fn debt_accumulation_rate(&self, window_ns: u64, now: Time) -> f64 {
if self.pending_items.len() < 2 {
return 0.0;
}
let cutoff_time = Time::from_nanos(now.as_nanos().saturating_sub(window_ns));
let items_added_in_window = self
.pending_items
.iter()
.filter(|item| item.created_at >= cutoff_time)
.count();
let completion_rate = self.completion_rate_over_window(window_ns, now);
let window_seconds = window_ns as f64 / 1_000_000_000.0;
let addition_rate = items_added_in_window as f64 / window_seconds;
addition_rate - completion_rate
}
fn stall_duration(&self, now: Time) -> u64 {
let Some(oldest) = self.pending_items.front() else {
return 0;
};
let stall_start_ns = match self.last_completion {
Some(last) => std::cmp::max(last.as_nanos(), oldest.created_at.as_nanos()),
None => oldest.created_at.as_nanos(),
};
now.as_nanos().saturating_sub(stall_start_ns)
}
}
#[derive(Debug)]
pub struct CancelDebtOracle {
config: CancelDebtConfig,
queue_states: RwLock<HashMap<String, QueueState>>,
violations: RwLock<VecDeque<CancelDebtViolation>>,
work_items_tracked: AtomicU64,
completions_tracked: AtomicU64,
violations_detected: AtomicU64,
debt_checks_performed: AtomicU64,
}
impl Default for CancelDebtOracle {
fn default() -> Self {
Self::with_default_config()
}
}
impl CancelDebtOracle {
#[must_use]
pub fn new(config: CancelDebtConfig) -> Self {
Self {
config,
queue_states: RwLock::new(HashMap::new()),
violations: RwLock::new(VecDeque::new()),
work_items_tracked: AtomicU64::new(0),
completions_tracked: AtomicU64::new(0),
violations_detected: AtomicU64::new(0),
debt_checks_performed: AtomicU64::new(0),
}
}
#[must_use]
pub fn with_default_config() -> Self {
Self::new(CancelDebtConfig::default())
}
pub fn on_work_item_added(
&self,
queue_type: &str,
task_id: Option<TaskId>,
region_id: Option<RegionId>,
work_type: CleanupWorkType,
created_at: Time,
) {
self.work_items_tracked.fetch_add(1, Ordering::Relaxed);
let item = CleanupWorkItem {
task_id,
region_id,
work_type,
created_at,
estimated_size_bytes: work_type.estimated_size_bytes(),
};
let mut states = self.queue_states.write();
let state = states
.entry(queue_type.to_string())
.or_insert_with(|| QueueState::new(queue_type.to_string()));
state.add_work_item(item);
}
pub fn on_work_items_completed(&self, queue_type: &str, count: usize, completion_time: Time) {
self.completions_tracked
.fetch_add(count as u64, Ordering::Relaxed);
let mut states = self.queue_states.write();
if let Some(state) = states.get_mut(queue_type) {
state.complete_items(count, completion_time);
}
}
pub fn check_debt_accumulation(&self, now: Time) {
self.debt_checks_performed.fetch_add(1, Ordering::Relaxed);
let states = self.queue_states.read();
for state in states.values() {
self.check_queue_violations(state, now);
}
}
pub fn check(&self, now: Time) -> Result<(), CancelDebtViolation> {
self.check_debt_accumulation(now);
let violations = self.violations.read();
if let Some(violation) = violations.front() {
return Err(violation.clone());
}
Ok(())
}
pub fn reset(&self) {
self.queue_states.write().clear();
self.violations.write().clear();
self.work_items_tracked.store(0, Ordering::Relaxed);
self.completions_tracked.store(0, Ordering::Relaxed);
self.violations_detected.store(0, Ordering::Relaxed);
self.debt_checks_performed.store(0, Ordering::Relaxed);
}
pub fn get_statistics(&self) -> CancelDebtStatistics {
let states = self.queue_states.read();
let violations = self.violations.read();
let total_debt: usize = states.values().map(QueueState::current_debt).sum();
let total_memory_usage: usize = states
.values()
.map(QueueState::estimated_memory_usage)
.sum();
CancelDebtStatistics {
work_items_tracked: self.work_items_tracked.load(Ordering::Relaxed),
completions_tracked: self.completions_tracked.load(Ordering::Relaxed),
violations_detected: self.violations_detected.load(Ordering::Relaxed),
debt_checks_performed: self.debt_checks_performed.load(Ordering::Relaxed),
tracked_queues: states.len(),
total_current_debt: total_debt,
total_estimated_memory_usage: total_memory_usage,
total_violations: violations.len(),
}
}
pub fn get_recent_violations(&self, limit: usize) -> Vec<CancelDebtViolation> {
let violations = self.violations.read();
violations.iter().rev().take(limit).cloned().collect()
}
pub fn get_queue_states(&self) -> Vec<(String, usize, usize)> {
let states = self.queue_states.read();
states
.values()
.map(|s| {
(
s.queue_type.clone(),
s.current_debt(),
s.estimated_memory_usage(),
)
})
.collect()
}
pub fn get_queue_state_snapshots(&self) -> Vec<QueueDebtSnapshot> {
let states = self.queue_states.read();
let mut snapshots = states
.values()
.map(|state| QueueDebtSnapshot {
queue_type: state.queue_type.clone(),
pending_items: state.current_debt(),
estimated_memory_usage: state.estimated_memory_usage(),
work_items: state
.pending_items
.iter()
.map(|item| CleanupWorkItemSnapshot {
task_id: item.task_id,
region_id: item.region_id,
work_type: item.work_type,
work_type_name: item.work_type.name(),
created_at: item.created_at,
estimated_size_bytes: item.estimated_size_bytes,
})
.collect(),
})
.collect::<Vec<_>>();
snapshots.sort_by(|a, b| a.queue_type.cmp(&b.queue_type));
snapshots
}
fn check_queue_violations(&self, state: &QueueState, now: Time) {
let current_debt = state.current_debt();
if current_debt > self.config.max_debt_items {
let violation = CancelDebtViolation::DebtThresholdExceeded {
current_debt,
max_debt: self.config.max_debt_items,
queue_type: state.queue_type.clone(),
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
let debt_rate = state.debt_accumulation_rate(self.config.measurement_window_ns, now);
if debt_rate > self.config.max_debt_rate_per_sec {
let violation = CancelDebtViolation::DebtAccumulationTooFast {
current_rate: debt_rate,
max_rate: self.config.max_debt_rate_per_sec,
window_size_ns: self.config.measurement_window_ns,
queue_type: state.queue_type.clone(),
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
if current_debt > 0 {
let stall_duration = state.stall_duration(now);
if stall_duration > self.config.measurement_window_ns * 2 {
let violation = CancelDebtViolation::CleanupStall {
queue_type: state.queue_type.clone(),
stall_duration_ns: stall_duration,
pending_items: current_debt,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
}
let memory_usage = state.estimated_memory_usage();
if memory_usage > 1_048_576 {
let violation = CancelDebtViolation::ResourcePressure {
queue_type: state.queue_type.clone(),
estimated_memory_bytes: memory_usage,
current_debt,
detected_at: now,
stack_trace: self.capture_stack_trace(),
};
self.record_violation(violation);
}
}
fn record_violation(&self, violation: CancelDebtViolation) {
self.violations_detected.fetch_add(1, Ordering::Relaxed);
assert!(
!self.config.panic_on_violation,
"Cancellation debt violation detected: {violation}"
);
let mut violations = self.violations.write();
violations.push_back(violation);
while violations.len() > self.config.max_violations {
violations.pop_front();
}
}
fn capture_stack_trace(&self) -> Option<Arc<Backtrace>> {
if self.config.capture_stack_traces {
Some(Arc::new(Backtrace::capture()))
} else {
None
}
}
}
#[derive(Debug, Clone)]
pub struct CancelDebtStatistics {
pub work_items_tracked: u64,
pub completions_tracked: u64,
pub violations_detected: u64,
pub debt_checks_performed: u64,
pub tracked_queues: usize,
pub total_current_debt: usize,
pub total_estimated_memory_usage: usize,
pub total_violations: usize,
}
impl fmt::Display for CancelDebtStatistics {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"CancelDebtStats {{ work_items: {}, completions: {}, violations: {}, checks: {}, queues: {}, debt: {}, memory: {}KB, total_violations: {} }}",
self.work_items_tracked,
self.completions_tracked,
self.violations_detected,
self.debt_checks_performed,
self.tracked_queues,
self.total_current_debt,
self.total_estimated_memory_usage / 1024,
self.total_violations
)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
#[test]
fn test_normal_operation_no_violations() {
init_test_logging();
let oracle = CancelDebtOracle::with_default_config();
let now = Time::ZERO;
oracle.on_work_item_added(
"finalizers",
Some(TaskId::testing_default()),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
now,
);
oracle.on_work_items_completed("finalizers", 1, Time::from_nanos(1000));
let stats = oracle.get_statistics();
assert_eq!(stats.violations_detected, 0);
assert_eq!(stats.work_items_tracked, 1);
assert_eq!(stats.completions_tracked, 1);
}
#[test]
fn test_debt_threshold_violation() {
init_test_logging();
let config = CancelDebtConfig {
max_debt_items: 5, ..Default::default()
};
let oracle = CancelDebtOracle::new(config);
let now = Time::ZERO;
for i in 0..10 {
oracle.on_work_item_added(
"finalizers",
Some(TaskId::new_for_test(i as u32, 0)),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::from_nanos(i * 1000),
);
}
oracle.check_debt_accumulation(now);
let stats = oracle.get_statistics();
assert!(stats.violations_detected > 0);
let violations = oracle.get_recent_violations(5);
assert!(!violations.is_empty());
assert!(matches!(
violations[0],
CancelDebtViolation::DebtThresholdExceeded { .. }
));
}
#[test]
fn test_debt_accumulation_rate_violation() {
init_test_logging();
let config = CancelDebtConfig {
max_debt_rate_per_sec: 1.0, measurement_window_ns: 1_000_000_000, ..Default::default()
};
let oracle = CancelDebtOracle::new(config);
for i in 0..10 {
oracle.on_work_item_added(
"finalizers",
Some(TaskId::new_for_test(i as u32, 0)),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::from_nanos(i * 100_000_000), );
}
let now = Time::from_nanos(1_000_000_000); oracle.check_debt_accumulation(now);
let stats = oracle.get_statistics();
assert!(stats.violations_detected > 0);
let violations = oracle.get_recent_violations(5);
assert!(!violations.is_empty());
assert!(
violations
.iter()
.any(|v| matches!(v, CancelDebtViolation::DebtAccumulationTooFast { .. }))
);
}
#[test]
fn test_cleanup_stall_detection() {
init_test_logging();
let config = CancelDebtConfig {
measurement_window_ns: 1_000_000_000, ..Default::default()
};
let oracle = CancelDebtOracle::new(config);
oracle.on_work_item_added(
"finalizers",
Some(TaskId::testing_default()),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::ZERO,
);
oracle.on_work_items_completed("finalizers", 1, Time::from_nanos(100_000_000));
oracle.on_work_item_added(
"finalizers",
Some(TaskId::new_for_test(2, 0)),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::from_nanos(500_000_000),
);
let now = Time::from_nanos(3_500_000_000);
oracle.check_debt_accumulation(now);
let stats = oracle.get_statistics();
assert!(stats.violations_detected > 0);
let violations = oracle.get_recent_violations(5);
assert!(
violations
.iter()
.any(|v| matches!(v, CancelDebtViolation::CleanupStall { .. }))
);
}
#[test]
fn test_oracle_check_method() {
init_test_logging();
let oracle = CancelDebtOracle::with_default_config();
let result = oracle.check(Time::ZERO);
assert!(result.is_ok());
for i in 0..1500 {
oracle.on_work_item_added(
"finalizers",
Some(TaskId::new_for_test(i as u32, 0)),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::from_nanos(i * 1000),
);
}
let result = oracle.check(Time::ZERO);
assert!(result.is_err());
}
#[test]
fn test_oracle_reset() {
init_test_logging();
let oracle = CancelDebtOracle::with_default_config();
oracle.on_work_item_added(
"finalizers",
Some(TaskId::testing_default()),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution,
Time::ZERO,
);
let stats_before = oracle.get_statistics();
assert!(stats_before.work_items_tracked > 0);
oracle.reset();
let stats_after = oracle.get_statistics();
assert_eq!(stats_after.work_items_tracked, 0);
assert_eq!(stats_after.completions_tracked, 0);
assert_eq!(stats_after.violations_detected, 0);
assert_eq!(stats_after.tracked_queues, 0);
assert_eq!(stats_after.total_current_debt, 0);
}
#[test]
fn test_resource_pressure_detection() {
init_test_logging();
let oracle = CancelDebtOracle::with_default_config();
for i in 0..5000 {
oracle.on_work_item_added(
"large_finalizers",
Some(TaskId::new_for_test(i as u32, 0)),
Some(RegionId::testing_default()),
CleanupWorkType::FinalizerExecution, Time::from_nanos(i * 1000),
);
}
oracle.check_debt_accumulation(Time::ZERO);
let violations = oracle.get_recent_violations(10);
assert!(
violations
.iter()
.any(|v| matches!(v, CancelDebtViolation::ResourcePressure { .. }))
);
}
#[test]
fn test_queue_state_snapshots_expose_pending_work_metadata() {
init_test_logging();
let oracle = CancelDebtOracle::with_default_config();
let task_id = TaskId::new_for_test(11, 0);
let region_id = RegionId::testing_default();
let created_at = Time::from_nanos(42);
oracle.on_work_item_added(
"finalizers",
Some(task_id),
Some(region_id),
CleanupWorkType::FinalizerExecution,
created_at,
);
let snapshots = oracle.get_queue_state_snapshots();
assert_eq!(snapshots.len(), 1);
let queue = &snapshots[0];
assert_eq!(queue.queue_type, "finalizers");
assert_eq!(queue.pending_items, 1);
assert_eq!(queue.work_items.len(), 1);
let item = &queue.work_items[0];
assert_eq!(item.task_id, Some(task_id));
assert_eq!(item.region_id, Some(region_id));
assert_eq!(item.work_type, CleanupWorkType::FinalizerExecution);
assert_eq!(item.work_type_name, "finalizer_execution");
assert_eq!(item.created_at, created_at);
assert_eq!(
item.estimated_size_bytes,
CleanupWorkType::FinalizerExecution.estimated_size_bytes()
);
assert_eq!(queue.estimated_memory_usage, item.estimated_size_bytes);
}
}