use crate::types::{CancelKind, CancelReason};
use serde::{Deserialize, Serialize};
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone)]
pub struct CancellationTracerConfig {
pub enable_tracing: bool,
pub max_trace_depth: usize,
pub max_traces: usize,
pub slow_propagation_threshold_ms: u64,
pub stuck_cancellation_timeout_ms: u64,
pub enable_timing_analysis: bool,
pub sample_rate: f64,
}
impl Default for CancellationTracerConfig {
fn default() -> Self {
Self {
enable_tracing: true,
max_trace_depth: 64,
max_traces: 10_000,
slow_propagation_threshold_ms: 100,
stuck_cancellation_timeout_ms: 5_000,
enable_timing_analysis: cfg!(debug_assertions),
sample_rate: 1.0,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
pub struct TraceId(u64);
impl TraceId {
pub fn new() -> Self {
static NEXT_ID: AtomicU64 = AtomicU64::new(1);
Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
}
#[must_use]
pub fn as_u64(&self) -> u64 {
self.0
}
}
impl Default for TraceId {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationTraceStep {
pub step_id: u32,
pub entity_id: String,
pub entity_type: EntityType,
pub cancel_reason: String,
pub cancel_kind: String,
pub timestamp: SystemTime,
pub elapsed_since_start: Duration,
pub elapsed_since_prev: Duration,
pub depth: u32,
pub parent_entity: Option<String>,
pub entity_state: String,
pub propagation_completed: bool,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
pub enum EntityType {
Task,
Region,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationTrace {
pub trace_id: TraceId,
pub root_cancel_reason: String,
pub root_cancel_kind: String,
pub root_entity: String,
pub root_entity_type: EntityType,
pub start_time: SystemTime,
pub steps: Vec<CancellationTraceStep>,
pub is_complete: bool,
pub total_propagation_time: Option<Duration>,
pub max_depth: u32,
pub entities_cancelled: u32,
pub anomalies: Vec<PropagationAnomaly>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum PropagationAnomaly {
SlowPropagation {
step_id: u32,
entity_id: String,
elapsed: Duration,
threshold: Duration,
},
StuckCancellation {
entity_id: String,
stuck_duration: Duration,
},
IncorrectPropagationOrder {
parent_entity: String,
child_entity: String,
parent_step: u32,
child_step: u32,
},
UnexpectedPropagation {
description: String,
affected_entities: Vec<String>,
},
ExcessiveDepth {
depth: u32,
entity_id: String,
},
}
#[derive(Debug, Default)]
pub struct CancellationTracerStats {
pub traces_collected: AtomicU64,
pub steps_recorded: AtomicU64,
pub anomalies_detected: AtomicU64,
pub slow_propagations: AtomicU64,
pub stuck_cancellations: AtomicU64,
pub incorrect_orders: AtomicU64,
pub avg_trace_depth: AtomicU64,
pub avg_propagation_time_us: AtomicU64,
}
impl CancellationTracerStats {
pub fn snapshot(&self) -> CancellationTracerStatsSnapshot {
CancellationTracerStatsSnapshot {
traces_collected: self.traces_collected.load(Ordering::Relaxed),
steps_recorded: self.steps_recorded.load(Ordering::Relaxed),
anomalies_detected: self.anomalies_detected.load(Ordering::Relaxed),
slow_propagations: self.slow_propagations.load(Ordering::Relaxed),
stuck_cancellations: self.stuck_cancellations.load(Ordering::Relaxed),
incorrect_orders: self.incorrect_orders.load(Ordering::Relaxed),
avg_trace_depth: self.avg_trace_depth.load(Ordering::Relaxed),
avg_propagation_time_us: self.avg_propagation_time_us.load(Ordering::Relaxed),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationTracerStatsSnapshot {
pub traces_collected: u64,
pub steps_recorded: u64,
pub anomalies_detected: u64,
pub slow_propagations: u64,
pub stuck_cancellations: u64,
pub incorrect_orders: u64,
pub avg_trace_depth: u64,
pub avg_propagation_time_us: u64,
}
#[derive(Debug)]
struct InProgressTrace {
trace: CancellationTrace,
last_step_time: SystemTime,
entity_to_step: HashMap<String, u32>,
depth_by_entity: HashMap<String, u32>,
}
#[derive(Debug)]
pub struct CancellationTracer {
config: CancellationTracerConfig,
stats: CancellationTracerStats,
in_progress: Arc<Mutex<HashMap<TraceId, InProgressTrace>>>,
completed_traces: Arc<Mutex<VecDeque<CancellationTrace>>>,
entity_traces: Arc<Mutex<HashMap<String, Vec<TraceId>>>>,
}
impl CancellationTracer {
#[must_use]
pub fn new(config: CancellationTracerConfig) -> Self {
Self {
config,
stats: CancellationTracerStats::default(),
in_progress: Arc::new(Mutex::new(HashMap::new())),
completed_traces: Arc::new(Mutex::new(VecDeque::new())),
entity_traces: Arc::new(Mutex::new(HashMap::new())),
}
}
pub fn start_trace(
&self,
root_entity: String,
entity_type: EntityType,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
) -> TraceId {
if !self.config.enable_tracing {
return TraceId::new(); }
if self.config.sample_rate < 1.0 {
let hash = self.hash_entity(&root_entity);
if self.sample_unit_interval(hash) > self.config.sample_rate {
return TraceId::new(); }
}
let trace_id = TraceId::new();
let now = SystemTime::now();
let trace = CancellationTrace {
trace_id,
root_cancel_reason: format!("{cancel_reason:?}"),
root_cancel_kind: format!("{cancel_kind:?}"),
root_entity: root_entity.clone(),
root_entity_type: entity_type,
start_time: now,
steps: Vec::new(),
is_complete: false,
total_propagation_time: None,
max_depth: 0,
entities_cancelled: 0,
anomalies: Vec::new(),
};
let in_progress_trace = InProgressTrace {
trace,
last_step_time: now,
entity_to_step: HashMap::new(),
depth_by_entity: HashMap::new(),
};
if let Ok(mut in_progress) = self.in_progress.lock() {
in_progress.insert(trace_id, in_progress_trace);
}
if let Ok(mut entity_traces) = self.entity_traces.lock() {
entity_traces.entry(root_entity).or_default().push(trace_id);
}
self.stats.traces_collected.fetch_add(1, Ordering::Relaxed);
trace_id
}
pub fn record_step(
&self,
trace_id: TraceId,
entity_id: String,
entity_type: EntityType,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
entity_state: String,
parent_entity: Option<String>,
propagation_completed: bool,
) {
if !self.config.enable_tracing {
return;
}
let now = SystemTime::now();
if let Ok(mut in_progress) = self.in_progress.lock() {
if let Some(in_progress_trace) = in_progress.get_mut(&trace_id) {
let elapsed_since_start = now
.duration_since(in_progress_trace.trace.start_time)
.unwrap_or(Duration::ZERO);
let elapsed_since_prev = now
.duration_since(in_progress_trace.last_step_time)
.unwrap_or(Duration::ZERO);
let depth = if let Some(parent) = &parent_entity {
in_progress_trace
.depth_by_entity
.get(parent)
.copied()
.unwrap_or(0)
+ 1
} else {
0
};
let step_id = in_progress_trace.trace.steps.len() as u32;
let step = CancellationTraceStep {
step_id,
entity_id: entity_id.clone(),
entity_type,
cancel_reason: format!("{cancel_reason:?}"),
cancel_kind: format!("{cancel_kind:?}"),
timestamp: now,
elapsed_since_start,
elapsed_since_prev,
depth,
parent_entity,
entity_state,
propagation_completed,
};
self.check_for_anomalies(&step, in_progress_trace);
in_progress_trace.trace.steps.push(step);
in_progress_trace.last_step_time = now;
in_progress_trace
.entity_to_step
.insert(entity_id.clone(), step_id);
in_progress_trace.depth_by_entity.insert(entity_id, depth);
in_progress_trace.trace.max_depth = in_progress_trace.trace.max_depth.max(depth);
in_progress_trace.trace.entities_cancelled += 1;
self.stats.steps_recorded.fetch_add(1, Ordering::Relaxed);
}
}
}
pub fn complete_trace(&self, trace_id: TraceId) {
if !self.config.enable_tracing {
return;
}
if let Ok(mut in_progress) = self.in_progress.lock() {
if let Some(mut in_progress_trace) = in_progress.remove(&trace_id) {
let completion_time = SystemTime::now();
let total_time = completion_time
.duration_since(in_progress_trace.trace.start_time)
.unwrap_or(Duration::ZERO);
in_progress_trace.trace.is_complete = true;
in_progress_trace.trace.total_propagation_time = Some(total_time);
self.update_completion_stats(&in_progress_trace.trace);
if let Ok(mut completed) = self.completed_traces.lock() {
completed.push_back(in_progress_trace.trace);
while completed.len() > self.config.max_traces {
completed.pop_front();
}
}
if let Ok(mut entity_traces) = self.entity_traces.lock() {
for traces in entity_traces.values_mut() {
traces.retain(|&id| id != trace_id);
}
}
}
}
}
pub fn stats(&self) -> CancellationTracerStatsSnapshot {
self.stats.snapshot()
}
pub fn completed_traces(&self) -> Vec<CancellationTrace> {
self.completed_traces
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.iter()
.cloned()
.collect()
}
pub fn in_progress_traces(&self) -> Vec<TraceId> {
self.in_progress
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.keys()
.copied()
.collect()
}
pub fn traces_for_entity(&self, entity_id: &str) -> Vec<TraceId> {
self.entity_traces
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.get(entity_id)
.cloned()
.unwrap_or_default()
}
fn check_for_anomalies(&self, step: &CancellationTraceStep, trace: &mut InProgressTrace) {
if step.elapsed_since_prev.as_millis()
> u128::from(self.config.slow_propagation_threshold_ms)
{
let anomaly = PropagationAnomaly::SlowPropagation {
step_id: step.step_id,
entity_id: step.entity_id.clone(),
elapsed: step.elapsed_since_prev,
threshold: Duration::from_millis(self.config.slow_propagation_threshold_ms),
};
trace.trace.anomalies.push(anomaly);
self.stats.slow_propagations.fetch_add(1, Ordering::Relaxed);
}
if step.depth > self.config.max_trace_depth as u32 {
let anomaly = PropagationAnomaly::ExcessiveDepth {
depth: step.depth,
entity_id: step.entity_id.clone(),
};
trace.trace.anomalies.push(anomaly);
}
if let Some(parent) = &step.parent_entity {
if let Some(&parent_step_id) = trace.entity_to_step.get(parent) {
if step.step_id < parent_step_id {
let anomaly = PropagationAnomaly::IncorrectPropagationOrder {
parent_entity: parent.clone(),
child_entity: step.entity_id.clone(),
parent_step: parent_step_id,
child_step: step.step_id,
};
trace.trace.anomalies.push(anomaly);
self.stats.incorrect_orders.fetch_add(1, Ordering::Relaxed);
}
}
}
if !trace.trace.anomalies.is_empty() {
self.stats
.anomalies_detected
.fetch_add(1, Ordering::Relaxed);
}
}
fn update_completion_stats(&self, trace: &CancellationTrace) {
if let Some(total_time) = trace.total_propagation_time {
self.stats
.avg_propagation_time_us
.store(total_time.as_micros() as u64, Ordering::Relaxed);
}
self.stats
.avg_trace_depth
.store(u64::from(trace.max_depth), Ordering::Relaxed);
}
fn hash_entity(&self, entity: &str) -> u64 {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
entity.hash(&mut hasher);
hasher.finish()
}
fn sample_unit_interval(&self, hash: u64) -> f64 {
const TWO_POW_53_F64: f64 = 9_007_199_254_740_992.0;
let bits = hash >> 11;
bits as f64 / TWO_POW_53_F64
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationAnalysis {
pub analysis_period: Duration,
pub traces_analyzed: usize,
pub total_steps: usize,
pub avg_depth: f64,
pub avg_propagation_time: Duration,
pub common_cancel_kinds: Vec<(String, usize)>,
pub high_cancellation_entities: Vec<(String, usize)>,
pub anomaly_summary: BTreeMap<String, usize>,
pub bottlenecks: Vec<String>,
pub recommendations: Vec<String>,
}
#[must_use]
pub fn analyze_cancellation_patterns(traces: &[CancellationTrace]) -> CancellationAnalysis {
if traces.is_empty() {
return CancellationAnalysis {
analysis_period: Duration::ZERO,
traces_analyzed: 0,
total_steps: 0,
avg_depth: 0.0,
avg_propagation_time: Duration::ZERO,
common_cancel_kinds: Vec::new(),
high_cancellation_entities: Vec::new(),
anomaly_summary: BTreeMap::new(),
bottlenecks: Vec::new(),
recommendations: Vec::new(),
};
}
let total_steps: usize = traces.iter().map(|t| t.steps.len()).sum();
let avg_depth: f64 =
traces.iter().map(|t| f64::from(t.max_depth)).sum::<f64>() / traces.len() as f64;
let avg_propagation_time = traces
.iter()
.filter_map(|t| t.total_propagation_time)
.map(|d| d.as_nanos() as f64)
.sum::<f64>()
/ traces.len() as f64;
let mut cancel_kind_counts: HashMap<String, usize> = HashMap::new();
for trace in traces {
*cancel_kind_counts
.entry(trace.root_cancel_kind.clone())
.or_default() += 1;
}
let mut common_cancel_kinds: Vec<_> = cancel_kind_counts.into_iter().collect();
common_cancel_kinds.sort_by(|a, b| b.1.cmp(&a.1));
let mut entity_counts: HashMap<String, usize> = HashMap::new();
for trace in traces {
for step in &trace.steps {
*entity_counts.entry(step.entity_id.clone()).or_default() += 1;
}
}
let mut high_cancellation_entities: Vec<_> = entity_counts.into_iter().collect();
high_cancellation_entities.sort_by(|a, b| b.1.cmp(&a.1));
high_cancellation_entities.truncate(10);
let mut anomaly_summary: BTreeMap<String, usize> = BTreeMap::new();
for trace in traces {
for anomaly in &trace.anomalies {
let anomaly_type = match anomaly {
PropagationAnomaly::SlowPropagation { .. } => "SlowPropagation",
PropagationAnomaly::StuckCancellation { .. } => "StuckCancellation",
PropagationAnomaly::IncorrectPropagationOrder { .. } => "IncorrectOrder",
PropagationAnomaly::UnexpectedPropagation { .. } => "UnexpectedPropagation",
PropagationAnomaly::ExcessiveDepth { .. } => "ExcessiveDepth",
};
*anomaly_summary.entry(anomaly_type.to_string()).or_default() += 1;
}
}
let mut bottlenecks = Vec::new();
let total_entity_cancellations: usize = high_cancellation_entities
.iter()
.map(|(_, count)| *count)
.sum();
if total_entity_cancellations > 0 {
for (entity_id, count) in &high_cancellation_entities {
let frequency_ratio = *count as f64 / total_entity_cancellations as f64;
if frequency_ratio > 0.3 {
bottlenecks.push(format!(
"High-frequency cancellation source: {} ({:.1}% of all cancellations)",
entity_id,
frequency_ratio * 100.0
));
}
}
}
let mut slow_propagation_entities: HashMap<String, usize> = HashMap::new();
for trace in traces {
for anomaly in &trace.anomalies {
if let PropagationAnomaly::SlowPropagation { entity_id, .. } = anomaly {
*slow_propagation_entities
.entry(entity_id.clone())
.or_default() += 1;
}
}
}
for (entity_id, slow_count) in slow_propagation_entities {
if slow_count > traces.len() / 20 {
bottlenecks.push(format!(
"Slow propagation bottleneck: {entity_id} (involved in {slow_count} slow propagations)"
));
}
}
let mut stuck_entities: HashMap<String, usize> = HashMap::new();
for trace in traces {
for anomaly in &trace.anomalies {
if let PropagationAnomaly::StuckCancellation { entity_id, .. } = anomaly {
*stuck_entities.entry(entity_id.clone()).or_default() += 1;
}
}
}
for (entity_id, stuck_count) in stuck_entities {
if stuck_count > 0 {
bottlenecks.push(format!(
"Stuck cancellation bottleneck: {entity_id} ({stuck_count} instances)"
));
}
}
let mut depth_bottlenecks: HashMap<String, f64> = HashMap::new();
for trace in traces {
if trace.steps.len() as f64 > avg_depth * 1.5 {
if let Some(first_step) = trace.steps.first() {
let current_avg = depth_bottlenecks
.entry(first_step.entity_id.clone())
.or_insert(0.0);
*current_avg = f64::midpoint(*current_avg, trace.steps.len() as f64); }
}
}
for (entity_id, avg_depth_caused) in depth_bottlenecks {
if avg_depth_caused > avg_depth * 1.5 {
bottlenecks.push(format!(
"Deep cancellation tree origin: {entity_id} (avg depth: {avg_depth_caused:.1})"
));
}
}
let mut recommendations = Vec::new();
if avg_propagation_time > 10_000_000.0 {
recommendations.push(
"Consider optimizing cancellation propagation - average time is high".to_string(),
);
}
if avg_depth > 10.0 {
recommendations.push(
"Deep cancellation trees detected - consider flatter structured concurrency"
.to_string(),
);
}
if anomaly_summary.get("SlowPropagation").copied().unwrap_or(0) > traces.len() / 10 {
recommendations.push(
"Frequent slow propagations - investigate blocking operations in cancellation handlers"
.to_string(),
);
}
if !bottlenecks.is_empty() {
recommendations.push(format!(
"Address {} identified performance bottlenecks to improve cancellation efficiency",
bottlenecks.len()
));
}
CancellationAnalysis {
analysis_period: Duration::from_nanos(avg_propagation_time as u64),
traces_analyzed: traces.len(),
total_steps,
avg_depth,
avg_propagation_time: Duration::from_nanos(avg_propagation_time as u64),
common_cancel_kinds,
high_cancellation_entities,
anomaly_summary,
bottlenecks,
recommendations,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tracer_creation() {
let config = CancellationTracerConfig::default();
let tracer = CancellationTracer::new(config);
let stats = tracer.stats();
assert_eq!(stats.traces_collected, 0);
assert_eq!(stats.steps_recorded, 0);
}
#[test]
fn test_trace_lifecycle() {
let config = CancellationTracerConfig::default();
let tracer = CancellationTracer::new(config);
let trace_id = tracer.start_trace(
"task-1".to_string(),
EntityType::Task,
&CancelReason::user("test"),
CancelKind::User,
);
tracer.record_step(
trace_id,
"region-1".to_string(),
EntityType::Region,
&CancelReason::user("propagation"),
CancelKind::User,
"Closing".to_string(),
Some("task-1".to_string()),
true,
);
tracer.complete_trace(trace_id);
let stats = tracer.stats();
assert_eq!(stats.traces_collected, 1);
assert_eq!(stats.steps_recorded, 1);
let completed = tracer.completed_traces();
assert_eq!(completed.len(), 1);
assert!(completed[0].is_complete);
}
#[test]
fn test_anomaly_detection() {
let mut config = CancellationTracerConfig::default();
config.slow_propagation_threshold_ms = 1; let tracer = CancellationTracer::new(config);
let trace_id = tracer.start_trace(
"task-1".to_string(),
EntityType::Task,
&CancelReason::user("test"),
CancelKind::User,
);
std::thread::sleep(Duration::from_millis(5));
tracer.record_step(
trace_id,
"region-1".to_string(),
EntityType::Region,
&CancelReason::user("slow"),
CancelKind::User,
"Closing".to_string(),
Some("task-1".to_string()),
true,
);
tracer.complete_trace(trace_id);
let completed = tracer.completed_traces();
assert!(!completed.is_empty());
assert!(!completed[0].anomalies.is_empty());
assert!(matches!(
completed[0].anomalies[0],
PropagationAnomaly::SlowPropagation { .. }
));
}
#[test]
fn test_analysis_patterns() {
let traces = vec![
CancellationTrace {
trace_id: TraceId::new(),
root_cancel_reason: "test1".to_string(),
root_cancel_kind: "User".to_string(),
root_entity: "task-1".to_string(),
root_entity_type: EntityType::Task,
start_time: SystemTime::now(),
steps: vec![],
is_complete: true,
total_propagation_time: Some(Duration::from_millis(10)),
max_depth: 3,
entities_cancelled: 5,
anomalies: vec![],
},
CancellationTrace {
trace_id: TraceId::new(),
root_cancel_reason: "test2".to_string(),
root_cancel_kind: "Timeout".to_string(),
root_entity: "task-2".to_string(),
root_entity_type: EntityType::Task,
start_time: SystemTime::now(),
steps: vec![],
is_complete: true,
total_propagation_time: Some(Duration::from_millis(5)),
max_depth: 2,
entities_cancelled: 3,
anomalies: vec![],
},
];
let analysis = analyze_cancellation_patterns(&traces);
assert_eq!(analysis.traces_analyzed, 2);
assert_eq!(analysis.avg_depth, 2.5);
assert!(!analysis.common_cancel_kinds.is_empty());
}
#[test]
fn test_bottleneck_detection() {
let traces = vec![
CancellationTrace {
trace_id: TraceId::new(),
root_cancel_reason: "test".to_string(),
root_cancel_kind: "User".to_string(),
root_entity: "bottleneck-entity".to_string(),
root_entity_type: EntityType::Task,
start_time: SystemTime::now(),
steps: vec![CancellationTraceStep {
step_id: 0,
entity_id: "bottleneck-entity".to_string(),
entity_type: EntityType::Task,
cancel_reason: "high frequency".to_string(),
cancel_kind: "User".to_string(),
parent_entity: None,
timestamp: SystemTime::now(),
elapsed_since_start: Duration::from_millis(1),
elapsed_since_prev: Duration::from_millis(1),
depth: 0,
entity_state: "Cancelled".to_string(),
propagation_completed: true,
}],
is_complete: true,
total_propagation_time: Some(Duration::from_millis(1)),
max_depth: 1,
entities_cancelled: 1,
anomalies: vec![
PropagationAnomaly::SlowPropagation {
step_id: 0,
entity_id: "slow-entity".to_string(),
elapsed: Duration::from_millis(100),
threshold: Duration::from_millis(1),
},
PropagationAnomaly::StuckCancellation {
entity_id: "stuck-entity".to_string(),
stuck_duration: Duration::from_millis(500),
},
],
},
CancellationTrace {
trace_id: TraceId::new(),
root_cancel_reason: "test".to_string(),
root_cancel_kind: "User".to_string(),
root_entity: "bottleneck-entity".to_string(),
root_entity_type: EntityType::Task,
start_time: SystemTime::now(),
steps: vec![CancellationTraceStep {
step_id: 0,
entity_id: "bottleneck-entity".to_string(),
entity_type: EntityType::Task,
cancel_reason: "high frequency".to_string(),
cancel_kind: "User".to_string(),
parent_entity: None,
timestamp: SystemTime::now(),
elapsed_since_start: Duration::from_millis(1),
elapsed_since_prev: Duration::from_millis(1),
depth: 0,
entity_state: "Cancelled".to_string(),
propagation_completed: true,
}],
is_complete: true,
total_propagation_time: Some(Duration::from_millis(1)),
max_depth: 1,
entities_cancelled: 1,
anomalies: vec![],
},
CancellationTrace {
trace_id: TraceId::new(),
root_cancel_reason: "test".to_string(),
root_cancel_kind: "User".to_string(),
root_entity: "other-entity".to_string(),
root_entity_type: EntityType::Task,
start_time: SystemTime::now(),
steps: vec![CancellationTraceStep {
step_id: 0,
entity_id: "other-entity".to_string(),
entity_type: EntityType::Task,
cancel_reason: "normal".to_string(),
cancel_kind: "User".to_string(),
parent_entity: None,
timestamp: SystemTime::now(),
elapsed_since_start: Duration::from_millis(1),
elapsed_since_prev: Duration::from_millis(1),
depth: 0,
entity_state: "Cancelled".to_string(),
propagation_completed: true,
}],
is_complete: true,
total_propagation_time: Some(Duration::from_millis(1)),
max_depth: 1,
entities_cancelled: 1,
anomalies: vec![],
},
];
let analysis = analyze_cancellation_patterns(&traces);
assert!(
!analysis.bottlenecks.is_empty(),
"Should detect bottlenecks"
);
let has_high_freq_bottleneck = analysis.bottlenecks.iter().any(|b| {
b.contains("High-frequency cancellation source") && b.contains("bottleneck-entity")
});
assert!(
has_high_freq_bottleneck,
"Should detect high-frequency cancellation source"
);
let has_slow_bottleneck = analysis
.bottlenecks
.iter()
.any(|b| b.contains("Slow propagation bottleneck") && b.contains("slow-entity"));
assert!(
has_slow_bottleneck,
"Should detect slow propagation bottleneck"
);
let has_stuck_bottleneck = analysis
.bottlenecks
.iter()
.any(|b| b.contains("Stuck cancellation bottleneck") && b.contains("stuck-entity"));
assert!(
has_stuck_bottleneck,
"Should detect stuck cancellation bottleneck"
);
let has_bottleneck_recommendation = analysis
.recommendations
.iter()
.any(|r| r.contains("Address") && r.contains("bottlenecks"));
assert!(
has_bottleneck_recommendation,
"Should recommend addressing bottlenecks"
);
}
}