use crate::observability::{
cancellation_analyzer::{CancellationAnalyzer, PerformanceAnalysis},
cancellation_tracer::{CancellationTrace, CancellationTracer, CancellationTracerConfig},
cancellation_visualizer::{CancellationDashboard, CancellationVisualizer, VisualizerConfig},
};
use crate::types::{CancelKind, CancelReason, Time};
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct StructuredCancellationConfig {
pub tracer_config: CancellationTracerConfig,
pub visualizer_config: VisualizerConfig,
pub enable_real_time_alerts: bool,
pub performance_alert_threshold: u64,
pub max_memory_usage_mb: usize,
pub trace_retention_duration: Duration,
pub enable_structured_logging: bool,
}
impl Default for StructuredCancellationConfig {
fn default() -> Self {
Self {
tracer_config: CancellationTracerConfig::default(),
visualizer_config: VisualizerConfig::default(),
enable_real_time_alerts: true,
performance_alert_threshold: 1000, max_memory_usage_mb: 100,
trace_retention_duration: Duration::from_secs(3600), enable_structured_logging: true,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CancellationAlert {
pub alert_type: AlertType,
pub severity: AlertSeverity,
pub message: String,
pub entity_id: Option<String>,
pub metric_value: f64,
pub threshold: f64,
pub triggered_at: std::time::SystemTime,
pub remediation_suggestions: Vec<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AlertType {
SlowPropagation,
StuckCancellation,
HighLatency,
BottleneckDetected,
ResourceLeakRisk,
AnomalySpike,
PerformanceRegression,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub enum AlertSeverity {
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RealTimeStats {
pub active_traces: usize,
pub traces_completed_last_minute: usize,
pub current_avg_latency: Duration,
pub alerts_last_hour: usize,
pub memory_usage_percentage: f64,
pub top_entities: Vec<String>,
}
pub struct StructuredCancellationAnalyzer {
config: StructuredCancellationConfig,
tracer: CancellationTracer,
visualizer: CancellationVisualizer,
analyzer: CancellationAnalyzer,
alerts: Arc<Mutex<Vec<CancellationAlert>>>,
stats: Arc<Mutex<RealTimeStats>>,
last_cleanup: Arc<Mutex<std::time::SystemTime>>,
}
impl StructuredCancellationAnalyzer {
#[must_use]
pub fn new(config: StructuredCancellationConfig) -> Self {
let tracer = CancellationTracer::new(config.tracer_config.clone());
let visualizer = CancellationVisualizer::new(config.visualizer_config.clone());
let analyzer = CancellationAnalyzer::default();
Self {
config,
tracer,
visualizer,
analyzer,
alerts: Arc::new(Mutex::new(Vec::new())),
stats: Arc::new(Mutex::new(RealTimeStats {
active_traces: 0,
traces_completed_last_minute: 0,
current_avg_latency: Duration::ZERO,
alerts_last_hour: 0,
memory_usage_percentage: 0.0,
top_entities: Vec::new(),
})),
last_cleanup: Arc::new(Mutex::new(std::time::SystemTime::now())),
}
}
#[must_use]
pub fn default() -> Self {
Self::new(StructuredCancellationConfig::default())
}
pub fn start_trace(
&self,
entity_id: String,
entity_type: crate::observability::EntityType,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
) -> crate::observability::TraceId {
let trace_id = self
.tracer
.start_trace(entity_id, entity_type, cancel_reason, cancel_kind);
self.update_active_traces_count();
if self.config.enable_structured_logging {
Self::log_trace_event("trace_started", trace_id, None);
}
trace_id
}
pub fn record_step(
&self,
trace_id: crate::observability::TraceId,
entity_id: String,
entity_type: crate::observability::EntityType,
cancel_reason: &CancelReason,
cancel_kind: CancelKind,
entity_state: String,
parent_entity: Option<String>,
propagation_completed: bool,
) {
self.tracer.record_step(
trace_id,
entity_id.clone(),
entity_type,
cancel_reason,
cancel_kind,
entity_state,
parent_entity,
propagation_completed,
);
if self.config.enable_real_time_alerts {
self.check_real_time_alerts(&entity_id);
}
if self.config.enable_structured_logging {
Self::log_trace_event("step_recorded", trace_id, Some(&entity_id));
}
}
pub fn complete_trace(&self, trace_id: crate::observability::TraceId) {
self.tracer.complete_trace(trace_id);
self.update_completed_traces_count();
if self.config.enable_structured_logging {
Self::log_trace_event("trace_completed", trace_id, None);
}
self.maybe_cleanup_old_traces();
}
pub fn get_dashboard(&self) -> CancellationDashboard {
let traces = self.tracer.completed_traces();
self.visualizer.generate_dashboard(&traces)
}
pub fn analyze_performance(&self) -> PerformanceAnalysis {
let traces = self.tracer.completed_traces();
self.analyzer.analyze_performance(&traces)
}
pub fn visualize_trace(&self, trace_id: crate::observability::TraceId) -> Option<String> {
let traces = self.tracer.completed_traces();
traces
.iter()
.find(|t| t.trace_id == trace_id)
.map(|trace| self.visualizer.visualize_trace_tree(trace))
}
pub fn visualize_timeline(&self, trace_id: crate::observability::TraceId) -> Option<String> {
let traces = self.tracer.completed_traces();
traces
.iter()
.find(|t| t.trace_id == trace_id)
.map(|trace| self.visualizer.visualize_timeline(trace))
}
pub fn export_dot_graph(&self) -> String {
let traces = self.tracer.completed_traces();
self.visualizer.generate_dot_graph(&traces)
}
pub fn get_recent_alerts(&self, limit: usize) -> Vec<CancellationAlert> {
let alerts = self.alerts.lock().unwrap();
alerts.iter().rev().take(limit).cloned().collect()
}
pub fn clear_old_alerts(&self, max_age: Duration) {
let mut alerts = self.alerts.lock().unwrap();
let cutoff = std::time::SystemTime::now() - max_age;
alerts.retain(|alert| alert.triggered_at > cutoff);
}
pub fn get_real_time_stats(&self) -> RealTimeStats {
let stats = self.stats.lock().unwrap();
stats.clone()
}
pub fn get_tracer_stats(
&self,
) -> crate::observability::cancellation_tracer::CancellationTracerStatsSnapshot {
self.tracer.stats()
}
fn update_active_traces_count(&self) {
let stats = self.tracer.stats();
let mut real_time_stats = self.stats.lock().unwrap();
real_time_stats.active_traces = stats.traces_collected as usize;
let memory_mb = (stats.traces_collected * 10 + stats.traces_collected * 2) / 1024; real_time_stats.memory_usage_percentage = if self.config.max_memory_usage_mb > 0 {
(memory_mb as f64 / self.config.max_memory_usage_mb as f64 * 100.0).min(100.0)
} else {
100.0
};
}
fn update_completed_traces_count(&self) {
let mut real_time_stats = self.stats.lock().unwrap();
real_time_stats.traces_completed_last_minute += 1;
let traces = self.tracer.completed_traces();
if !traces.is_empty() {
let recent_traces = traces
.iter()
.rev()
.take(10) .filter_map(|t| t.total_propagation_time)
.collect::<Vec<_>>();
if !recent_traces.is_empty() {
let total_nanos: u64 = recent_traces.iter().map(|d| d.as_nanos() as u64).sum();
real_time_stats.current_avg_latency =
Duration::from_nanos(total_nanos / recent_traces.len() as u64);
}
}
}
fn check_real_time_alerts(&self, entity_id: &str) {
let traces = self.tracer.completed_traces();
let entity_traces: Vec<&CancellationTrace> = traces
.iter()
.filter(|t| t.root_entity == entity_id)
.rev()
.take(5) .collect();
if entity_traces.is_empty() {
return;
}
let slow_threshold = Duration::from_millis(self.config.performance_alert_threshold);
let slow_count = entity_traces
.iter()
.filter_map(|t| t.total_propagation_time)
.filter(|&duration| duration > slow_threshold)
.count();
if slow_count > entity_traces.len() / 2 {
self.trigger_alert(&CancellationAlert {
alert_type: AlertType::SlowPropagation,
severity: AlertSeverity::Warning,
message: format!(
"Entity {entity_id} showing consistently slow cancellation propagation"
),
entity_id: Some(entity_id.to_string()),
metric_value: if entity_traces.is_empty() {
0.0
} else {
slow_count as f64 / entity_traces.len() as f64 * 100.0
},
threshold: 50.0,
triggered_at: std::time::SystemTime::now(),
remediation_suggestions: vec![
"Check for blocking operations in cancellation handlers".to_string(),
"Consider optimizing cleanup logic".to_string(),
],
});
}
let total_anomalies: usize = entity_traces.iter().map(|t| t.anomalies.len()).sum();
if total_anomalies > entity_traces.len() {
self.trigger_alert(&CancellationAlert {
alert_type: AlertType::AnomalySpike,
severity: AlertSeverity::Error,
message: format!("High anomaly rate detected for entity {entity_id}"),
entity_id: Some(entity_id.to_string()),
metric_value: if entity_traces.is_empty() {
0.0
} else {
total_anomalies as f64 / entity_traces.len() as f64
},
threshold: 1.0,
triggered_at: std::time::SystemTime::now(),
remediation_suggestions: vec![
"Investigate cancellation protocol violations".to_string(),
"Review structured concurrency patterns".to_string(),
],
});
}
}
fn trigger_alert(&self, alert: &CancellationAlert) {
{
let mut alerts = self.alerts.lock().unwrap();
alerts.push(alert.clone());
while alerts.len() > 1000 {
alerts.remove(0);
}
drop(alerts);
}
{
let mut stats = self.stats.lock().unwrap();
stats.alerts_last_hour += 1;
}
if self.config.enable_structured_logging {
Self::log_alert(alert);
}
}
#[allow(unused_variables)]
fn log_trace_event(
event_type: &str,
trace_id: crate::observability::TraceId,
entity_id: Option<&str>,
) {
crate::tracing_compat::debug!(
event_type = event_type,
trace_id = trace_id.as_u64(),
entity_id = ?entity_id,
"cancellation trace event"
);
}
#[allow(unused_variables)]
fn log_alert(alert: &CancellationAlert) {
crate::tracing_compat::warn!(
alert_type = ?alert.alert_type,
severity = ?alert.severity,
entity_id = ?alert.entity_id,
metric_value = alert.metric_value,
threshold = alert.threshold,
triggered_at = ?alert.triggered_at,
message = %alert.message,
"cancellation alert"
);
}
fn maybe_cleanup_old_traces(&self) {
let mut last_cleanup = self.last_cleanup.lock().unwrap();
let now = std::time::SystemTime::now();
if now.duration_since(*last_cleanup).unwrap_or(Duration::ZERO) < Duration::from_secs(300) {
return;
}
*last_cleanup = now;
drop(last_cleanup);
let stats = self.get_real_time_stats();
if stats.memory_usage_percentage > 80.0 {
self.trigger_alert(&CancellationAlert {
alert_type: AlertType::ResourceLeakRisk,
severity: AlertSeverity::Warning,
message: "High memory usage detected - cleaned up old traces".to_string(),
entity_id: None,
metric_value: stats.memory_usage_percentage,
threshold: 80.0,
triggered_at: now,
remediation_suggestions: vec![
"Consider reducing trace retention duration".to_string(),
"Monitor for memory leaks in cancellation handling".to_string(),
],
});
}
}
}
pub struct LabRuntimeIntegration {
analyzer: StructuredCancellationAnalyzer,
deterministic_time: Arc<Mutex<Time>>,
}
impl LabRuntimeIntegration {
#[must_use]
pub fn new(config: StructuredCancellationConfig) -> Self {
Self {
analyzer: StructuredCancellationAnalyzer::new(config),
deterministic_time: Arc::new(Mutex::new(Time::ZERO)),
}
}
pub fn advance_time(&self, delta: Duration) {
let mut time = self.deterministic_time.lock().unwrap();
*time = *time + delta;
}
pub fn analyzer(&self) -> &StructuredCancellationAnalyzer {
&self.analyzer
}
pub fn run_scenario<F>(&self, scenario: F) -> PerformanceAnalysis
where
F: FnOnce(&StructuredCancellationAnalyzer),
{
scenario(&self.analyzer);
self.analyzer.analyze_performance()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::types::{CancelKind, CancelReason};
#[test]
fn test_analyzer_creation() {
let config = StructuredCancellationConfig::default();
let analyzer = StructuredCancellationAnalyzer::new(config);
let stats = analyzer.get_real_time_stats();
assert_eq!(stats.active_traces, 0);
}
#[test]
fn test_trace_lifecycle_integration() {
let analyzer = StructuredCancellationAnalyzer::default();
let trace_id = analyzer.start_trace(
"test-task".to_string(),
crate::observability::EntityType::Task,
&CancelReason::user("test"),
CancelKind::User,
);
analyzer.record_step(
trace_id,
"child-region".to_string(),
crate::observability::EntityType::Region,
&CancelReason::user("propagation"),
CancelKind::User,
"Closing".to_string(),
Some("test-task".to_string()),
true,
);
analyzer.complete_trace(trace_id);
let stats = analyzer.get_tracer_stats();
assert_eq!(stats.traces_collected, 1);
let dashboard = analyzer.get_dashboard();
assert_eq!(dashboard.completed_traces_period, 1);
}
#[test]
fn test_lab_runtime_integration() {
let config = StructuredCancellationConfig::default();
let lab_integration = LabRuntimeIntegration::new(config);
let analysis = lab_integration.run_scenario(|analyzer| {
let trace_id = analyzer.start_trace(
"scenario-task".to_string(),
crate::observability::EntityType::Task,
&CancelReason::user("scenario"),
CancelKind::User,
);
analyzer.complete_trace(trace_id);
});
assert_eq!(analysis.traces_analyzed, 1);
}
}