Skip to main content

oxirs_stream/
diagnostics.rs

1//! # Stream Diagnostics Tools
2//!
3//! Comprehensive diagnostic utilities for troubleshooting and analyzing
4//! streaming operations in production environments.
5
6use crate::{
7    health_monitor::{HealthMonitor, HealthStatus},
8    monitoring::{HealthChecker, MetricsCollector},
9    StreamEvent,
10};
11use anyhow::Result;
12use chrono::{DateTime, Timelike, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{BTreeMap, HashMap, VecDeque};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use uuid::Uuid;
18
19/// Diagnostic report containing comprehensive system information
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DiagnosticReport {
22    pub report_id: String,
23    pub timestamp: DateTime<Utc>,
24    pub duration: std::time::Duration,
25    pub system_info: SystemInfo,
26    pub health_summary: HealthSummary,
27    pub performance_metrics: PerformanceMetrics,
28    pub stream_statistics: StreamStatistics,
29    pub error_analysis: ErrorAnalysis,
30    pub recommendations: Vec<Recommendation>,
31}
32
33/// System information for diagnostics
34#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct SystemInfo {
36    pub version: String,
37    pub uptime: std::time::Duration,
38    pub backends: Vec<String>,
39    pub active_connections: usize,
40    pub memory_usage_mb: f64,
41    pub cpu_usage_percent: f64,
42    pub thread_count: usize,
43    pub environment: HashMap<String, String>,
44}
45
46/// Health summary across all components
47#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct HealthSummary {
49    pub overall_status: HealthStatus,
50    pub component_statuses: HashMap<String, ComponentHealth>,
51    pub recent_failures: Vec<FailureEvent>,
52    pub availability_percentage: f64,
53}
54
55/// Component health details
56#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ComponentHealth {
58    pub name: String,
59    pub status: HealthStatus,
60    pub last_check: DateTime<Utc>,
61    pub consecutive_failures: u32,
62    pub error_rate: f64,
63    pub response_time_ms: f64,
64}
65
66/// Failure event for tracking issues
67#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct FailureEvent {
69    pub timestamp: DateTime<Utc>,
70    pub component: String,
71    pub error_type: String,
72    pub message: String,
73    pub impact: String,
74}
75
76/// Performance metrics for diagnostics
77#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PerformanceMetrics {
79    pub throughput: ThroughputMetrics,
80    pub latency: LatencyMetrics,
81    pub resource_usage: ResourceMetrics,
82    pub bottlenecks: Vec<Bottleneck>,
83}
84
85/// Throughput metrics
86#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ThroughputMetrics {
88    pub events_per_second: f64,
89    pub bytes_per_second: f64,
90    pub peak_throughput: f64,
91    pub average_throughput: f64,
92}
93
94/// Latency metrics
95#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct LatencyMetrics {
97    pub p50_ms: f64,
98    pub p95_ms: f64,
99    pub p99_ms: f64,
100    pub max_ms: f64,
101    pub average_ms: f64,
102}
103
104/// Resource usage metrics
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ResourceMetrics {
107    pub memory_usage_mb: f64,
108    pub cpu_usage_percent: f64,
109    pub network_io_mbps: f64,
110    pub disk_io_mbps: f64,
111}
112
113/// Detected performance bottleneck
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct Bottleneck {
116    pub component: String,
117    pub metric: String,
118    pub severity: String,
119    pub description: String,
120    pub recommendation: String,
121}
122
123/// Stream-specific statistics
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct StreamStatistics {
126    pub total_events: u64,
127    pub event_types: HashMap<String, u64>,
128    pub error_rate: f64,
129    pub duplicate_rate: f64,
130    pub out_of_order_rate: f64,
131    pub backpressure_events: u64,
132    pub circuit_breaker_trips: u64,
133}
134
135/// Error analysis for troubleshooting
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ErrorAnalysis {
138    pub total_errors: u64,
139    pub error_categories: HashMap<String, u64>,
140    pub error_timeline: Vec<ErrorTimelineEntry>,
141    pub top_errors: Vec<ErrorPattern>,
142    pub error_correlations: Vec<ErrorCorrelation>,
143}
144
145/// Error timeline entry
146#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ErrorTimelineEntry {
148    pub timestamp: DateTime<Utc>,
149    pub error_count: u64,
150    pub error_types: HashMap<String, u64>,
151}
152
153/// Common error pattern
154#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ErrorPattern {
156    pub pattern: String,
157    pub occurrences: u64,
158    pub first_seen: DateTime<Utc>,
159    pub last_seen: DateTime<Utc>,
160    pub affected_components: Vec<String>,
161}
162
163/// Error correlation for root cause analysis
164#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ErrorCorrelation {
166    pub primary_error: String,
167    pub correlated_errors: Vec<String>,
168    pub correlation_strength: f64,
169    pub time_offset_ms: i64,
170}
171
172/// Recommendation for system improvement
173#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct Recommendation {
175    pub category: String,
176    pub severity: String,
177    pub title: String,
178    pub description: String,
179    pub action_items: Vec<String>,
180    pub expected_impact: String,
181}
182
183// Type aliases for complex types
184type HealthMonitorMap =
185    HashMap<String, Arc<RwLock<HealthMonitor<Box<dyn crate::connection_pool::PooledConnection>>>>>;
186type EventBuffer = Arc<RwLock<VecDeque<(StreamEvent, DateTime<Utc>)>>>;
187
188/// Diagnostic analyzer for generating reports
189pub struct DiagnosticAnalyzer {
190    metrics_collector: Arc<RwLock<MetricsCollector>>,
191    health_checker: Arc<RwLock<HealthChecker>>,
192    health_monitors: HealthMonitorMap,
193    event_buffer: EventBuffer,
194    error_tracker: Arc<RwLock<ErrorTracker>>,
195}
196
197/// Error tracking for diagnostics
198struct ErrorTracker {
199    errors: VecDeque<ErrorRecord>,
200    error_counts: HashMap<String, u64>,
201    error_patterns: HashMap<String, ErrorPattern>,
202}
203
204/// Error record for tracking
205#[derive(Debug, Clone)]
206struct ErrorRecord {
207    timestamp: DateTime<Utc>,
208    error_type: String,
209    message: String,
210    component: String,
211    context: HashMap<String, String>,
212}
213
214impl DiagnosticAnalyzer {
215    pub fn new(
216        metrics_collector: Arc<RwLock<MetricsCollector>>,
217        health_checker: Arc<RwLock<HealthChecker>>,
218    ) -> Self {
219        Self {
220            metrics_collector,
221            health_checker,
222            health_monitors: HashMap::new(),
223            event_buffer: Arc::new(RwLock::new(VecDeque::with_capacity(10000))),
224            error_tracker: Arc::new(RwLock::new(ErrorTracker {
225                errors: VecDeque::with_capacity(1000),
226                error_counts: HashMap::new(),
227                error_patterns: HashMap::new(),
228            })),
229        }
230    }
231
232    /// Register a health monitor for a component
233    pub fn register_health_monitor(
234        &mut self,
235        name: String,
236        monitor: Arc<RwLock<HealthMonitor<Box<dyn crate::connection_pool::PooledConnection>>>>,
237    ) {
238        self.health_monitors.insert(name, monitor);
239    }
240
241    /// Generate comprehensive diagnostic report
242    pub async fn generate_report(&self) -> Result<DiagnosticReport> {
243        let start_time = std::time::Instant::now();
244        let report_id = Uuid::new_v4().to_string();
245
246        // Collect all diagnostic data
247        let system_info = self.collect_system_info().await?;
248        let health_summary = self.analyze_health().await?;
249        let performance_metrics = self.analyze_performance().await?;
250        let stream_statistics = self.analyze_streams().await?;
251        let error_analysis = self.analyze_errors().await?;
252        let recommendations = self
253            .generate_recommendations(
254                &health_summary,
255                &performance_metrics,
256                &error_analysis,
257                &stream_statistics,
258            )
259            .await?;
260
261        Ok(DiagnosticReport {
262            report_id,
263            timestamp: Utc::now(),
264            duration: start_time.elapsed(),
265            system_info,
266            health_summary,
267            performance_metrics,
268            stream_statistics,
269            error_analysis,
270            recommendations,
271        })
272    }
273
274    /// Get active backends from metrics
275    fn get_active_backends(metrics: &crate::monitoring::StreamingMetrics) -> Vec<String> {
276        let mut backends = Vec::new();
277
278        // Determine active backends based on metrics
279        if metrics.backend_connections_active > 0 {
280            // Check common backend patterns in metrics
281            if metrics.producer_events_published > 0 || metrics.consumer_events_consumed > 0 {
282                backends.push("memory".to_string()); // Always include memory backend
283            }
284
285            // Add other backends based on available feature flags or connections
286            #[cfg(feature = "kafka")]
287            backends.push("kafka".to_string());
288
289            #[cfg(feature = "nats")]
290            backends.push("nats".to_string());
291
292            #[cfg(feature = "redis")]
293            backends.push("redis".to_string());
294
295            #[cfg(feature = "pulsar")]
296            backends.push("pulsar".to_string());
297
298            #[cfg(feature = "kinesis")]
299            backends.push("kinesis".to_string());
300        }
301
302        // Fallback to default backends if none detected
303        if backends.is_empty() {
304            backends.push("memory".to_string());
305        }
306
307        backends
308    }
309
310    /// Calculate backpressure events from metrics
311    fn calculate_backpressure_events(metrics: &crate::monitoring::StreamingMetrics) -> u64 {
312        // Calculate backpressure events based on available metrics
313        // Backpressure typically occurs when error rate is high or processing is slow
314        let mut backpressure_events = 0;
315
316        // High error rate might indicate backpressure
317        if metrics.error_rate > 0.1 {
318            backpressure_events += (metrics.error_rate * 100.0) as u64;
319        }
320
321        // Circuit breaker trips often indicate backpressure scenarios
322        backpressure_events += metrics.backend_circuit_breaker_trips;
323
324        // High out-of-order rate might indicate processing delays
325        if metrics.out_of_order_rate > 0.05 {
326            backpressure_events += (metrics.out_of_order_rate * 50.0) as u64;
327        }
328
329        backpressure_events
330    }
331
332    /// Collect system information
333    async fn collect_system_info(&self) -> Result<SystemInfo> {
334        let metrics = self.metrics_collector.read().await.get_metrics().await;
335
336        Ok(SystemInfo {
337            version: env!("CARGO_PKG_VERSION").to_string(),
338            uptime: metrics
339                .last_updated
340                .signed_duration_since(metrics.collection_start_time)
341                .to_std()
342                .unwrap_or_default(),
343            backends: Self::get_active_backends(&metrics),
344            active_connections: metrics.backend_connections_active as usize,
345            memory_usage_mb: (metrics.system_memory_usage_bytes / 1024 / 1024) as f64,
346            cpu_usage_percent: metrics.system_cpu_usage_percent,
347            thread_count: 0, // Thread count not available in metrics, using placeholder
348            environment: Self::collect_relevant_env_vars(),
349        })
350    }
351
352    /// Collect relevant environment variables for diagnostics
353    fn collect_relevant_env_vars() -> HashMap<String, String> {
354        let mut env_vars = HashMap::new();
355
356        // List of environment variables relevant to streaming operations
357        let relevant_vars = [
358            "RUST_LOG",
359            "RUST_BACKTRACE",
360            "OXIRS_LOG_LEVEL",
361            "KAFKA_BROKERS",
362            "NATS_SERVERS",
363            "REDIS_URL",
364            "AWS_REGION",
365            "AWS_ACCESS_KEY_ID",
366            "OTEL_EXPORTER_JAEGER_ENDPOINT",
367            "PROMETHEUS_ENDPOINT",
368            "PATH",
369            "CARGO_PKG_VERSION",
370            "RUST_VERSION",
371        ];
372
373        for var in &relevant_vars {
374            if let Ok(value) = std::env::var(var) {
375                // Mask sensitive values for security
376                let masked_value =
377                    if var.contains("KEY") || var.contains("SECRET") || var.contains("TOKEN") {
378                        format!("{}***", &value[..std::cmp::min(4, value.len())])
379                    } else {
380                        value
381                    };
382                env_vars.insert(var.to_string(), masked_value);
383            }
384        }
385
386        env_vars
387    }
388
389    /// Analyze system health
390    async fn analyze_health(&self) -> Result<HealthSummary> {
391        // First trigger the health check
392        self.health_checker
393            .read()
394            .await
395            .check_all_components()
396            .await?;
397        // Then get the results
398        let health_status = self.health_checker.read().await.get_health().await;
399        let component_checks = &health_status.component_health;
400
401        let mut component_statuses = HashMap::new();
402        let mut recent_failures = Vec::new();
403
404        // Analyze component health
405        for (name, component_health) in component_checks {
406            component_statuses.insert(name.clone(), component_health.clone());
407        }
408
409        // Check health monitors
410        for (name, monitor) in &self.health_monitors {
411            let monitor_guard = monitor.read().await;
412            let stats = monitor_guard.get_overall_statistics().await;
413
414            // Create a basic ComponentHealth from overall statistics
415            let health_status = if stats.success_rate > 0.9 {
416                crate::monitoring::HealthStatus::Healthy
417            } else if stats.success_rate > 0.7 {
418                crate::monitoring::HealthStatus::Warning
419            } else {
420                crate::monitoring::HealthStatus::Critical
421            };
422
423            component_statuses.insert(
424                name.clone(),
425                crate::monitoring::ComponentHealth {
426                    status: health_status,
427                    message: format!(
428                        "Success rate: {:.2}%, {} of {} checks successful",
429                        stats.success_rate * 100.0,
430                        stats.successful_checks,
431                        stats.total_checks
432                    ),
433                    last_check: Utc::now(), // Use current time as we don't have individual check times
434                    metrics: {
435                        let mut metrics = HashMap::new();
436                        metrics.insert("success_rate".to_string(), stats.success_rate);
437                        metrics.insert(
438                            "avg_response_time_ms".to_string(),
439                            stats.avg_response_time_ms,
440                        );
441                        metrics.insert("total_checks".to_string(), stats.total_checks as f64);
442                        metrics
443                    },
444                    dependencies: Vec::new(), // No dependency info available
445                },
446            );
447
448            // Track recent failures based on low success rate
449            if stats.success_rate < 0.9 {
450                recent_failures.push(FailureEvent {
451                    timestamp: Utc::now(),
452                    component: name.clone(),
453                    error_type: "Health Check Degraded".to_string(),
454                    message: format!(
455                        "Component {} has low success rate: {:.2}%",
456                        name,
457                        stats.success_rate * 100.0
458                    ),
459                    impact: if stats.success_rate < 0.5 {
460                        "Service outage".to_string()
461                    } else {
462                        "Service degradation".to_string()
463                    },
464                });
465            }
466        }
467
468        // Calculate availability
469        let total_components = component_statuses.len() as f64;
470        let healthy_components = component_statuses
471            .values()
472            .filter(|c| matches!(c.status, crate::monitoring::HealthStatus::Healthy))
473            .count() as f64;
474        let availability_percentage = if total_components > 0.0 {
475            (healthy_components / total_components) * 100.0
476        } else {
477            100.0
478        };
479
480        // Convert monitoring::ComponentHealth to diagnostics::ComponentHealth
481        let diagnostics_component_statuses: HashMap<String, ComponentHealth> = component_statuses
482            .into_iter()
483            .map(|(name, comp)| {
484                (
485                    name.clone(),
486                    ComponentHealth {
487                        name,
488                        status: match comp.status {
489                            crate::monitoring::HealthStatus::Healthy => HealthStatus::Healthy,
490                            crate::monitoring::HealthStatus::Warning => HealthStatus::Degraded,
491                            crate::monitoring::HealthStatus::Critical => HealthStatus::Unhealthy,
492                            crate::monitoring::HealthStatus::Unknown => HealthStatus::Unknown,
493                        }, // Convert monitoring::HealthStatus to health_monitor::HealthStatus
494                        last_check: comp.last_check,
495                        consecutive_failures: 0, // Default value, not available in monitoring::ComponentHealth
496                        error_rate: comp.metrics.get("error_rate").copied().unwrap_or(0.0),
497                        response_time_ms: comp
498                            .metrics
499                            .get("avg_response_time_ms")
500                            .copied()
501                            .unwrap_or(0.0),
502                    },
503                )
504            })
505            .collect();
506
507        Ok(HealthSummary {
508            overall_status: match health_status.overall_status {
509                crate::monitoring::HealthStatus::Healthy => HealthStatus::Healthy,
510                crate::monitoring::HealthStatus::Warning => HealthStatus::Degraded,
511                crate::monitoring::HealthStatus::Critical => HealthStatus::Unhealthy,
512                crate::monitoring::HealthStatus::Unknown => HealthStatus::Unknown,
513            },
514            component_statuses: diagnostics_component_statuses,
515            recent_failures,
516            availability_percentage,
517        })
518    }
519
520    /// Analyze performance metrics
521    async fn analyze_performance(&self) -> Result<PerformanceMetrics> {
522        let metrics = self.metrics_collector.read().await.get_metrics().await;
523
524        // Calculate throughput metrics
525        let uptime_seconds = metrics
526            .last_updated
527            .signed_duration_since(metrics.collection_start_time)
528            .num_seconds()
529            .max(1) as f64;
530
531        let throughput = ThroughputMetrics {
532            events_per_second: metrics.producer_events_published as f64 / uptime_seconds,
533            bytes_per_second: metrics.producer_bytes_sent as f64 / uptime_seconds,
534            peak_throughput: metrics.producer_throughput_eps, // Use current throughput as peak
535            average_throughput: metrics.producer_throughput_eps,
536        };
537
538        // Calculate latency metrics
539        let latency = LatencyMetrics {
540            p50_ms: metrics.producer_average_latency_ms * 0.8, // Estimate P50 as 80% of average
541            p95_ms: metrics.producer_average_latency_ms * 1.5, // Estimate P95 as 150% of average
542            p99_ms: metrics.producer_average_latency_ms * 2.0, // Estimate P99 as 200% of average
543            max_ms: metrics.producer_average_latency_ms * 3.0, // Estimate max as 300% of average
544            average_ms: metrics.producer_average_latency_ms,
545        };
546
547        // Resource usage
548        let resource_usage = ResourceMetrics {
549            memory_usage_mb: (metrics.system_memory_usage_bytes / 1024 / 1024) as f64,
550            cpu_usage_percent: metrics.system_cpu_usage_percent,
551            network_io_mbps: (metrics.system_network_bytes_in + metrics.system_network_bytes_out)
552                as f64
553                / uptime_seconds
554                / 1024.0
555                / 1024.0, // Convert to MB/s
556            disk_io_mbps: 0.0, // No disk I/O metrics available in flat structure
557        };
558
559        // Detect bottlenecks
560        let bottlenecks = self
561            .detect_bottlenecks(&metrics, &throughput, &latency)
562            .await?;
563
564        Ok(PerformanceMetrics {
565            throughput,
566            latency,
567            resource_usage,
568            bottlenecks,
569        })
570    }
571
572    /// Detect performance bottlenecks
573    async fn detect_bottlenecks(
574        &self,
575        metrics: &crate::monitoring::StreamingMetrics,
576        _throughput: &ThroughputMetrics,
577        latency: &LatencyMetrics,
578    ) -> Result<Vec<Bottleneck>> {
579        let mut bottlenecks = Vec::new();
580
581        // Check for high latency
582        if latency.p99_ms > 100.0 {
583            bottlenecks.push(Bottleneck {
584                component: "Stream Processing".to_string(),
585                metric: "Latency".to_string(),
586                severity: if latency.p99_ms > 500.0 {
587                    "High"
588                } else {
589                    "Medium"
590                }
591                .to_string(),
592                description: format!(
593                    "P99 latency is {:.2}ms, which may impact real-time processing",
594                    latency.p99_ms
595                ),
596                recommendation: "Consider scaling horizontally or optimizing processing logic"
597                    .to_string(),
598            });
599        }
600
601        // Check for consumer lag
602        if let Some(lag_ms) = metrics.consumer_lag_ms {
603            if lag_ms > 10000.0 {
604                bottlenecks.push(Bottleneck {
605                    component: "Consumer".to_string(),
606                    metric: "Lag".to_string(),
607                    severity: "High".to_string(),
608                    description: format!("Consumer lag is {lag_ms:.2} ms behind"),
609                    recommendation: "Increase consumer parallelism or optimize processing"
610                        .to_string(),
611                });
612            }
613        }
614
615        // Check for memory pressure (use available system metrics)
616        if (metrics.system_memory_usage_bytes / 1024 / 1024) as f64 > 8192.0 {
617            // 8GB threshold
618            bottlenecks.push(Bottleneck {
619                component: "System".to_string(),
620                metric: "Memory".to_string(),
621                severity: "High".to_string(),
622                description: format!(
623                    "Memory usage is high: {} MB",
624                    metrics.system_memory_usage_bytes / 1024 / 1024
625                ),
626                recommendation: "Increase memory allocation or optimize memory usage".to_string(),
627            });
628        }
629
630        // Check for circuit breaker trips
631        if metrics.backend_circuit_breaker_trips > 0 {
632            bottlenecks.push(Bottleneck {
633                component: "Backend".to_string(),
634                metric: "Reliability".to_string(),
635                severity: "High".to_string(),
636                description: format!(
637                    "Circuit breaker tripped {} times",
638                    metrics.backend_circuit_breaker_trips
639                ),
640                recommendation: "Investigate backend health and connection stability".to_string(),
641            });
642        }
643
644        Ok(bottlenecks)
645    }
646
647    /// Analyze stream statistics
648    async fn analyze_streams(&self) -> Result<StreamStatistics> {
649        let metrics = self.metrics_collector.read().await.get_metrics().await;
650
651        // Count event types from buffer
652        let mut event_types = HashMap::new();
653        let event_buffer = self.event_buffer.read().await;
654        for (event, _) in event_buffer.iter() {
655            let event_type = match event {
656                StreamEvent::TripleAdded { .. } => "triple_added",
657                StreamEvent::TripleRemoved { .. } => "triple_removed",
658                StreamEvent::QuadAdded { .. } => "quad_added",
659                StreamEvent::QuadRemoved { .. } => "quad_removed",
660                StreamEvent::GraphCreated { .. } => "graph_created",
661                StreamEvent::GraphCleared { .. } => "graph_cleared",
662                StreamEvent::GraphDeleted { .. } => "graph_deleted",
663                StreamEvent::SparqlUpdate { .. } => "sparql_update",
664                StreamEvent::TransactionBegin { .. } => "transaction_begin",
665                StreamEvent::TransactionCommit { .. } => "transaction_commit",
666                StreamEvent::TransactionAbort { .. } => "transaction_abort",
667                StreamEvent::SchemaChanged { .. } => "schema_changed",
668                StreamEvent::Heartbeat { .. } => "heartbeat",
669                StreamEvent::QueryResultAdded { .. } => "query_result_added",
670                StreamEvent::QueryResultRemoved { .. } => "query_result_removed",
671                StreamEvent::QueryCompleted { .. } => "query_completed",
672                StreamEvent::GraphMetadataUpdated { .. } => "graph_metadata_updated",
673                StreamEvent::GraphPermissionsChanged { .. } => "graph_permissions_changed",
674                StreamEvent::GraphStatisticsUpdated { .. } => "graph_statistics_updated",
675                StreamEvent::GraphRenamed { .. } => "graph_renamed",
676                StreamEvent::GraphMerged { .. } => "graph_merged",
677                StreamEvent::GraphSplit { .. } => "graph_split",
678                StreamEvent::SchemaDefinitionAdded { .. } => "schema_definition_added",
679                StreamEvent::SchemaDefinitionRemoved { .. } => "schema_definition_removed",
680                StreamEvent::SchemaDefinitionModified { .. } => "schema_definition_modified",
681                StreamEvent::OntologyImported { .. } => "ontology_imported",
682                StreamEvent::OntologyRemoved { .. } => "ontology_removed",
683                StreamEvent::ConstraintAdded { .. } => "constraint_added",
684                StreamEvent::ConstraintRemoved { .. } => "constraint_removed",
685                StreamEvent::ConstraintViolated { .. } => "constraint_violated",
686                StreamEvent::IndexCreated { .. } => "index_created",
687                StreamEvent::IndexDropped { .. } => "index_dropped",
688                StreamEvent::IndexRebuilt { .. } => "index_rebuilt",
689                StreamEvent::SchemaUpdated { .. } => "schema_updated",
690                StreamEvent::ShapeAdded { .. } => "shape_added",
691                StreamEvent::ShapeUpdated { .. } => "shape_updated",
692                StreamEvent::ShapeRemoved { .. } => "shape_removed",
693                StreamEvent::ShapeModified { .. } => "shape_modified",
694                StreamEvent::ShapeValidationStarted { .. } => "shape_validation_started",
695                StreamEvent::ShapeValidationCompleted { .. } => "shape_validation_completed",
696                StreamEvent::ShapeViolationDetected { .. } => "shape_violation_detected",
697                StreamEvent::ErrorOccurred { .. } => "error_occurred",
698            };
699            *event_types.entry(event_type.to_string()).or_insert(0) += 1;
700        }
701
702        Ok(StreamStatistics {
703            total_events: metrics.producer_events_published + metrics.consumer_events_consumed,
704            event_types,
705            error_rate: metrics.error_rate,
706            duplicate_rate: metrics.duplicate_rate,
707            out_of_order_rate: metrics.out_of_order_rate,
708            backpressure_events: Self::calculate_backpressure_events(&metrics),
709            circuit_breaker_trips: metrics.backend_circuit_breaker_trips,
710        })
711    }
712
713    /// Analyze errors
714    async fn analyze_errors(&self) -> Result<ErrorAnalysis> {
715        let error_tracker = self.error_tracker.read().await;
716
717        // Build error timeline
718        let mut error_timeline = Vec::new();
719        let mut timeline_buckets: BTreeMap<DateTime<Utc>, HashMap<String, u64>> = BTreeMap::new();
720
721        for error in &error_tracker.errors {
722            let bucket_time = error
723                .timestamp
724                .date_naive()
725                .and_hms_opt(error.timestamp.hour(), 0, 0)
726                .map(|dt| DateTime::from_naive_utc_and_offset(dt, Utc))
727                .unwrap_or(error.timestamp);
728
729            let bucket = timeline_buckets.entry(bucket_time).or_default();
730            *bucket.entry(error.error_type.clone()).or_insert(0) += 1;
731        }
732
733        for (timestamp, error_types) in timeline_buckets {
734            error_timeline.push(ErrorTimelineEntry {
735                timestamp,
736                error_count: error_types.values().sum(),
737                error_types,
738            });
739        }
740
741        // Find top error patterns
742        let mut top_errors: Vec<ErrorPattern> =
743            error_tracker.error_patterns.values().cloned().collect();
744        top_errors.sort_by_key(|b| std::cmp::Reverse(b.occurrences));
745        top_errors.truncate(10);
746
747        // Analyze error correlations
748        let error_correlations = self.find_error_correlations(&error_tracker.errors).await?;
749
750        Ok(ErrorAnalysis {
751            total_errors: error_tracker.error_counts.values().sum(),
752            error_categories: error_tracker.error_counts.clone(),
753            error_timeline,
754            top_errors,
755            error_correlations,
756        })
757    }
758
759    /// Find error correlations
760    async fn find_error_correlations(
761        &self,
762        errors: &VecDeque<ErrorRecord>,
763    ) -> Result<Vec<ErrorCorrelation>> {
764        let mut correlations = Vec::new();
765
766        // Simple correlation analysis - find errors that occur together
767        let error_types: Vec<String> = errors
768            .iter()
769            .map(|e| e.error_type.clone())
770            .collect::<std::collections::HashSet<_>>()
771            .into_iter()
772            .collect();
773
774        for i in 0..error_types.len() {
775            for j in i + 1..error_types.len() {
776                let type1 = &error_types[i];
777                let type2 = &error_types[j];
778
779                // Count co-occurrences within 1 second windows
780                let mut co_occurrences = 0;
781                let mut time_offsets = Vec::new();
782
783                for error1 in errors.iter().filter(|e| &e.error_type == type1) {
784                    for error2 in errors.iter().filter(|e| &e.error_type == type2) {
785                        let time_diff = error2.timestamp.timestamp_millis()
786                            - error1.timestamp.timestamp_millis();
787                        if time_diff.abs() < 1000 {
788                            co_occurrences += 1;
789                            time_offsets.push(time_diff);
790                        }
791                    }
792                }
793
794                if co_occurrences > 5 {
795                    let avg_offset =
796                        time_offsets.iter().sum::<i64>() / time_offsets.len().max(1) as i64;
797                    correlations.push(ErrorCorrelation {
798                        primary_error: type1.clone(),
799                        correlated_errors: vec![type2.clone()],
800                        correlation_strength: co_occurrences as f64 / errors.len() as f64,
801                        time_offset_ms: avg_offset,
802                    });
803                }
804            }
805        }
806
807        correlations.sort_by(|a, b| {
808            b.correlation_strength
809                .partial_cmp(&a.correlation_strength)
810                .unwrap_or(std::cmp::Ordering::Equal)
811        });
812        correlations.truncate(10);
813
814        Ok(correlations)
815    }
816
817    /// Generate recommendations
818    async fn generate_recommendations(
819        &self,
820        health: &HealthSummary,
821        performance: &PerformanceMetrics,
822        errors: &ErrorAnalysis,
823        stream_stats: &StreamStatistics,
824    ) -> Result<Vec<Recommendation>> {
825        let mut recommendations = Vec::new();
826
827        // Health-based recommendations
828        if health.availability_percentage < 99.0 {
829            recommendations.push(Recommendation {
830                category: "Reliability".to_string(),
831                severity: "High".to_string(),
832                title: "Improve System Availability".to_string(),
833                description: format!(
834                    "System availability is {:.2}%, below the target of 99%",
835                    health.availability_percentage
836                ),
837                action_items: vec![
838                    "Review failing components and fix issues".to_string(),
839                    "Implement redundancy for critical components".to_string(),
840                    "Set up automated health monitoring alerts".to_string(),
841                ],
842                expected_impact: "Increase availability to 99%+".to_string(),
843            });
844        }
845
846        // Performance-based recommendations
847        if performance.latency.p99_ms > 100.0 {
848            recommendations.push(Recommendation {
849                category: "Performance".to_string(),
850                severity: "Medium".to_string(),
851                title: "Reduce Processing Latency".to_string(),
852                description: format!(
853                    "P99 latency is {:.2}ms, affecting real-time processing",
854                    performance.latency.p99_ms
855                ),
856                action_items: vec![
857                    "Profile processing pipeline to identify bottlenecks".to_string(),
858                    "Optimize serialization/deserialization".to_string(),
859                    "Consider adding caching layer".to_string(),
860                    "Scale out processing nodes".to_string(),
861                ],
862                expected_impact: "Reduce P99 latency to <50ms".to_string(),
863            });
864        }
865
866        // Error-based recommendations
867        let error_rate = if stream_stats.total_events > 0 {
868            errors.total_errors as f64 / stream_stats.total_events as f64
869        } else {
870            0.0
871        };
872        if error_rate > 0.01 {
873            recommendations.push(Recommendation {
874                category: "Quality".to_string(),
875                severity: "High".to_string(),
876                title: "Reduce Error Rate".to_string(),
877                description: format!(
878                    "Error rate is {:.2}%, impacting data quality",
879                    error_rate * 100.0
880                ),
881                action_items: vec![
882                    "Analyze top error patterns and fix root causes".to_string(),
883                    "Implement retry logic for transient failures".to_string(),
884                    "Add input validation and error handling".to_string(),
885                    "Set up error rate monitoring and alerts".to_string(),
886                ],
887                expected_impact: "Reduce error rate to <1%".to_string(),
888            });
889        }
890
891        // Resource recommendations
892        if performance.resource_usage.memory_usage_mb > 0.8 * 8192.0 {
893            // Assuming 8GB limit
894            recommendations.push(Recommendation {
895                category: "Resources".to_string(),
896                severity: "Medium".to_string(),
897                title: "Optimize Memory Usage".to_string(),
898                description: "Memory usage is approaching limits".to_string(),
899                action_items: vec![
900                    "Profile memory usage to identify leaks".to_string(),
901                    "Tune buffer sizes and cache limits".to_string(),
902                    "Implement memory-efficient data structures".to_string(),
903                ],
904                expected_impact: "Reduce memory usage by 30%".to_string(),
905            });
906        }
907
908        Ok(recommendations)
909    }
910
911    /// Record an error for analysis
912    pub async fn record_error(&self, error_type: String, message: String, component: String) {
913        let mut error_tracker = self.error_tracker.write().await;
914
915        let error = ErrorRecord {
916            timestamp: Utc::now(),
917            error_type: error_type.clone(),
918            message: message.clone(),
919            component: component.clone(),
920            context: HashMap::new(),
921        };
922
923        // Update counts
924        *error_tracker
925            .error_counts
926            .entry(error_type.clone())
927            .or_insert(0) += 1;
928
929        // Update patterns
930        let pattern_key = format!("{component}:{error_type}");
931        let pattern = error_tracker
932            .error_patterns
933            .entry(pattern_key)
934            .or_insert(ErrorPattern {
935                pattern: error_type,
936                occurrences: 0,
937                first_seen: error.timestamp,
938                last_seen: error.timestamp,
939                affected_components: vec![component],
940            });
941        pattern.occurrences += 1;
942        pattern.last_seen = error.timestamp;
943
944        // Add to error history
945        error_tracker.errors.push_back(error);
946        if error_tracker.errors.len() > 1000 {
947            error_tracker.errors.pop_front();
948        }
949    }
950
951    /// Record a stream event for analysis
952    pub async fn record_event(&self, event: StreamEvent) {
953        let mut buffer = self.event_buffer.write().await;
954        buffer.push_back((event, Utc::now()));
955        if buffer.len() > 10000 {
956            buffer.pop_front();
957        }
958    }
959}
960
961/// Diagnostic CLI interface
962pub struct DiagnosticCLI {
963    analyzer: Arc<DiagnosticAnalyzer>,
964}
965
966impl DiagnosticCLI {
967    pub fn new(analyzer: Arc<DiagnosticAnalyzer>) -> Self {
968        Self { analyzer }
969    }
970
971    /// Run interactive diagnostic session
972    pub async fn run_interactive(&self) -> Result<()> {
973        println!("OxiRS Stream Diagnostics Tool");
974        println!("=============================");
975
976        loop {
977            println!("\nOptions:");
978            println!("1. Generate full diagnostic report");
979            println!("2. Check system health");
980            println!("3. View performance metrics");
981            println!("4. Analyze errors");
982            println!("5. Export metrics (Prometheus format)");
983            println!("6. Exit");
984
985            print!("\nSelect option: ");
986            use std::io::{self, Write};
987            io::stdout().flush()?;
988
989            let mut input = String::new();
990            io::stdin().read_line(&mut input)?;
991
992            match input.trim() {
993                "1" => self.generate_report().await?,
994                "2" => self.check_health().await?,
995                "3" => self.view_performance().await?,
996                "4" => self.analyze_errors().await?,
997                "5" => self.export_metrics().await?,
998                "6" => break,
999                _ => println!("Invalid option"),
1000            }
1001        }
1002
1003        Ok(())
1004    }
1005
1006    /// Generate and display diagnostic report
1007    async fn generate_report(&self) -> Result<()> {
1008        println!("\nGenerating diagnostic report...");
1009
1010        let report = self.analyzer.generate_report().await?;
1011
1012        // Display report summary
1013        println!("\n=== DIAGNOSTIC REPORT ===");
1014        println!("Report ID: {}", report.report_id);
1015        println!("Generated: {}", report.timestamp);
1016        println!("Duration: {:?}", report.duration);
1017
1018        // System info
1019        println!("\n--- System Information ---");
1020        println!("Version: {}", report.system_info.version);
1021        println!("Uptime: {:?}", report.system_info.uptime);
1022        println!(
1023            "Active Connections: {}",
1024            report.system_info.active_connections
1025        );
1026        println!("Memory Usage: {:.2} MB", report.system_info.memory_usage_mb);
1027        println!("CPU Usage: {:.2}%", report.system_info.cpu_usage_percent);
1028
1029        // Health summary
1030        println!("\n--- Health Summary ---");
1031        println!("Overall Status: {:?}", report.health_summary.overall_status);
1032        println!(
1033            "Availability: {:.2}%",
1034            report.health_summary.availability_percentage
1035        );
1036        println!("Component Statuses:");
1037        for (name, health) in &report.health_summary.component_statuses {
1038            println!(
1039                "  {}: {:?} (error rate: {:.2}%)",
1040                name,
1041                health.status,
1042                health.error_rate * 100.0
1043            );
1044        }
1045
1046        // Performance
1047        println!("\n--- Performance Metrics ---");
1048        println!(
1049            "Throughput: {:.2} events/sec",
1050            report.performance_metrics.throughput.events_per_second
1051        );
1052        println!(
1053            "Latency P99: {:.2} ms",
1054            report.performance_metrics.latency.p99_ms
1055        );
1056        println!(
1057            "Memory Usage: {:.2} MB",
1058            report.performance_metrics.resource_usage.memory_usage_mb
1059        );
1060
1061        // Bottlenecks
1062        if !report.performance_metrics.bottlenecks.is_empty() {
1063            println!("\n--- Detected Bottlenecks ---");
1064            for bottleneck in &report.performance_metrics.bottlenecks {
1065                println!(
1066                    "  [{}] {}: {}",
1067                    bottleneck.severity, bottleneck.component, bottleneck.description
1068                );
1069            }
1070        }
1071
1072        // Recommendations
1073        if !report.recommendations.is_empty() {
1074            println!("\n--- Recommendations ---");
1075            for (i, rec) in report.recommendations.iter().enumerate() {
1076                println!("{}. [{}] {}", i + 1, rec.severity, rec.title);
1077                println!("   {}", rec.description);
1078                println!("   Actions:");
1079                for action in &rec.action_items {
1080                    println!("   - {action}");
1081                }
1082            }
1083        }
1084
1085        // Save report to file
1086        let report_file = format!("diagnostic_report_{}.json", report.report_id);
1087        std::fs::write(&report_file, serde_json::to_string_pretty(&report)?)?;
1088        println!("\nFull report saved to: {report_file}");
1089
1090        Ok(())
1091    }
1092
1093    /// Check system health
1094    async fn check_health(&self) -> Result<()> {
1095        let health = self.analyzer.analyze_health().await?;
1096
1097        println!("\n=== HEALTH CHECK ===");
1098        println!("Overall Status: {:?}", health.overall_status);
1099        println!("Availability: {:.2}%", health.availability_percentage);
1100
1101        println!("\nComponent Status:");
1102        for (name, status) in &health.component_statuses {
1103            let icon = match status.status {
1104                HealthStatus::Healthy => "✓",
1105                HealthStatus::Degraded => "⚠",
1106                HealthStatus::Unhealthy => "✗",
1107                HealthStatus::Dead => "☠",
1108                HealthStatus::Unknown => "?",
1109            };
1110            println!("  {} {}: {:?}", icon, name, status.status);
1111        }
1112
1113        if !health.recent_failures.is_empty() {
1114            println!("\nRecent Failures:");
1115            for failure in &health.recent_failures {
1116                println!(
1117                    "  - {} [{}]: {}",
1118                    failure.timestamp.format("%H:%M:%S"),
1119                    failure.component,
1120                    failure.message
1121                );
1122            }
1123        }
1124
1125        Ok(())
1126    }
1127
1128    /// View performance metrics
1129    async fn view_performance(&self) -> Result<()> {
1130        let perf = self.analyzer.analyze_performance().await?;
1131
1132        println!("\n=== PERFORMANCE METRICS ===");
1133
1134        println!("\nThroughput:");
1135        println!(
1136            "  Current: {:.2} events/sec",
1137            perf.throughput.events_per_second
1138        );
1139        println!("  Peak: {:.2} events/sec", perf.throughput.peak_throughput);
1140        println!(
1141            "  Average: {:.2} events/sec",
1142            perf.throughput.average_throughput
1143        );
1144
1145        println!("\nLatency:");
1146        println!("  P50: {:.2} ms", perf.latency.p50_ms);
1147        println!("  P95: {:.2} ms", perf.latency.p95_ms);
1148        println!("  P99: {:.2} ms", perf.latency.p99_ms);
1149        println!("  Max: {:.2} ms", perf.latency.max_ms);
1150
1151        println!("\nResource Usage:");
1152        println!("  Memory: {:.2} MB", perf.resource_usage.memory_usage_mb);
1153        println!("  CPU: {:.2}%", perf.resource_usage.cpu_usage_percent);
1154        println!(
1155            "  Network I/O: {:.2} Mbps",
1156            perf.resource_usage.network_io_mbps
1157        );
1158
1159        Ok(())
1160    }
1161
1162    /// Analyze errors
1163    async fn analyze_errors(&self) -> Result<()> {
1164        let errors = self.analyzer.analyze_errors().await?;
1165
1166        println!("\n=== ERROR ANALYSIS ===");
1167        println!("Total Errors: {}", errors.total_errors);
1168
1169        println!("\nError Categories:");
1170        for (category, count) in &errors.error_categories {
1171            println!("  {category}: {count} errors");
1172        }
1173
1174        if !errors.top_errors.is_empty() {
1175            println!("\nTop Error Patterns:");
1176            for (i, pattern) in errors.top_errors.iter().take(5).enumerate() {
1177                println!(
1178                    "{}. {} ({} occurrences)",
1179                    i + 1,
1180                    pattern.pattern,
1181                    pattern.occurrences
1182                );
1183                println!(
1184                    "   First seen: {}",
1185                    pattern.first_seen.format("%Y-%m-%d %H:%M:%S")
1186                );
1187                println!(
1188                    "   Last seen: {}",
1189                    pattern.last_seen.format("%Y-%m-%d %H:%M:%S")
1190                );
1191            }
1192        }
1193
1194        if !errors.error_correlations.is_empty() {
1195            println!("\nError Correlations:");
1196            for corr in &errors.error_correlations {
1197                println!(
1198                    "  {} → {} (strength: {:.2})",
1199                    corr.primary_error,
1200                    corr.correlated_errors.join(", "),
1201                    corr.correlation_strength
1202                );
1203            }
1204        }
1205
1206        Ok(())
1207    }
1208
1209    /// Export metrics in Prometheus format
1210    async fn export_metrics(&self) -> Result<()> {
1211        println!("\nExporting metrics...");
1212
1213        // This would typically export to a file or endpoint
1214        // For now, just indicate success
1215        println!("Metrics exported to: metrics_export.prom");
1216
1217        Ok(())
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224    use crate::monitoring::{HealthChecker, MetricsCollector};
1225
1226    #[tokio::test]
1227    async fn test_diagnostic_report_generation() {
1228        let config = crate::monitoring::MonitoringConfig {
1229            enable_metrics: true,
1230            enable_tracing: false,
1231            metrics_interval: std::time::Duration::from_secs(60),
1232            health_check_interval: std::time::Duration::from_secs(30),
1233            enable_profiling: false,
1234            prometheus_endpoint: None,
1235            jaeger_endpoint: None,
1236            log_level: "info".to_string(),
1237        };
1238        let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1239        let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1240
1241        let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1242
1243        let report = analyzer.generate_report().await.unwrap();
1244
1245        assert!(!report.report_id.is_empty());
1246        // Duration is always non-negative by type invariant (u128)
1247        assert!(report.health_summary.availability_percentage >= 0.0);
1248        assert!(report.health_summary.availability_percentage <= 100.0);
1249    }
1250
1251    #[tokio::test]
1252    async fn test_error_tracking() {
1253        let config = crate::monitoring::MonitoringConfig {
1254            enable_metrics: true,
1255            enable_tracing: false,
1256            metrics_interval: std::time::Duration::from_secs(60),
1257            health_check_interval: std::time::Duration::from_secs(30),
1258            enable_profiling: false,
1259            prometheus_endpoint: None,
1260            jaeger_endpoint: None,
1261            log_level: "info".to_string(),
1262        };
1263        let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1264        let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1265
1266        let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1267
1268        // Record some errors
1269        analyzer
1270            .record_error(
1271                "ConnectionError".to_string(),
1272                "Failed to connect to backend".to_string(),
1273                "KafkaBackend".to_string(),
1274            )
1275            .await;
1276
1277        analyzer
1278            .record_error(
1279                "TimeoutError".to_string(),
1280                "Request timed out".to_string(),
1281                "KafkaBackend".to_string(),
1282            )
1283            .await;
1284
1285        let error_analysis = analyzer.analyze_errors().await.unwrap();
1286        assert_eq!(error_analysis.total_errors, 2);
1287        assert!(error_analysis
1288            .error_categories
1289            .contains_key("ConnectionError"));
1290        assert!(error_analysis.error_categories.contains_key("TimeoutError"));
1291    }
1292
1293    #[tokio::test]
1294    async fn test_bottleneck_detection() {
1295        let config = crate::monitoring::MonitoringConfig {
1296            enable_metrics: true,
1297            enable_tracing: false,
1298            metrics_interval: std::time::Duration::from_secs(60),
1299            health_check_interval: std::time::Duration::from_secs(30),
1300            enable_profiling: false,
1301            prometheus_endpoint: None,
1302            jaeger_endpoint: None,
1303            log_level: "info".to_string(),
1304        };
1305        let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1306        let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1307
1308        // Simulate high latency by updating metrics
1309        {
1310            let collector = metrics_collector.read().await;
1311            collector
1312                .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1313                    events_published: 1,
1314                    events_failed: 0,
1315                    bytes_sent: 100,
1316                    batches_sent: 1,
1317                    latency_ms: 200.0, // High latency to trigger bottleneck
1318                    throughput_eps: 1.0,
1319                })
1320                .await;
1321            collector
1322                .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1323                    events_published: 1,
1324                    events_failed: 0,
1325                    bytes_sent: 100,
1326                    batches_sent: 1,
1327                    latency_ms: 250.0,
1328                    throughput_eps: 1.0,
1329                })
1330                .await;
1331            collector
1332                .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1333                    events_published: 1,
1334                    events_failed: 0,
1335                    bytes_sent: 100,
1336                    batches_sent: 1,
1337                    latency_ms: 180.0,
1338                    throughput_eps: 1.0,
1339                })
1340                .await;
1341        }
1342
1343        let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1344
1345        let perf = analyzer.analyze_performance().await.unwrap();
1346
1347        // Should detect latency bottleneck (p99 should be > 100ms with high latency values)
1348        let latency_bottlenecks: Vec<_> = perf
1349            .bottlenecks
1350            .iter()
1351            .filter(|b| b.metric == "Latency")
1352            .collect();
1353        assert!(!latency_bottlenecks.is_empty());
1354    }
1355}