Skip to main content

asupersync/observability/
cancellation_tracer.rs

1//! Structured Cancellation Trace Analyzer
2//!
3//! Deep analysis and visualization of cancellation propagation paths through the structured
4//! concurrency tree. Detects anomalies like slow propagation, stuck cancellations, or
5//! incorrect propagation patterns.
6//!
7//! This module provides real-time cancellation monitoring with minimal overhead, building
8//! on the existing observability infrastructure to provide comprehensive insights into
9//! cancellation behavior across complex structured concurrency applications.
10
11use crate::types::{CancelKind, CancelReason};
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, SystemTime};
17
18/// Configuration for cancellation trace collection.
19#[derive(Debug, Clone)]
20pub struct CancellationTracerConfig {
21    /// Enable real-time cancellation tracing.
22    pub enable_tracing: bool,
23    /// Maximum trace depth to collect (prevents memory explosion).
24    pub max_trace_depth: usize,
25    /// Maximum number of traces to keep in memory.
26    pub max_traces: usize,
27    /// Threshold for detecting slow cancellation propagation.
28    pub slow_propagation_threshold_ms: u64,
29    /// Threshold for detecting stuck cancellations.
30    pub stuck_cancellation_timeout_ms: u64,
31    /// Enable detailed timing measurements (higher overhead).
32    pub enable_timing_analysis: bool,
33    /// Sample rate for trace collection (0.0-1.0).
34    pub sample_rate: f64,
35}
36
37impl Default for CancellationTracerConfig {
38    fn default() -> Self {
39        Self {
40            enable_tracing: true,
41            max_trace_depth: 64,
42            max_traces: 10_000,
43            slow_propagation_threshold_ms: 100,
44            stuck_cancellation_timeout_ms: 5_000,
45            enable_timing_analysis: cfg!(debug_assertions),
46            sample_rate: 1.0,
47        }
48    }
49}
50
51/// Unique identifier for a cancellation trace.
52#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
53pub struct TraceId(u64);
54
55impl TraceId {
56    /// Creates a new trace ID.
57    pub fn new() -> Self {
58        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
59        Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
60    }
61
62    /// Returns the inner trace ID value.
63    #[must_use]
64    pub fn as_u64(&self) -> u64 {
65        self.0
66    }
67}
68
69impl Default for TraceId {
70    fn default() -> Self {
71        Self::new()
72    }
73}
74
75/// A single step in a cancellation propagation trace.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct CancellationTraceStep {
78    /// Sequence number within the trace.
79    pub step_id: u32,
80    /// Entity that received cancellation.
81    pub entity_id: String,
82    /// Type of entity (Task, Region).
83    pub entity_type: EntityType,
84    /// Cancellation reason.
85    pub cancel_reason: String,
86    /// Kind of cancellation.
87    pub cancel_kind: String,
88    /// Timestamp when cancellation was received.
89    pub timestamp: SystemTime,
90    /// Time elapsed since trace started.
91    pub elapsed_since_start: Duration,
92    /// Time elapsed since previous step.
93    pub elapsed_since_prev: Duration,
94    /// Depth in the propagation tree.
95    pub depth: u32,
96    /// Parent entity that propagated cancellation to this entity.
97    pub parent_entity: Option<String>,
98    /// Current state of the entity.
99    pub entity_state: String,
100    /// Whether this step completed propagation successfully.
101    pub propagation_completed: bool,
102}
103
104/// Type of entity in the cancellation trace.
105#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
106pub enum EntityType {
107    /// A task entity.
108    Task,
109    /// A region entity.
110    Region,
111}
112
113/// A complete cancellation propagation trace.
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct CancellationTrace {
116    /// Unique identifier for this trace.
117    pub trace_id: TraceId,
118    /// Root cancellation that started the trace.
119    pub root_cancel_reason: String,
120    /// Initial cancellation kind.
121    pub root_cancel_kind: String,
122    /// Entity that initiated the cancellation.
123    pub root_entity: String,
124    /// Type of root entity.
125    pub root_entity_type: EntityType,
126    /// Timestamp when trace started.
127    pub start_time: SystemTime,
128    /// All propagation steps in order.
129    pub steps: Vec<CancellationTraceStep>,
130    /// Whether the trace is complete (all propagation finished).
131    pub is_complete: bool,
132    /// Total propagation time (if complete).
133    pub total_propagation_time: Option<Duration>,
134    /// Maximum depth reached in propagation tree.
135    pub max_depth: u32,
136    /// Number of entities that were cancelled.
137    pub entities_cancelled: u32,
138    /// Detected anomalies in this trace.
139    pub anomalies: Vec<PropagationAnomaly>,
140}
141
142/// Types of anomalies detected during cancellation propagation.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub enum PropagationAnomaly {
145    /// Propagation took longer than expected threshold.
146    SlowPropagation {
147        /// Step ID where slow propagation was detected.
148        step_id: u32,
149        /// ID of the entity with slow propagation.
150        entity_id: String,
151        /// Actual time elapsed during propagation.
152        elapsed: Duration,
153        /// Expected threshold duration.
154        threshold: Duration,
155    },
156    /// Cancellation appears to be stuck.
157    StuckCancellation {
158        /// ID of the entity with stuck cancellation.
159        entity_id: String,
160        /// How long the cancellation has been stuck.
161        stuck_duration: Duration,
162    },
163    /// Child was cancelled before parent.
164    IncorrectPropagationOrder {
165        /// ID of the parent entity that should have been cancelled first.
166        parent_entity: String,
167        /// ID of the child entity that was incorrectly cancelled first.
168        child_entity: String,
169        /// Step ID where parent cancellation occurred.
170        parent_step: u32,
171        /// Step ID where child cancellation occurred.
172        child_step: u32,
173    },
174    /// Unexpected propagation pattern.
175    UnexpectedPropagation {
176        /// Description of the unexpected pattern.
177        description: String,
178        /// List of entities affected by the pattern.
179        affected_entities: Vec<String>,
180    },
181    /// Propagation depth exceeded normal bounds.
182    ExcessiveDepth {
183        /// The excessive depth reached.
184        depth: u32,
185        /// ID of the entity at excessive depth.
186        entity_id: String,
187    },
188}
189
190/// Statistics about cancellation tracing.
191#[derive(Debug, Default)]
192pub struct CancellationTracerStats {
193    /// Total number of traces collected.
194    pub traces_collected: AtomicU64,
195    /// Total number of propagation steps recorded.
196    pub steps_recorded: AtomicU64,
197    /// Number of anomalies detected.
198    pub anomalies_detected: AtomicU64,
199    /// Number of slow propagations detected.
200    pub slow_propagations: AtomicU64,
201    /// Number of stuck cancellations detected.
202    pub stuck_cancellations: AtomicU64,
203    /// Number of incorrect propagation orders detected.
204    pub incorrect_orders: AtomicU64,
205    /// Average trace depth.
206    pub avg_trace_depth: AtomicU64,
207    /// Average propagation time in microseconds.
208    pub avg_propagation_time_us: AtomicU64,
209}
210
211impl CancellationTracerStats {
212    /// Gets a snapshot of current statistics.
213    pub fn snapshot(&self) -> CancellationTracerStatsSnapshot {
214        CancellationTracerStatsSnapshot {
215            traces_collected: self.traces_collected.load(Ordering::Relaxed),
216            steps_recorded: self.steps_recorded.load(Ordering::Relaxed),
217            anomalies_detected: self.anomalies_detected.load(Ordering::Relaxed),
218            slow_propagations: self.slow_propagations.load(Ordering::Relaxed),
219            stuck_cancellations: self.stuck_cancellations.load(Ordering::Relaxed),
220            incorrect_orders: self.incorrect_orders.load(Ordering::Relaxed),
221            avg_trace_depth: self.avg_trace_depth.load(Ordering::Relaxed),
222            avg_propagation_time_us: self.avg_propagation_time_us.load(Ordering::Relaxed),
223        }
224    }
225}
226
227/// Snapshot of cancellation tracer statistics.
228#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct CancellationTracerStatsSnapshot {
230    /// Total number of traces collected.
231    pub traces_collected: u64,
232    /// Total number of propagation steps recorded.
233    pub steps_recorded: u64,
234    /// Number of anomalies detected.
235    pub anomalies_detected: u64,
236    /// Number of slow propagation incidents.
237    pub slow_propagations: u64,
238    /// Number of stuck cancellation incidents.
239    pub stuck_cancellations: u64,
240    /// Number of incorrect propagation order incidents.
241    pub incorrect_orders: u64,
242    /// Average depth of cancellation traces.
243    pub avg_trace_depth: u64,
244    /// Average propagation time in microseconds.
245    pub avg_propagation_time_us: u64,
246}
247
248/// In-progress cancellation trace being built.
249#[derive(Debug)]
250struct InProgressTrace {
251    trace: CancellationTrace,
252    last_step_time: SystemTime,
253    entity_to_step: HashMap<String, u32>,
254    depth_by_entity: HashMap<String, u32>,
255}
256
257/// Structured cancellation trace analyzer.
258#[derive(Debug)]
259pub struct CancellationTracer {
260    config: CancellationTracerConfig,
261    stats: CancellationTracerStats,
262    /// In-progress traces being built.
263    in_progress: Arc<Mutex<HashMap<TraceId, InProgressTrace>>>,
264    /// Completed traces.
265    completed_traces: Arc<Mutex<VecDeque<CancellationTrace>>>,
266    /// Mapping from entity to active trace IDs.
267    entity_traces: Arc<Mutex<HashMap<String, Vec<TraceId>>>>,
268}
269
270impl CancellationTracer {
271    /// Creates a new cancellation tracer.
272    #[must_use]
273    pub fn new(config: CancellationTracerConfig) -> Self {
274        Self {
275            config,
276            stats: CancellationTracerStats::default(),
277            in_progress: Arc::new(Mutex::new(HashMap::new())),
278            completed_traces: Arc::new(Mutex::new(VecDeque::new())),
279            entity_traces: Arc::new(Mutex::new(HashMap::new())),
280        }
281    }
282
283    /// Starts a new cancellation trace from a root cancellation.
284    pub fn start_trace(
285        &self,
286        root_entity: String,
287        entity_type: EntityType,
288        cancel_reason: &CancelReason,
289        cancel_kind: CancelKind,
290    ) -> TraceId {
291        if !self.config.enable_tracing {
292            return TraceId::new(); // Return dummy ID
293        }
294
295        // Sample based on configured rate
296        if self.config.sample_rate < 1.0 {
297            let hash = self.hash_entity(&root_entity);
298            if self.sample_unit_interval(hash) > self.config.sample_rate {
299                return TraceId::new(); // Skip this trace
300            }
301        }
302
303        let trace_id = TraceId::new();
304        let now = SystemTime::now();
305
306        let trace = CancellationTrace {
307            trace_id,
308            root_cancel_reason: format!("{cancel_reason:?}"),
309            root_cancel_kind: format!("{cancel_kind:?}"),
310            root_entity: root_entity.clone(),
311            root_entity_type: entity_type,
312            start_time: now,
313            steps: Vec::new(),
314            is_complete: false,
315            total_propagation_time: None,
316            max_depth: 0,
317            entities_cancelled: 0,
318            anomalies: Vec::new(),
319        };
320
321        let in_progress_trace = InProgressTrace {
322            trace,
323            last_step_time: now,
324            entity_to_step: HashMap::new(),
325            depth_by_entity: HashMap::new(),
326        };
327
328        // Store the in-progress trace
329        if let Ok(mut in_progress) = self.in_progress.lock() {
330            in_progress.insert(trace_id, in_progress_trace);
331        }
332
333        // Track entity -> trace mapping
334        if let Ok(mut entity_traces) = self.entity_traces.lock() {
335            entity_traces.entry(root_entity).or_default().push(trace_id);
336        }
337
338        self.stats.traces_collected.fetch_add(1, Ordering::Relaxed);
339        trace_id
340    }
341
342    /// Records a cancellation propagation step.
343    pub fn record_step(
344        &self,
345        trace_id: TraceId,
346        entity_id: String,
347        entity_type: EntityType,
348        cancel_reason: &CancelReason,
349        cancel_kind: CancelKind,
350        entity_state: String,
351        parent_entity: Option<String>,
352        propagation_completed: bool,
353    ) {
354        if !self.config.enable_tracing {
355            return;
356        }
357
358        let now = SystemTime::now();
359
360        if let Ok(mut in_progress) = self.in_progress.lock() {
361            if let Some(in_progress_trace) = in_progress.get_mut(&trace_id) {
362                let elapsed_since_start = now
363                    .duration_since(in_progress_trace.trace.start_time)
364                    .unwrap_or(Duration::ZERO);
365                let elapsed_since_prev = now
366                    .duration_since(in_progress_trace.last_step_time)
367                    .unwrap_or(Duration::ZERO);
368
369                // Determine depth
370                let depth = if let Some(parent) = &parent_entity {
371                    in_progress_trace
372                        .depth_by_entity
373                        .get(parent)
374                        .copied()
375                        .unwrap_or(0)
376                        + 1
377                } else {
378                    0
379                };
380
381                let step_id = in_progress_trace.trace.steps.len() as u32;
382                let step = CancellationTraceStep {
383                    step_id,
384                    entity_id: entity_id.clone(),
385                    entity_type,
386                    cancel_reason: format!("{cancel_reason:?}"),
387                    cancel_kind: format!("{cancel_kind:?}"),
388                    timestamp: now,
389                    elapsed_since_start,
390                    elapsed_since_prev,
391                    depth,
392                    parent_entity,
393                    entity_state,
394                    propagation_completed,
395                };
396
397                // Check for anomalies
398                self.check_for_anomalies(&step, in_progress_trace);
399
400                // Update trace state
401                in_progress_trace.trace.steps.push(step);
402                in_progress_trace.last_step_time = now;
403                in_progress_trace
404                    .entity_to_step
405                    .insert(entity_id.clone(), step_id);
406                in_progress_trace.depth_by_entity.insert(entity_id, depth);
407                in_progress_trace.trace.max_depth = in_progress_trace.trace.max_depth.max(depth);
408                in_progress_trace.trace.entities_cancelled += 1;
409
410                self.stats.steps_recorded.fetch_add(1, Ordering::Relaxed);
411            }
412        }
413    }
414
415    /// Completes a cancellation trace.
416    pub fn complete_trace(&self, trace_id: TraceId) {
417        if !self.config.enable_tracing {
418            return;
419        }
420
421        if let Ok(mut in_progress) = self.in_progress.lock() {
422            if let Some(mut in_progress_trace) = in_progress.remove(&trace_id) {
423                let completion_time = SystemTime::now();
424                let total_time = completion_time
425                    .duration_since(in_progress_trace.trace.start_time)
426                    .unwrap_or(Duration::ZERO);
427
428                in_progress_trace.trace.is_complete = true;
429                in_progress_trace.trace.total_propagation_time = Some(total_time);
430
431                // Update statistics
432                self.update_completion_stats(&in_progress_trace.trace);
433
434                // Store completed trace
435                if let Ok(mut completed) = self.completed_traces.lock() {
436                    completed.push_back(in_progress_trace.trace);
437
438                    // Maintain size limit
439                    while completed.len() > self.config.max_traces {
440                        completed.pop_front();
441                    }
442                }
443
444                // Clean up entity mappings
445                if let Ok(mut entity_traces) = self.entity_traces.lock() {
446                    for traces in entity_traces.values_mut() {
447                        traces.retain(|&id| id != trace_id);
448                    }
449                }
450            }
451        }
452    }
453
454    /// Gets statistics about cancellation tracing.
455    pub fn stats(&self) -> CancellationTracerStatsSnapshot {
456        self.stats.snapshot()
457    }
458
459    /// Gets all completed traces (for analysis).
460    pub fn completed_traces(&self) -> Vec<CancellationTrace> {
461        self.completed_traces
462            .lock()
463            .unwrap_or_else(std::sync::PoisonError::into_inner)
464            .iter()
465            .cloned()
466            .collect()
467    }
468
469    /// Gets traces that are currently in progress.
470    pub fn in_progress_traces(&self) -> Vec<TraceId> {
471        self.in_progress
472            .lock()
473            .unwrap_or_else(std::sync::PoisonError::into_inner)
474            .keys()
475            .copied()
476            .collect()
477    }
478
479    /// Gets traces related to a specific entity.
480    pub fn traces_for_entity(&self, entity_id: &str) -> Vec<TraceId> {
481        self.entity_traces
482            .lock()
483            .unwrap_or_else(std::sync::PoisonError::into_inner)
484            .get(entity_id)
485            .cloned()
486            .unwrap_or_default()
487    }
488
489    /// Detects anomalies in cancellation propagation.
490    fn check_for_anomalies(&self, step: &CancellationTraceStep, trace: &mut InProgressTrace) {
491        // Check for slow propagation
492        if step.elapsed_since_prev.as_millis()
493            > u128::from(self.config.slow_propagation_threshold_ms)
494        {
495            let anomaly = PropagationAnomaly::SlowPropagation {
496                step_id: step.step_id,
497                entity_id: step.entity_id.clone(),
498                elapsed: step.elapsed_since_prev,
499                threshold: Duration::from_millis(self.config.slow_propagation_threshold_ms),
500            };
501            trace.trace.anomalies.push(anomaly);
502            self.stats.slow_propagations.fetch_add(1, Ordering::Relaxed);
503        }
504
505        // Check for excessive depth
506        if step.depth > self.config.max_trace_depth as u32 {
507            let anomaly = PropagationAnomaly::ExcessiveDepth {
508                depth: step.depth,
509                entity_id: step.entity_id.clone(),
510            };
511            trace.trace.anomalies.push(anomaly);
512        }
513
514        // Check for incorrect propagation order (simplified check)
515        if let Some(parent) = &step.parent_entity {
516            if let Some(&parent_step_id) = trace.entity_to_step.get(parent) {
517                if step.step_id < parent_step_id {
518                    let anomaly = PropagationAnomaly::IncorrectPropagationOrder {
519                        parent_entity: parent.clone(),
520                        child_entity: step.entity_id.clone(),
521                        parent_step: parent_step_id,
522                        child_step: step.step_id,
523                    };
524                    trace.trace.anomalies.push(anomaly);
525                    self.stats.incorrect_orders.fetch_add(1, Ordering::Relaxed);
526                }
527            }
528        }
529
530        if !trace.trace.anomalies.is_empty() {
531            self.stats
532                .anomalies_detected
533                .fetch_add(1, Ordering::Relaxed);
534        }
535    }
536
537    /// Updates statistics when a trace completes.
538    fn update_completion_stats(&self, trace: &CancellationTrace) {
539        if let Some(total_time) = trace.total_propagation_time {
540            self.stats
541                .avg_propagation_time_us
542                .store(total_time.as_micros() as u64, Ordering::Relaxed);
543        }
544
545        self.stats
546            .avg_trace_depth
547            .store(u64::from(trace.max_depth), Ordering::Relaxed);
548    }
549
550    /// Hash function for sampling decisions.
551    fn hash_entity(&self, entity: &str) -> u64 {
552        use std::collections::hash_map::DefaultHasher;
553        use std::hash::{Hash, Hasher};
554
555        let mut hasher = DefaultHasher::new();
556        entity.hash(&mut hasher);
557        hasher.finish()
558    }
559
560    /// Convert hash to unit interval for sampling.
561    fn sample_unit_interval(&self, hash: u64) -> f64 {
562        const TWO_POW_53_F64: f64 = 9_007_199_254_740_992.0;
563        let bits = hash >> 11;
564        bits as f64 / TWO_POW_53_F64
565    }
566}
567
568/// Analysis result for cancellation patterns.
569#[derive(Debug, Clone, Serialize, Deserialize)]
570pub struct CancellationAnalysis {
571    /// Time period analyzed.
572    pub analysis_period: Duration,
573    /// Total number of traces analyzed.
574    pub traces_analyzed: usize,
575    /// Total propagation steps.
576    pub total_steps: usize,
577    /// Average propagation depth.
578    pub avg_depth: f64,
579    /// Average propagation time.
580    pub avg_propagation_time: Duration,
581    /// Most common cancellation kinds.
582    pub common_cancel_kinds: Vec<(String, usize)>,
583    /// Entities with highest cancellation frequency.
584    pub high_cancellation_entities: Vec<(String, usize)>,
585    /// Summary of detected anomalies.
586    pub anomaly_summary: BTreeMap<String, usize>,
587    /// Performance bottlenecks identified.
588    pub bottlenecks: Vec<String>,
589    /// Recommendations for optimization.
590    pub recommendations: Vec<String>,
591}
592
593/// Analyzes cancellation traces to extract insights.
594#[must_use]
595pub fn analyze_cancellation_patterns(traces: &[CancellationTrace]) -> CancellationAnalysis {
596    if traces.is_empty() {
597        return CancellationAnalysis {
598            analysis_period: Duration::ZERO,
599            traces_analyzed: 0,
600            total_steps: 0,
601            avg_depth: 0.0,
602            avg_propagation_time: Duration::ZERO,
603            common_cancel_kinds: Vec::new(),
604            high_cancellation_entities: Vec::new(),
605            anomaly_summary: BTreeMap::new(),
606            bottlenecks: Vec::new(),
607            recommendations: Vec::new(),
608        };
609    }
610
611    let total_steps: usize = traces.iter().map(|t| t.steps.len()).sum();
612    let avg_depth: f64 =
613        traces.iter().map(|t| f64::from(t.max_depth)).sum::<f64>() / traces.len() as f64;
614
615    let avg_propagation_time = traces
616        .iter()
617        .filter_map(|t| t.total_propagation_time)
618        .map(|d| d.as_nanos() as f64)
619        .sum::<f64>()
620        / traces.len() as f64;
621
622    // Analyze common cancellation kinds
623    let mut cancel_kind_counts: HashMap<String, usize> = HashMap::new();
624    for trace in traces {
625        *cancel_kind_counts
626            .entry(trace.root_cancel_kind.clone())
627            .or_default() += 1;
628    }
629    let mut common_cancel_kinds: Vec<_> = cancel_kind_counts.into_iter().collect();
630    common_cancel_kinds.sort_by(|a, b| b.1.cmp(&a.1));
631
632    // Analyze high-cancellation entities
633    let mut entity_counts: HashMap<String, usize> = HashMap::new();
634    for trace in traces {
635        for step in &trace.steps {
636            *entity_counts.entry(step.entity_id.clone()).or_default() += 1;
637        }
638    }
639    let mut high_cancellation_entities: Vec<_> = entity_counts.into_iter().collect();
640    high_cancellation_entities.sort_by(|a, b| b.1.cmp(&a.1));
641    high_cancellation_entities.truncate(10); // Top 10
642
643    // Analyze anomalies
644    let mut anomaly_summary: BTreeMap<String, usize> = BTreeMap::new();
645    for trace in traces {
646        for anomaly in &trace.anomalies {
647            let anomaly_type = match anomaly {
648                PropagationAnomaly::SlowPropagation { .. } => "SlowPropagation",
649                PropagationAnomaly::StuckCancellation { .. } => "StuckCancellation",
650                PropagationAnomaly::IncorrectPropagationOrder { .. } => "IncorrectOrder",
651                PropagationAnomaly::UnexpectedPropagation { .. } => "UnexpectedPropagation",
652                PropagationAnomaly::ExcessiveDepth { .. } => "ExcessiveDepth",
653            };
654            *anomaly_summary.entry(anomaly_type.to_string()).or_default() += 1;
655        }
656    }
657
658    // Detect performance bottlenecks
659    let mut bottlenecks = Vec::new();
660
661    // Bottleneck 1: Entities with disproportionately high cancellation frequency
662    let total_entity_cancellations: usize = high_cancellation_entities
663        .iter()
664        .map(|(_, count)| *count)
665        .sum();
666    if total_entity_cancellations > 0 {
667        for (entity_id, count) in &high_cancellation_entities {
668            let frequency_ratio = *count as f64 / total_entity_cancellations as f64;
669            if frequency_ratio > 0.3 {
670                // Entity accounts for >30% of all cancellations
671                bottlenecks.push(format!(
672                    "High-frequency cancellation source: {} ({:.1}% of all cancellations)",
673                    entity_id,
674                    frequency_ratio * 100.0
675                ));
676            }
677        }
678    }
679
680    // Bottleneck 2: Entities frequently involved in slow propagations
681    let mut slow_propagation_entities: HashMap<String, usize> = HashMap::new();
682    for trace in traces {
683        for anomaly in &trace.anomalies {
684            if let PropagationAnomaly::SlowPropagation { entity_id, .. } = anomaly {
685                *slow_propagation_entities
686                    .entry(entity_id.clone())
687                    .or_default() += 1;
688            }
689        }
690    }
691    for (entity_id, slow_count) in slow_propagation_entities {
692        if slow_count > traces.len() / 20 {
693            // Appears in >5% of traces with slow propagation
694            bottlenecks.push(format!(
695                "Slow propagation bottleneck: {entity_id} (involved in {slow_count} slow propagations)"
696            ));
697        }
698    }
699
700    // Bottleneck 3: Entities causing stuck cancellations
701    let mut stuck_entities: HashMap<String, usize> = HashMap::new();
702    for trace in traces {
703        for anomaly in &trace.anomalies {
704            if let PropagationAnomaly::StuckCancellation { entity_id, .. } = anomaly {
705                *stuck_entities.entry(entity_id.clone()).or_default() += 1;
706            }
707        }
708    }
709    for (entity_id, stuck_count) in stuck_entities {
710        if stuck_count > 0 {
711            // Any stuck cancellation is a bottleneck
712            bottlenecks.push(format!(
713                "Stuck cancellation bottleneck: {entity_id} ({stuck_count} instances)"
714            ));
715        }
716    }
717
718    // Bottleneck 4: Deep cancellation tree origins
719    let mut depth_bottlenecks: HashMap<String, f64> = HashMap::new();
720    for trace in traces {
721        if trace.steps.len() as f64 > avg_depth * 1.5 {
722            // Traces significantly deeper than average
723            if let Some(first_step) = trace.steps.first() {
724                let current_avg = depth_bottlenecks
725                    .entry(first_step.entity_id.clone())
726                    .or_insert(0.0);
727                *current_avg = f64::midpoint(*current_avg, trace.steps.len() as f64); // Running average
728            }
729        }
730    }
731    for (entity_id, avg_depth_caused) in depth_bottlenecks {
732        if avg_depth_caused > avg_depth * 1.5 {
733            bottlenecks.push(format!(
734                "Deep cancellation tree origin: {entity_id} (avg depth: {avg_depth_caused:.1})"
735            ));
736        }
737    }
738
739    // Generate recommendations
740    let mut recommendations = Vec::new();
741    if avg_propagation_time > 10_000_000.0 {
742        // 10ms
743        recommendations.push(
744            "Consider optimizing cancellation propagation - average time is high".to_string(),
745        );
746    }
747    if avg_depth > 10.0 {
748        recommendations.push(
749            "Deep cancellation trees detected - consider flatter structured concurrency"
750                .to_string(),
751        );
752    }
753    if anomaly_summary.get("SlowPropagation").copied().unwrap_or(0) > traces.len() / 10 {
754        recommendations.push(
755            "Frequent slow propagations - investigate blocking operations in cancellation handlers"
756                .to_string(),
757        );
758    }
759    if !bottlenecks.is_empty() {
760        recommendations.push(format!(
761            "Address {} identified performance bottlenecks to improve cancellation efficiency",
762            bottlenecks.len()
763        ));
764    }
765
766    CancellationAnalysis {
767        analysis_period: Duration::from_nanos(avg_propagation_time as u64),
768        traces_analyzed: traces.len(),
769        total_steps,
770        avg_depth,
771        avg_propagation_time: Duration::from_nanos(avg_propagation_time as u64),
772        common_cancel_kinds,
773        high_cancellation_entities,
774        anomaly_summary,
775        bottlenecks,
776        recommendations,
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783
784    #[test]
785    fn test_tracer_creation() {
786        let config = CancellationTracerConfig::default();
787        let tracer = CancellationTracer::new(config);
788        let stats = tracer.stats();
789        assert_eq!(stats.traces_collected, 0);
790        assert_eq!(stats.steps_recorded, 0);
791    }
792
793    #[test]
794    fn test_trace_lifecycle() {
795        let config = CancellationTracerConfig::default();
796        let tracer = CancellationTracer::new(config);
797
798        // Start a trace
799        let trace_id = tracer.start_trace(
800            "task-1".to_string(),
801            EntityType::Task,
802            &CancelReason::user("test"),
803            CancelKind::User,
804        );
805
806        // Record a step
807        tracer.record_step(
808            trace_id,
809            "region-1".to_string(),
810            EntityType::Region,
811            &CancelReason::user("propagation"),
812            CancelKind::User,
813            "Closing".to_string(),
814            Some("task-1".to_string()),
815            true,
816        );
817
818        // Complete the trace
819        tracer.complete_trace(trace_id);
820
821        let stats = tracer.stats();
822        assert_eq!(stats.traces_collected, 1);
823        assert_eq!(stats.steps_recorded, 1);
824
825        let completed = tracer.completed_traces();
826        assert_eq!(completed.len(), 1);
827        assert!(completed[0].is_complete);
828    }
829
830    #[test]
831    fn test_anomaly_detection() {
832        let mut config = CancellationTracerConfig::default();
833        config.slow_propagation_threshold_ms = 1; // Very low threshold for testing
834        let tracer = CancellationTracer::new(config);
835
836        let trace_id = tracer.start_trace(
837            "task-1".to_string(),
838            EntityType::Task,
839            &CancelReason::user("test"),
840            CancelKind::User,
841        );
842
843        // Simulate slow propagation by adding delay
844        std::thread::sleep(Duration::from_millis(5));
845
846        tracer.record_step(
847            trace_id,
848            "region-1".to_string(),
849            EntityType::Region,
850            &CancelReason::user("slow"),
851            CancelKind::User,
852            "Closing".to_string(),
853            Some("task-1".to_string()),
854            true,
855        );
856
857        tracer.complete_trace(trace_id);
858
859        let completed = tracer.completed_traces();
860        assert!(!completed.is_empty());
861
862        // Should detect slow propagation anomaly
863        assert!(!completed[0].anomalies.is_empty());
864        assert!(matches!(
865            completed[0].anomalies[0],
866            PropagationAnomaly::SlowPropagation { .. }
867        ));
868    }
869
870    #[test]
871    fn test_analysis_patterns() {
872        let traces = vec![
873            CancellationTrace {
874                trace_id: TraceId::new(),
875                root_cancel_reason: "test1".to_string(),
876                root_cancel_kind: "User".to_string(),
877                root_entity: "task-1".to_string(),
878                root_entity_type: EntityType::Task,
879                start_time: SystemTime::now(),
880                steps: vec![],
881                is_complete: true,
882                total_propagation_time: Some(Duration::from_millis(10)),
883                max_depth: 3,
884                entities_cancelled: 5,
885                anomalies: vec![],
886            },
887            CancellationTrace {
888                trace_id: TraceId::new(),
889                root_cancel_reason: "test2".to_string(),
890                root_cancel_kind: "Timeout".to_string(),
891                root_entity: "task-2".to_string(),
892                root_entity_type: EntityType::Task,
893                start_time: SystemTime::now(),
894                steps: vec![],
895                is_complete: true,
896                total_propagation_time: Some(Duration::from_millis(5)),
897                max_depth: 2,
898                entities_cancelled: 3,
899                anomalies: vec![],
900            },
901        ];
902
903        let analysis = analyze_cancellation_patterns(&traces);
904        assert_eq!(analysis.traces_analyzed, 2);
905        assert_eq!(analysis.avg_depth, 2.5);
906        assert!(!analysis.common_cancel_kinds.is_empty());
907    }
908
909    #[test]
910    fn test_bottleneck_detection() {
911        // Create traces with various bottleneck patterns
912        let traces = vec![
913            CancellationTrace {
914                trace_id: TraceId::new(),
915                root_cancel_reason: "test".to_string(),
916                root_cancel_kind: "User".to_string(),
917                root_entity: "bottleneck-entity".to_string(),
918                root_entity_type: EntityType::Task,
919                start_time: SystemTime::now(),
920                steps: vec![CancellationTraceStep {
921                    step_id: 0,
922                    entity_id: "bottleneck-entity".to_string(),
923                    entity_type: EntityType::Task,
924                    cancel_reason: "high frequency".to_string(),
925                    cancel_kind: "User".to_string(),
926                    parent_entity: None,
927                    timestamp: SystemTime::now(),
928                    elapsed_since_start: Duration::from_millis(1),
929                    elapsed_since_prev: Duration::from_millis(1),
930                    depth: 0,
931                    entity_state: "Cancelled".to_string(),
932                    propagation_completed: true,
933                }],
934                is_complete: true,
935                total_propagation_time: Some(Duration::from_millis(1)),
936                max_depth: 1,
937                entities_cancelled: 1,
938                anomalies: vec![
939                    PropagationAnomaly::SlowPropagation {
940                        step_id: 0,
941                        entity_id: "slow-entity".to_string(),
942                        elapsed: Duration::from_millis(100),
943                        threshold: Duration::from_millis(1),
944                    },
945                    PropagationAnomaly::StuckCancellation {
946                        entity_id: "stuck-entity".to_string(),
947                        stuck_duration: Duration::from_millis(500),
948                    },
949                ],
950            },
951            // Create multiple traces to make "bottleneck-entity" high frequency
952            CancellationTrace {
953                trace_id: TraceId::new(),
954                root_cancel_reason: "test".to_string(),
955                root_cancel_kind: "User".to_string(),
956                root_entity: "bottleneck-entity".to_string(),
957                root_entity_type: EntityType::Task,
958                start_time: SystemTime::now(),
959                steps: vec![CancellationTraceStep {
960                    step_id: 0,
961                    entity_id: "bottleneck-entity".to_string(),
962                    entity_type: EntityType::Task,
963                    cancel_reason: "high frequency".to_string(),
964                    cancel_kind: "User".to_string(),
965                    parent_entity: None,
966                    timestamp: SystemTime::now(),
967                    elapsed_since_start: Duration::from_millis(1),
968                    elapsed_since_prev: Duration::from_millis(1),
969                    depth: 0,
970                    entity_state: "Cancelled".to_string(),
971                    propagation_completed: true,
972                }],
973                is_complete: true,
974                total_propagation_time: Some(Duration::from_millis(1)),
975                max_depth: 1,
976                entities_cancelled: 1,
977                anomalies: vec![],
978            },
979            CancellationTrace {
980                trace_id: TraceId::new(),
981                root_cancel_reason: "test".to_string(),
982                root_cancel_kind: "User".to_string(),
983                root_entity: "other-entity".to_string(),
984                root_entity_type: EntityType::Task,
985                start_time: SystemTime::now(),
986                steps: vec![CancellationTraceStep {
987                    step_id: 0,
988                    entity_id: "other-entity".to_string(),
989                    entity_type: EntityType::Task,
990                    cancel_reason: "normal".to_string(),
991                    cancel_kind: "User".to_string(),
992                    parent_entity: None,
993                    timestamp: SystemTime::now(),
994                    elapsed_since_start: Duration::from_millis(1),
995                    elapsed_since_prev: Duration::from_millis(1),
996                    depth: 0,
997                    entity_state: "Cancelled".to_string(),
998                    propagation_completed: true,
999                }],
1000                is_complete: true,
1001                total_propagation_time: Some(Duration::from_millis(1)),
1002                max_depth: 1,
1003                entities_cancelled: 1,
1004                anomalies: vec![],
1005            },
1006        ];
1007
1008        let analysis = analyze_cancellation_patterns(&traces);
1009
1010        // Should detect bottlenecks
1011        assert!(
1012            !analysis.bottlenecks.is_empty(),
1013            "Should detect bottlenecks"
1014        );
1015
1016        // Check for high-frequency bottleneck (bottleneck-entity appears in 2/3 traces)
1017        let has_high_freq_bottleneck = analysis.bottlenecks.iter().any(|b| {
1018            b.contains("High-frequency cancellation source") && b.contains("bottleneck-entity")
1019        });
1020        assert!(
1021            has_high_freq_bottleneck,
1022            "Should detect high-frequency cancellation source"
1023        );
1024
1025        // Check for slow propagation bottleneck
1026        let has_slow_bottleneck = analysis
1027            .bottlenecks
1028            .iter()
1029            .any(|b| b.contains("Slow propagation bottleneck") && b.contains("slow-entity"));
1030        assert!(
1031            has_slow_bottleneck,
1032            "Should detect slow propagation bottleneck"
1033        );
1034
1035        // Check for stuck cancellation bottleneck
1036        let has_stuck_bottleneck = analysis
1037            .bottlenecks
1038            .iter()
1039            .any(|b| b.contains("Stuck cancellation bottleneck") && b.contains("stuck-entity"));
1040        assert!(
1041            has_stuck_bottleneck,
1042            "Should detect stuck cancellation bottleneck"
1043        );
1044
1045        // Should include recommendation about addressing bottlenecks
1046        let has_bottleneck_recommendation = analysis
1047            .recommendations
1048            .iter()
1049            .any(|r| r.contains("Address") && r.contains("bottlenecks"));
1050        assert!(
1051            has_bottleneck_recommendation,
1052            "Should recommend addressing bottlenecks"
1053        );
1054    }
1055}