1use crate::error::{Result, VecStoreError};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, VecDeque};
9use std::time::{Duration, SystemTime};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct MonitoringConfig {
14 pub max_history_size: usize,
16
17 pub collection_interval: Duration,
19
20 pub enable_alerts: bool,
22
23 pub alert_cooldown: Duration,
25
26 pub enable_aggregation: bool,
28}
29
30impl Default for MonitoringConfig {
31 fn default() -> Self {
32 Self {
33 max_history_size: 1000,
34 collection_interval: Duration::from_secs(60),
35 enable_alerts: true,
36 alert_cooldown: Duration::from_secs(300),
37 enable_aggregation: true,
38 }
39 }
40}
41
42#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)]
44pub enum AlertSeverity {
45 Info,
46 Warning,
47 Error,
48 Critical,
49}
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
53pub enum AlertCategory {
54 Performance,
55 DataQuality,
56 Storage,
57 IndexHealth,
58 Security,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct Alert {
64 pub id: String,
65 pub timestamp: SystemTime,
66 pub severity: AlertSeverity,
67 pub category: AlertCategory,
68 pub message: String,
69 pub metric_name: String,
70 pub metric_value: f64,
71 pub threshold: f64,
72}
73
74#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct AlertRule {
77 pub name: String,
78 pub metric_name: String,
79 pub category: AlertCategory,
80 pub severity: AlertSeverity,
81 pub condition: AlertCondition,
82 pub threshold: f64,
83 pub enabled: bool,
84}
85
86#[derive(Debug, Clone, Serialize, Deserialize)]
88pub enum AlertCondition {
89 GreaterThan,
90 LessThan,
91 Equal,
92 NotEqual,
93 PercentageChange,
94}
95
96impl AlertCondition {
97 fn evaluate(&self, value: f64, threshold: f64) -> bool {
98 match self {
99 AlertCondition::GreaterThan => value > threshold,
100 AlertCondition::LessThan => value < threshold,
101 AlertCondition::Equal => (value - threshold).abs() < 1e-6,
102 AlertCondition::NotEqual => (value - threshold).abs() >= 1e-6,
103 AlertCondition::PercentageChange => {
104 (value / threshold - 1.0).abs() > 0.1 }
106 }
107 }
108}
109
110#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
112pub enum MetricType {
113 QueryLatency,
115 InsertLatency,
116 ThroughputQPS,
117
118 VectorQuality,
120 DuplicateRate,
121 OutlierRate,
122
123 StorageUsed,
125 IndexSize,
126 MemoryUsage,
127
128 IndexFragmentation,
130 CacheHitRate,
131 ErrorRate,
132}
133
134impl MetricType {
135 pub fn name(&self) -> &'static str {
136 match self {
137 MetricType::QueryLatency => "query_latency_ms",
138 MetricType::InsertLatency => "insert_latency_ms",
139 MetricType::ThroughputQPS => "throughput_qps",
140 MetricType::VectorQuality => "vector_quality",
141 MetricType::DuplicateRate => "duplicate_rate",
142 MetricType::OutlierRate => "outlier_rate",
143 MetricType::StorageUsed => "storage_used_mb",
144 MetricType::IndexSize => "index_size_mb",
145 MetricType::MemoryUsage => "memory_usage_mb",
146 MetricType::IndexFragmentation => "index_fragmentation",
147 MetricType::CacheHitRate => "cache_hit_rate",
148 MetricType::ErrorRate => "error_rate",
149 }
150 }
151
152 pub fn unit(&self) -> &'static str {
153 match self {
154 MetricType::QueryLatency | MetricType::InsertLatency => "ms",
155 MetricType::ThroughputQPS => "qps",
156 MetricType::VectorQuality
157 | MetricType::DuplicateRate
158 | MetricType::OutlierRate
159 | MetricType::CacheHitRate
160 | MetricType::ErrorRate => "ratio",
161 MetricType::StorageUsed | MetricType::IndexSize | MetricType::MemoryUsage => "MB",
162 MetricType::IndexFragmentation => "percent",
163 }
164 }
165}
166
167#[derive(Debug, Clone, Serialize, Deserialize)]
169pub struct MetricPoint {
170 pub timestamp: SystemTime,
171 pub value: f64,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct MetricHistory {
177 pub metric_type: MetricType,
178 pub points: VecDeque<MetricPoint>,
179 pub max_size: usize,
180}
181
182impl MetricHistory {
183 pub fn new(metric_type: MetricType, max_size: usize) -> Self {
184 Self {
185 metric_type,
186 points: VecDeque::with_capacity(max_size),
187 max_size,
188 }
189 }
190
191 pub fn add(&mut self, value: f64) {
192 if self.points.len() >= self.max_size {
193 self.points.pop_front();
194 }
195 self.points.push_back(MetricPoint {
196 timestamp: SystemTime::now(),
197 value,
198 });
199 }
200
201 pub fn latest(&self) -> Option<f64> {
202 self.points.back().map(|p| p.value)
203 }
204
205 pub fn average(&self) -> Option<f64> {
206 if self.points.is_empty() {
207 return None;
208 }
209 let sum: f64 = self.points.iter().map(|p| p.value).sum();
210 Some(sum / self.points.len() as f64)
211 }
212
213 pub fn percentile(&self, p: f64) -> Option<f64> {
214 if self.points.is_empty() {
215 return None;
216 }
217 let mut values: Vec<f64> = self.points.iter().map(|p| p.value).collect();
218 values.sort_by(|a, b| a.partial_cmp(b).unwrap());
219 let idx = ((values.len() - 1) as f64 * p / 100.0) as usize;
220 Some(values[idx])
221 }
222}
223
224#[derive(Debug, Clone, Serialize, Deserialize)]
226pub struct MonitoringStats {
227 pub total_alerts: usize,
228 pub alerts_by_severity: HashMap<AlertSeverity, usize>,
229 pub alerts_by_category: HashMap<AlertCategory, usize>,
230 pub active_rules: usize,
231 pub metrics_tracked: usize,
232 pub uptime: Duration,
233}
234
235#[derive(Debug, Clone, Serialize, Deserialize)]
237pub struct MonitoringReport {
238 pub timestamp: SystemTime,
239 pub metrics: HashMap<String, f64>,
240 pub recent_alerts: Vec<Alert>,
241 pub stats: MonitoringStats,
242}
243
244pub struct Monitor {
246 config: MonitoringConfig,
247 metrics: HashMap<MetricType, MetricHistory>,
248 alert_rules: Vec<AlertRule>,
249 alerts: VecDeque<Alert>,
250 last_alert_time: HashMap<String, SystemTime>,
251 start_time: SystemTime,
252}
253
254impl Monitor {
255 pub fn new(config: MonitoringConfig) -> Self {
257 Self {
258 config,
259 metrics: HashMap::new(),
260 alert_rules: Vec::new(),
261 alerts: VecDeque::new(),
262 last_alert_time: HashMap::new(),
263 start_time: SystemTime::now(),
264 }
265 }
266
267 pub fn default() -> Self {
269 Self::new(MonitoringConfig::default())
270 }
271
272 pub fn add_rule(&mut self, rule: AlertRule) {
274 self.alert_rules.push(rule);
275 }
276
277 pub fn remove_rule(&mut self, name: &str) -> bool {
279 if let Some(pos) = self.alert_rules.iter().position(|r| r.name == name) {
280 self.alert_rules.remove(pos);
281 true
282 } else {
283 false
284 }
285 }
286
287 pub fn record(&mut self, metric_type: MetricType, value: f64) {
289 let history = self
291 .metrics
292 .entry(metric_type)
293 .or_insert_with(|| MetricHistory::new(metric_type, self.config.max_history_size));
294
295 history.add(value);
296
297 if self.config.enable_alerts {
299 self.check_alerts(metric_type, value);
300 }
301 }
302
303 fn check_alerts(&mut self, metric_type: MetricType, value: f64) {
305 let metric_name = metric_type.name();
306
307 for rule in &self.alert_rules {
308 if !rule.enabled || rule.metric_name != metric_name {
309 continue;
310 }
311
312 if let Some(last_time) = self.last_alert_time.get(&rule.name) {
314 if let Ok(elapsed) = SystemTime::now().duration_since(*last_time) {
315 if elapsed < self.config.alert_cooldown {
316 continue;
317 }
318 }
319 }
320
321 if rule.condition.evaluate(value, rule.threshold) {
323 let alert = Alert {
324 id: format!(
325 "{}-{}",
326 rule.name,
327 SystemTime::now()
328 .duration_since(SystemTime::UNIX_EPOCH)
329 .unwrap()
330 .as_secs()
331 ),
332 timestamp: SystemTime::now(),
333 severity: rule.severity,
334 category: rule.category,
335 message: format!(
336 "{}: {} = {:.3} {} (threshold: {:.3})",
337 rule.name,
338 metric_name,
339 value,
340 metric_type.unit(),
341 rule.threshold
342 ),
343 metric_name: metric_name.to_string(),
344 metric_value: value,
345 threshold: rule.threshold,
346 };
347
348 self.alerts.push_back(alert);
349 self.last_alert_time
350 .insert(rule.name.clone(), SystemTime::now());
351
352 while self.alerts.len() > self.config.max_history_size {
354 self.alerts.pop_front();
355 }
356 }
357 }
358 }
359
360 pub fn get_alerts(&self, count: usize) -> Vec<Alert> {
362 self.alerts.iter().rev().take(count).cloned().collect()
363 }
364
365 pub fn get_alerts_by_severity(&self, severity: AlertSeverity) -> Vec<Alert> {
367 self.alerts
368 .iter()
369 .filter(|a| a.severity == severity)
370 .cloned()
371 .collect()
372 }
373
374 pub fn get_alerts_by_category(&self, category: AlertCategory) -> Vec<Alert> {
376 self.alerts
377 .iter()
378 .filter(|a| a.category == category)
379 .cloned()
380 .collect()
381 }
382
383 pub fn clear_alerts(&mut self) {
385 self.alerts.clear();
386 self.last_alert_time.clear();
387 }
388
389 pub fn get_metric(&self, metric_type: MetricType) -> Option<&MetricHistory> {
391 self.metrics.get(&metric_type)
392 }
393
394 pub fn get_all_metrics(&self) -> &HashMap<MetricType, MetricHistory> {
396 &self.metrics
397 }
398
399 pub fn generate_report(&self) -> MonitoringReport {
401 let mut metrics = HashMap::new();
403 for (metric_type, history) in &self.metrics {
404 if let Some(value) = history.latest() {
405 metrics.insert(metric_type.name().to_string(), value);
406 }
407 }
408
409 let recent_alerts = self.get_alerts(10);
411
412 let mut alerts_by_severity = HashMap::new();
414 let mut alerts_by_category = HashMap::new();
415
416 for alert in &self.alerts {
417 *alerts_by_severity.entry(alert.severity).or_insert(0) += 1;
418 *alerts_by_category.entry(alert.category).or_insert(0) += 1;
419 }
420
421 let uptime = SystemTime::now()
422 .duration_since(self.start_time)
423 .unwrap_or(Duration::from_secs(0));
424
425 let stats = MonitoringStats {
426 total_alerts: self.alerts.len(),
427 alerts_by_severity,
428 alerts_by_category,
429 active_rules: self.alert_rules.iter().filter(|r| r.enabled).count(),
430 metrics_tracked: self.metrics.len(),
431 uptime,
432 };
433
434 MonitoringReport {
435 timestamp: SystemTime::now(),
436 metrics,
437 recent_alerts,
438 stats,
439 }
440 }
441
442 pub fn get_stats(&self) -> MonitoringStats {
444 let mut alerts_by_severity = HashMap::new();
445 let mut alerts_by_category = HashMap::new();
446
447 for alert in &self.alerts {
448 *alerts_by_severity.entry(alert.severity).or_insert(0) += 1;
449 *alerts_by_category.entry(alert.category).or_insert(0) += 1;
450 }
451
452 let uptime = SystemTime::now()
453 .duration_since(self.start_time)
454 .unwrap_or(Duration::from_secs(0));
455
456 MonitoringStats {
457 total_alerts: self.alerts.len(),
458 alerts_by_severity,
459 alerts_by_category,
460 active_rules: self.alert_rules.iter().filter(|r| r.enabled).count(),
461 metrics_tracked: self.metrics.len(),
462 uptime,
463 }
464 }
465
466 pub fn export_prometheus(&self) -> String {
468 let mut output = String::new();
469
470 for (metric_type, history) in &self.metrics {
471 if let Some(value) = history.latest() {
472 output.push_str(&format!("vecstore_{} {}\n", metric_type.name(), value));
473 }
474 }
475
476 output
477 }
478}
479
480pub struct AlertPresets;
482
483impl AlertPresets {
484 pub fn high_query_latency(threshold_ms: f64) -> AlertRule {
486 AlertRule {
487 name: "high_query_latency".to_string(),
488 metric_name: "query_latency_ms".to_string(),
489 category: AlertCategory::Performance,
490 severity: AlertSeverity::Warning,
491 condition: AlertCondition::GreaterThan,
492 threshold: threshold_ms,
493 enabled: true,
494 }
495 }
496
497 pub fn low_cache_hit_rate(threshold: f64) -> AlertRule {
499 AlertRule {
500 name: "low_cache_hit_rate".to_string(),
501 metric_name: "cache_hit_rate".to_string(),
502 category: AlertCategory::Performance,
503 severity: AlertSeverity::Warning,
504 condition: AlertCondition::LessThan,
505 threshold,
506 enabled: true,
507 }
508 }
509
510 pub fn high_error_rate(threshold: f64) -> AlertRule {
512 AlertRule {
513 name: "high_error_rate".to_string(),
514 metric_name: "error_rate".to_string(),
515 category: AlertCategory::Performance,
516 severity: AlertSeverity::Error,
517 condition: AlertCondition::GreaterThan,
518 threshold,
519 enabled: true,
520 }
521 }
522
523 pub fn low_vector_quality(threshold: f64) -> AlertRule {
525 AlertRule {
526 name: "low_vector_quality".to_string(),
527 metric_name: "vector_quality".to_string(),
528 category: AlertCategory::DataQuality,
529 severity: AlertSeverity::Warning,
530 condition: AlertCondition::LessThan,
531 threshold,
532 enabled: true,
533 }
534 }
535
536 pub fn high_storage_usage(threshold_mb: f64) -> AlertRule {
538 AlertRule {
539 name: "high_storage_usage".to_string(),
540 metric_name: "storage_used_mb".to_string(),
541 category: AlertCategory::Storage,
542 severity: AlertSeverity::Warning,
543 condition: AlertCondition::GreaterThan,
544 threshold: threshold_mb,
545 enabled: true,
546 }
547 }
548
549 pub fn high_memory_usage(threshold_mb: f64) -> AlertRule {
551 AlertRule {
552 name: "high_memory_usage".to_string(),
553 metric_name: "memory_usage_mb".to_string(),
554 category: AlertCategory::Storage,
555 severity: AlertSeverity::Error,
556 condition: AlertCondition::GreaterThan,
557 threshold: threshold_mb,
558 enabled: true,
559 }
560 }
561
562 pub fn high_index_fragmentation(threshold_pct: f64) -> AlertRule {
564 AlertRule {
565 name: "high_index_fragmentation".to_string(),
566 metric_name: "index_fragmentation".to_string(),
567 category: AlertCategory::IndexHealth,
568 severity: AlertSeverity::Warning,
569 condition: AlertCondition::GreaterThan,
570 threshold: threshold_pct,
571 enabled: true,
572 }
573 }
574
575 pub fn default_rules() -> Vec<AlertRule> {
577 vec![
578 Self::high_query_latency(100.0),
579 Self::low_cache_hit_rate(0.7),
580 Self::high_error_rate(0.05),
581 Self::low_vector_quality(0.6),
582 Self::high_storage_usage(1000.0),
583 Self::high_memory_usage(2000.0),
584 Self::high_index_fragmentation(30.0),
585 ]
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use super::*;
592 use std::thread;
593
594 #[test]
595 fn test_monitor_creation() {
596 let monitor = Monitor::default();
597 assert_eq!(monitor.metrics.len(), 0);
598 assert_eq!(monitor.alert_rules.len(), 0);
599 }
600
601 #[test]
602 fn test_record_metrics() {
603 let mut monitor = Monitor::default();
604
605 monitor.record(MetricType::QueryLatency, 50.0);
606 monitor.record(MetricType::QueryLatency, 75.0);
607 monitor.record(MetricType::QueryLatency, 100.0);
608
609 let history = monitor.get_metric(MetricType::QueryLatency).unwrap();
610 assert_eq!(history.points.len(), 3);
611 assert_eq!(history.latest(), Some(100.0));
612 }
613
614 #[test]
615 fn test_metric_statistics() {
616 let mut history = MetricHistory::new(MetricType::QueryLatency, 100);
617
618 history.add(50.0);
619 history.add(100.0);
620 history.add(150.0);
621
622 assert_eq!(history.latest(), Some(150.0));
623 assert_eq!(history.average(), Some(100.0));
624
625 let p50 = history.percentile(50.0);
627 assert!(p50.is_some());
628 assert!((p50.unwrap() - 100.0).abs() < 1.0);
629 }
630
631 #[test]
632 fn test_alert_rules() {
633 let mut monitor = Monitor::default();
634
635 monitor.add_rule(AlertPresets::high_query_latency(100.0));
637
638 monitor.record(MetricType::QueryLatency, 50.0);
640 monitor.record(MetricType::QueryLatency, 75.0);
641 assert_eq!(monitor.alerts.len(), 0);
642
643 monitor.record(MetricType::QueryLatency, 150.0);
645 assert_eq!(monitor.alerts.len(), 1);
646
647 let alert = &monitor.alerts[0];
648 assert_eq!(alert.severity, AlertSeverity::Warning);
649 assert_eq!(alert.category, AlertCategory::Performance);
650 }
651
652 #[test]
653 fn test_alert_cooldown() {
654 let config = MonitoringConfig {
655 alert_cooldown: Duration::from_millis(100),
656 ..Default::default()
657 };
658
659 let mut monitor = Monitor::new(config);
660 monitor.add_rule(AlertPresets::high_query_latency(100.0));
661
662 monitor.record(MetricType::QueryLatency, 150.0);
664 assert_eq!(monitor.alerts.len(), 1);
665
666 monitor.record(MetricType::QueryLatency, 200.0);
668 assert_eq!(monitor.alerts.len(), 1);
669
670 thread::sleep(Duration::from_millis(150));
672
673 monitor.record(MetricType::QueryLatency, 250.0);
675 assert_eq!(monitor.alerts.len(), 2);
676 }
677
678 #[test]
679 fn test_alert_filtering() {
680 let mut monitor = Monitor::default();
681
682 monitor.add_rule(AlertPresets::high_query_latency(100.0));
683 monitor.add_rule(AlertPresets::high_error_rate(0.05));
684
685 monitor.record(MetricType::QueryLatency, 150.0);
686 monitor.record(MetricType::ErrorRate, 0.1);
687
688 assert_eq!(monitor.alerts.len(), 2);
689
690 let warnings = monitor.get_alerts_by_severity(AlertSeverity::Warning);
691 assert_eq!(warnings.len(), 1);
692
693 let errors = monitor.get_alerts_by_severity(AlertSeverity::Error);
694 assert_eq!(errors.len(), 1);
695
696 let perf_alerts = monitor.get_alerts_by_category(AlertCategory::Performance);
697 assert_eq!(perf_alerts.len(), 2);
698 }
699
700 #[test]
701 fn test_monitoring_report() {
702 let mut monitor = Monitor::default();
703
704 monitor.record(MetricType::QueryLatency, 50.0);
705 monitor.record(MetricType::MemoryUsage, 512.0);
706
707 monitor.add_rule(AlertPresets::high_memory_usage(1000.0));
708
709 let report = monitor.generate_report();
710
711 assert_eq!(report.metrics.len(), 2);
712 assert!(report.metrics.contains_key("query_latency_ms"));
713 assert!(report.metrics.contains_key("memory_usage_mb"));
714
715 assert_eq!(report.stats.metrics_tracked, 2);
716 assert_eq!(report.stats.active_rules, 1);
717 }
718
719 #[test]
720 fn test_prometheus_export() {
721 let mut monitor = Monitor::default();
722
723 monitor.record(MetricType::QueryLatency, 50.0);
724 monitor.record(MetricType::ThroughputQPS, 1000.0);
725
726 let output = monitor.export_prometheus();
727
728 assert!(output.contains("vecstore_query_latency_ms"));
729 assert!(output.contains("vecstore_throughput_qps"));
730 assert!(output.contains("50"));
731 assert!(output.contains("1000"));
732 }
733
734 #[test]
735 fn test_rule_management() {
736 let mut monitor = Monitor::default();
737
738 let rule = AlertPresets::high_query_latency(100.0);
739 monitor.add_rule(rule);
740
741 assert_eq!(monitor.alert_rules.len(), 1);
742
743 let removed = monitor.remove_rule("high_query_latency");
744 assert!(removed);
745 assert_eq!(monitor.alert_rules.len(), 0);
746
747 let removed_again = monitor.remove_rule("nonexistent");
748 assert!(!removed_again);
749 }
750
751 #[test]
752 fn test_default_presets() {
753 let rules = AlertPresets::default_rules();
754 assert_eq!(rules.len(), 7);
755
756 for rule in &rules {
758 assert!(rule.enabled);
759 }
760 }
761}