Skip to main content

asupersync/observability/
cancellation_visualizer.rs

1//! Cancellation Trace Visualizer
2//!
3//! Real-time visualization tools for cancellation propagation trees and analysis.
4//! Provides multiple output formats for different debugging scenarios.
5
6use crate::observability::cancellation_tracer::{
7    CancellationTrace, CancellationTraceStep, EntityType, PropagationAnomaly, TraceId,
8};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::Duration;
12
13/// Configuration for visualization output.
14#[derive(Debug, Clone)]
15pub struct VisualizerConfig {
16    /// Include timing information in visualizations.
17    pub show_timing: bool,
18    /// Maximum depth to visualize (prevents overwhelming output).
19    pub max_depth: u32,
20    /// Highlight anomalies in visual output.
21    pub highlight_anomalies: bool,
22    /// Include detailed step information.
23    pub show_step_details: bool,
24    /// Format for timing display.
25    pub timing_format: TimingFormat,
26}
27
28impl Default for VisualizerConfig {
29    fn default() -> Self {
30        Self {
31            show_timing: true,
32            max_depth: 20,
33            highlight_anomalies: true,
34            show_step_details: false,
35            timing_format: TimingFormat::Milliseconds,
36        }
37    }
38}
39
40/// Format for displaying timing information.
41#[derive(Debug, Clone, Copy)]
42pub enum TimingFormat {
43    /// Display timing in nanoseconds.
44    Nanoseconds,
45    /// Display timing in microseconds.
46    Microseconds,
47    /// Display timing in milliseconds.
48    Milliseconds,
49    /// Display timing in seconds.
50    Seconds,
51    /// Automatically choose the most appropriate unit.
52    Auto,
53}
54
55/// A tree node in the cancellation propagation visualization.
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct CancellationTreeNode {
58    /// Unique identifier for the entity (task, region, etc.).
59    pub entity_id: String,
60    /// Type of the entity represented by this node.
61    pub entity_type: EntityType,
62    /// Depth level in the cancellation tree.
63    pub depth: u32,
64    /// Total time for cancellation to complete for this entity.
65    pub timing: Option<Duration>,
66    /// Delay between parent cancellation and this entity's cancellation start.
67    pub propagation_delay: Option<Duration>,
68    /// List of detected anomalies or issues during cancellation.
69    pub anomalies: Vec<String>,
70    /// Child nodes in the cancellation tree.
71    pub children: Vec<Self>,
72    /// Whether cancellation has completed for this entity.
73    pub completed: bool,
74}
75
76/// Real-time cancellation statistics for monitoring dashboards.
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CancellationDashboard {
79    /// Current time of snapshot.
80    pub snapshot_time: std::time::SystemTime,
81    /// Active traces being tracked.
82    pub active_traces: usize,
83    /// Completed traces in the last period.
84    pub completed_traces_period: usize,
85    /// Average propagation latency.
86    pub avg_propagation_latency: Duration,
87    /// 95th percentile propagation latency.
88    pub p95_propagation_latency: Duration,
89    /// Current bottlenecks detected.
90    pub current_bottlenecks: Vec<BottleneckInfo>,
91    /// Anomalies detected in the last period.
92    pub recent_anomalies: Vec<AnomalyInfo>,
93    /// Entity throughput statistics.
94    pub entity_throughput: HashMap<String, ThroughputStats>,
95}
96
97/// Information about a detected bottleneck.
98#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct BottleneckInfo {
100    /// ID of the entity causing the bottleneck.
101    pub entity_id: String,
102    /// Type of the entity causing the bottleneck.
103    pub entity_type: EntityType,
104    /// Average delay caused by this bottleneck.
105    pub avg_delay: Duration,
106    /// Current queue depth at this bottleneck.
107    pub queue_depth: usize,
108    /// Impact score indicating severity (0.0 to 1.0).
109    pub impact_score: f64,
110}
111
112/// Information about a detected anomaly.
113#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct AnomalyInfo {
115    /// Trace ID associated with the anomaly.
116    pub trace_id: TraceId,
117    /// Type or category of the anomaly.
118    pub anomaly_type: String,
119    /// Severity level of the anomaly.
120    pub severity: AnomalySeverity,
121    /// Human-readable description of the anomaly.
122    pub description: String,
123    /// When the anomaly was detected.
124    pub detected_at: std::time::SystemTime,
125}
126
127/// Severity level of an anomaly.
128#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
129pub enum AnomalySeverity {
130    /// Low severity anomaly, informational.
131    Low,
132    /// Medium severity anomaly, monitor.
133    Medium,
134    /// High severity anomaly, investigate.
135    High,
136    /// Critical severity anomaly, immediate attention required.
137    Critical,
138}
139
140/// Throughput statistics for an entity.
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ThroughputStats {
143    /// Number of cancellations processed per second.
144    pub cancellations_per_second: f64,
145    /// Average time to process a cancellation.
146    pub avg_processing_time: Duration,
147    /// Current depth of the processing queue.
148    pub queue_depth: usize,
149    /// Success rate for cancellation processing (0.0 to 1.0).
150    pub success_rate: f64,
151}
152
153/// Cancellation trace visualizer.
154pub struct CancellationVisualizer {
155    config: VisualizerConfig,
156}
157
158impl CancellationVisualizer {
159    /// Creates a new visualizer with the given configuration.
160    #[must_use]
161    pub fn new(config: VisualizerConfig) -> Self {
162        Self { config }
163    }
164
165    /// Creates a visualizer with default configuration.
166    #[must_use]
167    pub fn default() -> Self {
168        Self::new(VisualizerConfig::default())
169    }
170
171    /// Generate a tree visualization of a cancellation trace.
172    #[must_use]
173    pub fn visualize_trace_tree(&self, trace: &CancellationTrace) -> String {
174        let tree = self.build_tree(trace);
175        self.format_tree(&tree, 0)
176    }
177
178    /// Generate a timeline visualization showing propagation order.
179    #[must_use]
180    pub fn visualize_timeline(&self, trace: &CancellationTrace) -> String {
181        let mut output = String::new();
182        output.push_str(&format!(
183            "=== Cancellation Timeline (Trace {}) ===\n",
184            trace.trace_id.as_u64()
185        ));
186        output.push_str(&format!(
187            "Root: {} ({})\n",
188            trace.root_entity, trace.root_cancel_reason
189        ));
190        output.push_str(&format!("Start: {:?}\n", trace.start_time));
191
192        if trace.steps.is_empty() {
193            output.push_str("No propagation steps recorded.\n");
194            return output;
195        }
196
197        output.push_str("\nPropagation Timeline:\n");
198
199        for (i, step) in trace.steps.iter().enumerate() {
200            let timing = if self.config.show_timing {
201                format!(" [+{}]", self.format_duration(step.elapsed_since_start))
202            } else {
203                String::new()
204            };
205
206            let parent_info = match &step.parent_entity {
207                Some(parent) => format!(" ← {parent}"),
208                None => String::new(),
209            };
210
211            let anomaly_marker = if self.config.highlight_anomalies
212                && trace
213                    .anomalies
214                    .iter()
215                    .any(|a| self.step_has_anomaly(step, a))
216            {
217                " ⚠️"
218            } else {
219                ""
220            };
221
222            output.push_str(&format!(
223                "  {}: {}{}{}{}\n",
224                i + 1,
225                step.entity_id,
226                parent_info,
227                timing,
228                anomaly_marker
229            ));
230
231            if self.config.show_step_details {
232                output.push_str(&format!(
233                    "     State: {} | Depth: {} | Kind: {}\n",
234                    step.entity_state, step.depth, step.cancel_kind
235                ));
236            }
237        }
238
239        if let Some(total_time) = &trace.total_propagation_time {
240            output.push_str(&format!(
241                "\nTotal propagation time: {}\n",
242                self.format_duration(*total_time)
243            ));
244        }
245
246        output.push_str(&format!(
247            "Entities cancelled: {}\n",
248            trace.entities_cancelled
249        ));
250        output.push_str(&format!("Max depth: {}\n", trace.max_depth));
251
252        if !trace.anomalies.is_empty() {
253            output.push_str(&format!(
254                "\n⚠️  {} anomalies detected:\n",
255                trace.anomalies.len()
256            ));
257            for anomaly in &trace.anomalies {
258                output.push_str(&format!("  - {}\n", self.format_anomaly(anomaly)));
259            }
260        }
261
262        output
263    }
264
265    /// Generate a dot graph for use with graphviz.
266    #[must_use]
267    pub fn generate_dot_graph(&self, traces: &[CancellationTrace]) -> String {
268        let mut output = String::new();
269        output.push_str("digraph cancellation_traces {\n");
270        output.push_str("  rankdir=TB;\n");
271        output.push_str("  node [shape=box];\n\n");
272
273        for trace in traces {
274            output.push_str(&format!("  // Trace {}\n", trace.trace_id.as_u64()));
275
276            // Root node
277            output.push_str(&format!(
278                "  \"{}\" [label=\"{}\\n{}\" style=filled fillcolor=lightblue];\n",
279                trace.root_entity, trace.root_entity, trace.root_cancel_reason
280            ));
281
282            // Steps as edges
283            for step in &trace.steps {
284                let color = if trace
285                    .anomalies
286                    .iter()
287                    .any(|a| self.step_has_anomaly(step, a))
288                {
289                    "red"
290                } else {
291                    "black"
292                };
293
294                if let Some(parent) = &step.parent_entity {
295                    output.push_str(&format!(
296                        "  \"{}\" -> \"{}\" [label=\"{:.1}ms\" color={}];\n",
297                        parent,
298                        step.entity_id,
299                        step.elapsed_since_prev.as_secs_f64() * 1000.0,
300                        color
301                    ));
302                }
303            }
304
305            output.push('\n');
306        }
307
308        output.push_str("}\n");
309        output
310    }
311
312    /// Generate a real-time dashboard view.
313    #[must_use]
314    pub fn generate_dashboard(&self, traces: &[CancellationTrace]) -> CancellationDashboard {
315        let now = std::time::SystemTime::now();
316        let active_traces = traces.iter().filter(|t| !t.is_complete).count();
317        let completed_traces = traces.iter().filter(|t| t.is_complete).count();
318
319        let propagation_times: Vec<Duration> = traces
320            .iter()
321            .filter_map(|t| t.total_propagation_time)
322            .collect();
323
324        let avg_propagation_latency = if propagation_times.is_empty() {
325            Duration::ZERO
326        } else {
327            let total: u64 = propagation_times.iter().map(|d| d.as_nanos() as u64).sum();
328            Duration::from_nanos(total / propagation_times.len() as u64)
329        };
330
331        let mut sorted_times = propagation_times;
332        sorted_times.sort();
333        let p95_propagation_latency = if sorted_times.is_empty() {
334            Duration::ZERO
335        } else {
336            let index = (sorted_times.len() as f64 * 0.95) as usize;
337            sorted_times[index.min(sorted_times.len() - 1)]
338        };
339
340        // Detect bottlenecks
341        let bottlenecks = self.identify_bottlenecks(traces);
342
343        // Collect recent anomalies
344        let recent_anomalies: Vec<AnomalyInfo> = traces
345            .iter()
346            .flat_map(|trace| {
347                trace.anomalies.iter().map(|anomaly| AnomalyInfo {
348                    trace_id: trace.trace_id,
349                    anomaly_type: match anomaly {
350                        PropagationAnomaly::SlowPropagation { .. } => "SlowPropagation".to_string(),
351                        PropagationAnomaly::StuckCancellation { .. } => {
352                            "StuckCancellation".to_string()
353                        }
354                        PropagationAnomaly::IncorrectPropagationOrder { .. } => {
355                            "IncorrectPropagationOrder".to_string()
356                        }
357                        PropagationAnomaly::UnexpectedPropagation { .. } => {
358                            "UnexpectedPropagation".to_string()
359                        }
360                        PropagationAnomaly::ExcessiveDepth { .. } => "ExcessiveDepth".to_string(),
361                    },
362                    severity: self.anomaly_severity(anomaly),
363                    description: self.format_anomaly(anomaly),
364                    detected_at: now,
365                })
366            })
367            .collect();
368
369        // Calculate entity throughput
370        let entity_throughput = self.calculate_entity_throughput(traces);
371
372        CancellationDashboard {
373            snapshot_time: now,
374            active_traces,
375            completed_traces_period: completed_traces,
376            avg_propagation_latency,
377            p95_propagation_latency,
378            current_bottlenecks: bottlenecks,
379            recent_anomalies,
380            entity_throughput,
381        }
382    }
383
384    /// Identify performance bottlenecks in the traces.
385    fn identify_bottlenecks(&self, traces: &[CancellationTrace]) -> Vec<BottleneckInfo> {
386        let mut entity_delays: HashMap<String, Vec<Duration>> = HashMap::new();
387
388        for trace in traces {
389            for step in &trace.steps {
390                entity_delays
391                    .entry(step.entity_id.clone())
392                    .or_default()
393                    .push(step.elapsed_since_prev);
394            }
395        }
396
397        let mut bottlenecks = Vec::new();
398
399        for (entity_id, delays) in entity_delays {
400            if delays.len() < 2 {
401                continue;
402            }
403
404            let avg_delay = Duration::from_nanos(
405                delays.iter().map(|d| d.as_nanos() as u64).sum::<u64>() / delays.len() as u64,
406            );
407
408            // Consider it a bottleneck if average delay is above threshold
409            let threshold = Duration::from_millis(10);
410            if avg_delay > threshold {
411                let impact_score = avg_delay.as_secs_f64() * delays.len() as f64;
412
413                bottlenecks.push(BottleneckInfo {
414                    entity_id: entity_id.clone(),
415                    entity_type: EntityType::Task, // Would need type tracking to be accurate
416                    avg_delay,
417                    queue_depth: delays.len(),
418                    impact_score,
419                });
420            }
421        }
422
423        // Sort by impact score
424        bottlenecks.sort_by(|a, b| {
425            b.impact_score
426                .partial_cmp(&a.impact_score)
427                .unwrap_or(std::cmp::Ordering::Equal)
428        });
429        bottlenecks
430    }
431
432    /// Calculate throughput statistics for entities.
433    fn calculate_entity_throughput(
434        &self,
435        traces: &[CancellationTrace],
436    ) -> HashMap<String, ThroughputStats> {
437        let mut stats = HashMap::new();
438
439        // Simple implementation - would need more data for full metrics
440        for trace in traces {
441            for step in &trace.steps {
442                stats
443                    .entry(step.entity_id.clone())
444                    .or_insert(ThroughputStats {
445                        cancellations_per_second: 1.0, // Placeholder
446                        avg_processing_time: step.elapsed_since_prev,
447                        queue_depth: 0, // Would need queue tracking
448                        success_rate: if step.propagation_completed { 1.0 } else { 0.0 },
449                    });
450            }
451        }
452
453        stats
454    }
455
456    /// Build a tree structure from a trace for visualization.
457    fn build_tree(&self, trace: &CancellationTrace) -> CancellationTreeNode {
458        let mut root = CancellationTreeNode {
459            entity_id: trace.root_entity.clone(),
460            entity_type: trace.root_entity_type,
461            depth: 0,
462            timing: trace.total_propagation_time,
463            propagation_delay: None,
464            anomalies: Vec::new(),
465            children: Vec::new(),
466            completed: trace.is_complete,
467        };
468
469        // Build child nodes from steps
470        let mut parent_map: HashMap<String, &mut CancellationTreeNode> = HashMap::new();
471        parent_map.insert(root.entity_id.clone(), &mut root);
472
473        // This is a simplified tree building - in practice would need more complex logic
474        for _step in &trace.steps {
475            // Add as child of parent or root
476            // Implementation would be more complex in practice
477        }
478
479        root
480    }
481
482    /// Format a tree node for display.
483    fn format_tree(&self, node: &CancellationTreeNode, indent: usize) -> String {
484        let mut output = String::new();
485        let prefix = "  ".repeat(indent);
486
487        let timing = if let Some(timing) = node.timing {
488            format!(" [{}]", self.format_duration(timing))
489        } else {
490            String::new()
491        };
492
493        let anomaly_marker = if !node.anomalies.is_empty() && self.config.highlight_anomalies {
494            " ⚠️"
495        } else {
496            ""
497        };
498
499        output.push_str(&format!(
500            "{}├─ {}{}{}\n",
501            prefix, node.entity_id, timing, anomaly_marker
502        ));
503
504        for child in &node.children {
505            output.push_str(&self.format_tree(child, indent + 1));
506        }
507
508        output
509    }
510
511    /// Format a duration according to the configured format.
512    fn format_duration(&self, duration: Duration) -> String {
513        match self.config.timing_format {
514            TimingFormat::Nanoseconds => format!("{}ns", duration.as_nanos()),
515            TimingFormat::Microseconds => format!("{:.1}μs", duration.as_secs_f64() * 1_000_000.0),
516            TimingFormat::Milliseconds => format!("{:.1}ms", duration.as_secs_f64() * 1000.0),
517            TimingFormat::Seconds => format!("{:.3}s", duration.as_secs_f64()),
518            TimingFormat::Auto => {
519                let nanos = duration.as_nanos();
520                if nanos < 1_000 {
521                    format!("{nanos}ns")
522                } else if nanos < 1_000_000 {
523                    format!("{:.1}μs", nanos as f64 / 1_000.0)
524                } else if nanos < 1_000_000_000 {
525                    format!("{:.1}ms", nanos as f64 / 1_000_000.0)
526                } else {
527                    format!("{:.3}s", nanos as f64 / 1_000_000_000.0)
528                }
529            }
530        }
531    }
532
533    /// Format an anomaly for display.
534    fn format_anomaly(&self, anomaly: &PropagationAnomaly) -> String {
535        match anomaly {
536            PropagationAnomaly::SlowPropagation {
537                elapsed, threshold, ..
538            } => {
539                format!(
540                    "Slow propagation: {} (threshold: {})",
541                    self.format_duration(*elapsed),
542                    self.format_duration(*threshold)
543                )
544            }
545            PropagationAnomaly::StuckCancellation { stuck_duration, .. } => {
546                format!(
547                    "Stuck cancellation: timeout after {}",
548                    self.format_duration(*stuck_duration)
549                )
550            }
551            PropagationAnomaly::IncorrectPropagationOrder {
552                parent_entity,
553                child_entity,
554                ..
555            } => {
556                format!("Incorrect ordering: parent {parent_entity} before child {child_entity}")
557            }
558            PropagationAnomaly::UnexpectedPropagation { description, .. } => {
559                format!("Unexpected propagation: {description}")
560            }
561            PropagationAnomaly::ExcessiveDepth { depth, entity_id } => {
562                format!("Excessive depth: {depth} levels for entity {entity_id}")
563            }
564        }
565    }
566
567    /// Determine the severity of an anomaly.
568    fn anomaly_severity(&self, anomaly: &PropagationAnomaly) -> AnomalySeverity {
569        match anomaly {
570            PropagationAnomaly::SlowPropagation { elapsed, .. } => {
571                if elapsed.as_millis() > 1000 {
572                    AnomalySeverity::High
573                } else if elapsed.as_millis() > 100 {
574                    AnomalySeverity::Medium
575                } else {
576                    AnomalySeverity::Low
577                }
578            }
579            PropagationAnomaly::StuckCancellation { .. } => AnomalySeverity::Critical,
580            PropagationAnomaly::IncorrectPropagationOrder { .. } => AnomalySeverity::High,
581            PropagationAnomaly::UnexpectedPropagation { .. } => AnomalySeverity::Medium,
582            PropagationAnomaly::ExcessiveDepth { .. } => AnomalySeverity::Medium,
583        }
584    }
585
586    /// Check if a step is associated with a specific anomaly.
587    fn step_has_anomaly(&self, step: &CancellationTraceStep, anomaly: &PropagationAnomaly) -> bool {
588        // Simple check - could be more sophisticated
589        match anomaly {
590            PropagationAnomaly::SlowPropagation { elapsed, .. } => {
591                step.elapsed_since_prev >= *elapsed
592            }
593            _ => false, // Would need entity tracking for other anomaly types
594        }
595    }
596}
597
598#[cfg(test)]
599mod tests {
600    use super::*;
601
602    #[test]
603    fn test_visualizer_creation() {
604        let config = VisualizerConfig::default();
605        let _visualizer = CancellationVisualizer::new(config);
606
607        // Just test that creation works
608        assert!(true);
609    }
610
611    #[test]
612    fn test_duration_formatting() {
613        let visualizer = CancellationVisualizer::default();
614
615        let duration = Duration::from_millis(123);
616        let formatted = visualizer.format_duration(duration);
617        assert!(formatted.contains("123"));
618    }
619}