1use crate::observability::cancellation_tracer::{
7 CancellationTrace, CancellationTraceStep, EntityType, PropagationAnomaly, TraceId,
8};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::time::Duration;
12
13#[derive(Debug, Clone)]
15pub struct VisualizerConfig {
16 pub show_timing: bool,
18 pub max_depth: u32,
20 pub highlight_anomalies: bool,
22 pub show_step_details: bool,
24 pub timing_format: TimingFormat,
26}
27
28impl Default for VisualizerConfig {
29 fn default() -> Self {
30 Self {
31 show_timing: true,
32 max_depth: 20,
33 highlight_anomalies: true,
34 show_step_details: false,
35 timing_format: TimingFormat::Milliseconds,
36 }
37 }
38}
39
40#[derive(Debug, Clone, Copy)]
42pub enum TimingFormat {
43 Nanoseconds,
45 Microseconds,
47 Milliseconds,
49 Seconds,
51 Auto,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct CancellationTreeNode {
58 pub entity_id: String,
60 pub entity_type: EntityType,
62 pub depth: u32,
64 pub timing: Option<Duration>,
66 pub propagation_delay: Option<Duration>,
68 pub anomalies: Vec<String>,
70 pub children: Vec<Self>,
72 pub completed: bool,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct CancellationDashboard {
79 pub snapshot_time: std::time::SystemTime,
81 pub active_traces: usize,
83 pub completed_traces_period: usize,
85 pub avg_propagation_latency: Duration,
87 pub p95_propagation_latency: Duration,
89 pub current_bottlenecks: Vec<BottleneckInfo>,
91 pub recent_anomalies: Vec<AnomalyInfo>,
93 pub entity_throughput: HashMap<String, ThroughputStats>,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize)]
99pub struct BottleneckInfo {
100 pub entity_id: String,
102 pub entity_type: EntityType,
104 pub avg_delay: Duration,
106 pub queue_depth: usize,
108 pub impact_score: f64,
110}
111
112#[derive(Debug, Clone, Serialize, Deserialize)]
114pub struct AnomalyInfo {
115 pub trace_id: TraceId,
117 pub anomaly_type: String,
119 pub severity: AnomalySeverity,
121 pub description: String,
123 pub detected_at: std::time::SystemTime,
125}
126
127#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
129pub enum AnomalySeverity {
130 Low,
132 Medium,
134 High,
136 Critical,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct ThroughputStats {
143 pub cancellations_per_second: f64,
145 pub avg_processing_time: Duration,
147 pub queue_depth: usize,
149 pub success_rate: f64,
151}
152
153pub struct CancellationVisualizer {
155 config: VisualizerConfig,
156}
157
158impl CancellationVisualizer {
159 #[must_use]
161 pub fn new(config: VisualizerConfig) -> Self {
162 Self { config }
163 }
164
165 #[must_use]
167 pub fn default() -> Self {
168 Self::new(VisualizerConfig::default())
169 }
170
171 #[must_use]
173 pub fn visualize_trace_tree(&self, trace: &CancellationTrace) -> String {
174 let tree = self.build_tree(trace);
175 self.format_tree(&tree, 0)
176 }
177
178 #[must_use]
180 pub fn visualize_timeline(&self, trace: &CancellationTrace) -> String {
181 let mut output = String::new();
182 output.push_str(&format!(
183 "=== Cancellation Timeline (Trace {}) ===\n",
184 trace.trace_id.as_u64()
185 ));
186 output.push_str(&format!(
187 "Root: {} ({})\n",
188 trace.root_entity, trace.root_cancel_reason
189 ));
190 output.push_str(&format!("Start: {:?}\n", trace.start_time));
191
192 if trace.steps.is_empty() {
193 output.push_str("No propagation steps recorded.\n");
194 return output;
195 }
196
197 output.push_str("\nPropagation Timeline:\n");
198
199 for (i, step) in trace.steps.iter().enumerate() {
200 let timing = if self.config.show_timing {
201 format!(" [+{}]", self.format_duration(step.elapsed_since_start))
202 } else {
203 String::new()
204 };
205
206 let parent_info = match &step.parent_entity {
207 Some(parent) => format!(" ← {parent}"),
208 None => String::new(),
209 };
210
211 let anomaly_marker = if self.config.highlight_anomalies
212 && trace
213 .anomalies
214 .iter()
215 .any(|a| self.step_has_anomaly(step, a))
216 {
217 " ⚠️"
218 } else {
219 ""
220 };
221
222 output.push_str(&format!(
223 " {}: {}{}{}{}\n",
224 i + 1,
225 step.entity_id,
226 parent_info,
227 timing,
228 anomaly_marker
229 ));
230
231 if self.config.show_step_details {
232 output.push_str(&format!(
233 " State: {} | Depth: {} | Kind: {}\n",
234 step.entity_state, step.depth, step.cancel_kind
235 ));
236 }
237 }
238
239 if let Some(total_time) = &trace.total_propagation_time {
240 output.push_str(&format!(
241 "\nTotal propagation time: {}\n",
242 self.format_duration(*total_time)
243 ));
244 }
245
246 output.push_str(&format!(
247 "Entities cancelled: {}\n",
248 trace.entities_cancelled
249 ));
250 output.push_str(&format!("Max depth: {}\n", trace.max_depth));
251
252 if !trace.anomalies.is_empty() {
253 output.push_str(&format!(
254 "\n⚠️ {} anomalies detected:\n",
255 trace.anomalies.len()
256 ));
257 for anomaly in &trace.anomalies {
258 output.push_str(&format!(" - {}\n", self.format_anomaly(anomaly)));
259 }
260 }
261
262 output
263 }
264
265 #[must_use]
267 pub fn generate_dot_graph(&self, traces: &[CancellationTrace]) -> String {
268 let mut output = String::new();
269 output.push_str("digraph cancellation_traces {\n");
270 output.push_str(" rankdir=TB;\n");
271 output.push_str(" node [shape=box];\n\n");
272
273 for trace in traces {
274 output.push_str(&format!(" // Trace {}\n", trace.trace_id.as_u64()));
275
276 output.push_str(&format!(
278 " \"{}\" [label=\"{}\\n{}\" style=filled fillcolor=lightblue];\n",
279 trace.root_entity, trace.root_entity, trace.root_cancel_reason
280 ));
281
282 for step in &trace.steps {
284 let color = if trace
285 .anomalies
286 .iter()
287 .any(|a| self.step_has_anomaly(step, a))
288 {
289 "red"
290 } else {
291 "black"
292 };
293
294 if let Some(parent) = &step.parent_entity {
295 output.push_str(&format!(
296 " \"{}\" -> \"{}\" [label=\"{:.1}ms\" color={}];\n",
297 parent,
298 step.entity_id,
299 step.elapsed_since_prev.as_secs_f64() * 1000.0,
300 color
301 ));
302 }
303 }
304
305 output.push('\n');
306 }
307
308 output.push_str("}\n");
309 output
310 }
311
312 #[must_use]
314 pub fn generate_dashboard(&self, traces: &[CancellationTrace]) -> CancellationDashboard {
315 let now = std::time::SystemTime::now();
316 let active_traces = traces.iter().filter(|t| !t.is_complete).count();
317 let completed_traces = traces.iter().filter(|t| t.is_complete).count();
318
319 let propagation_times: Vec<Duration> = traces
320 .iter()
321 .filter_map(|t| t.total_propagation_time)
322 .collect();
323
324 let avg_propagation_latency = if propagation_times.is_empty() {
325 Duration::ZERO
326 } else {
327 let total: u64 = propagation_times.iter().map(|d| d.as_nanos() as u64).sum();
328 Duration::from_nanos(total / propagation_times.len() as u64)
329 };
330
331 let mut sorted_times = propagation_times;
332 sorted_times.sort();
333 let p95_propagation_latency = if sorted_times.is_empty() {
334 Duration::ZERO
335 } else {
336 let index = (sorted_times.len() as f64 * 0.95) as usize;
337 sorted_times[index.min(sorted_times.len() - 1)]
338 };
339
340 let bottlenecks = self.identify_bottlenecks(traces);
342
343 let recent_anomalies: Vec<AnomalyInfo> = traces
345 .iter()
346 .flat_map(|trace| {
347 trace.anomalies.iter().map(|anomaly| AnomalyInfo {
348 trace_id: trace.trace_id,
349 anomaly_type: match anomaly {
350 PropagationAnomaly::SlowPropagation { .. } => "SlowPropagation".to_string(),
351 PropagationAnomaly::StuckCancellation { .. } => {
352 "StuckCancellation".to_string()
353 }
354 PropagationAnomaly::IncorrectPropagationOrder { .. } => {
355 "IncorrectPropagationOrder".to_string()
356 }
357 PropagationAnomaly::UnexpectedPropagation { .. } => {
358 "UnexpectedPropagation".to_string()
359 }
360 PropagationAnomaly::ExcessiveDepth { .. } => "ExcessiveDepth".to_string(),
361 },
362 severity: self.anomaly_severity(anomaly),
363 description: self.format_anomaly(anomaly),
364 detected_at: now,
365 })
366 })
367 .collect();
368
369 let entity_throughput = self.calculate_entity_throughput(traces);
371
372 CancellationDashboard {
373 snapshot_time: now,
374 active_traces,
375 completed_traces_period: completed_traces,
376 avg_propagation_latency,
377 p95_propagation_latency,
378 current_bottlenecks: bottlenecks,
379 recent_anomalies,
380 entity_throughput,
381 }
382 }
383
384 fn identify_bottlenecks(&self, traces: &[CancellationTrace]) -> Vec<BottleneckInfo> {
386 let mut entity_delays: HashMap<String, Vec<Duration>> = HashMap::new();
387
388 for trace in traces {
389 for step in &trace.steps {
390 entity_delays
391 .entry(step.entity_id.clone())
392 .or_default()
393 .push(step.elapsed_since_prev);
394 }
395 }
396
397 let mut bottlenecks = Vec::new();
398
399 for (entity_id, delays) in entity_delays {
400 if delays.len() < 2 {
401 continue;
402 }
403
404 let avg_delay = Duration::from_nanos(
405 delays.iter().map(|d| d.as_nanos() as u64).sum::<u64>() / delays.len() as u64,
406 );
407
408 let threshold = Duration::from_millis(10);
410 if avg_delay > threshold {
411 let impact_score = avg_delay.as_secs_f64() * delays.len() as f64;
412
413 bottlenecks.push(BottleneckInfo {
414 entity_id: entity_id.clone(),
415 entity_type: EntityType::Task, avg_delay,
417 queue_depth: delays.len(),
418 impact_score,
419 });
420 }
421 }
422
423 bottlenecks.sort_by(|a, b| {
425 b.impact_score
426 .partial_cmp(&a.impact_score)
427 .unwrap_or(std::cmp::Ordering::Equal)
428 });
429 bottlenecks
430 }
431
432 fn calculate_entity_throughput(
434 &self,
435 traces: &[CancellationTrace],
436 ) -> HashMap<String, ThroughputStats> {
437 let mut stats = HashMap::new();
438
439 for trace in traces {
441 for step in &trace.steps {
442 stats
443 .entry(step.entity_id.clone())
444 .or_insert(ThroughputStats {
445 cancellations_per_second: 1.0, avg_processing_time: step.elapsed_since_prev,
447 queue_depth: 0, success_rate: if step.propagation_completed { 1.0 } else { 0.0 },
449 });
450 }
451 }
452
453 stats
454 }
455
456 fn build_tree(&self, trace: &CancellationTrace) -> CancellationTreeNode {
458 let mut root = CancellationTreeNode {
459 entity_id: trace.root_entity.clone(),
460 entity_type: trace.root_entity_type,
461 depth: 0,
462 timing: trace.total_propagation_time,
463 propagation_delay: None,
464 anomalies: Vec::new(),
465 children: Vec::new(),
466 completed: trace.is_complete,
467 };
468
469 let mut parent_map: HashMap<String, &mut CancellationTreeNode> = HashMap::new();
471 parent_map.insert(root.entity_id.clone(), &mut root);
472
473 for _step in &trace.steps {
475 }
478
479 root
480 }
481
482 fn format_tree(&self, node: &CancellationTreeNode, indent: usize) -> String {
484 let mut output = String::new();
485 let prefix = " ".repeat(indent);
486
487 let timing = if let Some(timing) = node.timing {
488 format!(" [{}]", self.format_duration(timing))
489 } else {
490 String::new()
491 };
492
493 let anomaly_marker = if !node.anomalies.is_empty() && self.config.highlight_anomalies {
494 " ⚠️"
495 } else {
496 ""
497 };
498
499 output.push_str(&format!(
500 "{}├─ {}{}{}\n",
501 prefix, node.entity_id, timing, anomaly_marker
502 ));
503
504 for child in &node.children {
505 output.push_str(&self.format_tree(child, indent + 1));
506 }
507
508 output
509 }
510
511 fn format_duration(&self, duration: Duration) -> String {
513 match self.config.timing_format {
514 TimingFormat::Nanoseconds => format!("{}ns", duration.as_nanos()),
515 TimingFormat::Microseconds => format!("{:.1}μs", duration.as_secs_f64() * 1_000_000.0),
516 TimingFormat::Milliseconds => format!("{:.1}ms", duration.as_secs_f64() * 1000.0),
517 TimingFormat::Seconds => format!("{:.3}s", duration.as_secs_f64()),
518 TimingFormat::Auto => {
519 let nanos = duration.as_nanos();
520 if nanos < 1_000 {
521 format!("{nanos}ns")
522 } else if nanos < 1_000_000 {
523 format!("{:.1}μs", nanos as f64 / 1_000.0)
524 } else if nanos < 1_000_000_000 {
525 format!("{:.1}ms", nanos as f64 / 1_000_000.0)
526 } else {
527 format!("{:.3}s", nanos as f64 / 1_000_000_000.0)
528 }
529 }
530 }
531 }
532
533 fn format_anomaly(&self, anomaly: &PropagationAnomaly) -> String {
535 match anomaly {
536 PropagationAnomaly::SlowPropagation {
537 elapsed, threshold, ..
538 } => {
539 format!(
540 "Slow propagation: {} (threshold: {})",
541 self.format_duration(*elapsed),
542 self.format_duration(*threshold)
543 )
544 }
545 PropagationAnomaly::StuckCancellation { stuck_duration, .. } => {
546 format!(
547 "Stuck cancellation: timeout after {}",
548 self.format_duration(*stuck_duration)
549 )
550 }
551 PropagationAnomaly::IncorrectPropagationOrder {
552 parent_entity,
553 child_entity,
554 ..
555 } => {
556 format!("Incorrect ordering: parent {parent_entity} before child {child_entity}")
557 }
558 PropagationAnomaly::UnexpectedPropagation { description, .. } => {
559 format!("Unexpected propagation: {description}")
560 }
561 PropagationAnomaly::ExcessiveDepth { depth, entity_id } => {
562 format!("Excessive depth: {depth} levels for entity {entity_id}")
563 }
564 }
565 }
566
567 fn anomaly_severity(&self, anomaly: &PropagationAnomaly) -> AnomalySeverity {
569 match anomaly {
570 PropagationAnomaly::SlowPropagation { elapsed, .. } => {
571 if elapsed.as_millis() > 1000 {
572 AnomalySeverity::High
573 } else if elapsed.as_millis() > 100 {
574 AnomalySeverity::Medium
575 } else {
576 AnomalySeverity::Low
577 }
578 }
579 PropagationAnomaly::StuckCancellation { .. } => AnomalySeverity::Critical,
580 PropagationAnomaly::IncorrectPropagationOrder { .. } => AnomalySeverity::High,
581 PropagationAnomaly::UnexpectedPropagation { .. } => AnomalySeverity::Medium,
582 PropagationAnomaly::ExcessiveDepth { .. } => AnomalySeverity::Medium,
583 }
584 }
585
586 fn step_has_anomaly(&self, step: &CancellationTraceStep, anomaly: &PropagationAnomaly) -> bool {
588 match anomaly {
590 PropagationAnomaly::SlowPropagation { elapsed, .. } => {
591 step.elapsed_since_prev >= *elapsed
592 }
593 _ => false, }
595 }
596}
597
598#[cfg(test)]
599mod tests {
600 use super::*;
601
602 #[test]
603 fn test_visualizer_creation() {
604 let config = VisualizerConfig::default();
605 let _visualizer = CancellationVisualizer::new(config);
606
607 assert!(true);
609 }
610
611 #[test]
612 fn test_duration_formatting() {
613 let visualizer = CancellationVisualizer::default();
614
615 let duration = Duration::from_millis(123);
616 let formatted = visualizer.format_duration(duration);
617 assert!(formatted.contains("123"));
618 }
619}