1use super::metrics::MetricsCollector;
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MonitoringConfig {
17 pub enabled: bool,
18 pub collection_interval_seconds: u64,
19 pub performance_window_minutes: u32,
20 pub search_analytics_enabled: bool,
21 pub user_tracking_enabled: bool,
22 pub resource_monitoring_enabled: bool,
23 pub alert_thresholds: AlertThresholds,
24}
25
26impl Default for MonitoringConfig {
27 fn default() -> Self {
28 Self {
29 enabled: true,
30 collection_interval_seconds: 30,
31 performance_window_minutes: 5,
32 search_analytics_enabled: true,
33 user_tracking_enabled: true,
34 resource_monitoring_enabled: true,
35 alert_thresholds: AlertThresholds::default(),
36 }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct AlertThresholds {
43 pub cpu_usage_percent: f64,
44 pub memory_usage_percent: f64,
45 pub error_rate_percent: f64,
46 pub response_time_ms: f64,
47 pub disk_usage_percent: f64,
48 pub queue_size: usize,
49}
50
51impl Default for AlertThresholds {
52 fn default() -> Self {
53 Self {
54 cpu_usage_percent: 80.0,
55 memory_usage_percent: 85.0,
56 error_rate_percent: 5.0,
57 response_time_ms: 1000.0,
58 disk_usage_percent: 90.0,
59 queue_size: 1000,
60 }
61 }
62}
63
64#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PerformanceMetrics {
67 pub timestamp: DateTime<Utc>,
68 pub cpu_usage_percent: f64,
69 pub memory_usage_mb: f64,
70 pub memory_usage_percent: f64,
71 pub disk_usage_mb: f64,
72 pub disk_usage_percent: f64,
73 pub network_bytes_sent: u64,
74 pub network_bytes_received: u64,
75 pub active_connections: u32,
76 pub thread_count: u32,
77 pub gc_collections: u64,
78 pub gc_pause_time_ms: f64,
79}
80
81#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SearchAnalytics {
84 pub timestamp: DateTime<Utc>,
85 pub query: String,
86 pub query_type: QueryType,
87 pub results_count: usize,
88 pub processing_time_ms: f64,
89 pub success: bool,
90 pub user_id: Option<String>,
91 pub session_id: Option<String>,
92 pub similarity_scores: Vec<f64>,
93 pub rerank_applied: bool,
94 pub cache_hit: bool,
95}
96
97#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
99pub enum QueryType {
100 Factual,
101 Conceptual,
102 Procedural,
103 Conversational,
104 Unknown,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct UserActivity {
110 pub timestamp: DateTime<Utc>,
111 pub user_id: String,
112 pub session_id: String,
113 pub action: UserAction,
114 pub query: Option<String>,
115 pub response_time_ms: f64,
116 pub success: bool,
117 pub ip_address: Option<String>,
118 pub user_agent: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum UserAction {
123 Search,
124 Chat,
125 DocumentUpload,
126 DocumentView,
127 SystemHealth,
128 Other(String),
129}
130
131pub struct PerformanceMonitor {
133 config: MonitoringConfig,
134 metrics_collector: Arc<MetricsCollector>,
135 performance_history: Arc<RwLock<Vec<PerformanceMetrics>>>,
136 collection_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
137 is_running: Arc<RwLock<bool>>,
138}
139
140impl PerformanceMonitor {
141 pub async fn new(
142 config: MonitoringConfig,
143 metrics_collector: Arc<MetricsCollector>,
144 ) -> RragResult<Self> {
145 Ok(Self {
146 config,
147 metrics_collector,
148 performance_history: Arc::new(RwLock::new(Vec::new())),
149 collection_handle: Arc::new(RwLock::new(None)),
150 is_running: Arc::new(RwLock::new(false)),
151 })
152 }
153
154 pub async fn start(&self) -> RragResult<()> {
155 let mut running = self.is_running.write().await;
156 if *running {
157 return Err(RragError::config(
158 "performance_monitor",
159 "stopped",
160 "already running",
161 ));
162 }
163
164 let config = self.config.clone();
165 let metrics_collector = self.metrics_collector.clone();
166 let performance_history = self.performance_history.clone();
167 let is_running_clone = self.is_running.clone();
168
169 let handle = tokio::spawn(async move {
170 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
171 config.collection_interval_seconds,
172 ));
173
174 while *is_running_clone.read().await {
175 interval.tick().await;
176
177 if let Ok(metrics) = Self::collect_system_metrics().await {
178 let mut history = performance_history.write().await;
180 history.push(metrics.clone());
181
182 let retention_size = (config.performance_window_minutes * 60
184 / config.collection_interval_seconds as u32)
185 as usize;
186 let current_len = history.len();
187 if current_len > retention_size {
188 history.drain(0..current_len - retention_size);
189 }
190 drop(history);
191
192 let _ = metrics_collector
194 .set_gauge("system_cpu_usage_percent", metrics.cpu_usage_percent)
195 .await;
196 let _ = metrics_collector
197 .set_gauge("system_memory_usage_mb", metrics.memory_usage_mb)
198 .await;
199 let _ = metrics_collector
200 .set_gauge("system_memory_usage_percent", metrics.memory_usage_percent)
201 .await;
202 let _ = metrics_collector
203 .set_gauge("system_disk_usage_mb", metrics.disk_usage_mb)
204 .await;
205 let _ = metrics_collector
206 .set_gauge(
207 "system_active_connections",
208 metrics.active_connections as f64,
209 )
210 .await;
211 let _ = metrics_collector
212 .set_gauge("system_thread_count", metrics.thread_count as f64)
213 .await;
214 }
215 }
216 });
217
218 {
219 let mut handle_guard = self.collection_handle.write().await;
220 *handle_guard = Some(handle);
221 }
222 *running = true;
223 tracing::info!("Performance monitor started");
224 Ok(())
225 }
226
227 pub async fn stop(&self) -> RragResult<()> {
228 let mut running = self.is_running.write().await;
229 if !*running {
230 return Ok(());
231 }
232
233 *running = false;
234
235 {
236 let mut handle_guard = self.collection_handle.write().await;
237 if let Some(handle) = handle_guard.take() {
238 handle.abort();
239 }
240 }
241
242 tracing::info!("Performance monitor stopped");
243 Ok(())
244 }
245
246 pub async fn is_healthy(&self) -> bool {
247 *self.is_running.read().await
248 }
249
250 pub async fn get_current_metrics(&self) -> RragResult<PerformanceMetrics> {
251 Self::collect_system_metrics().await
252 }
253
254 pub async fn get_metrics_history(&self) -> Vec<PerformanceMetrics> {
255 self.performance_history.read().await.clone()
256 }
257
258 pub async fn get_average_metrics(&self, minutes: u32) -> RragResult<PerformanceMetrics> {
259 let history = self.performance_history.read().await;
260 if history.is_empty() {
261 return self.get_current_metrics().await;
262 }
263
264 let cutoff_time = Utc::now() - Duration::minutes(minutes as i64);
265 let recent_metrics: Vec<_> = history
266 .iter()
267 .filter(|m| m.timestamp >= cutoff_time)
268 .collect();
269
270 if recent_metrics.is_empty() {
271 return self.get_current_metrics().await;
272 }
273
274 let count = recent_metrics.len() as f64;
275 let avg_cpu = recent_metrics
276 .iter()
277 .map(|m| m.cpu_usage_percent)
278 .sum::<f64>()
279 / count;
280 let avg_memory = recent_metrics
281 .iter()
282 .map(|m| m.memory_usage_mb)
283 .sum::<f64>()
284 / count;
285 let avg_memory_percent = recent_metrics
286 .iter()
287 .map(|m| m.memory_usage_percent)
288 .sum::<f64>()
289 / count;
290 let avg_disk = recent_metrics.iter().map(|m| m.disk_usage_mb).sum::<f64>() / count;
291 let avg_disk_percent = recent_metrics
292 .iter()
293 .map(|m| m.disk_usage_percent)
294 .sum::<f64>()
295 / count;
296 let avg_connections = recent_metrics
297 .iter()
298 .map(|m| m.active_connections as f64)
299 .sum::<f64>()
300 / count;
301 let avg_threads = recent_metrics
302 .iter()
303 .map(|m| m.thread_count as f64)
304 .sum::<f64>()
305 / count;
306
307 Ok(PerformanceMetrics {
308 timestamp: Utc::now(),
309 cpu_usage_percent: avg_cpu,
310 memory_usage_mb: avg_memory,
311 memory_usage_percent: avg_memory_percent,
312 disk_usage_mb: avg_disk,
313 disk_usage_percent: avg_disk_percent,
314 network_bytes_sent: 0, network_bytes_received: 0,
316 active_connections: avg_connections as u32,
317 thread_count: avg_threads as u32,
318 gc_collections: 0,
319 gc_pause_time_ms: 0.0,
320 })
321 }
322
323 async fn collect_system_metrics() -> RragResult<PerformanceMetrics> {
324 Ok(PerformanceMetrics {
327 timestamp: Utc::now(),
328 cpu_usage_percent: 25.0 + (rand::random::<f64>() * 50.0), memory_usage_mb: 512.0 + (rand::random::<f64>() * 1024.0), memory_usage_percent: 30.0 + (rand::random::<f64>() * 40.0), disk_usage_mb: 2048.0 + (rand::random::<f64>() * 1024.0), disk_usage_percent: 40.0 + (rand::random::<f64>() * 30.0), network_bytes_sent: rand::random::<u64>() % 1_000_000,
334 network_bytes_received: rand::random::<u64>() % 1_000_000,
335 active_connections: (10 + rand::random::<u32>() % 100) as u32,
336 thread_count: (50 + rand::random::<u32>() % 50) as u32,
337 gc_collections: rand::random::<u64>() % 10,
338 gc_pause_time_ms: rand::random::<f64>() * 10.0,
339 })
340 }
341}
342
343pub struct SearchAnalyzer {
345 config: MonitoringConfig,
346 metrics_collector: Arc<MetricsCollector>,
347 search_history: Arc<RwLock<Vec<SearchAnalytics>>>,
348 query_patterns: Arc<RwLock<HashMap<String, u64>>>,
349 is_running: Arc<RwLock<bool>>,
350}
351
352impl SearchAnalyzer {
353 pub async fn new(config: MonitoringConfig, metrics_collector: Arc<MetricsCollector>) -> Self {
354 Self {
355 config,
356 metrics_collector,
357 search_history: Arc::new(RwLock::new(Vec::new())),
358 query_patterns: Arc::new(RwLock::new(HashMap::new())),
359 is_running: Arc::new(RwLock::new(false)),
360 }
361 }
362
363 pub async fn start(&self) -> RragResult<()> {
364 let mut running = self.is_running.write().await;
365 if *running {
366 return Err(RragError::config(
367 "search_analyzer",
368 "stopped",
369 "already running",
370 ));
371 }
372 *running = true;
373 tracing::info!("Search analyzer started");
374 Ok(())
375 }
376
377 pub async fn stop(&self) -> RragResult<()> {
378 let mut running = self.is_running.write().await;
379 if !*running {
380 return Ok(());
381 }
382 *running = false;
383 tracing::info!("Search analyzer stopped");
384 Ok(())
385 }
386
387 pub async fn is_healthy(&self) -> bool {
388 *self.is_running.read().await
389 }
390
391 pub async fn record_search(&self, analytics: SearchAnalytics) -> RragResult<()> {
392 if !*self.is_running.read().await {
393 return Err(RragError::config("search_analyzer", "running", "stopped"));
394 }
395
396 let mut patterns = self.query_patterns.write().await;
398 *patterns.entry(analytics.query.clone()).or_insert(0) += 1;
399 drop(patterns);
400
401 let mut history = self.search_history.write().await;
403 history.push(analytics.clone());
404
405 let current_len = history.len();
407 if current_len > 1000 {
408 history.drain(0..current_len - 1000);
409 }
410
411 let _ = self
413 .metrics_collector
414 .inc_counter("search_queries_total")
415 .await;
416 let _ = self
417 .metrics_collector
418 .record_timer("search_processing_time_ms", analytics.processing_time_ms)
419 .await;
420 let _ = self
421 .metrics_collector
422 .observe_histogram("search_results_count", analytics.results_count as f64, None)
423 .await;
424
425 if analytics.success {
426 let _ = self
427 .metrics_collector
428 .inc_counter("search_queries_successful")
429 .await;
430 } else {
431 let _ = self
432 .metrics_collector
433 .inc_counter("search_queries_failed")
434 .await;
435 }
436
437 if analytics.cache_hit {
438 let _ = self
439 .metrics_collector
440 .inc_counter("search_cache_hits")
441 .await;
442 } else {
443 let _ = self
444 .metrics_collector
445 .inc_counter("search_cache_misses")
446 .await;
447 }
448
449 Ok(())
450 }
451
452 pub async fn get_popular_queries(&self, limit: usize) -> Vec<(String, u64)> {
453 let patterns = self.query_patterns.read().await;
454 let mut query_counts: Vec<_> = patterns
455 .iter()
456 .map(|(query, count)| (query.clone(), *count))
457 .collect();
458
459 query_counts.sort_by(|a, b| b.1.cmp(&a.1));
460 query_counts.into_iter().take(limit).collect()
461 }
462
463 pub async fn get_search_stats(&self) -> SearchStats {
464 let history = self.search_history.read().await;
465
466 if history.is_empty() {
467 return SearchStats::default();
468 }
469
470 let total_searches = history.len();
471 let successful_searches = history.iter().filter(|s| s.success).count();
472 let cache_hits = history.iter().filter(|s| s.cache_hit).count();
473 let rerank_applied = history.iter().filter(|s| s.rerank_applied).count();
474
475 let avg_processing_time =
476 history.iter().map(|s| s.processing_time_ms).sum::<f64>() / total_searches as f64;
477
478 let avg_results_count =
479 history.iter().map(|s| s.results_count as f64).sum::<f64>() / total_searches as f64;
480
481 SearchStats {
482 total_searches,
483 successful_searches,
484 success_rate: (successful_searches as f64 / total_searches as f64) * 100.0,
485 cache_hit_rate: (cache_hits as f64 / total_searches as f64) * 100.0,
486 rerank_usage_rate: (rerank_applied as f64 / total_searches as f64) * 100.0,
487 average_processing_time_ms: avg_processing_time,
488 average_results_count: avg_results_count,
489 query_types: self.analyze_query_types(&history),
490 }
491 }
492
493 fn analyze_query_types(&self, history: &[SearchAnalytics]) -> HashMap<QueryType, usize> {
494 let mut counts = HashMap::new();
495 for search in history {
496 *counts.entry(search.query_type.clone()).or_insert(0) += 1;
497 }
498 counts
499 }
500
501 fn classify_query(&self, query: &str) -> QueryType {
502 let query_lower = query.to_lowercase();
504
505 if query_lower.contains("what")
506 || query_lower.contains("who")
507 || query_lower.contains("when")
508 || query_lower.contains("where")
509 {
510 QueryType::Factual
511 } else if query_lower.contains("how")
512 || query_lower.contains("explain")
513 || query_lower.contains("describe")
514 {
515 QueryType::Procedural
516 } else if query_lower.contains("why")
517 || query_lower.contains("concept")
518 || query_lower.contains("theory")
519 {
520 QueryType::Conceptual
521 } else if query_lower.contains("can you")
522 || query_lower.contains("please")
523 || query_lower.len() > 50
524 {
525 QueryType::Conversational
526 } else {
527 QueryType::Unknown
528 }
529 }
530}
531
532#[derive(Debug, Clone, Serialize, Deserialize)]
534pub struct SearchStats {
535 pub total_searches: usize,
536 pub successful_searches: usize,
537 pub success_rate: f64,
538 pub cache_hit_rate: f64,
539 pub rerank_usage_rate: f64,
540 pub average_processing_time_ms: f64,
541 pub average_results_count: f64,
542 pub query_types: HashMap<QueryType, usize>,
543}
544
545impl Default for SearchStats {
546 fn default() -> Self {
547 Self {
548 total_searches: 0,
549 successful_searches: 0,
550 success_rate: 0.0,
551 cache_hit_rate: 0.0,
552 rerank_usage_rate: 0.0,
553 average_processing_time_ms: 0.0,
554 average_results_count: 0.0,
555 query_types: HashMap::new(),
556 }
557 }
558}
559
560pub struct UserActivityTracker {
562 config: MonitoringConfig,
563 metrics_collector: Arc<MetricsCollector>,
564 activity_history: Arc<RwLock<Vec<UserActivity>>>,
565 active_sessions: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
566 is_running: Arc<RwLock<bool>>,
567}
568
569impl UserActivityTracker {
570 pub async fn new(config: MonitoringConfig, metrics_collector: Arc<MetricsCollector>) -> Self {
571 Self {
572 config,
573 metrics_collector,
574 activity_history: Arc::new(RwLock::new(Vec::new())),
575 active_sessions: Arc::new(RwLock::new(HashMap::new())),
576 is_running: Arc::new(RwLock::new(false)),
577 }
578 }
579
580 pub async fn start(&self) -> RragResult<()> {
581 let mut running = self.is_running.write().await;
582 if *running {
583 return Err(RragError::config(
584 "user_activity_tracker",
585 "stopped",
586 "already running",
587 ));
588 }
589 *running = true;
590 tracing::info!("User activity tracker started");
591 Ok(())
592 }
593
594 pub async fn stop(&self) -> RragResult<()> {
595 let mut running = self.is_running.write().await;
596 if !*running {
597 return Ok(());
598 }
599 *running = false;
600 tracing::info!("User activity tracker stopped");
601 Ok(())
602 }
603
604 pub async fn is_healthy(&self) -> bool {
605 *self.is_running.read().await
606 }
607
608 pub async fn track_activity(&self, activity: UserActivity) -> RragResult<()> {
609 if !*self.is_running.read().await {
610 return Err(RragError::config(
611 "user_activity_tracker",
612 "running",
613 "stopped",
614 ));
615 }
616
617 let mut sessions = self.active_sessions.write().await;
619 sessions.insert(activity.session_id.clone(), activity.timestamp);
620 drop(sessions);
621
622 let mut history = self.activity_history.write().await;
624 history.push(activity.clone());
625
626 let current_len = history.len();
628 if current_len > 10000 {
629 history.drain(0..current_len - 10000);
630 }
631
632 let _ = self
634 .metrics_collector
635 .inc_counter("user_actions_total")
636 .await;
637 let _ = self
638 .metrics_collector
639 .record_timer("user_action_response_time_ms", activity.response_time_ms)
640 .await;
641
642 match activity.action {
643 UserAction::Search => {
644 let _ = self
645 .metrics_collector
646 .inc_counter("user_searches_total")
647 .await;
648 }
649 UserAction::Chat => {
650 let _ = self.metrics_collector.inc_counter("user_chats_total").await;
651 }
652 UserAction::DocumentUpload => {
653 let _ = self
654 .metrics_collector
655 .inc_counter("user_document_uploads_total")
656 .await;
657 }
658 _ => {}
659 }
660
661 Ok(())
662 }
663
664 pub async fn get_active_sessions_count(&self) -> usize {
665 let cutoff = Utc::now() - Duration::hours(1);
667 let mut sessions = self.active_sessions.write().await;
668 sessions.retain(|_, timestamp| *timestamp > cutoff);
669 sessions.len()
670 }
671
672 pub async fn get_user_stats(&self, time_window_hours: i64) -> UserStats {
673 let history = self.activity_history.read().await;
674 let cutoff_time = Utc::now() - Duration::hours(time_window_hours);
675
676 let recent_activity: Vec<_> = history
677 .iter()
678 .filter(|a| a.timestamp >= cutoff_time)
679 .collect();
680
681 if recent_activity.is_empty() {
682 return UserStats::default();
683 }
684
685 let unique_users: std::collections::HashSet<_> =
686 recent_activity.iter().map(|a| a.user_id.as_str()).collect();
687
688 let unique_sessions: std::collections::HashSet<_> = recent_activity
689 .iter()
690 .map(|a| a.session_id.as_str())
691 .collect();
692
693 let action_counts = self.count_actions(&recent_activity);
694 let avg_response_time = recent_activity
695 .iter()
696 .map(|a| a.response_time_ms)
697 .sum::<f64>()
698 / recent_activity.len() as f64;
699
700 UserStats {
701 total_actions: recent_activity.len(),
702 unique_users: unique_users.len(),
703 unique_sessions: unique_sessions.len(),
704 action_breakdown: action_counts,
705 average_response_time_ms: avg_response_time,
706 time_window_hours,
707 }
708 }
709
710 fn count_actions(&self, activities: &[&UserActivity]) -> HashMap<String, usize> {
711 let mut counts = HashMap::new();
712 for activity in activities {
713 let action_name = match &activity.action {
714 UserAction::Search => "search",
715 UserAction::Chat => "chat",
716 UserAction::DocumentUpload => "document_upload",
717 UserAction::DocumentView => "document_view",
718 UserAction::SystemHealth => "system_health",
719 UserAction::Other(name) => name,
720 };
721 *counts.entry(action_name.to_string()).or_insert(0) += 1;
722 }
723 counts
724 }
725}
726
727#[derive(Debug, Clone, Serialize, Deserialize)]
729pub struct UserStats {
730 pub total_actions: usize,
731 pub unique_users: usize,
732 pub unique_sessions: usize,
733 pub action_breakdown: HashMap<String, usize>,
734 pub average_response_time_ms: f64,
735 pub time_window_hours: i64,
736}
737
738impl Default for UserStats {
739 fn default() -> Self {
740 Self {
741 total_actions: 0,
742 unique_users: 0,
743 unique_sessions: 0,
744 action_breakdown: HashMap::new(),
745 average_response_time_ms: 0.0,
746 time_window_hours: 24,
747 }
748 }
749}
750
751pub struct SystemMonitor {
753 config: MonitoringConfig,
754 performance_monitor: Arc<PerformanceMonitor>,
755 search_analyzer: Arc<SearchAnalyzer>,
756 user_tracker: Arc<UserActivityTracker>,
757 is_running: Arc<RwLock<bool>>,
758}
759
760impl SystemMonitor {
761 pub async fn new(
762 config: MonitoringConfig,
763 metrics_collector: Arc<MetricsCollector>,
764 ) -> RragResult<Self> {
765 let performance_monitor =
766 Arc::new(PerformanceMonitor::new(config.clone(), metrics_collector.clone()).await?);
767 let search_analyzer =
768 Arc::new(SearchAnalyzer::new(config.clone(), metrics_collector.clone()).await);
769 let user_tracker =
770 Arc::new(UserActivityTracker::new(config.clone(), metrics_collector).await);
771
772 Ok(Self {
773 config,
774 performance_monitor,
775 search_analyzer,
776 user_tracker,
777 is_running: Arc::new(RwLock::new(false)),
778 })
779 }
780
781 pub async fn start(&self) -> RragResult<()> {
782 let mut running = self.is_running.write().await;
783 if *running {
784 return Err(RragError::config(
785 "system_monitor",
786 "stopped",
787 "already running",
788 ));
789 }
790
791 if self.config.resource_monitoring_enabled {
793 self.performance_monitor.start().await?;
794 }
795
796 if self.config.search_analytics_enabled {
797 self.search_analyzer.start().await?;
798 }
799
800 if self.config.user_tracking_enabled {
801 self.user_tracker.start().await?;
802 }
803
804 *running = true;
805 tracing::info!("System monitor started");
806 Ok(())
807 }
808
809 pub async fn stop(&self) -> RragResult<()> {
810 let mut running = self.is_running.write().await;
811 if !*running {
812 return Ok(());
813 }
814
815 self.performance_monitor.stop().await?;
817 self.search_analyzer.stop().await?;
818 self.user_tracker.stop().await?;
819
820 *running = false;
821 tracing::info!("System monitor stopped");
822 Ok(())
823 }
824
825 pub async fn is_healthy(&self) -> bool {
826 *self.is_running.read().await
827 && self.performance_monitor.is_healthy().await
828 && self.search_analyzer.is_healthy().await
829 && self.user_tracker.is_healthy().await
830 }
831
832 pub fn performance(&self) -> &PerformanceMonitor {
833 &self.performance_monitor
834 }
835
836 pub fn search_analytics(&self) -> &SearchAnalyzer {
837 &self.search_analyzer
838 }
839
840 pub fn user_activity(&self) -> &UserActivityTracker {
841 &self.user_tracker
842 }
843
844 pub async fn get_system_overview(&self) -> SystemOverview {
845 let performance = if self.config.resource_monitoring_enabled {
846 self.performance_monitor.get_current_metrics().await.ok()
847 } else {
848 None
849 };
850
851 let search_stats = if self.config.search_analytics_enabled {
852 Some(self.search_analyzer.get_search_stats().await)
853 } else {
854 None
855 };
856
857 let user_stats = if self.config.user_tracking_enabled {
858 Some(self.user_tracker.get_user_stats(24).await)
859 } else {
860 None
861 };
862
863 SystemOverview {
864 timestamp: Utc::now(),
865 performance_metrics: performance,
866 search_stats,
867 user_stats,
868 active_sessions: if self.config.user_tracking_enabled {
869 Some(self.user_tracker.get_active_sessions_count().await)
870 } else {
871 None
872 },
873 }
874 }
875}
876
877#[derive(Debug, Clone, Serialize, Deserialize)]
879pub struct SystemOverview {
880 pub timestamp: DateTime<Utc>,
881 pub performance_metrics: Option<PerformanceMetrics>,
882 pub search_stats: Option<SearchStats>,
883 pub user_stats: Option<UserStats>,
884 pub active_sessions: Option<usize>,
885}
886
887pub struct MonitoringService {
889 system_monitor: Arc<RwLock<SystemMonitor>>,
890}
891
892impl MonitoringService {
893 pub async fn new(
894 config: MonitoringConfig,
895 metrics_collector: Arc<MetricsCollector>,
896 ) -> RragResult<Self> {
897 let system_monitor = SystemMonitor::new(config, metrics_collector).await?;
898 Ok(Self {
899 system_monitor: Arc::new(RwLock::new(system_monitor)),
900 })
901 }
902
903 pub async fn start(&self) -> RragResult<()> {
904 let monitor = self.system_monitor.read().await;
905 monitor.start().await
906 }
907
908 pub async fn stop(&self) -> RragResult<()> {
909 let monitor = self.system_monitor.read().await;
910 monitor.stop().await
911 }
912
913 pub async fn is_healthy(&self) -> bool {
914 let monitor = self.system_monitor.read().await;
915 monitor.is_healthy().await
916 }
917
918 pub async fn get_overview(&self) -> SystemOverview {
919 let monitor = self.system_monitor.read().await;
920 monitor.get_system_overview().await
921 }
922
923 pub async fn record_search(&self, analytics: SearchAnalytics) -> RragResult<()> {
924 let monitor = self.system_monitor.read().await;
925 monitor.search_analytics().record_search(analytics).await
926 }
927
928 pub async fn track_user_activity(&self, activity: UserActivity) -> RragResult<()> {
929 let monitor = self.system_monitor.read().await;
930 monitor.user_activity().track_activity(activity).await
931 }
932}
933
934#[cfg(test)]
935mod tests {
936 use super::*;
937 use crate::observability::metrics::MetricsConfig;
938
939 async fn create_test_metrics_collector() -> Arc<MetricsCollector> {
940 Arc::new(
941 MetricsCollector::new(MetricsConfig::default())
942 .await
943 .unwrap(),
944 )
945 }
946
947 #[tokio::test]
948 async fn test_performance_monitor() {
949 let metrics_collector = create_test_metrics_collector().await;
950 let config = MonitoringConfig::default();
951 let mut monitor = PerformanceMonitor::new(config, metrics_collector)
952 .await
953 .unwrap();
954
955 assert!(!monitor.is_healthy().await);
956
957 monitor.start().await.unwrap();
958 assert!(monitor.is_healthy().await);
959
960 let current_metrics = monitor.get_current_metrics().await.unwrap();
961 assert!(current_metrics.cpu_usage_percent >= 0.0);
962 assert!(current_metrics.memory_usage_mb >= 0.0);
963
964 monitor.stop().await.unwrap();
965 assert!(!monitor.is_healthy().await);
966 }
967
968 #[tokio::test]
969 async fn test_search_analyzer() {
970 let metrics_collector = create_test_metrics_collector().await;
971 let config = MonitoringConfig::default();
972 let analyzer = SearchAnalyzer::new(config, metrics_collector).await;
973
974 analyzer.start().await.unwrap();
975 assert!(analyzer.is_healthy().await);
976
977 let search_analytics = SearchAnalytics {
978 timestamp: Utc::now(),
979 query: "test query".to_string(),
980 query_type: QueryType::Factual,
981 results_count: 5,
982 processing_time_ms: 150.0,
983 success: true,
984 user_id: Some("user123".to_string()),
985 session_id: Some("session456".to_string()),
986 similarity_scores: vec![0.9, 0.8, 0.7],
987 rerank_applied: true,
988 cache_hit: false,
989 };
990
991 analyzer.record_search(search_analytics).await.unwrap();
992
993 let stats = analyzer.get_search_stats().await;
994 assert_eq!(stats.total_searches, 1);
995 assert_eq!(stats.successful_searches, 1);
996 assert_eq!(stats.success_rate, 100.0);
997
998 analyzer.stop().await.unwrap();
999 assert!(!analyzer.is_healthy().await);
1000 }
1001
1002 #[tokio::test]
1003 async fn test_user_activity_tracker() {
1004 let metrics_collector = create_test_metrics_collector().await;
1005 let config = MonitoringConfig::default();
1006 let tracker = UserActivityTracker::new(config, metrics_collector).await;
1007
1008 tracker.start().await.unwrap();
1009 assert!(tracker.is_healthy().await);
1010
1011 let activity = UserActivity {
1012 timestamp: Utc::now(),
1013 user_id: "user123".to_string(),
1014 session_id: "session456".to_string(),
1015 action: UserAction::Search,
1016 query: Some("test query".to_string()),
1017 response_time_ms: 200.0,
1018 success: true,
1019 ip_address: Some("127.0.0.1".to_string()),
1020 user_agent: Some("test-agent".to_string()),
1021 };
1022
1023 tracker.track_activity(activity).await.unwrap();
1024
1025 let stats = tracker.get_user_stats(24).await;
1026 assert_eq!(stats.total_actions, 1);
1027 assert_eq!(stats.unique_users, 1);
1028 assert_eq!(stats.unique_sessions, 1);
1029
1030 tracker.stop().await.unwrap();
1031 assert!(!tracker.is_healthy().await);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_system_monitor() {
1036 let metrics_collector = create_test_metrics_collector().await;
1037 let config = MonitoringConfig::default();
1038 let mut monitor = SystemMonitor::new(config, metrics_collector).await.unwrap();
1039
1040 assert!(!monitor.is_healthy().await);
1041
1042 monitor.start().await.unwrap();
1043 assert!(monitor.is_healthy().await);
1044
1045 let overview = monitor.get_system_overview().await;
1046 assert!(overview.performance_metrics.is_some());
1047 assert!(overview.search_stats.is_some());
1048 assert!(overview.user_stats.is_some());
1049
1050 monitor.stop().await.unwrap();
1051 assert!(!monitor.is_healthy().await);
1052 }
1053
1054 #[test]
1055 fn test_query_classification() {
1056 let metrics_collector = futures::executor::block_on(create_test_metrics_collector());
1057 let config = MonitoringConfig::default();
1058 let analyzer = futures::executor::block_on(SearchAnalyzer::new(config, metrics_collector));
1059
1060 assert_eq!(
1061 analyzer.classify_query("What is machine learning?"),
1062 QueryType::Factual
1063 );
1064 assert_eq!(
1065 analyzer.classify_query("How do I implement a neural network?"),
1066 QueryType::Procedural
1067 );
1068 assert_eq!(
1069 analyzer.classify_query("Why does backpropagation work?"),
1070 QueryType::Conceptual
1071 );
1072 assert_eq!(
1073 analyzer.classify_query("Can you help me understand this concept please?"),
1074 QueryType::Conversational
1075 );
1076 assert_eq!(
1077 analyzer.classify_query("neural networks"),
1078 QueryType::Unknown
1079 );
1080 }
1081
1082 #[test]
1083 fn test_alert_thresholds() {
1084 let thresholds = AlertThresholds::default();
1085 assert_eq!(thresholds.cpu_usage_percent, 80.0);
1086 assert_eq!(thresholds.memory_usage_percent, 85.0);
1087 assert_eq!(thresholds.error_rate_percent, 5.0);
1088 assert_eq!(thresholds.response_time_ms, 1000.0);
1089 }
1090}