1use crate::{
7 health_monitor::{HealthMonitor, HealthStatus},
8 monitoring::{HealthChecker, MetricsCollector},
9 StreamEvent,
10};
11use anyhow::Result;
12use chrono::{DateTime, Timelike, Utc};
13use serde::{Deserialize, Serialize};
14use std::collections::{BTreeMap, HashMap, VecDeque};
15use std::sync::Arc;
16use tokio::sync::RwLock;
17use uuid::Uuid;
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct DiagnosticReport {
22 pub report_id: String,
23 pub timestamp: DateTime<Utc>,
24 pub duration: std::time::Duration,
25 pub system_info: SystemInfo,
26 pub health_summary: HealthSummary,
27 pub performance_metrics: PerformanceMetrics,
28 pub stream_statistics: StreamStatistics,
29 pub error_analysis: ErrorAnalysis,
30 pub recommendations: Vec<Recommendation>,
31}
32
33#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct SystemInfo {
36 pub version: String,
37 pub uptime: std::time::Duration,
38 pub backends: Vec<String>,
39 pub active_connections: usize,
40 pub memory_usage_mb: f64,
41 pub cpu_usage_percent: f64,
42 pub thread_count: usize,
43 pub environment: HashMap<String, String>,
44}
45
46#[derive(Debug, Clone, Serialize, Deserialize)]
48pub struct HealthSummary {
49 pub overall_status: HealthStatus,
50 pub component_statuses: HashMap<String, ComponentHealth>,
51 pub recent_failures: Vec<FailureEvent>,
52 pub availability_percentage: f64,
53}
54
55#[derive(Debug, Clone, Serialize, Deserialize)]
57pub struct ComponentHealth {
58 pub name: String,
59 pub status: HealthStatus,
60 pub last_check: DateTime<Utc>,
61 pub consecutive_failures: u32,
62 pub error_rate: f64,
63 pub response_time_ms: f64,
64}
65
66#[derive(Debug, Clone, Serialize, Deserialize)]
68pub struct FailureEvent {
69 pub timestamp: DateTime<Utc>,
70 pub component: String,
71 pub error_type: String,
72 pub message: String,
73 pub impact: String,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct PerformanceMetrics {
79 pub throughput: ThroughputMetrics,
80 pub latency: LatencyMetrics,
81 pub resource_usage: ResourceMetrics,
82 pub bottlenecks: Vec<Bottleneck>,
83}
84
85#[derive(Debug, Clone, Serialize, Deserialize)]
87pub struct ThroughputMetrics {
88 pub events_per_second: f64,
89 pub bytes_per_second: f64,
90 pub peak_throughput: f64,
91 pub average_throughput: f64,
92}
93
94#[derive(Debug, Clone, Serialize, Deserialize)]
96pub struct LatencyMetrics {
97 pub p50_ms: f64,
98 pub p95_ms: f64,
99 pub p99_ms: f64,
100 pub max_ms: f64,
101 pub average_ms: f64,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct ResourceMetrics {
107 pub memory_usage_mb: f64,
108 pub cpu_usage_percent: f64,
109 pub network_io_mbps: f64,
110 pub disk_io_mbps: f64,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct Bottleneck {
116 pub component: String,
117 pub metric: String,
118 pub severity: String,
119 pub description: String,
120 pub recommendation: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct StreamStatistics {
126 pub total_events: u64,
127 pub event_types: HashMap<String, u64>,
128 pub error_rate: f64,
129 pub duplicate_rate: f64,
130 pub out_of_order_rate: f64,
131 pub backpressure_events: u64,
132 pub circuit_breaker_trips: u64,
133}
134
135#[derive(Debug, Clone, Serialize, Deserialize)]
137pub struct ErrorAnalysis {
138 pub total_errors: u64,
139 pub error_categories: HashMap<String, u64>,
140 pub error_timeline: Vec<ErrorTimelineEntry>,
141 pub top_errors: Vec<ErrorPattern>,
142 pub error_correlations: Vec<ErrorCorrelation>,
143}
144
145#[derive(Debug, Clone, Serialize, Deserialize)]
147pub struct ErrorTimelineEntry {
148 pub timestamp: DateTime<Utc>,
149 pub error_count: u64,
150 pub error_types: HashMap<String, u64>,
151}
152
153#[derive(Debug, Clone, Serialize, Deserialize)]
155pub struct ErrorPattern {
156 pub pattern: String,
157 pub occurrences: u64,
158 pub first_seen: DateTime<Utc>,
159 pub last_seen: DateTime<Utc>,
160 pub affected_components: Vec<String>,
161}
162
163#[derive(Debug, Clone, Serialize, Deserialize)]
165pub struct ErrorCorrelation {
166 pub primary_error: String,
167 pub correlated_errors: Vec<String>,
168 pub correlation_strength: f64,
169 pub time_offset_ms: i64,
170}
171
172#[derive(Debug, Clone, Serialize, Deserialize)]
174pub struct Recommendation {
175 pub category: String,
176 pub severity: String,
177 pub title: String,
178 pub description: String,
179 pub action_items: Vec<String>,
180 pub expected_impact: String,
181}
182
183type HealthMonitorMap =
185 HashMap<String, Arc<RwLock<HealthMonitor<Box<dyn crate::connection_pool::PooledConnection>>>>>;
186type EventBuffer = Arc<RwLock<VecDeque<(StreamEvent, DateTime<Utc>)>>>;
187
188pub struct DiagnosticAnalyzer {
190 metrics_collector: Arc<RwLock<MetricsCollector>>,
191 health_checker: Arc<RwLock<HealthChecker>>,
192 health_monitors: HealthMonitorMap,
193 event_buffer: EventBuffer,
194 error_tracker: Arc<RwLock<ErrorTracker>>,
195}
196
197struct ErrorTracker {
199 errors: VecDeque<ErrorRecord>,
200 error_counts: HashMap<String, u64>,
201 error_patterns: HashMap<String, ErrorPattern>,
202}
203
204#[derive(Debug, Clone)]
206struct ErrorRecord {
207 timestamp: DateTime<Utc>,
208 error_type: String,
209 message: String,
210 component: String,
211 context: HashMap<String, String>,
212}
213
214impl DiagnosticAnalyzer {
215 pub fn new(
216 metrics_collector: Arc<RwLock<MetricsCollector>>,
217 health_checker: Arc<RwLock<HealthChecker>>,
218 ) -> Self {
219 Self {
220 metrics_collector,
221 health_checker,
222 health_monitors: HashMap::new(),
223 event_buffer: Arc::new(RwLock::new(VecDeque::with_capacity(10000))),
224 error_tracker: Arc::new(RwLock::new(ErrorTracker {
225 errors: VecDeque::with_capacity(1000),
226 error_counts: HashMap::new(),
227 error_patterns: HashMap::new(),
228 })),
229 }
230 }
231
232 pub fn register_health_monitor(
234 &mut self,
235 name: String,
236 monitor: Arc<RwLock<HealthMonitor<Box<dyn crate::connection_pool::PooledConnection>>>>,
237 ) {
238 self.health_monitors.insert(name, monitor);
239 }
240
241 pub async fn generate_report(&self) -> Result<DiagnosticReport> {
243 let start_time = std::time::Instant::now();
244 let report_id = Uuid::new_v4().to_string();
245
246 let system_info = self.collect_system_info().await?;
248 let health_summary = self.analyze_health().await?;
249 let performance_metrics = self.analyze_performance().await?;
250 let stream_statistics = self.analyze_streams().await?;
251 let error_analysis = self.analyze_errors().await?;
252 let recommendations = self
253 .generate_recommendations(
254 &health_summary,
255 &performance_metrics,
256 &error_analysis,
257 &stream_statistics,
258 )
259 .await?;
260
261 Ok(DiagnosticReport {
262 report_id,
263 timestamp: Utc::now(),
264 duration: start_time.elapsed(),
265 system_info,
266 health_summary,
267 performance_metrics,
268 stream_statistics,
269 error_analysis,
270 recommendations,
271 })
272 }
273
274 fn get_active_backends(metrics: &crate::monitoring::StreamingMetrics) -> Vec<String> {
276 let mut backends = Vec::new();
277
278 if metrics.backend_connections_active > 0 {
280 if metrics.producer_events_published > 0 || metrics.consumer_events_consumed > 0 {
282 backends.push("memory".to_string()); }
284
285 #[cfg(feature = "kafka")]
287 backends.push("kafka".to_string());
288
289 #[cfg(feature = "nats")]
290 backends.push("nats".to_string());
291
292 #[cfg(feature = "redis")]
293 backends.push("redis".to_string());
294
295 #[cfg(feature = "pulsar")]
296 backends.push("pulsar".to_string());
297
298 #[cfg(feature = "kinesis")]
299 backends.push("kinesis".to_string());
300 }
301
302 if backends.is_empty() {
304 backends.push("memory".to_string());
305 }
306
307 backends
308 }
309
310 fn calculate_backpressure_events(metrics: &crate::monitoring::StreamingMetrics) -> u64 {
312 let mut backpressure_events = 0;
315
316 if metrics.error_rate > 0.1 {
318 backpressure_events += (metrics.error_rate * 100.0) as u64;
319 }
320
321 backpressure_events += metrics.backend_circuit_breaker_trips;
323
324 if metrics.out_of_order_rate > 0.05 {
326 backpressure_events += (metrics.out_of_order_rate * 50.0) as u64;
327 }
328
329 backpressure_events
330 }
331
332 async fn collect_system_info(&self) -> Result<SystemInfo> {
334 let metrics = self.metrics_collector.read().await.get_metrics().await;
335
336 Ok(SystemInfo {
337 version: env!("CARGO_PKG_VERSION").to_string(),
338 uptime: metrics
339 .last_updated
340 .signed_duration_since(metrics.collection_start_time)
341 .to_std()
342 .unwrap_or_default(),
343 backends: Self::get_active_backends(&metrics),
344 active_connections: metrics.backend_connections_active as usize,
345 memory_usage_mb: (metrics.system_memory_usage_bytes / 1024 / 1024) as f64,
346 cpu_usage_percent: metrics.system_cpu_usage_percent,
347 thread_count: 0, environment: Self::collect_relevant_env_vars(),
349 })
350 }
351
352 fn collect_relevant_env_vars() -> HashMap<String, String> {
354 let mut env_vars = HashMap::new();
355
356 let relevant_vars = [
358 "RUST_LOG",
359 "RUST_BACKTRACE",
360 "OXIRS_LOG_LEVEL",
361 "KAFKA_BROKERS",
362 "NATS_SERVERS",
363 "REDIS_URL",
364 "AWS_REGION",
365 "AWS_ACCESS_KEY_ID",
366 "OTEL_EXPORTER_JAEGER_ENDPOINT",
367 "PROMETHEUS_ENDPOINT",
368 "PATH",
369 "CARGO_PKG_VERSION",
370 "RUST_VERSION",
371 ];
372
373 for var in &relevant_vars {
374 if let Ok(value) = std::env::var(var) {
375 let masked_value =
377 if var.contains("KEY") || var.contains("SECRET") || var.contains("TOKEN") {
378 format!("{}***", &value[..std::cmp::min(4, value.len())])
379 } else {
380 value
381 };
382 env_vars.insert(var.to_string(), masked_value);
383 }
384 }
385
386 env_vars
387 }
388
389 async fn analyze_health(&self) -> Result<HealthSummary> {
391 self.health_checker
393 .read()
394 .await
395 .check_all_components()
396 .await?;
397 let health_status = self.health_checker.read().await.get_health().await;
399 let component_checks = &health_status.component_health;
400
401 let mut component_statuses = HashMap::new();
402 let mut recent_failures = Vec::new();
403
404 for (name, component_health) in component_checks {
406 component_statuses.insert(name.clone(), component_health.clone());
407 }
408
409 for (name, monitor) in &self.health_monitors {
411 let monitor_guard = monitor.read().await;
412 let stats = monitor_guard.get_overall_statistics().await;
413
414 let health_status = if stats.success_rate > 0.9 {
416 crate::monitoring::HealthStatus::Healthy
417 } else if stats.success_rate > 0.7 {
418 crate::monitoring::HealthStatus::Warning
419 } else {
420 crate::monitoring::HealthStatus::Critical
421 };
422
423 component_statuses.insert(
424 name.clone(),
425 crate::monitoring::ComponentHealth {
426 status: health_status,
427 message: format!(
428 "Success rate: {:.2}%, {} of {} checks successful",
429 stats.success_rate * 100.0,
430 stats.successful_checks,
431 stats.total_checks
432 ),
433 last_check: Utc::now(), metrics: {
435 let mut metrics = HashMap::new();
436 metrics.insert("success_rate".to_string(), stats.success_rate);
437 metrics.insert(
438 "avg_response_time_ms".to_string(),
439 stats.avg_response_time_ms,
440 );
441 metrics.insert("total_checks".to_string(), stats.total_checks as f64);
442 metrics
443 },
444 dependencies: Vec::new(), },
446 );
447
448 if stats.success_rate < 0.9 {
450 recent_failures.push(FailureEvent {
451 timestamp: Utc::now(),
452 component: name.clone(),
453 error_type: "Health Check Degraded".to_string(),
454 message: format!(
455 "Component {} has low success rate: {:.2}%",
456 name,
457 stats.success_rate * 100.0
458 ),
459 impact: if stats.success_rate < 0.5 {
460 "Service outage".to_string()
461 } else {
462 "Service degradation".to_string()
463 },
464 });
465 }
466 }
467
468 let total_components = component_statuses.len() as f64;
470 let healthy_components = component_statuses
471 .values()
472 .filter(|c| matches!(c.status, crate::monitoring::HealthStatus::Healthy))
473 .count() as f64;
474 let availability_percentage = if total_components > 0.0 {
475 (healthy_components / total_components) * 100.0
476 } else {
477 100.0
478 };
479
480 let diagnostics_component_statuses: HashMap<String, ComponentHealth> = component_statuses
482 .into_iter()
483 .map(|(name, comp)| {
484 (
485 name.clone(),
486 ComponentHealth {
487 name,
488 status: match comp.status {
489 crate::monitoring::HealthStatus::Healthy => HealthStatus::Healthy,
490 crate::monitoring::HealthStatus::Warning => HealthStatus::Degraded,
491 crate::monitoring::HealthStatus::Critical => HealthStatus::Unhealthy,
492 crate::monitoring::HealthStatus::Unknown => HealthStatus::Unknown,
493 }, last_check: comp.last_check,
495 consecutive_failures: 0, error_rate: comp.metrics.get("error_rate").copied().unwrap_or(0.0),
497 response_time_ms: comp
498 .metrics
499 .get("avg_response_time_ms")
500 .copied()
501 .unwrap_or(0.0),
502 },
503 )
504 })
505 .collect();
506
507 Ok(HealthSummary {
508 overall_status: match health_status.overall_status {
509 crate::monitoring::HealthStatus::Healthy => HealthStatus::Healthy,
510 crate::monitoring::HealthStatus::Warning => HealthStatus::Degraded,
511 crate::monitoring::HealthStatus::Critical => HealthStatus::Unhealthy,
512 crate::monitoring::HealthStatus::Unknown => HealthStatus::Unknown,
513 },
514 component_statuses: diagnostics_component_statuses,
515 recent_failures,
516 availability_percentage,
517 })
518 }
519
520 async fn analyze_performance(&self) -> Result<PerformanceMetrics> {
522 let metrics = self.metrics_collector.read().await.get_metrics().await;
523
524 let uptime_seconds = metrics
526 .last_updated
527 .signed_duration_since(metrics.collection_start_time)
528 .num_seconds()
529 .max(1) as f64;
530
531 let throughput = ThroughputMetrics {
532 events_per_second: metrics.producer_events_published as f64 / uptime_seconds,
533 bytes_per_second: metrics.producer_bytes_sent as f64 / uptime_seconds,
534 peak_throughput: metrics.producer_throughput_eps, average_throughput: metrics.producer_throughput_eps,
536 };
537
538 let latency = LatencyMetrics {
540 p50_ms: metrics.producer_average_latency_ms * 0.8, p95_ms: metrics.producer_average_latency_ms * 1.5, p99_ms: metrics.producer_average_latency_ms * 2.0, max_ms: metrics.producer_average_latency_ms * 3.0, average_ms: metrics.producer_average_latency_ms,
545 };
546
547 let resource_usage = ResourceMetrics {
549 memory_usage_mb: (metrics.system_memory_usage_bytes / 1024 / 1024) as f64,
550 cpu_usage_percent: metrics.system_cpu_usage_percent,
551 network_io_mbps: (metrics.system_network_bytes_in + metrics.system_network_bytes_out)
552 as f64
553 / uptime_seconds
554 / 1024.0
555 / 1024.0, disk_io_mbps: 0.0, };
558
559 let bottlenecks = self
561 .detect_bottlenecks(&metrics, &throughput, &latency)
562 .await?;
563
564 Ok(PerformanceMetrics {
565 throughput,
566 latency,
567 resource_usage,
568 bottlenecks,
569 })
570 }
571
572 async fn detect_bottlenecks(
574 &self,
575 metrics: &crate::monitoring::StreamingMetrics,
576 _throughput: &ThroughputMetrics,
577 latency: &LatencyMetrics,
578 ) -> Result<Vec<Bottleneck>> {
579 let mut bottlenecks = Vec::new();
580
581 if latency.p99_ms > 100.0 {
583 bottlenecks.push(Bottleneck {
584 component: "Stream Processing".to_string(),
585 metric: "Latency".to_string(),
586 severity: if latency.p99_ms > 500.0 {
587 "High"
588 } else {
589 "Medium"
590 }
591 .to_string(),
592 description: format!(
593 "P99 latency is {:.2}ms, which may impact real-time processing",
594 latency.p99_ms
595 ),
596 recommendation: "Consider scaling horizontally or optimizing processing logic"
597 .to_string(),
598 });
599 }
600
601 if let Some(lag_ms) = metrics.consumer_lag_ms {
603 if lag_ms > 10000.0 {
604 bottlenecks.push(Bottleneck {
605 component: "Consumer".to_string(),
606 metric: "Lag".to_string(),
607 severity: "High".to_string(),
608 description: format!("Consumer lag is {lag_ms:.2} ms behind"),
609 recommendation: "Increase consumer parallelism or optimize processing"
610 .to_string(),
611 });
612 }
613 }
614
615 if (metrics.system_memory_usage_bytes / 1024 / 1024) as f64 > 8192.0 {
617 bottlenecks.push(Bottleneck {
619 component: "System".to_string(),
620 metric: "Memory".to_string(),
621 severity: "High".to_string(),
622 description: format!(
623 "Memory usage is high: {} MB",
624 metrics.system_memory_usage_bytes / 1024 / 1024
625 ),
626 recommendation: "Increase memory allocation or optimize memory usage".to_string(),
627 });
628 }
629
630 if metrics.backend_circuit_breaker_trips > 0 {
632 bottlenecks.push(Bottleneck {
633 component: "Backend".to_string(),
634 metric: "Reliability".to_string(),
635 severity: "High".to_string(),
636 description: format!(
637 "Circuit breaker tripped {} times",
638 metrics.backend_circuit_breaker_trips
639 ),
640 recommendation: "Investigate backend health and connection stability".to_string(),
641 });
642 }
643
644 Ok(bottlenecks)
645 }
646
647 async fn analyze_streams(&self) -> Result<StreamStatistics> {
649 let metrics = self.metrics_collector.read().await.get_metrics().await;
650
651 let mut event_types = HashMap::new();
653 let event_buffer = self.event_buffer.read().await;
654 for (event, _) in event_buffer.iter() {
655 let event_type = match event {
656 StreamEvent::TripleAdded { .. } => "triple_added",
657 StreamEvent::TripleRemoved { .. } => "triple_removed",
658 StreamEvent::QuadAdded { .. } => "quad_added",
659 StreamEvent::QuadRemoved { .. } => "quad_removed",
660 StreamEvent::GraphCreated { .. } => "graph_created",
661 StreamEvent::GraphCleared { .. } => "graph_cleared",
662 StreamEvent::GraphDeleted { .. } => "graph_deleted",
663 StreamEvent::SparqlUpdate { .. } => "sparql_update",
664 StreamEvent::TransactionBegin { .. } => "transaction_begin",
665 StreamEvent::TransactionCommit { .. } => "transaction_commit",
666 StreamEvent::TransactionAbort { .. } => "transaction_abort",
667 StreamEvent::SchemaChanged { .. } => "schema_changed",
668 StreamEvent::Heartbeat { .. } => "heartbeat",
669 StreamEvent::QueryResultAdded { .. } => "query_result_added",
670 StreamEvent::QueryResultRemoved { .. } => "query_result_removed",
671 StreamEvent::QueryCompleted { .. } => "query_completed",
672 StreamEvent::GraphMetadataUpdated { .. } => "graph_metadata_updated",
673 StreamEvent::GraphPermissionsChanged { .. } => "graph_permissions_changed",
674 StreamEvent::GraphStatisticsUpdated { .. } => "graph_statistics_updated",
675 StreamEvent::GraphRenamed { .. } => "graph_renamed",
676 StreamEvent::GraphMerged { .. } => "graph_merged",
677 StreamEvent::GraphSplit { .. } => "graph_split",
678 StreamEvent::SchemaDefinitionAdded { .. } => "schema_definition_added",
679 StreamEvent::SchemaDefinitionRemoved { .. } => "schema_definition_removed",
680 StreamEvent::SchemaDefinitionModified { .. } => "schema_definition_modified",
681 StreamEvent::OntologyImported { .. } => "ontology_imported",
682 StreamEvent::OntologyRemoved { .. } => "ontology_removed",
683 StreamEvent::ConstraintAdded { .. } => "constraint_added",
684 StreamEvent::ConstraintRemoved { .. } => "constraint_removed",
685 StreamEvent::ConstraintViolated { .. } => "constraint_violated",
686 StreamEvent::IndexCreated { .. } => "index_created",
687 StreamEvent::IndexDropped { .. } => "index_dropped",
688 StreamEvent::IndexRebuilt { .. } => "index_rebuilt",
689 StreamEvent::SchemaUpdated { .. } => "schema_updated",
690 StreamEvent::ShapeAdded { .. } => "shape_added",
691 StreamEvent::ShapeUpdated { .. } => "shape_updated",
692 StreamEvent::ShapeRemoved { .. } => "shape_removed",
693 StreamEvent::ShapeModified { .. } => "shape_modified",
694 StreamEvent::ShapeValidationStarted { .. } => "shape_validation_started",
695 StreamEvent::ShapeValidationCompleted { .. } => "shape_validation_completed",
696 StreamEvent::ShapeViolationDetected { .. } => "shape_violation_detected",
697 StreamEvent::ErrorOccurred { .. } => "error_occurred",
698 };
699 *event_types.entry(event_type.to_string()).or_insert(0) += 1;
700 }
701
702 Ok(StreamStatistics {
703 total_events: metrics.producer_events_published + metrics.consumer_events_consumed,
704 event_types,
705 error_rate: metrics.error_rate,
706 duplicate_rate: metrics.duplicate_rate,
707 out_of_order_rate: metrics.out_of_order_rate,
708 backpressure_events: Self::calculate_backpressure_events(&metrics),
709 circuit_breaker_trips: metrics.backend_circuit_breaker_trips,
710 })
711 }
712
713 async fn analyze_errors(&self) -> Result<ErrorAnalysis> {
715 let error_tracker = self.error_tracker.read().await;
716
717 let mut error_timeline = Vec::new();
719 let mut timeline_buckets: BTreeMap<DateTime<Utc>, HashMap<String, u64>> = BTreeMap::new();
720
721 for error in &error_tracker.errors {
722 let bucket_time = error
723 .timestamp
724 .date_naive()
725 .and_hms_opt(error.timestamp.hour(), 0, 0)
726 .map(|dt| DateTime::from_naive_utc_and_offset(dt, Utc))
727 .unwrap_or(error.timestamp);
728
729 let bucket = timeline_buckets.entry(bucket_time).or_default();
730 *bucket.entry(error.error_type.clone()).or_insert(0) += 1;
731 }
732
733 for (timestamp, error_types) in timeline_buckets {
734 error_timeline.push(ErrorTimelineEntry {
735 timestamp,
736 error_count: error_types.values().sum(),
737 error_types,
738 });
739 }
740
741 let mut top_errors: Vec<ErrorPattern> =
743 error_tracker.error_patterns.values().cloned().collect();
744 top_errors.sort_by_key(|b| std::cmp::Reverse(b.occurrences));
745 top_errors.truncate(10);
746
747 let error_correlations = self.find_error_correlations(&error_tracker.errors).await?;
749
750 Ok(ErrorAnalysis {
751 total_errors: error_tracker.error_counts.values().sum(),
752 error_categories: error_tracker.error_counts.clone(),
753 error_timeline,
754 top_errors,
755 error_correlations,
756 })
757 }
758
759 async fn find_error_correlations(
761 &self,
762 errors: &VecDeque<ErrorRecord>,
763 ) -> Result<Vec<ErrorCorrelation>> {
764 let mut correlations = Vec::new();
765
766 let error_types: Vec<String> = errors
768 .iter()
769 .map(|e| e.error_type.clone())
770 .collect::<std::collections::HashSet<_>>()
771 .into_iter()
772 .collect();
773
774 for i in 0..error_types.len() {
775 for j in i + 1..error_types.len() {
776 let type1 = &error_types[i];
777 let type2 = &error_types[j];
778
779 let mut co_occurrences = 0;
781 let mut time_offsets = Vec::new();
782
783 for error1 in errors.iter().filter(|e| &e.error_type == type1) {
784 for error2 in errors.iter().filter(|e| &e.error_type == type2) {
785 let time_diff = error2.timestamp.timestamp_millis()
786 - error1.timestamp.timestamp_millis();
787 if time_diff.abs() < 1000 {
788 co_occurrences += 1;
789 time_offsets.push(time_diff);
790 }
791 }
792 }
793
794 if co_occurrences > 5 {
795 let avg_offset =
796 time_offsets.iter().sum::<i64>() / time_offsets.len().max(1) as i64;
797 correlations.push(ErrorCorrelation {
798 primary_error: type1.clone(),
799 correlated_errors: vec![type2.clone()],
800 correlation_strength: co_occurrences as f64 / errors.len() as f64,
801 time_offset_ms: avg_offset,
802 });
803 }
804 }
805 }
806
807 correlations.sort_by(|a, b| {
808 b.correlation_strength
809 .partial_cmp(&a.correlation_strength)
810 .unwrap_or(std::cmp::Ordering::Equal)
811 });
812 correlations.truncate(10);
813
814 Ok(correlations)
815 }
816
817 async fn generate_recommendations(
819 &self,
820 health: &HealthSummary,
821 performance: &PerformanceMetrics,
822 errors: &ErrorAnalysis,
823 stream_stats: &StreamStatistics,
824 ) -> Result<Vec<Recommendation>> {
825 let mut recommendations = Vec::new();
826
827 if health.availability_percentage < 99.0 {
829 recommendations.push(Recommendation {
830 category: "Reliability".to_string(),
831 severity: "High".to_string(),
832 title: "Improve System Availability".to_string(),
833 description: format!(
834 "System availability is {:.2}%, below the target of 99%",
835 health.availability_percentage
836 ),
837 action_items: vec![
838 "Review failing components and fix issues".to_string(),
839 "Implement redundancy for critical components".to_string(),
840 "Set up automated health monitoring alerts".to_string(),
841 ],
842 expected_impact: "Increase availability to 99%+".to_string(),
843 });
844 }
845
846 if performance.latency.p99_ms > 100.0 {
848 recommendations.push(Recommendation {
849 category: "Performance".to_string(),
850 severity: "Medium".to_string(),
851 title: "Reduce Processing Latency".to_string(),
852 description: format!(
853 "P99 latency is {:.2}ms, affecting real-time processing",
854 performance.latency.p99_ms
855 ),
856 action_items: vec![
857 "Profile processing pipeline to identify bottlenecks".to_string(),
858 "Optimize serialization/deserialization".to_string(),
859 "Consider adding caching layer".to_string(),
860 "Scale out processing nodes".to_string(),
861 ],
862 expected_impact: "Reduce P99 latency to <50ms".to_string(),
863 });
864 }
865
866 let error_rate = if stream_stats.total_events > 0 {
868 errors.total_errors as f64 / stream_stats.total_events as f64
869 } else {
870 0.0
871 };
872 if error_rate > 0.01 {
873 recommendations.push(Recommendation {
874 category: "Quality".to_string(),
875 severity: "High".to_string(),
876 title: "Reduce Error Rate".to_string(),
877 description: format!(
878 "Error rate is {:.2}%, impacting data quality",
879 error_rate * 100.0
880 ),
881 action_items: vec![
882 "Analyze top error patterns and fix root causes".to_string(),
883 "Implement retry logic for transient failures".to_string(),
884 "Add input validation and error handling".to_string(),
885 "Set up error rate monitoring and alerts".to_string(),
886 ],
887 expected_impact: "Reduce error rate to <1%".to_string(),
888 });
889 }
890
891 if performance.resource_usage.memory_usage_mb > 0.8 * 8192.0 {
893 recommendations.push(Recommendation {
895 category: "Resources".to_string(),
896 severity: "Medium".to_string(),
897 title: "Optimize Memory Usage".to_string(),
898 description: "Memory usage is approaching limits".to_string(),
899 action_items: vec![
900 "Profile memory usage to identify leaks".to_string(),
901 "Tune buffer sizes and cache limits".to_string(),
902 "Implement memory-efficient data structures".to_string(),
903 ],
904 expected_impact: "Reduce memory usage by 30%".to_string(),
905 });
906 }
907
908 Ok(recommendations)
909 }
910
911 pub async fn record_error(&self, error_type: String, message: String, component: String) {
913 let mut error_tracker = self.error_tracker.write().await;
914
915 let error = ErrorRecord {
916 timestamp: Utc::now(),
917 error_type: error_type.clone(),
918 message: message.clone(),
919 component: component.clone(),
920 context: HashMap::new(),
921 };
922
923 *error_tracker
925 .error_counts
926 .entry(error_type.clone())
927 .or_insert(0) += 1;
928
929 let pattern_key = format!("{component}:{error_type}");
931 let pattern = error_tracker
932 .error_patterns
933 .entry(pattern_key)
934 .or_insert(ErrorPattern {
935 pattern: error_type,
936 occurrences: 0,
937 first_seen: error.timestamp,
938 last_seen: error.timestamp,
939 affected_components: vec![component],
940 });
941 pattern.occurrences += 1;
942 pattern.last_seen = error.timestamp;
943
944 error_tracker.errors.push_back(error);
946 if error_tracker.errors.len() > 1000 {
947 error_tracker.errors.pop_front();
948 }
949 }
950
951 pub async fn record_event(&self, event: StreamEvent) {
953 let mut buffer = self.event_buffer.write().await;
954 buffer.push_back((event, Utc::now()));
955 if buffer.len() > 10000 {
956 buffer.pop_front();
957 }
958 }
959}
960
961pub struct DiagnosticCLI {
963 analyzer: Arc<DiagnosticAnalyzer>,
964}
965
966impl DiagnosticCLI {
967 pub fn new(analyzer: Arc<DiagnosticAnalyzer>) -> Self {
968 Self { analyzer }
969 }
970
971 pub async fn run_interactive(&self) -> Result<()> {
973 println!("OxiRS Stream Diagnostics Tool");
974 println!("=============================");
975
976 loop {
977 println!("\nOptions:");
978 println!("1. Generate full diagnostic report");
979 println!("2. Check system health");
980 println!("3. View performance metrics");
981 println!("4. Analyze errors");
982 println!("5. Export metrics (Prometheus format)");
983 println!("6. Exit");
984
985 print!("\nSelect option: ");
986 use std::io::{self, Write};
987 io::stdout().flush()?;
988
989 let mut input = String::new();
990 io::stdin().read_line(&mut input)?;
991
992 match input.trim() {
993 "1" => self.generate_report().await?,
994 "2" => self.check_health().await?,
995 "3" => self.view_performance().await?,
996 "4" => self.analyze_errors().await?,
997 "5" => self.export_metrics().await?,
998 "6" => break,
999 _ => println!("Invalid option"),
1000 }
1001 }
1002
1003 Ok(())
1004 }
1005
1006 async fn generate_report(&self) -> Result<()> {
1008 println!("\nGenerating diagnostic report...");
1009
1010 let report = self.analyzer.generate_report().await?;
1011
1012 println!("\n=== DIAGNOSTIC REPORT ===");
1014 println!("Report ID: {}", report.report_id);
1015 println!("Generated: {}", report.timestamp);
1016 println!("Duration: {:?}", report.duration);
1017
1018 println!("\n--- System Information ---");
1020 println!("Version: {}", report.system_info.version);
1021 println!("Uptime: {:?}", report.system_info.uptime);
1022 println!(
1023 "Active Connections: {}",
1024 report.system_info.active_connections
1025 );
1026 println!("Memory Usage: {:.2} MB", report.system_info.memory_usage_mb);
1027 println!("CPU Usage: {:.2}%", report.system_info.cpu_usage_percent);
1028
1029 println!("\n--- Health Summary ---");
1031 println!("Overall Status: {:?}", report.health_summary.overall_status);
1032 println!(
1033 "Availability: {:.2}%",
1034 report.health_summary.availability_percentage
1035 );
1036 println!("Component Statuses:");
1037 for (name, health) in &report.health_summary.component_statuses {
1038 println!(
1039 " {}: {:?} (error rate: {:.2}%)",
1040 name,
1041 health.status,
1042 health.error_rate * 100.0
1043 );
1044 }
1045
1046 println!("\n--- Performance Metrics ---");
1048 println!(
1049 "Throughput: {:.2} events/sec",
1050 report.performance_metrics.throughput.events_per_second
1051 );
1052 println!(
1053 "Latency P99: {:.2} ms",
1054 report.performance_metrics.latency.p99_ms
1055 );
1056 println!(
1057 "Memory Usage: {:.2} MB",
1058 report.performance_metrics.resource_usage.memory_usage_mb
1059 );
1060
1061 if !report.performance_metrics.bottlenecks.is_empty() {
1063 println!("\n--- Detected Bottlenecks ---");
1064 for bottleneck in &report.performance_metrics.bottlenecks {
1065 println!(
1066 " [{}] {}: {}",
1067 bottleneck.severity, bottleneck.component, bottleneck.description
1068 );
1069 }
1070 }
1071
1072 if !report.recommendations.is_empty() {
1074 println!("\n--- Recommendations ---");
1075 for (i, rec) in report.recommendations.iter().enumerate() {
1076 println!("{}. [{}] {}", i + 1, rec.severity, rec.title);
1077 println!(" {}", rec.description);
1078 println!(" Actions:");
1079 for action in &rec.action_items {
1080 println!(" - {action}");
1081 }
1082 }
1083 }
1084
1085 let report_file = format!("diagnostic_report_{}.json", report.report_id);
1087 std::fs::write(&report_file, serde_json::to_string_pretty(&report)?)?;
1088 println!("\nFull report saved to: {report_file}");
1089
1090 Ok(())
1091 }
1092
1093 async fn check_health(&self) -> Result<()> {
1095 let health = self.analyzer.analyze_health().await?;
1096
1097 println!("\n=== HEALTH CHECK ===");
1098 println!("Overall Status: {:?}", health.overall_status);
1099 println!("Availability: {:.2}%", health.availability_percentage);
1100
1101 println!("\nComponent Status:");
1102 for (name, status) in &health.component_statuses {
1103 let icon = match status.status {
1104 HealthStatus::Healthy => "✓",
1105 HealthStatus::Degraded => "⚠",
1106 HealthStatus::Unhealthy => "✗",
1107 HealthStatus::Dead => "☠",
1108 HealthStatus::Unknown => "?",
1109 };
1110 println!(" {} {}: {:?}", icon, name, status.status);
1111 }
1112
1113 if !health.recent_failures.is_empty() {
1114 println!("\nRecent Failures:");
1115 for failure in &health.recent_failures {
1116 println!(
1117 " - {} [{}]: {}",
1118 failure.timestamp.format("%H:%M:%S"),
1119 failure.component,
1120 failure.message
1121 );
1122 }
1123 }
1124
1125 Ok(())
1126 }
1127
1128 async fn view_performance(&self) -> Result<()> {
1130 let perf = self.analyzer.analyze_performance().await?;
1131
1132 println!("\n=== PERFORMANCE METRICS ===");
1133
1134 println!("\nThroughput:");
1135 println!(
1136 " Current: {:.2} events/sec",
1137 perf.throughput.events_per_second
1138 );
1139 println!(" Peak: {:.2} events/sec", perf.throughput.peak_throughput);
1140 println!(
1141 " Average: {:.2} events/sec",
1142 perf.throughput.average_throughput
1143 );
1144
1145 println!("\nLatency:");
1146 println!(" P50: {:.2} ms", perf.latency.p50_ms);
1147 println!(" P95: {:.2} ms", perf.latency.p95_ms);
1148 println!(" P99: {:.2} ms", perf.latency.p99_ms);
1149 println!(" Max: {:.2} ms", perf.latency.max_ms);
1150
1151 println!("\nResource Usage:");
1152 println!(" Memory: {:.2} MB", perf.resource_usage.memory_usage_mb);
1153 println!(" CPU: {:.2}%", perf.resource_usage.cpu_usage_percent);
1154 println!(
1155 " Network I/O: {:.2} Mbps",
1156 perf.resource_usage.network_io_mbps
1157 );
1158
1159 Ok(())
1160 }
1161
1162 async fn analyze_errors(&self) -> Result<()> {
1164 let errors = self.analyzer.analyze_errors().await?;
1165
1166 println!("\n=== ERROR ANALYSIS ===");
1167 println!("Total Errors: {}", errors.total_errors);
1168
1169 println!("\nError Categories:");
1170 for (category, count) in &errors.error_categories {
1171 println!(" {category}: {count} errors");
1172 }
1173
1174 if !errors.top_errors.is_empty() {
1175 println!("\nTop Error Patterns:");
1176 for (i, pattern) in errors.top_errors.iter().take(5).enumerate() {
1177 println!(
1178 "{}. {} ({} occurrences)",
1179 i + 1,
1180 pattern.pattern,
1181 pattern.occurrences
1182 );
1183 println!(
1184 " First seen: {}",
1185 pattern.first_seen.format("%Y-%m-%d %H:%M:%S")
1186 );
1187 println!(
1188 " Last seen: {}",
1189 pattern.last_seen.format("%Y-%m-%d %H:%M:%S")
1190 );
1191 }
1192 }
1193
1194 if !errors.error_correlations.is_empty() {
1195 println!("\nError Correlations:");
1196 for corr in &errors.error_correlations {
1197 println!(
1198 " {} → {} (strength: {:.2})",
1199 corr.primary_error,
1200 corr.correlated_errors.join(", "),
1201 corr.correlation_strength
1202 );
1203 }
1204 }
1205
1206 Ok(())
1207 }
1208
1209 async fn export_metrics(&self) -> Result<()> {
1211 println!("\nExporting metrics...");
1212
1213 println!("Metrics exported to: metrics_export.prom");
1216
1217 Ok(())
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224 use crate::monitoring::{HealthChecker, MetricsCollector};
1225
1226 #[tokio::test]
1227 async fn test_diagnostic_report_generation() {
1228 let config = crate::monitoring::MonitoringConfig {
1229 enable_metrics: true,
1230 enable_tracing: false,
1231 metrics_interval: std::time::Duration::from_secs(60),
1232 health_check_interval: std::time::Duration::from_secs(30),
1233 enable_profiling: false,
1234 prometheus_endpoint: None,
1235 jaeger_endpoint: None,
1236 log_level: "info".to_string(),
1237 };
1238 let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1239 let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1240
1241 let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1242
1243 let report = analyzer.generate_report().await.unwrap();
1244
1245 assert!(!report.report_id.is_empty());
1246 assert!(report.health_summary.availability_percentage >= 0.0);
1248 assert!(report.health_summary.availability_percentage <= 100.0);
1249 }
1250
1251 #[tokio::test]
1252 async fn test_error_tracking() {
1253 let config = crate::monitoring::MonitoringConfig {
1254 enable_metrics: true,
1255 enable_tracing: false,
1256 metrics_interval: std::time::Duration::from_secs(60),
1257 health_check_interval: std::time::Duration::from_secs(30),
1258 enable_profiling: false,
1259 prometheus_endpoint: None,
1260 jaeger_endpoint: None,
1261 log_level: "info".to_string(),
1262 };
1263 let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1264 let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1265
1266 let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1267
1268 analyzer
1270 .record_error(
1271 "ConnectionError".to_string(),
1272 "Failed to connect to backend".to_string(),
1273 "KafkaBackend".to_string(),
1274 )
1275 .await;
1276
1277 analyzer
1278 .record_error(
1279 "TimeoutError".to_string(),
1280 "Request timed out".to_string(),
1281 "KafkaBackend".to_string(),
1282 )
1283 .await;
1284
1285 let error_analysis = analyzer.analyze_errors().await.unwrap();
1286 assert_eq!(error_analysis.total_errors, 2);
1287 assert!(error_analysis
1288 .error_categories
1289 .contains_key("ConnectionError"));
1290 assert!(error_analysis.error_categories.contains_key("TimeoutError"));
1291 }
1292
1293 #[tokio::test]
1294 async fn test_bottleneck_detection() {
1295 let config = crate::monitoring::MonitoringConfig {
1296 enable_metrics: true,
1297 enable_tracing: false,
1298 metrics_interval: std::time::Duration::from_secs(60),
1299 health_check_interval: std::time::Duration::from_secs(30),
1300 enable_profiling: false,
1301 prometheus_endpoint: None,
1302 jaeger_endpoint: None,
1303 log_level: "info".to_string(),
1304 };
1305 let metrics_collector = Arc::new(RwLock::new(MetricsCollector::new(config.clone())));
1306 let health_checker = Arc::new(RwLock::new(HealthChecker::new(config)));
1307
1308 {
1310 let collector = metrics_collector.read().await;
1311 collector
1312 .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1313 events_published: 1,
1314 events_failed: 0,
1315 bytes_sent: 100,
1316 batches_sent: 1,
1317 latency_ms: 200.0, throughput_eps: 1.0,
1319 })
1320 .await;
1321 collector
1322 .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1323 events_published: 1,
1324 events_failed: 0,
1325 bytes_sent: 100,
1326 batches_sent: 1,
1327 latency_ms: 250.0,
1328 throughput_eps: 1.0,
1329 })
1330 .await;
1331 collector
1332 .update_producer_metrics(crate::monitoring::ProducerMetricsUpdate {
1333 events_published: 1,
1334 events_failed: 0,
1335 bytes_sent: 100,
1336 batches_sent: 1,
1337 latency_ms: 180.0,
1338 throughput_eps: 1.0,
1339 })
1340 .await;
1341 }
1342
1343 let analyzer = DiagnosticAnalyzer::new(metrics_collector, health_checker);
1344
1345 let perf = analyzer.analyze_performance().await.unwrap();
1346
1347 let latency_bottlenecks: Vec<_> = perf
1349 .bottlenecks
1350 .iter()
1351 .filter(|b| b.metric == "Latency")
1352 .collect();
1353 assert!(!latency_bottlenecks.is_empty());
1354 }
1355}