vecstore/
monitoring.rs

1//! Vector Database Monitoring and Alerting
2//!
3//! Provides real-time monitoring of vector operations with configurable alerts
4//! for data quality, performance, storage, and index health.
5
6use crate::error::{Result, VecStoreError};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, SystemTime};
10
11/// Monitoring configuration
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MonitoringConfig {
14    /// Maximum number of metric samples to keep in history
15    pub max_history_size: usize,
16
17    /// Interval for collecting metrics
18    pub collection_interval: Duration,
19
20    /// Enable alert notifications
21    pub enable_alerts: bool,
22
23    /// Alert cooldown period to prevent spam
24    pub alert_cooldown: Duration,
25
26    /// Enable metric aggregation
27    pub enable_aggregation: bool,
28}
29
30impl Default for MonitoringConfig {
31    fn default() -> Self {
32        Self {
33            max_history_size: 1000,
34            collection_interval: Duration::from_secs(60),
35            enable_alerts: true,
36            alert_cooldown: Duration::from_secs(300),
37            enable_aggregation: true,
38        }
39    }
40}
41
42/// Alert severity levels
43#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
44pub enum AlertSeverity {
45    Info,
46    Warning,
47    Error,
48    Critical,
49}
50
51/// Alert categories
52#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub enum AlertCategory {
54    Performance,
55    DataQuality,
56    Storage,
57    IndexHealth,
58    Security,
59}
60
61/// Alert definition
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Alert {
64    pub id: String,
65    pub timestamp: SystemTime,
66    pub severity: AlertSeverity,
67    pub category: AlertCategory,
68    pub message: String,
69    pub metric_name: String,
70    pub metric_value: f64,
71    pub threshold: f64,
72}
73
74/// Alert rule for triggering notifications
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AlertRule {
77    pub name: String,
78    pub metric_name: String,
79    pub category: AlertCategory,
80    pub severity: AlertSeverity,
81    pub condition: AlertCondition,
82    pub threshold: f64,
83    pub enabled: bool,
84}
85
86/// Alert condition types
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum AlertCondition {
89    GreaterThan,
90    LessThan,
91    Equal,
92    NotEqual,
93    PercentageChange,
94}
95
96impl AlertCondition {
97    fn evaluate(&self, value: f64, threshold: f64) -> bool {
98        match self {
99            AlertCondition::GreaterThan => value > threshold,
100            AlertCondition::LessThan => value < threshold,
101            AlertCondition::Equal => (value - threshold).abs() < 1e-6,
102            AlertCondition::NotEqual => (value - threshold).abs() >= 1e-6,
103            AlertCondition::PercentageChange => {
104                (value / threshold - 1.0).abs() > 0.1 // 10% change
105            }
106        }
107    }
108}
109
110/// Metric types
111#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub enum MetricType {
113    // Performance metrics
114    QueryLatency,
115    InsertLatency,
116    ThroughputQPS,
117
118    // Data quality metrics
119    VectorQuality,
120    DuplicateRate,
121    OutlierRate,
122
123    // Storage metrics
124    StorageUsed,
125    IndexSize,
126    MemoryUsage,
127
128    // Index health metrics
129    IndexFragmentation,
130    CacheHitRate,
131    ErrorRate,
132}
133
134impl MetricType {
135    pub fn name(&self) -> &'static str {
136        match self {
137            MetricType::QueryLatency => "query_latency_ms",
138            MetricType::InsertLatency => "insert_latency_ms",
139            MetricType::ThroughputQPS => "throughput_qps",
140            MetricType::VectorQuality => "vector_quality",
141            MetricType::DuplicateRate => "duplicate_rate",
142            MetricType::OutlierRate => "outlier_rate",
143            MetricType::StorageUsed => "storage_used_mb",
144            MetricType::IndexSize => "index_size_mb",
145            MetricType::MemoryUsage => "memory_usage_mb",
146            MetricType::IndexFragmentation => "index_fragmentation",
147            MetricType::CacheHitRate => "cache_hit_rate",
148            MetricType::ErrorRate => "error_rate",
149        }
150    }
151
152    pub fn unit(&self) -> &'static str {
153        match self {
154            MetricType::QueryLatency | MetricType::InsertLatency => "ms",
155            MetricType::ThroughputQPS => "qps",
156            MetricType::VectorQuality
157            | MetricType::DuplicateRate
158            | MetricType::OutlierRate
159            | MetricType::CacheHitRate
160            | MetricType::ErrorRate => "ratio",
161            MetricType::StorageUsed | MetricType::IndexSize | MetricType::MemoryUsage => "MB",
162            MetricType::IndexFragmentation => "percent",
163        }
164    }
165}
166
167/// Metric data point
168#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct MetricPoint {
170    pub timestamp: SystemTime,
171    pub value: f64,
172}
173
174/// Metric history
175#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct MetricHistory {
177    pub metric_type: MetricType,
178    pub points: VecDeque<MetricPoint>,
179    pub max_size: usize,
180}
181
182impl MetricHistory {
183    pub fn new(metric_type: MetricType, max_size: usize) -> Self {
184        Self {
185            metric_type,
186            points: VecDeque::with_capacity(max_size),
187            max_size,
188        }
189    }
190
191    pub fn add(&mut self, value: f64) {
192        if self.points.len() >= self.max_size {
193            self.points.pop_front();
194        }
195        self.points.push_back(MetricPoint {
196            timestamp: SystemTime::now(),
197            value,
198        });
199    }
200
201    pub fn latest(&self) -> Option<f64> {
202        self.points.back().map(|p| p.value)
203    }
204
205    pub fn average(&self) -> Option<f64> {
206        if self.points.is_empty() {
207            return None;
208        }
209        let sum: f64 = self.points.iter().map(|p| p.value).sum();
210        Some(sum / self.points.len() as f64)
211    }
212
213    pub fn percentile(&self, p: f64) -> Option<f64> {
214        if self.points.is_empty() {
215            return None;
216        }
217        let mut values: Vec<f64> = self.points.iter().map(|p| p.value).collect();
218        values.sort_by(|a, b| a.partial_cmp(b).unwrap());
219        let idx = ((values.len() - 1) as f64 * p / 100.0) as usize;
220        Some(values[idx])
221    }
222}
223
224/// Monitoring statistics
225#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct MonitoringStats {
227    pub total_alerts: usize,
228    pub alerts_by_severity: HashMap<AlertSeverity, usize>,
229    pub alerts_by_category: HashMap<AlertCategory, usize>,
230    pub active_rules: usize,
231    pub metrics_tracked: usize,
232    pub uptime: Duration,
233}
234
235/// Monitoring report
236#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct MonitoringReport {
238    pub timestamp: SystemTime,
239    pub metrics: HashMap<String, f64>,
240    pub recent_alerts: Vec<Alert>,
241    pub stats: MonitoringStats,
242}
243
244/// Vector database monitor
245pub struct Monitor {
246    config: MonitoringConfig,
247    metrics: HashMap<MetricType, MetricHistory>,
248    alert_rules: Vec<AlertRule>,
249    alerts: VecDeque<Alert>,
250    last_alert_time: HashMap<String, SystemTime>,
251    start_time: SystemTime,
252}
253
254impl Monitor {
255    /// Create a new monitor
256    pub fn new(config: MonitoringConfig) -> Self {
257        Self {
258            config,
259            metrics: HashMap::new(),
260            alert_rules: Vec::new(),
261            alerts: VecDeque::new(),
262            last_alert_time: HashMap::new(),
263            start_time: SystemTime::now(),
264        }
265    }
266
267    /// Create monitor with default configuration
268    pub fn default() -> Self {
269        Self::new(MonitoringConfig::default())
270    }
271
272    /// Add an alert rule
273    pub fn add_rule(&mut self, rule: AlertRule) {
274        self.alert_rules.push(rule);
275    }
276
277    /// Remove an alert rule by name
278    pub fn remove_rule(&mut self, name: &str) -> bool {
279        if let Some(pos) = self.alert_rules.iter().position(|r| r.name == name) {
280            self.alert_rules.remove(pos);
281            true
282        } else {
283            false
284        }
285    }
286
287    /// Record a metric value
288    pub fn record(&mut self, metric_type: MetricType, value: f64) {
289        // Get or create metric history
290        let history = self
291            .metrics
292            .entry(metric_type)
293            .or_insert_with(|| MetricHistory::new(metric_type, self.config.max_history_size));
294
295        history.add(value);
296
297        // Check alert rules if enabled
298        if self.config.enable_alerts {
299            self.check_alerts(metric_type, value);
300        }
301    }
302
303    /// Check alert rules for a metric
304    fn check_alerts(&mut self, metric_type: MetricType, value: f64) {
305        let metric_name = metric_type.name();
306
307        for rule in &self.alert_rules {
308            if !rule.enabled || rule.metric_name != metric_name {
309                continue;
310            }
311
312            // Check cooldown period
313            if let Some(last_time) = self.last_alert_time.get(&rule.name) {
314                if let Ok(elapsed) = SystemTime::now().duration_since(*last_time) {
315                    if elapsed < self.config.alert_cooldown {
316                        continue;
317                    }
318                }
319            }
320
321            // Evaluate condition
322            if rule.condition.evaluate(value, rule.threshold) {
323                let alert = Alert {
324                    id: format!(
325                        "{}-{}",
326                        rule.name,
327                        SystemTime::now()
328                            .duration_since(SystemTime::UNIX_EPOCH)
329                            .unwrap()
330                            .as_secs()
331                    ),
332                    timestamp: SystemTime::now(),
333                    severity: rule.severity,
334                    category: rule.category,
335                    message: format!(
336                        "{}: {} = {:.3} {} (threshold: {:.3})",
337                        rule.name,
338                        metric_name,
339                        value,
340                        metric_type.unit(),
341                        rule.threshold
342                    ),
343                    metric_name: metric_name.to_string(),
344                    metric_value: value,
345                    threshold: rule.threshold,
346                };
347
348                self.alerts.push_back(alert);
349                self.last_alert_time
350                    .insert(rule.name.clone(), SystemTime::now());
351
352                // Limit alert history
353                while self.alerts.len() > self.config.max_history_size {
354                    self.alerts.pop_front();
355                }
356            }
357        }
358    }
359
360    /// Get recent alerts
361    pub fn get_alerts(&self, count: usize) -> Vec<Alert> {
362        self.alerts.iter().rev().take(count).cloned().collect()
363    }
364
365    /// Get alerts by severity
366    pub fn get_alerts_by_severity(&self, severity: AlertSeverity) -> Vec<Alert> {
367        self.alerts
368            .iter()
369            .filter(|a| a.severity == severity)
370            .cloned()
371            .collect()
372    }
373
374    /// Get alerts by category
375    pub fn get_alerts_by_category(&self, category: AlertCategory) -> Vec<Alert> {
376        self.alerts
377            .iter()
378            .filter(|a| a.category == category)
379            .cloned()
380            .collect()
381    }
382
383    /// Clear all alerts
384    pub fn clear_alerts(&mut self) {
385        self.alerts.clear();
386        self.last_alert_time.clear();
387    }
388
389    /// Get metric history
390    pub fn get_metric(&self, metric_type: MetricType) -> Option<&MetricHistory> {
391        self.metrics.get(&metric_type)
392    }
393
394    /// Get all metrics
395    pub fn get_all_metrics(&self) -> &HashMap<MetricType, MetricHistory> {
396        &self.metrics
397    }
398
399    /// Generate monitoring report
400    pub fn generate_report(&self) -> MonitoringReport {
401        // Collect current metric values
402        let mut metrics = HashMap::new();
403        for (metric_type, history) in &self.metrics {
404            if let Some(value) = history.latest() {
405                metrics.insert(metric_type.name().to_string(), value);
406            }
407        }
408
409        // Get recent alerts
410        let recent_alerts = self.get_alerts(10);
411
412        // Calculate statistics
413        let mut alerts_by_severity = HashMap::new();
414        let mut alerts_by_category = HashMap::new();
415
416        for alert in &self.alerts {
417            *alerts_by_severity.entry(alert.severity).or_insert(0) += 1;
418            *alerts_by_category.entry(alert.category).or_insert(0) += 1;
419        }
420
421        let uptime = SystemTime::now()
422            .duration_since(self.start_time)
423            .unwrap_or(Duration::from_secs(0));
424
425        let stats = MonitoringStats {
426            total_alerts: self.alerts.len(),
427            alerts_by_severity,
428            alerts_by_category,
429            active_rules: self.alert_rules.iter().filter(|r| r.enabled).count(),
430            metrics_tracked: self.metrics.len(),
431            uptime,
432        };
433
434        MonitoringReport {
435            timestamp: SystemTime::now(),
436            metrics,
437            recent_alerts,
438            stats,
439        }
440    }
441
442    /// Get monitoring statistics
443    pub fn get_stats(&self) -> MonitoringStats {
444        let mut alerts_by_severity = HashMap::new();
445        let mut alerts_by_category = HashMap::new();
446
447        for alert in &self.alerts {
448            *alerts_by_severity.entry(alert.severity).or_insert(0) += 1;
449            *alerts_by_category.entry(alert.category).or_insert(0) += 1;
450        }
451
452        let uptime = SystemTime::now()
453            .duration_since(self.start_time)
454            .unwrap_or(Duration::from_secs(0));
455
456        MonitoringStats {
457            total_alerts: self.alerts.len(),
458            alerts_by_severity,
459            alerts_by_category,
460            active_rules: self.alert_rules.iter().filter(|r| r.enabled).count(),
461            metrics_tracked: self.metrics.len(),
462            uptime,
463        }
464    }
465
466    /// Export metrics as Prometheus format
467    pub fn export_prometheus(&self) -> String {
468        let mut output = String::new();
469
470        for (metric_type, history) in &self.metrics {
471            if let Some(value) = history.latest() {
472                output.push_str(&format!("vecstore_{} {}\n", metric_type.name(), value));
473            }
474        }
475
476        output
477    }
478}
479
480/// Preset alert rules for common scenarios
481pub struct AlertPresets;
482
483impl AlertPresets {
484    /// High query latency alert
485    pub fn high_query_latency(threshold_ms: f64) -> AlertRule {
486        AlertRule {
487            name: "high_query_latency".to_string(),
488            metric_name: "query_latency_ms".to_string(),
489            category: AlertCategory::Performance,
490            severity: AlertSeverity::Warning,
491            condition: AlertCondition::GreaterThan,
492            threshold: threshold_ms,
493            enabled: true,
494        }
495    }
496
497    /// Low cache hit rate alert
498    pub fn low_cache_hit_rate(threshold: f64) -> AlertRule {
499        AlertRule {
500            name: "low_cache_hit_rate".to_string(),
501            metric_name: "cache_hit_rate".to_string(),
502            category: AlertCategory::Performance,
503            severity: AlertSeverity::Warning,
504            condition: AlertCondition::LessThan,
505            threshold,
506            enabled: true,
507        }
508    }
509
510    /// High error rate alert
511    pub fn high_error_rate(threshold: f64) -> AlertRule {
512        AlertRule {
513            name: "high_error_rate".to_string(),
514            metric_name: "error_rate".to_string(),
515            category: AlertCategory::Performance,
516            severity: AlertSeverity::Error,
517            condition: AlertCondition::GreaterThan,
518            threshold,
519            enabled: true,
520        }
521    }
522
523    /// Low vector quality alert
524    pub fn low_vector_quality(threshold: f64) -> AlertRule {
525        AlertRule {
526            name: "low_vector_quality".to_string(),
527            metric_name: "vector_quality".to_string(),
528            category: AlertCategory::DataQuality,
529            severity: AlertSeverity::Warning,
530            condition: AlertCondition::LessThan,
531            threshold,
532            enabled: true,
533        }
534    }
535
536    /// High storage usage alert
537    pub fn high_storage_usage(threshold_mb: f64) -> AlertRule {
538        AlertRule {
539            name: "high_storage_usage".to_string(),
540            metric_name: "storage_used_mb".to_string(),
541            category: AlertCategory::Storage,
542            severity: AlertSeverity::Warning,
543            condition: AlertCondition::GreaterThan,
544            threshold: threshold_mb,
545            enabled: true,
546        }
547    }
548
549    /// High memory usage alert
550    pub fn high_memory_usage(threshold_mb: f64) -> AlertRule {
551        AlertRule {
552            name: "high_memory_usage".to_string(),
553            metric_name: "memory_usage_mb".to_string(),
554            category: AlertCategory::Storage,
555            severity: AlertSeverity::Error,
556            condition: AlertCondition::GreaterThan,
557            threshold: threshold_mb,
558            enabled: true,
559        }
560    }
561
562    /// High index fragmentation alert
563    pub fn high_index_fragmentation(threshold_pct: f64) -> AlertRule {
564        AlertRule {
565            name: "high_index_fragmentation".to_string(),
566            metric_name: "index_fragmentation".to_string(),
567            category: AlertCategory::IndexHealth,
568            severity: AlertSeverity::Warning,
569            condition: AlertCondition::GreaterThan,
570            threshold: threshold_pct,
571            enabled: true,
572        }
573    }
574
575    /// Get all default rules
576    pub fn default_rules() -> Vec<AlertRule> {
577        vec![
578            Self::high_query_latency(100.0),
579            Self::low_cache_hit_rate(0.7),
580            Self::high_error_rate(0.05),
581            Self::low_vector_quality(0.6),
582            Self::high_storage_usage(1000.0),
583            Self::high_memory_usage(2000.0),
584            Self::high_index_fragmentation(30.0),
585        ]
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use super::*;
592    use std::thread;
593
594    #[test]
595    fn test_monitor_creation() {
596        let monitor = Monitor::default();
597        assert_eq!(monitor.metrics.len(), 0);
598        assert_eq!(monitor.alert_rules.len(), 0);
599    }
600
601    #[test]
602    fn test_record_metrics() {
603        let mut monitor = Monitor::default();
604
605        monitor.record(MetricType::QueryLatency, 50.0);
606        monitor.record(MetricType::QueryLatency, 75.0);
607        monitor.record(MetricType::QueryLatency, 100.0);
608
609        let history = monitor.get_metric(MetricType::QueryLatency).unwrap();
610        assert_eq!(history.points.len(), 3);
611        assert_eq!(history.latest(), Some(100.0));
612    }
613
614    #[test]
615    fn test_metric_statistics() {
616        let mut history = MetricHistory::new(MetricType::QueryLatency, 100);
617
618        history.add(50.0);
619        history.add(100.0);
620        history.add(150.0);
621
622        assert_eq!(history.latest(), Some(150.0));
623        assert_eq!(history.average(), Some(100.0));
624
625        // Test percentile
626        let p50 = history.percentile(50.0);
627        assert!(p50.is_some());
628        assert!((p50.unwrap() - 100.0).abs() < 1.0);
629    }
630
631    #[test]
632    fn test_alert_rules() {
633        let mut monitor = Monitor::default();
634
635        // Add high latency alert rule
636        monitor.add_rule(AlertPresets::high_query_latency(100.0));
637
638        // Record values below threshold - no alerts
639        monitor.record(MetricType::QueryLatency, 50.0);
640        monitor.record(MetricType::QueryLatency, 75.0);
641        assert_eq!(monitor.alerts.len(), 0);
642
643        // Record value above threshold - should trigger alert
644        monitor.record(MetricType::QueryLatency, 150.0);
645        assert_eq!(monitor.alerts.len(), 1);
646
647        let alert = &monitor.alerts[0];
648        assert_eq!(alert.severity, AlertSeverity::Warning);
649        assert_eq!(alert.category, AlertCategory::Performance);
650    }
651
652    #[test]
653    fn test_alert_cooldown() {
654        let config = MonitoringConfig {
655            alert_cooldown: Duration::from_millis(100),
656            ..Default::default()
657        };
658
659        let mut monitor = Monitor::new(config);
660        monitor.add_rule(AlertPresets::high_query_latency(100.0));
661
662        // First trigger
663        monitor.record(MetricType::QueryLatency, 150.0);
664        assert_eq!(monitor.alerts.len(), 1);
665
666        // Immediate second trigger - should be suppressed
667        monitor.record(MetricType::QueryLatency, 200.0);
668        assert_eq!(monitor.alerts.len(), 1);
669
670        // Wait for cooldown
671        thread::sleep(Duration::from_millis(150));
672
673        // Third trigger after cooldown - should create new alert
674        monitor.record(MetricType::QueryLatency, 250.0);
675        assert_eq!(monitor.alerts.len(), 2);
676    }
677
678    #[test]
679    fn test_alert_filtering() {
680        let mut monitor = Monitor::default();
681
682        monitor.add_rule(AlertPresets::high_query_latency(100.0));
683        monitor.add_rule(AlertPresets::high_error_rate(0.05));
684
685        monitor.record(MetricType::QueryLatency, 150.0);
686        monitor.record(MetricType::ErrorRate, 0.1);
687
688        assert_eq!(monitor.alerts.len(), 2);
689
690        let warnings = monitor.get_alerts_by_severity(AlertSeverity::Warning);
691        assert_eq!(warnings.len(), 1);
692
693        let errors = monitor.get_alerts_by_severity(AlertSeverity::Error);
694        assert_eq!(errors.len(), 1);
695
696        let perf_alerts = monitor.get_alerts_by_category(AlertCategory::Performance);
697        assert_eq!(perf_alerts.len(), 2);
698    }
699
700    #[test]
701    fn test_monitoring_report() {
702        let mut monitor = Monitor::default();
703
704        monitor.record(MetricType::QueryLatency, 50.0);
705        monitor.record(MetricType::MemoryUsage, 512.0);
706
707        monitor.add_rule(AlertPresets::high_memory_usage(1000.0));
708
709        let report = monitor.generate_report();
710
711        assert_eq!(report.metrics.len(), 2);
712        assert!(report.metrics.contains_key("query_latency_ms"));
713        assert!(report.metrics.contains_key("memory_usage_mb"));
714
715        assert_eq!(report.stats.metrics_tracked, 2);
716        assert_eq!(report.stats.active_rules, 1);
717    }
718
719    #[test]
720    fn test_prometheus_export() {
721        let mut monitor = Monitor::default();
722
723        monitor.record(MetricType::QueryLatency, 50.0);
724        monitor.record(MetricType::ThroughputQPS, 1000.0);
725
726        let output = monitor.export_prometheus();
727
728        assert!(output.contains("vecstore_query_latency_ms"));
729        assert!(output.contains("vecstore_throughput_qps"));
730        assert!(output.contains("50"));
731        assert!(output.contains("1000"));
732    }
733
734    #[test]
735    fn test_rule_management() {
736        let mut monitor = Monitor::default();
737
738        let rule = AlertPresets::high_query_latency(100.0);
739        monitor.add_rule(rule);
740
741        assert_eq!(monitor.alert_rules.len(), 1);
742
743        let removed = monitor.remove_rule("high_query_latency");
744        assert!(removed);
745        assert_eq!(monitor.alert_rules.len(), 0);
746
747        let removed_again = monitor.remove_rule("nonexistent");
748        assert!(!removed_again);
749    }
750
751    #[test]
752    fn test_default_presets() {
753        let rules = AlertPresets::default_rules();
754        assert_eq!(rules.len(), 7);
755
756        // Verify all rules are enabled
757        for rule in &rules {
758            assert!(rule.enabled);
759        }
760    }
761}