Skip to main content

oxirs_vec/
rta_aggregators.rs

1//! Metrics collectors, monitors, alert managers, notification channels, and dashboard data types.
2
3use anyhow::{anyhow, Result};
4use parking_lot::RwLock;
5use serde::{Deserialize, Serialize};
6use std::collections::{HashMap, VecDeque};
7use std::sync::Arc;
8use std::time::{Duration, Instant, SystemTime};
9
10use crate::rta_engine::{Alert, AlertSeverity};
11
12/// Comprehensive metrics collection
13#[derive(Debug)]
14pub struct MetricsCollector {
15    pub query_metrics: Arc<RwLock<QueryMetrics>>,
16    pub system_metrics: Arc<RwLock<SystemMetrics>>,
17    pub quality_metrics: Arc<RwLock<QualityMetrics>>,
18    pub custom_metrics: Arc<RwLock<HashMap<String, CustomMetric>>>,
19}
20
21/// Query performance metrics
22#[derive(Debug, Clone, Default, Serialize, Deserialize)]
23pub struct QueryMetrics {
24    pub total_queries: u64,
25    pub successful_queries: u64,
26    pub failed_queries: u64,
27    pub average_latency: Duration,
28    pub p50_latency: Duration,
29    pub p95_latency: Duration,
30    pub p99_latency: Duration,
31    pub max_latency: Duration,
32    pub min_latency: Duration,
33    pub throughput_qps: f64,
34    pub latency_history: VecDeque<(SystemTime, Duration)>,
35    pub throughput_history: VecDeque<(SystemTime, f64)>,
36    pub error_rate: f64,
37    pub query_distribution: HashMap<String, u64>,
38}
39
40/// System resource metrics
41#[derive(Debug, Clone, Default, Serialize, Deserialize)]
42pub struct SystemMetrics {
43    pub cpu_usage: f64,
44    pub memory_usage: f64,
45    pub memory_total: u64,
46    pub memory_available: u64,
47    pub disk_usage: f64,
48    pub network_io: NetworkIO,
49    pub vector_count: u64,
50    pub index_size: u64,
51    pub cache_hit_ratio: f64,
52    pub gc_pressure: f64,
53    pub thread_count: u64,
54    pub system_load: f64,
55}
56
57/// Network I/O metrics
58#[derive(Debug, Clone, Default, Serialize, Deserialize)]
59pub struct NetworkIO {
60    pub bytes_sent: u64,
61    pub bytes_received: u64,
62    pub packets_sent: u64,
63    pub packets_received: u64,
64    pub connections_active: u64,
65}
66
67/// Vector search quality metrics
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct QualityMetrics {
70    pub recall_at_k: HashMap<usize, f64>,
71    pub precision_at_k: HashMap<usize, f64>,
72    pub ndcg_at_k: HashMap<usize, f64>,
73    pub mean_reciprocal_rank: f64,
74    pub average_similarity_score: f64,
75    pub similarity_distribution: Vec<f64>,
76    pub query_diversity: f64,
77    pub result_diversity: f64,
78    pub relevance_correlation: f64,
79}
80
81/// Custom metrics defined by users
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct CustomMetric {
84    pub name: String,
85    pub value: f64,
86    pub unit: String,
87    pub description: String,
88    pub timestamp: SystemTime,
89    pub tags: HashMap<String, String>,
90}
91
92/// Performance monitoring with alerting
93#[derive(Debug)]
94pub struct PerformanceMonitor {
95    pub thresholds: Arc<RwLock<PerformanceThresholds>>,
96    pub alert_history: Arc<RwLock<VecDeque<Alert>>>,
97    pub current_alerts: Arc<RwLock<HashMap<String, Alert>>>,
98}
99
100/// Performance thresholds for alerting
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct PerformanceThresholds {
103    pub max_latency_ms: u64,
104    pub min_throughput_qps: f64,
105    pub max_memory_usage_percent: f64,
106    pub max_cpu_usage_percent: f64,
107    pub min_cache_hit_ratio: f64,
108    pub max_error_rate_percent: f64,
109    pub min_recall_at_10: f64,
110    pub max_index_size_gb: f64,
111}
112
113impl Default for PerformanceThresholds {
114    fn default() -> Self {
115        Self {
116            max_latency_ms: 100,
117            min_throughput_qps: 100.0,
118            max_memory_usage_percent: 80.0,
119            max_cpu_usage_percent: 85.0,
120            min_cache_hit_ratio: 0.8,
121            max_error_rate_percent: 1.0,
122            min_recall_at_10: 0.9,
123            max_index_size_gb: 10.0,
124        }
125    }
126}
127
128/// Query pattern analysis
129#[derive(Debug)]
130pub struct QueryAnalyzer {
131    pub query_patterns: Arc<RwLock<HashMap<String, QueryPattern>>>,
132    pub popular_queries: Arc<RwLock<VecDeque<PopularQuery>>>,
133    pub usage_trends: Arc<RwLock<UsageTrends>>,
134}
135
136/// Query pattern information
137#[derive(Debug, Clone, Serialize, Deserialize)]
138pub struct QueryPattern {
139    pub pattern_id: String,
140    pub frequency: u64,
141    pub avg_latency: Duration,
142    pub success_rate: f64,
143    pub peak_hours: Vec<u8>, // Hours of day (0-23)
144    pub similarity_threshold_distribution: Vec<f64>,
145    pub result_size_distribution: Vec<usize>,
146    pub user_segments: HashMap<String, u64>,
147}
148
149/// Popular query tracking
150#[derive(Debug, Clone, Serialize, Deserialize)]
151pub struct PopularQuery {
152    pub query_text: String,
153    pub frequency: u64,
154    pub avg_similarity_score: f64,
155    pub timestamp: SystemTime,
156}
157
158/// Usage trends analysis
159#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct UsageTrends {
161    pub daily_query_counts: VecDeque<(SystemTime, u64)>,
162    pub hourly_patterns: [u64; 24],
163    pub weekly_patterns: [u64; 7],
164    pub growth_rate: f64,
165    pub seasonal_patterns: HashMap<String, f64>,
166    pub user_growth: f64,
167    pub feature_adoption: HashMap<String, f64>,
168}
169
170/// Alert management system
171pub struct AlertManager {
172    pub config: AlertConfig,
173    pub notification_channels: Vec<Box<dyn NotificationChannel>>,
174    pub alert_rules: Arc<RwLock<Vec<AlertRule>>>,
175}
176
177/// Alert configuration
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub struct AlertConfig {
180    pub enable_email: bool,
181    pub enable_slack: bool,
182    pub enable_webhook: bool,
183    pub email_recipients: Vec<String>,
184    pub slack_webhook: Option<String>,
185    pub webhook_url: Option<String>,
186    pub cooldown_period: Duration,
187    pub escalation_enabled: bool,
188}
189
190impl Default for AlertConfig {
191    fn default() -> Self {
192        Self {
193            enable_email: false,
194            enable_slack: false,
195            enable_webhook: false,
196            email_recipients: Vec::new(),
197            slack_webhook: None,
198            webhook_url: None,
199            cooldown_period: Duration::from_secs(300), // 5 minutes
200            escalation_enabled: false,
201        }
202    }
203}
204
205/// Alert rule definition
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct AlertRule {
208    pub name: String,
209    pub condition: String, // e.g., "latency > 100ms"
210    pub severity: AlertSeverity,
211    pub enabled: bool,
212    pub cooldown: Duration,
213    pub actions: Vec<String>,
214}
215
216/// Notification channel trait
217pub trait NotificationChannel: Send + Sync {
218    fn send_notification(&self, alert: &Alert) -> Result<()>;
219    fn get_channel_type(&self) -> String;
220}
221
222/// Dashboard data aggregation
223#[derive(Debug, Clone, Serialize, Deserialize)]
224pub struct DashboardData {
225    pub overview: OverviewData,
226    pub query_performance: QueryPerformanceData,
227    pub system_health: SystemHealthData,
228    pub quality_metrics: QualityMetricsData,
229    pub usage_analytics: UsageAnalyticsData,
230    pub alerts: Vec<Alert>,
231    pub last_updated: SystemTime,
232}
233
234/// Overview dashboard data
235#[derive(Debug, Clone, Serialize, Deserialize)]
236pub struct OverviewData {
237    pub total_queries_today: u64,
238    pub average_latency: Duration,
239    pub current_qps: f64,
240    pub system_health_score: f64,
241    pub active_alerts: u64,
242    pub index_size: u64,
243    pub vector_count: u64,
244    pub cache_hit_ratio: f64,
245}
246
247/// Query performance dashboard data
248#[derive(Debug, Clone, Serialize, Deserialize, Default)]
249pub struct QueryPerformanceData {
250    pub latency_trends: Vec<(SystemTime, Duration)>,
251    pub throughput_trends: Vec<(SystemTime, f64)>,
252    pub error_rate_trends: Vec<(SystemTime, f64)>,
253    pub top_slow_queries: Vec<(String, Duration)>,
254    pub query_distribution: HashMap<String, u64>,
255    pub performance_percentiles: HashMap<String, Duration>,
256}
257
258/// System health dashboard data
259#[derive(Debug, Clone, Serialize, Deserialize)]
260pub struct SystemHealthData {
261    pub cpu_usage: f64,
262    pub memory_usage: f64,
263    pub disk_usage: f64,
264    pub network_throughput: f64,
265    pub resource_trends: Vec<(SystemTime, f64)>,
266    pub capacity_forecast: Vec<(SystemTime, f64)>,
267    pub bottlenecks: Vec<String>,
268}
269
270/// Quality metrics dashboard data
271#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct QualityMetricsData {
273    pub recall_trends: Vec<(SystemTime, f64)>,
274    pub precision_trends: Vec<(SystemTime, f64)>,
275    pub similarity_distribution: Vec<f64>,
276    pub quality_score: f64,
277    pub quality_trends: Vec<(SystemTime, f64)>,
278    pub benchmark_comparisons: HashMap<String, f64>,
279}
280
281/// Usage analytics dashboard data
282#[derive(Debug, Clone, Serialize, Deserialize, Default)]
283pub struct UsageAnalyticsData {
284    pub user_activity: Vec<(SystemTime, u64)>,
285    pub popular_queries: Vec<PopularQuery>,
286    pub usage_patterns: HashMap<String, f64>,
287    pub growth_metrics: GrowthMetrics,
288    pub feature_usage: HashMap<String, u64>,
289}
290
291/// Growth metrics
292#[derive(Debug, Clone, Serialize, Deserialize)]
293pub struct GrowthMetrics {
294    pub daily_growth_rate: f64,
295    pub weekly_growth_rate: f64,
296    pub monthly_growth_rate: f64,
297    pub user_retention: f64,
298    pub query_volume_growth: f64,
299}
300
301/// Performance profiler for detailed analysis
302pub struct PerformanceProfiler {
303    pub profiles: Arc<RwLock<HashMap<String, ProfileData>>>,
304    pub active_profiles: Arc<RwLock<HashMap<String, Instant>>>,
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct ProfileData {
309    pub function_name: String,
310    pub total_calls: u64,
311    pub total_time: Duration,
312    pub average_time: Duration,
313    pub min_time: Duration,
314    pub max_time: Duration,
315    pub call_history: VecDeque<(SystemTime, Duration)>,
316}
317
318/// Export formats for metrics
319#[derive(Debug, Clone)]
320pub enum ExportFormat {
321    Json,
322    Csv,
323    Prometheus,
324    InfluxDb,
325}
326
327// --- Default implementations ---
328
329impl Default for MetricsCollector {
330    fn default() -> Self {
331        Self::new()
332    }
333}
334
335impl MetricsCollector {
336    pub fn new() -> Self {
337        Self {
338            query_metrics: Arc::new(RwLock::new(QueryMetrics::default())),
339            system_metrics: Arc::new(RwLock::new(SystemMetrics::default())),
340            quality_metrics: Arc::new(RwLock::new(QualityMetrics::default())),
341            custom_metrics: Arc::new(RwLock::new(HashMap::new())),
342        }
343    }
344}
345
346impl Default for PerformanceMonitor {
347    fn default() -> Self {
348        Self::new()
349    }
350}
351
352impl PerformanceMonitor {
353    pub fn new() -> Self {
354        Self {
355            thresholds: Arc::new(RwLock::new(PerformanceThresholds::default())),
356            alert_history: Arc::new(RwLock::new(VecDeque::new())),
357            current_alerts: Arc::new(RwLock::new(HashMap::new())),
358        }
359    }
360}
361
362impl Default for QueryAnalyzer {
363    fn default() -> Self {
364        Self::new()
365    }
366}
367
368impl QueryAnalyzer {
369    pub fn new() -> Self {
370        Self {
371            query_patterns: Arc::new(RwLock::new(HashMap::new())),
372            popular_queries: Arc::new(RwLock::new(VecDeque::new())),
373            usage_trends: Arc::new(RwLock::new(UsageTrends::default())),
374        }
375    }
376}
377
378impl AlertManager {
379    pub fn new(config: AlertConfig) -> Self {
380        Self {
381            config,
382            notification_channels: Vec::new(),
383            alert_rules: Arc::new(RwLock::new(Vec::new())),
384        }
385    }
386
387    pub fn send_alert(&self, alert: &Alert) -> Result<()> {
388        for channel in &self.notification_channels {
389            if let Err(e) = channel.send_notification(alert) {
390                eprintln!(
391                    "Failed to send alert via {}: {}",
392                    channel.get_channel_type(),
393                    e
394                );
395            }
396        }
397        Ok(())
398    }
399
400    pub fn add_notification_channel(&mut self, channel: Box<dyn NotificationChannel>) {
401        self.notification_channels.push(channel);
402    }
403}
404
405impl Default for DashboardData {
406    fn default() -> Self {
407        Self {
408            overview: OverviewData::default(),
409            query_performance: QueryPerformanceData::default(),
410            system_health: SystemHealthData::default(),
411            quality_metrics: QualityMetricsData::default(),
412            usage_analytics: UsageAnalyticsData::default(),
413            alerts: Vec::new(),
414            last_updated: SystemTime::now(),
415        }
416    }
417}
418
419impl Default for OverviewData {
420    fn default() -> Self {
421        Self {
422            total_queries_today: 0,
423            average_latency: Duration::from_millis(0),
424            current_qps: 0.0,
425            system_health_score: 100.0,
426            active_alerts: 0,
427            index_size: 0,
428            vector_count: 0,
429            cache_hit_ratio: 0.0,
430        }
431    }
432}
433
434impl Default for SystemHealthData {
435    fn default() -> Self {
436        Self {
437            cpu_usage: 0.0,
438            memory_usage: 0.0,
439            disk_usage: 0.0,
440            network_throughput: 0.0,
441            resource_trends: Vec::new(),
442            capacity_forecast: Vec::new(),
443            bottlenecks: Vec::new(),
444        }
445    }
446}
447
448impl Default for QualityMetricsData {
449    fn default() -> Self {
450        Self {
451            recall_trends: Vec::new(),
452            precision_trends: Vec::new(),
453            similarity_distribution: Vec::new(),
454            quality_score: 0.0,
455            quality_trends: Vec::new(),
456            benchmark_comparisons: HashMap::new(),
457        }
458    }
459}
460
461impl Default for UsageTrends {
462    fn default() -> Self {
463        Self {
464            daily_query_counts: VecDeque::new(),
465            hourly_patterns: [0; 24],
466            weekly_patterns: [0; 7],
467            growth_rate: 0.0,
468            seasonal_patterns: HashMap::new(),
469            user_growth: 0.0,
470            feature_adoption: HashMap::new(),
471        }
472    }
473}
474
475impl Default for GrowthMetrics {
476    fn default() -> Self {
477        Self {
478            daily_growth_rate: 0.0,
479            weekly_growth_rate: 0.0,
480            monthly_growth_rate: 0.0,
481            user_retention: 0.0,
482            query_volume_growth: 0.0,
483        }
484    }
485}
486
487impl Default for PerformanceProfiler {
488    fn default() -> Self {
489        Self::new()
490    }
491}
492
493impl PerformanceProfiler {
494    pub fn new() -> Self {
495        Self {
496            profiles: Arc::new(RwLock::new(HashMap::new())),
497            active_profiles: Arc::new(RwLock::new(HashMap::new())),
498        }
499    }
500
501    pub fn start_profile(&self, function_name: &str) -> String {
502        let profile_id = format!(
503            "{}_{}",
504            function_name,
505            chrono::Utc::now().timestamp_nanos_opt().unwrap_or(0)
506        );
507        let mut active = self.active_profiles.write();
508        active.insert(profile_id.clone(), Instant::now());
509        profile_id
510    }
511
512    pub fn end_profile(&self, profile_id: &str) -> Result<Duration> {
513        let mut active = self.active_profiles.write();
514        let start_time = active
515            .remove(profile_id)
516            .ok_or_else(|| anyhow!("Profile ID not found: {}", profile_id))?;
517
518        let duration = start_time.elapsed();
519
520        // Extract function name from profile ID
521        let function_name = profile_id.split('_').next().unwrap_or("unknown");
522
523        // Update profile data
524        let mut profiles = self.profiles.write();
525        let profile = profiles
526            .entry(function_name.to_string())
527            .or_insert_with(|| ProfileData {
528                function_name: function_name.to_string(),
529                total_calls: 0,
530                total_time: Duration::from_nanos(0),
531                average_time: Duration::from_nanos(0),
532                min_time: Duration::from_secs(u64::MAX),
533                max_time: Duration::from_nanos(0),
534                call_history: VecDeque::new(),
535            });
536
537        profile.total_calls += 1;
538        profile.total_time += duration;
539        profile.average_time = profile.total_time / profile.total_calls as u32;
540        profile.min_time = profile.min_time.min(duration);
541        profile.max_time = profile.max_time.max(duration);
542        profile
543            .call_history
544            .push_back((SystemTime::now(), duration));
545
546        // Keep only recent history
547        while profile.call_history.len() > 1000 {
548            profile.call_history.pop_front();
549        }
550
551        Ok(duration)
552    }
553
554    pub fn get_profile_report(&self) -> HashMap<String, ProfileData> {
555        self.profiles.read().clone()
556    }
557}
558
559// --- Notification channel implementations ---
560
561/// Email notification channel
562pub struct EmailNotificationChannel {
563    smtp_config: SmtpConfig,
564}
565
566#[derive(Debug, Clone)]
567pub struct SmtpConfig {
568    pub server: String,
569    pub port: u16,
570    pub username: String,
571    pub password: String,
572    pub from_address: String,
573}
574
575impl EmailNotificationChannel {
576    pub fn new(smtp_config: SmtpConfig) -> Self {
577        Self { smtp_config }
578    }
579}
580
581impl NotificationChannel for EmailNotificationChannel {
582    fn send_notification(&self, alert: &Alert) -> Result<()> {
583        // Email implementation would require SMTP client
584        tracing::info!(
585            "Email notification sent for alert {}: {}",
586            alert.id,
587            alert.message
588        );
589        Ok(())
590    }
591
592    fn get_channel_type(&self) -> String {
593        "email".to_string()
594    }
595}
596
597/// Slack notification channel
598pub struct SlackNotificationChannel {
599    webhook_url: String,
600    client: reqwest::Client,
601}
602
603impl SlackNotificationChannel {
604    pub fn new(webhook_url: String) -> Self {
605        Self {
606            webhook_url,
607            client: reqwest::Client::new(),
608        }
609    }
610}
611
612impl NotificationChannel for SlackNotificationChannel {
613    fn send_notification(&self, alert: &Alert) -> Result<()> {
614        let _payload = serde_json::json!({
615            "text": format!("Alert: {}", alert.message),
616            "attachments": [{
617                "color": match alert.severity {
618                    AlertSeverity::Critical => "danger",
619                    AlertSeverity::Warning => "warning",
620                    AlertSeverity::Info => "good",
621                },
622                "fields": [
623                    {
624                        "title": "Alert Type",
625                        "value": format!("{:?}", alert.alert_type),
626                        "short": true
627                    },
628                    {
629                        "title": "Severity",
630                        "value": format!("{:?}", alert.severity),
631                        "short": true
632                    },
633                    {
634                        "title": "Timestamp",
635                        "value": chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%Y-%m-%d %H:%M:%S UTC").to_string(),
636                        "short": true
637                    }
638                ]
639            }]
640        });
641
642        // In a real implementation, would send HTTP POST
643        tracing::info!(
644            "Slack notification sent for alert {}: {}",
645            alert.id,
646            alert.message
647        );
648        Ok(())
649    }
650
651    fn get_channel_type(&self) -> String {
652        "slack".to_string()
653    }
654}
655
656/// Webhook notification channel
657pub struct WebhookNotificationChannel {
658    webhook_url: String,
659    client: reqwest::Client,
660    headers: HashMap<String, String>,
661}
662
663impl WebhookNotificationChannel {
664    pub fn new(webhook_url: String) -> Self {
665        Self {
666            webhook_url,
667            client: reqwest::Client::new(),
668            headers: HashMap::new(),
669        }
670    }
671
672    pub fn with_headers(mut self, headers: HashMap<String, String>) -> Self {
673        self.headers = headers;
674        self
675    }
676}
677
678impl NotificationChannel for WebhookNotificationChannel {
679    fn send_notification(&self, alert: &Alert) -> Result<()> {
680        let _payload = serde_json::to_value(alert)?;
681
682        // In a real implementation, would send HTTP POST
683        tracing::info!(
684            "Webhook notification sent for alert {}: {}",
685            alert.id,
686            alert.message
687        );
688        Ok(())
689    }
690
691    fn get_channel_type(&self) -> String {
692        "webhook".to_string()
693    }
694}
695
696/// Format alerts as HTML for dashboard display
697pub fn format_alerts(alerts: &[Alert]) -> String {
698    if alerts.is_empty() {
699        return "<p>No active alerts</p>".to_string();
700    }
701
702    alerts
703        .iter()
704        .map(|alert| {
705            let class = match alert.severity {
706                AlertSeverity::Critical => "alert-critical",
707                AlertSeverity::Warning => "alert-warning",
708                AlertSeverity::Info => "alert-info",
709            };
710
711            format!(
712                "<div class=\"alert {}\">
713                        <strong>{:?}</strong>: {}
714                        <br><small>{}</small>
715                    </div>",
716                class,
717                alert.alert_type,
718                alert.message,
719                chrono::DateTime::<chrono::Utc>::from(alert.timestamp).format("%H:%M:%S")
720            )
721        })
722        .collect::<Vec<_>>()
723        .join("\n")
724}