1use crate::types::{CancelKind, CancelReason};
12use serde::{Deserialize, Serialize};
13use std::collections::{BTreeMap, HashMap, VecDeque};
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Mutex};
16use std::time::{Duration, SystemTime};
17
18#[derive(Debug, Clone)]
20pub struct CancellationTracerConfig {
21 pub enable_tracing: bool,
23 pub max_trace_depth: usize,
25 pub max_traces: usize,
27 pub slow_propagation_threshold_ms: u64,
29 pub stuck_cancellation_timeout_ms: u64,
31 pub enable_timing_analysis: bool,
33 pub sample_rate: f64,
35}
36
37impl Default for CancellationTracerConfig {
38 fn default() -> Self {
39 Self {
40 enable_tracing: true,
41 max_trace_depth: 64,
42 max_traces: 10_000,
43 slow_propagation_threshold_ms: 100,
44 stuck_cancellation_timeout_ms: 5_000,
45 enable_timing_analysis: cfg!(debug_assertions),
46 sample_rate: 1.0,
47 }
48 }
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
53pub struct TraceId(u64);
54
55impl TraceId {
56 pub fn new() -> Self {
58 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
59 Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
60 }
61
62 #[must_use]
64 pub fn as_u64(&self) -> u64 {
65 self.0
66 }
67}
68
69impl Default for TraceId {
70 fn default() -> Self {
71 Self::new()
72 }
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct CancellationTraceStep {
78 pub step_id: u32,
80 pub entity_id: String,
82 pub entity_type: EntityType,
84 pub cancel_reason: String,
86 pub cancel_kind: String,
88 pub timestamp: SystemTime,
90 pub elapsed_since_start: Duration,
92 pub elapsed_since_prev: Duration,
94 pub depth: u32,
96 pub parent_entity: Option<String>,
98 pub entity_state: String,
100 pub propagation_completed: bool,
102}
103
104#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
106pub enum EntityType {
107 Task,
109 Region,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct CancellationTrace {
116 pub trace_id: TraceId,
118 pub root_cancel_reason: String,
120 pub root_cancel_kind: String,
122 pub root_entity: String,
124 pub root_entity_type: EntityType,
126 pub start_time: SystemTime,
128 pub steps: Vec<CancellationTraceStep>,
130 pub is_complete: bool,
132 pub total_propagation_time: Option<Duration>,
134 pub max_depth: u32,
136 pub entities_cancelled: u32,
138 pub anomalies: Vec<PropagationAnomaly>,
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub enum PropagationAnomaly {
145 SlowPropagation {
147 step_id: u32,
149 entity_id: String,
151 elapsed: Duration,
153 threshold: Duration,
155 },
156 StuckCancellation {
158 entity_id: String,
160 stuck_duration: Duration,
162 },
163 IncorrectPropagationOrder {
165 parent_entity: String,
167 child_entity: String,
169 parent_step: u32,
171 child_step: u32,
173 },
174 UnexpectedPropagation {
176 description: String,
178 affected_entities: Vec<String>,
180 },
181 ExcessiveDepth {
183 depth: u32,
185 entity_id: String,
187 },
188}
189
190#[derive(Debug, Default)]
192pub struct CancellationTracerStats {
193 pub traces_collected: AtomicU64,
195 pub steps_recorded: AtomicU64,
197 pub anomalies_detected: AtomicU64,
199 pub slow_propagations: AtomicU64,
201 pub stuck_cancellations: AtomicU64,
203 pub incorrect_orders: AtomicU64,
205 pub avg_trace_depth: AtomicU64,
207 pub avg_propagation_time_us: AtomicU64,
209}
210
211impl CancellationTracerStats {
212 pub fn snapshot(&self) -> CancellationTracerStatsSnapshot {
214 CancellationTracerStatsSnapshot {
215 traces_collected: self.traces_collected.load(Ordering::Relaxed),
216 steps_recorded: self.steps_recorded.load(Ordering::Relaxed),
217 anomalies_detected: self.anomalies_detected.load(Ordering::Relaxed),
218 slow_propagations: self.slow_propagations.load(Ordering::Relaxed),
219 stuck_cancellations: self.stuck_cancellations.load(Ordering::Relaxed),
220 incorrect_orders: self.incorrect_orders.load(Ordering::Relaxed),
221 avg_trace_depth: self.avg_trace_depth.load(Ordering::Relaxed),
222 avg_propagation_time_us: self.avg_propagation_time_us.load(Ordering::Relaxed),
223 }
224 }
225}
226
227#[derive(Debug, Clone, Serialize, Deserialize)]
229pub struct CancellationTracerStatsSnapshot {
230 pub traces_collected: u64,
232 pub steps_recorded: u64,
234 pub anomalies_detected: u64,
236 pub slow_propagations: u64,
238 pub stuck_cancellations: u64,
240 pub incorrect_orders: u64,
242 pub avg_trace_depth: u64,
244 pub avg_propagation_time_us: u64,
246}
247
248#[derive(Debug)]
250struct InProgressTrace {
251 trace: CancellationTrace,
252 last_step_time: SystemTime,
253 entity_to_step: HashMap<String, u32>,
254 depth_by_entity: HashMap<String, u32>,
255}
256
257#[derive(Debug)]
259pub struct CancellationTracer {
260 config: CancellationTracerConfig,
261 stats: CancellationTracerStats,
262 in_progress: Arc<Mutex<HashMap<TraceId, InProgressTrace>>>,
264 completed_traces: Arc<Mutex<VecDeque<CancellationTrace>>>,
266 entity_traces: Arc<Mutex<HashMap<String, Vec<TraceId>>>>,
268}
269
270impl CancellationTracer {
271 #[must_use]
273 pub fn new(config: CancellationTracerConfig) -> Self {
274 Self {
275 config,
276 stats: CancellationTracerStats::default(),
277 in_progress: Arc::new(Mutex::new(HashMap::new())),
278 completed_traces: Arc::new(Mutex::new(VecDeque::new())),
279 entity_traces: Arc::new(Mutex::new(HashMap::new())),
280 }
281 }
282
283 pub fn start_trace(
285 &self,
286 root_entity: String,
287 entity_type: EntityType,
288 cancel_reason: &CancelReason,
289 cancel_kind: CancelKind,
290 ) -> TraceId {
291 if !self.config.enable_tracing {
292 return TraceId::new(); }
294
295 if self.config.sample_rate < 1.0 {
297 let hash = self.hash_entity(&root_entity);
298 if self.sample_unit_interval(hash) > self.config.sample_rate {
299 return TraceId::new(); }
301 }
302
303 let trace_id = TraceId::new();
304 let now = SystemTime::now();
305
306 let trace = CancellationTrace {
307 trace_id,
308 root_cancel_reason: format!("{cancel_reason:?}"),
309 root_cancel_kind: format!("{cancel_kind:?}"),
310 root_entity: root_entity.clone(),
311 root_entity_type: entity_type,
312 start_time: now,
313 steps: Vec::new(),
314 is_complete: false,
315 total_propagation_time: None,
316 max_depth: 0,
317 entities_cancelled: 0,
318 anomalies: Vec::new(),
319 };
320
321 let in_progress_trace = InProgressTrace {
322 trace,
323 last_step_time: now,
324 entity_to_step: HashMap::new(),
325 depth_by_entity: HashMap::new(),
326 };
327
328 if let Ok(mut in_progress) = self.in_progress.lock() {
330 in_progress.insert(trace_id, in_progress_trace);
331 }
332
333 if let Ok(mut entity_traces) = self.entity_traces.lock() {
335 entity_traces.entry(root_entity).or_default().push(trace_id);
336 }
337
338 self.stats.traces_collected.fetch_add(1, Ordering::Relaxed);
339 trace_id
340 }
341
342 pub fn record_step(
344 &self,
345 trace_id: TraceId,
346 entity_id: String,
347 entity_type: EntityType,
348 cancel_reason: &CancelReason,
349 cancel_kind: CancelKind,
350 entity_state: String,
351 parent_entity: Option<String>,
352 propagation_completed: bool,
353 ) {
354 if !self.config.enable_tracing {
355 return;
356 }
357
358 let now = SystemTime::now();
359
360 if let Ok(mut in_progress) = self.in_progress.lock() {
361 if let Some(in_progress_trace) = in_progress.get_mut(&trace_id) {
362 let elapsed_since_start = now
363 .duration_since(in_progress_trace.trace.start_time)
364 .unwrap_or(Duration::ZERO);
365 let elapsed_since_prev = now
366 .duration_since(in_progress_trace.last_step_time)
367 .unwrap_or(Duration::ZERO);
368
369 let depth = if let Some(parent) = &parent_entity {
371 in_progress_trace
372 .depth_by_entity
373 .get(parent)
374 .copied()
375 .unwrap_or(0)
376 + 1
377 } else {
378 0
379 };
380
381 let step_id = in_progress_trace.trace.steps.len() as u32;
382 let step = CancellationTraceStep {
383 step_id,
384 entity_id: entity_id.clone(),
385 entity_type,
386 cancel_reason: format!("{cancel_reason:?}"),
387 cancel_kind: format!("{cancel_kind:?}"),
388 timestamp: now,
389 elapsed_since_start,
390 elapsed_since_prev,
391 depth,
392 parent_entity,
393 entity_state,
394 propagation_completed,
395 };
396
397 self.check_for_anomalies(&step, in_progress_trace);
399
400 in_progress_trace.trace.steps.push(step);
402 in_progress_trace.last_step_time = now;
403 in_progress_trace
404 .entity_to_step
405 .insert(entity_id.clone(), step_id);
406 in_progress_trace.depth_by_entity.insert(entity_id, depth);
407 in_progress_trace.trace.max_depth = in_progress_trace.trace.max_depth.max(depth);
408 in_progress_trace.trace.entities_cancelled += 1;
409
410 self.stats.steps_recorded.fetch_add(1, Ordering::Relaxed);
411 }
412 }
413 }
414
415 pub fn complete_trace(&self, trace_id: TraceId) {
417 if !self.config.enable_tracing {
418 return;
419 }
420
421 if let Ok(mut in_progress) = self.in_progress.lock() {
422 if let Some(mut in_progress_trace) = in_progress.remove(&trace_id) {
423 let completion_time = SystemTime::now();
424 let total_time = completion_time
425 .duration_since(in_progress_trace.trace.start_time)
426 .unwrap_or(Duration::ZERO);
427
428 in_progress_trace.trace.is_complete = true;
429 in_progress_trace.trace.total_propagation_time = Some(total_time);
430
431 self.update_completion_stats(&in_progress_trace.trace);
433
434 if let Ok(mut completed) = self.completed_traces.lock() {
436 completed.push_back(in_progress_trace.trace);
437
438 while completed.len() > self.config.max_traces {
440 completed.pop_front();
441 }
442 }
443
444 if let Ok(mut entity_traces) = self.entity_traces.lock() {
446 for traces in entity_traces.values_mut() {
447 traces.retain(|&id| id != trace_id);
448 }
449 }
450 }
451 }
452 }
453
454 pub fn stats(&self) -> CancellationTracerStatsSnapshot {
456 self.stats.snapshot()
457 }
458
459 pub fn completed_traces(&self) -> Vec<CancellationTrace> {
461 self.completed_traces
462 .lock()
463 .unwrap_or_else(std::sync::PoisonError::into_inner)
464 .iter()
465 .cloned()
466 .collect()
467 }
468
469 pub fn in_progress_traces(&self) -> Vec<TraceId> {
471 self.in_progress
472 .lock()
473 .unwrap_or_else(std::sync::PoisonError::into_inner)
474 .keys()
475 .copied()
476 .collect()
477 }
478
479 pub fn traces_for_entity(&self, entity_id: &str) -> Vec<TraceId> {
481 self.entity_traces
482 .lock()
483 .unwrap_or_else(std::sync::PoisonError::into_inner)
484 .get(entity_id)
485 .cloned()
486 .unwrap_or_default()
487 }
488
489 fn check_for_anomalies(&self, step: &CancellationTraceStep, trace: &mut InProgressTrace) {
491 if step.elapsed_since_prev.as_millis()
493 > u128::from(self.config.slow_propagation_threshold_ms)
494 {
495 let anomaly = PropagationAnomaly::SlowPropagation {
496 step_id: step.step_id,
497 entity_id: step.entity_id.clone(),
498 elapsed: step.elapsed_since_prev,
499 threshold: Duration::from_millis(self.config.slow_propagation_threshold_ms),
500 };
501 trace.trace.anomalies.push(anomaly);
502 self.stats.slow_propagations.fetch_add(1, Ordering::Relaxed);
503 }
504
505 if step.depth > self.config.max_trace_depth as u32 {
507 let anomaly = PropagationAnomaly::ExcessiveDepth {
508 depth: step.depth,
509 entity_id: step.entity_id.clone(),
510 };
511 trace.trace.anomalies.push(anomaly);
512 }
513
514 if let Some(parent) = &step.parent_entity {
516 if let Some(&parent_step_id) = trace.entity_to_step.get(parent) {
517 if step.step_id < parent_step_id {
518 let anomaly = PropagationAnomaly::IncorrectPropagationOrder {
519 parent_entity: parent.clone(),
520 child_entity: step.entity_id.clone(),
521 parent_step: parent_step_id,
522 child_step: step.step_id,
523 };
524 trace.trace.anomalies.push(anomaly);
525 self.stats.incorrect_orders.fetch_add(1, Ordering::Relaxed);
526 }
527 }
528 }
529
530 if !trace.trace.anomalies.is_empty() {
531 self.stats
532 .anomalies_detected
533 .fetch_add(1, Ordering::Relaxed);
534 }
535 }
536
537 fn update_completion_stats(&self, trace: &CancellationTrace) {
539 if let Some(total_time) = trace.total_propagation_time {
540 self.stats
541 .avg_propagation_time_us
542 .store(total_time.as_micros() as u64, Ordering::Relaxed);
543 }
544
545 self.stats
546 .avg_trace_depth
547 .store(u64::from(trace.max_depth), Ordering::Relaxed);
548 }
549
550 fn hash_entity(&self, entity: &str) -> u64 {
552 use std::collections::hash_map::DefaultHasher;
553 use std::hash::{Hash, Hasher};
554
555 let mut hasher = DefaultHasher::new();
556 entity.hash(&mut hasher);
557 hasher.finish()
558 }
559
560 fn sample_unit_interval(&self, hash: u64) -> f64 {
562 const TWO_POW_53_F64: f64 = 9_007_199_254_740_992.0;
563 let bits = hash >> 11;
564 bits as f64 / TWO_POW_53_F64
565 }
566}
567
568#[derive(Debug, Clone, Serialize, Deserialize)]
570pub struct CancellationAnalysis {
571 pub analysis_period: Duration,
573 pub traces_analyzed: usize,
575 pub total_steps: usize,
577 pub avg_depth: f64,
579 pub avg_propagation_time: Duration,
581 pub common_cancel_kinds: Vec<(String, usize)>,
583 pub high_cancellation_entities: Vec<(String, usize)>,
585 pub anomaly_summary: BTreeMap<String, usize>,
587 pub bottlenecks: Vec<String>,
589 pub recommendations: Vec<String>,
591}
592
593#[must_use]
595pub fn analyze_cancellation_patterns(traces: &[CancellationTrace]) -> CancellationAnalysis {
596 if traces.is_empty() {
597 return CancellationAnalysis {
598 analysis_period: Duration::ZERO,
599 traces_analyzed: 0,
600 total_steps: 0,
601 avg_depth: 0.0,
602 avg_propagation_time: Duration::ZERO,
603 common_cancel_kinds: Vec::new(),
604 high_cancellation_entities: Vec::new(),
605 anomaly_summary: BTreeMap::new(),
606 bottlenecks: Vec::new(),
607 recommendations: Vec::new(),
608 };
609 }
610
611 let total_steps: usize = traces.iter().map(|t| t.steps.len()).sum();
612 let avg_depth: f64 =
613 traces.iter().map(|t| f64::from(t.max_depth)).sum::<f64>() / traces.len() as f64;
614
615 let avg_propagation_time = traces
616 .iter()
617 .filter_map(|t| t.total_propagation_time)
618 .map(|d| d.as_nanos() as f64)
619 .sum::<f64>()
620 / traces.len() as f64;
621
622 let mut cancel_kind_counts: HashMap<String, usize> = HashMap::new();
624 for trace in traces {
625 *cancel_kind_counts
626 .entry(trace.root_cancel_kind.clone())
627 .or_default() += 1;
628 }
629 let mut common_cancel_kinds: Vec<_> = cancel_kind_counts.into_iter().collect();
630 common_cancel_kinds.sort_by(|a, b| b.1.cmp(&a.1));
631
632 let mut entity_counts: HashMap<String, usize> = HashMap::new();
634 for trace in traces {
635 for step in &trace.steps {
636 *entity_counts.entry(step.entity_id.clone()).or_default() += 1;
637 }
638 }
639 let mut high_cancellation_entities: Vec<_> = entity_counts.into_iter().collect();
640 high_cancellation_entities.sort_by(|a, b| b.1.cmp(&a.1));
641 high_cancellation_entities.truncate(10); let mut anomaly_summary: BTreeMap<String, usize> = BTreeMap::new();
645 for trace in traces {
646 for anomaly in &trace.anomalies {
647 let anomaly_type = match anomaly {
648 PropagationAnomaly::SlowPropagation { .. } => "SlowPropagation",
649 PropagationAnomaly::StuckCancellation { .. } => "StuckCancellation",
650 PropagationAnomaly::IncorrectPropagationOrder { .. } => "IncorrectOrder",
651 PropagationAnomaly::UnexpectedPropagation { .. } => "UnexpectedPropagation",
652 PropagationAnomaly::ExcessiveDepth { .. } => "ExcessiveDepth",
653 };
654 *anomaly_summary.entry(anomaly_type.to_string()).or_default() += 1;
655 }
656 }
657
658 let mut bottlenecks = Vec::new();
660
661 let total_entity_cancellations: usize = high_cancellation_entities
663 .iter()
664 .map(|(_, count)| *count)
665 .sum();
666 if total_entity_cancellations > 0 {
667 for (entity_id, count) in &high_cancellation_entities {
668 let frequency_ratio = *count as f64 / total_entity_cancellations as f64;
669 if frequency_ratio > 0.3 {
670 bottlenecks.push(format!(
672 "High-frequency cancellation source: {} ({:.1}% of all cancellations)",
673 entity_id,
674 frequency_ratio * 100.0
675 ));
676 }
677 }
678 }
679
680 let mut slow_propagation_entities: HashMap<String, usize> = HashMap::new();
682 for trace in traces {
683 for anomaly in &trace.anomalies {
684 if let PropagationAnomaly::SlowPropagation { entity_id, .. } = anomaly {
685 *slow_propagation_entities
686 .entry(entity_id.clone())
687 .or_default() += 1;
688 }
689 }
690 }
691 for (entity_id, slow_count) in slow_propagation_entities {
692 if slow_count > traces.len() / 20 {
693 bottlenecks.push(format!(
695 "Slow propagation bottleneck: {entity_id} (involved in {slow_count} slow propagations)"
696 ));
697 }
698 }
699
700 let mut stuck_entities: HashMap<String, usize> = HashMap::new();
702 for trace in traces {
703 for anomaly in &trace.anomalies {
704 if let PropagationAnomaly::StuckCancellation { entity_id, .. } = anomaly {
705 *stuck_entities.entry(entity_id.clone()).or_default() += 1;
706 }
707 }
708 }
709 for (entity_id, stuck_count) in stuck_entities {
710 if stuck_count > 0 {
711 bottlenecks.push(format!(
713 "Stuck cancellation bottleneck: {entity_id} ({stuck_count} instances)"
714 ));
715 }
716 }
717
718 let mut depth_bottlenecks: HashMap<String, f64> = HashMap::new();
720 for trace in traces {
721 if trace.steps.len() as f64 > avg_depth * 1.5 {
722 if let Some(first_step) = trace.steps.first() {
724 let current_avg = depth_bottlenecks
725 .entry(first_step.entity_id.clone())
726 .or_insert(0.0);
727 *current_avg = f64::midpoint(*current_avg, trace.steps.len() as f64); }
729 }
730 }
731 for (entity_id, avg_depth_caused) in depth_bottlenecks {
732 if avg_depth_caused > avg_depth * 1.5 {
733 bottlenecks.push(format!(
734 "Deep cancellation tree origin: {entity_id} (avg depth: {avg_depth_caused:.1})"
735 ));
736 }
737 }
738
739 let mut recommendations = Vec::new();
741 if avg_propagation_time > 10_000_000.0 {
742 recommendations.push(
744 "Consider optimizing cancellation propagation - average time is high".to_string(),
745 );
746 }
747 if avg_depth > 10.0 {
748 recommendations.push(
749 "Deep cancellation trees detected - consider flatter structured concurrency"
750 .to_string(),
751 );
752 }
753 if anomaly_summary.get("SlowPropagation").copied().unwrap_or(0) > traces.len() / 10 {
754 recommendations.push(
755 "Frequent slow propagations - investigate blocking operations in cancellation handlers"
756 .to_string(),
757 );
758 }
759 if !bottlenecks.is_empty() {
760 recommendations.push(format!(
761 "Address {} identified performance bottlenecks to improve cancellation efficiency",
762 bottlenecks.len()
763 ));
764 }
765
766 CancellationAnalysis {
767 analysis_period: Duration::from_nanos(avg_propagation_time as u64),
768 traces_analyzed: traces.len(),
769 total_steps,
770 avg_depth,
771 avg_propagation_time: Duration::from_nanos(avg_propagation_time as u64),
772 common_cancel_kinds,
773 high_cancellation_entities,
774 anomaly_summary,
775 bottlenecks,
776 recommendations,
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783
784 #[test]
785 fn test_tracer_creation() {
786 let config = CancellationTracerConfig::default();
787 let tracer = CancellationTracer::new(config);
788 let stats = tracer.stats();
789 assert_eq!(stats.traces_collected, 0);
790 assert_eq!(stats.steps_recorded, 0);
791 }
792
793 #[test]
794 fn test_trace_lifecycle() {
795 let config = CancellationTracerConfig::default();
796 let tracer = CancellationTracer::new(config);
797
798 let trace_id = tracer.start_trace(
800 "task-1".to_string(),
801 EntityType::Task,
802 &CancelReason::user("test"),
803 CancelKind::User,
804 );
805
806 tracer.record_step(
808 trace_id,
809 "region-1".to_string(),
810 EntityType::Region,
811 &CancelReason::user("propagation"),
812 CancelKind::User,
813 "Closing".to_string(),
814 Some("task-1".to_string()),
815 true,
816 );
817
818 tracer.complete_trace(trace_id);
820
821 let stats = tracer.stats();
822 assert_eq!(stats.traces_collected, 1);
823 assert_eq!(stats.steps_recorded, 1);
824
825 let completed = tracer.completed_traces();
826 assert_eq!(completed.len(), 1);
827 assert!(completed[0].is_complete);
828 }
829
830 #[test]
831 fn test_anomaly_detection() {
832 let mut config = CancellationTracerConfig::default();
833 config.slow_propagation_threshold_ms = 1; let tracer = CancellationTracer::new(config);
835
836 let trace_id = tracer.start_trace(
837 "task-1".to_string(),
838 EntityType::Task,
839 &CancelReason::user("test"),
840 CancelKind::User,
841 );
842
843 std::thread::sleep(Duration::from_millis(5));
845
846 tracer.record_step(
847 trace_id,
848 "region-1".to_string(),
849 EntityType::Region,
850 &CancelReason::user("slow"),
851 CancelKind::User,
852 "Closing".to_string(),
853 Some("task-1".to_string()),
854 true,
855 );
856
857 tracer.complete_trace(trace_id);
858
859 let completed = tracer.completed_traces();
860 assert!(!completed.is_empty());
861
862 assert!(!completed[0].anomalies.is_empty());
864 assert!(matches!(
865 completed[0].anomalies[0],
866 PropagationAnomaly::SlowPropagation { .. }
867 ));
868 }
869
870 #[test]
871 fn test_analysis_patterns() {
872 let traces = vec![
873 CancellationTrace {
874 trace_id: TraceId::new(),
875 root_cancel_reason: "test1".to_string(),
876 root_cancel_kind: "User".to_string(),
877 root_entity: "task-1".to_string(),
878 root_entity_type: EntityType::Task,
879 start_time: SystemTime::now(),
880 steps: vec![],
881 is_complete: true,
882 total_propagation_time: Some(Duration::from_millis(10)),
883 max_depth: 3,
884 entities_cancelled: 5,
885 anomalies: vec![],
886 },
887 CancellationTrace {
888 trace_id: TraceId::new(),
889 root_cancel_reason: "test2".to_string(),
890 root_cancel_kind: "Timeout".to_string(),
891 root_entity: "task-2".to_string(),
892 root_entity_type: EntityType::Task,
893 start_time: SystemTime::now(),
894 steps: vec![],
895 is_complete: true,
896 total_propagation_time: Some(Duration::from_millis(5)),
897 max_depth: 2,
898 entities_cancelled: 3,
899 anomalies: vec![],
900 },
901 ];
902
903 let analysis = analyze_cancellation_patterns(&traces);
904 assert_eq!(analysis.traces_analyzed, 2);
905 assert_eq!(analysis.avg_depth, 2.5);
906 assert!(!analysis.common_cancel_kinds.is_empty());
907 }
908
909 #[test]
910 fn test_bottleneck_detection() {
911 let traces = vec![
913 CancellationTrace {
914 trace_id: TraceId::new(),
915 root_cancel_reason: "test".to_string(),
916 root_cancel_kind: "User".to_string(),
917 root_entity: "bottleneck-entity".to_string(),
918 root_entity_type: EntityType::Task,
919 start_time: SystemTime::now(),
920 steps: vec![CancellationTraceStep {
921 step_id: 0,
922 entity_id: "bottleneck-entity".to_string(),
923 entity_type: EntityType::Task,
924 cancel_reason: "high frequency".to_string(),
925 cancel_kind: "User".to_string(),
926 parent_entity: None,
927 timestamp: SystemTime::now(),
928 elapsed_since_start: Duration::from_millis(1),
929 elapsed_since_prev: Duration::from_millis(1),
930 depth: 0,
931 entity_state: "Cancelled".to_string(),
932 propagation_completed: true,
933 }],
934 is_complete: true,
935 total_propagation_time: Some(Duration::from_millis(1)),
936 max_depth: 1,
937 entities_cancelled: 1,
938 anomalies: vec![
939 PropagationAnomaly::SlowPropagation {
940 step_id: 0,
941 entity_id: "slow-entity".to_string(),
942 elapsed: Duration::from_millis(100),
943 threshold: Duration::from_millis(1),
944 },
945 PropagationAnomaly::StuckCancellation {
946 entity_id: "stuck-entity".to_string(),
947 stuck_duration: Duration::from_millis(500),
948 },
949 ],
950 },
951 CancellationTrace {
953 trace_id: TraceId::new(),
954 root_cancel_reason: "test".to_string(),
955 root_cancel_kind: "User".to_string(),
956 root_entity: "bottleneck-entity".to_string(),
957 root_entity_type: EntityType::Task,
958 start_time: SystemTime::now(),
959 steps: vec![CancellationTraceStep {
960 step_id: 0,
961 entity_id: "bottleneck-entity".to_string(),
962 entity_type: EntityType::Task,
963 cancel_reason: "high frequency".to_string(),
964 cancel_kind: "User".to_string(),
965 parent_entity: None,
966 timestamp: SystemTime::now(),
967 elapsed_since_start: Duration::from_millis(1),
968 elapsed_since_prev: Duration::from_millis(1),
969 depth: 0,
970 entity_state: "Cancelled".to_string(),
971 propagation_completed: true,
972 }],
973 is_complete: true,
974 total_propagation_time: Some(Duration::from_millis(1)),
975 max_depth: 1,
976 entities_cancelled: 1,
977 anomalies: vec![],
978 },
979 CancellationTrace {
980 trace_id: TraceId::new(),
981 root_cancel_reason: "test".to_string(),
982 root_cancel_kind: "User".to_string(),
983 root_entity: "other-entity".to_string(),
984 root_entity_type: EntityType::Task,
985 start_time: SystemTime::now(),
986 steps: vec![CancellationTraceStep {
987 step_id: 0,
988 entity_id: "other-entity".to_string(),
989 entity_type: EntityType::Task,
990 cancel_reason: "normal".to_string(),
991 cancel_kind: "User".to_string(),
992 parent_entity: None,
993 timestamp: SystemTime::now(),
994 elapsed_since_start: Duration::from_millis(1),
995 elapsed_since_prev: Duration::from_millis(1),
996 depth: 0,
997 entity_state: "Cancelled".to_string(),
998 propagation_completed: true,
999 }],
1000 is_complete: true,
1001 total_propagation_time: Some(Duration::from_millis(1)),
1002 max_depth: 1,
1003 entities_cancelled: 1,
1004 anomalies: vec![],
1005 },
1006 ];
1007
1008 let analysis = analyze_cancellation_patterns(&traces);
1009
1010 assert!(
1012 !analysis.bottlenecks.is_empty(),
1013 "Should detect bottlenecks"
1014 );
1015
1016 let has_high_freq_bottleneck = analysis.bottlenecks.iter().any(|b| {
1018 b.contains("High-frequency cancellation source") && b.contains("bottleneck-entity")
1019 });
1020 assert!(
1021 has_high_freq_bottleneck,
1022 "Should detect high-frequency cancellation source"
1023 );
1024
1025 let has_slow_bottleneck = analysis
1027 .bottlenecks
1028 .iter()
1029 .any(|b| b.contains("Slow propagation bottleneck") && b.contains("slow-entity"));
1030 assert!(
1031 has_slow_bottleneck,
1032 "Should detect slow propagation bottleneck"
1033 );
1034
1035 let has_stuck_bottleneck = analysis
1037 .bottlenecks
1038 .iter()
1039 .any(|b| b.contains("Stuck cancellation bottleneck") && b.contains("stuck-entity"));
1040 assert!(
1041 has_stuck_bottleneck,
1042 "Should detect stuck cancellation bottleneck"
1043 );
1044
1045 let has_bottleneck_recommendation = analysis
1047 .recommendations
1048 .iter()
1049 .any(|r| r.contains("Address") && r.contains("bottlenecks"));
1050 assert!(
1051 has_bottleneck_recommendation,
1052 "Should recommend addressing bottlenecks"
1053 );
1054 }
1055}