rrag/observability/
monitoring.rs

1//! # System Monitoring
2//!
3//! Real-time monitoring of RRAG system performance, health, and usage patterns.
4//! Provides insights into search analytics, performance bottlenecks, and user behavior.
5
6use super::metrics::MetricsCollector;
7use crate::{RragError, RragResult};
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use std::collections::HashMap;
11use std::sync::Arc;
12use tokio::sync::RwLock;
13
14/// Monitoring configuration
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct MonitoringConfig {
17    pub enabled: bool,
18    pub collection_interval_seconds: u64,
19    pub performance_window_minutes: u32,
20    pub search_analytics_enabled: bool,
21    pub user_tracking_enabled: bool,
22    pub resource_monitoring_enabled: bool,
23    pub alert_thresholds: AlertThresholds,
24}
25
26impl Default for MonitoringConfig {
27    fn default() -> Self {
28        Self {
29            enabled: true,
30            collection_interval_seconds: 30,
31            performance_window_minutes: 5,
32            search_analytics_enabled: true,
33            user_tracking_enabled: true,
34            resource_monitoring_enabled: true,
35            alert_thresholds: AlertThresholds::default(),
36        }
37    }
38}
39
40/// Alert threshold configurations
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct AlertThresholds {
43    pub cpu_usage_percent: f64,
44    pub memory_usage_percent: f64,
45    pub error_rate_percent: f64,
46    pub response_time_ms: f64,
47    pub disk_usage_percent: f64,
48    pub queue_size: usize,
49}
50
51impl Default for AlertThresholds {
52    fn default() -> Self {
53        Self {
54            cpu_usage_percent: 80.0,
55            memory_usage_percent: 85.0,
56            error_rate_percent: 5.0,
57            response_time_ms: 1000.0,
58            disk_usage_percent: 90.0,
59            queue_size: 1000,
60        }
61    }
62}
63
64/// System performance metrics
65#[derive(Debug, Clone, Serialize, Deserialize)]
66pub struct PerformanceMetrics {
67    pub timestamp: DateTime<Utc>,
68    pub cpu_usage_percent: f64,
69    pub memory_usage_mb: f64,
70    pub memory_usage_percent: f64,
71    pub disk_usage_mb: f64,
72    pub disk_usage_percent: f64,
73    pub network_bytes_sent: u64,
74    pub network_bytes_received: u64,
75    pub active_connections: u32,
76    pub thread_count: u32,
77    pub gc_collections: u64,
78    pub gc_pause_time_ms: f64,
79}
80
81/// Search analytics data
82#[derive(Debug, Clone, Serialize, Deserialize)]
83pub struct SearchAnalytics {
84    pub timestamp: DateTime<Utc>,
85    pub query: String,
86    pub query_type: QueryType,
87    pub results_count: usize,
88    pub processing_time_ms: f64,
89    pub success: bool,
90    pub user_id: Option<String>,
91    pub session_id: Option<String>,
92    pub similarity_scores: Vec<f64>,
93    pub rerank_applied: bool,
94    pub cache_hit: bool,
95}
96
97/// Query classification
98#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
99pub enum QueryType {
100    Factual,
101    Conceptual,
102    Procedural,
103    Conversational,
104    Unknown,
105}
106
107/// User activity tracking
108#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct UserActivity {
110    pub timestamp: DateTime<Utc>,
111    pub user_id: String,
112    pub session_id: String,
113    pub action: UserAction,
114    pub query: Option<String>,
115    pub response_time_ms: f64,
116    pub success: bool,
117    pub ip_address: Option<String>,
118    pub user_agent: Option<String>,
119}
120
121#[derive(Debug, Clone, Serialize, Deserialize)]
122pub enum UserAction {
123    Search,
124    Chat,
125    DocumentUpload,
126    DocumentView,
127    SystemHealth,
128    Other(String),
129}
130
131/// Performance monitoring service
132pub struct PerformanceMonitor {
133    config: MonitoringConfig,
134    metrics_collector: Arc<MetricsCollector>,
135    performance_history: Arc<RwLock<Vec<PerformanceMetrics>>>,
136    collection_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
137    is_running: Arc<RwLock<bool>>,
138}
139
140impl PerformanceMonitor {
141    pub async fn new(
142        config: MonitoringConfig,
143        metrics_collector: Arc<MetricsCollector>,
144    ) -> RragResult<Self> {
145        Ok(Self {
146            config,
147            metrics_collector,
148            performance_history: Arc::new(RwLock::new(Vec::new())),
149            collection_handle: Arc::new(RwLock::new(None)),
150            is_running: Arc::new(RwLock::new(false)),
151        })
152    }
153
154    pub async fn start(&self) -> RragResult<()> {
155        let mut running = self.is_running.write().await;
156        if *running {
157            return Err(RragError::config(
158                "performance_monitor",
159                "stopped",
160                "already running",
161            ));
162        }
163
164        let config = self.config.clone();
165        let metrics_collector = self.metrics_collector.clone();
166        let performance_history = self.performance_history.clone();
167        let is_running_clone = self.is_running.clone();
168
169        let handle = tokio::spawn(async move {
170            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(
171                config.collection_interval_seconds,
172            ));
173
174            while *is_running_clone.read().await {
175                interval.tick().await;
176
177                if let Ok(metrics) = Self::collect_system_metrics().await {
178                    // Store in history
179                    let mut history = performance_history.write().await;
180                    history.push(metrics.clone());
181
182                    // Keep only recent data
183                    let retention_size = (config.performance_window_minutes * 60
184                        / config.collection_interval_seconds as u32)
185                        as usize;
186                    let current_len = history.len();
187                    if current_len > retention_size {
188                        history.drain(0..current_len - retention_size);
189                    }
190                    drop(history);
191
192                    // Update metrics
193                    let _ = metrics_collector
194                        .set_gauge("system_cpu_usage_percent", metrics.cpu_usage_percent)
195                        .await;
196                    let _ = metrics_collector
197                        .set_gauge("system_memory_usage_mb", metrics.memory_usage_mb)
198                        .await;
199                    let _ = metrics_collector
200                        .set_gauge("system_memory_usage_percent", metrics.memory_usage_percent)
201                        .await;
202                    let _ = metrics_collector
203                        .set_gauge("system_disk_usage_mb", metrics.disk_usage_mb)
204                        .await;
205                    let _ = metrics_collector
206                        .set_gauge(
207                            "system_active_connections",
208                            metrics.active_connections as f64,
209                        )
210                        .await;
211                    let _ = metrics_collector
212                        .set_gauge("system_thread_count", metrics.thread_count as f64)
213                        .await;
214                }
215            }
216        });
217
218        {
219            let mut handle_guard = self.collection_handle.write().await;
220            *handle_guard = Some(handle);
221        }
222        *running = true;
223        tracing::info!("Performance monitor started");
224        Ok(())
225    }
226
227    pub async fn stop(&self) -> RragResult<()> {
228        let mut running = self.is_running.write().await;
229        if !*running {
230            return Ok(());
231        }
232
233        *running = false;
234
235        {
236            let mut handle_guard = self.collection_handle.write().await;
237            if let Some(handle) = handle_guard.take() {
238                handle.abort();
239            }
240        }
241
242        tracing::info!("Performance monitor stopped");
243        Ok(())
244    }
245
246    pub async fn is_healthy(&self) -> bool {
247        *self.is_running.read().await
248    }
249
250    pub async fn get_current_metrics(&self) -> RragResult<PerformanceMetrics> {
251        Self::collect_system_metrics().await
252    }
253
254    pub async fn get_metrics_history(&self) -> Vec<PerformanceMetrics> {
255        self.performance_history.read().await.clone()
256    }
257
258    pub async fn get_average_metrics(&self, minutes: u32) -> RragResult<PerformanceMetrics> {
259        let history = self.performance_history.read().await;
260        if history.is_empty() {
261            return self.get_current_metrics().await;
262        }
263
264        let cutoff_time = Utc::now() - Duration::minutes(minutes as i64);
265        let recent_metrics: Vec<_> = history
266            .iter()
267            .filter(|m| m.timestamp >= cutoff_time)
268            .collect();
269
270        if recent_metrics.is_empty() {
271            return self.get_current_metrics().await;
272        }
273
274        let count = recent_metrics.len() as f64;
275        let avg_cpu = recent_metrics
276            .iter()
277            .map(|m| m.cpu_usage_percent)
278            .sum::<f64>()
279            / count;
280        let avg_memory = recent_metrics
281            .iter()
282            .map(|m| m.memory_usage_mb)
283            .sum::<f64>()
284            / count;
285        let avg_memory_percent = recent_metrics
286            .iter()
287            .map(|m| m.memory_usage_percent)
288            .sum::<f64>()
289            / count;
290        let avg_disk = recent_metrics.iter().map(|m| m.disk_usage_mb).sum::<f64>() / count;
291        let avg_disk_percent = recent_metrics
292            .iter()
293            .map(|m| m.disk_usage_percent)
294            .sum::<f64>()
295            / count;
296        let avg_connections = recent_metrics
297            .iter()
298            .map(|m| m.active_connections as f64)
299            .sum::<f64>()
300            / count;
301        let avg_threads = recent_metrics
302            .iter()
303            .map(|m| m.thread_count as f64)
304            .sum::<f64>()
305            / count;
306
307        Ok(PerformanceMetrics {
308            timestamp: Utc::now(),
309            cpu_usage_percent: avg_cpu,
310            memory_usage_mb: avg_memory,
311            memory_usage_percent: avg_memory_percent,
312            disk_usage_mb: avg_disk,
313            disk_usage_percent: avg_disk_percent,
314            network_bytes_sent: 0, // Averages don't make sense for cumulative metrics
315            network_bytes_received: 0,
316            active_connections: avg_connections as u32,
317            thread_count: avg_threads as u32,
318            gc_collections: 0,
319            gc_pause_time_ms: 0.0,
320        })
321    }
322
323    async fn collect_system_metrics() -> RragResult<PerformanceMetrics> {
324        // This would integrate with actual system monitoring libraries
325        // For now, providing mock data
326        Ok(PerformanceMetrics {
327            timestamp: Utc::now(),
328            cpu_usage_percent: 25.0 + (rand::random::<f64>() * 50.0), // Mock: 25-75%
329            memory_usage_mb: 512.0 + (rand::random::<f64>() * 1024.0), // Mock: 512-1536 MB
330            memory_usage_percent: 30.0 + (rand::random::<f64>() * 40.0), // Mock: 30-70%
331            disk_usage_mb: 2048.0 + (rand::random::<f64>() * 1024.0), // Mock: 2-3 GB
332            disk_usage_percent: 40.0 + (rand::random::<f64>() * 30.0), // Mock: 40-70%
333            network_bytes_sent: rand::random::<u64>() % 1_000_000,
334            network_bytes_received: rand::random::<u64>() % 1_000_000,
335            active_connections: (10 + rand::random::<u32>() % 100) as u32,
336            thread_count: (50 + rand::random::<u32>() % 50) as u32,
337            gc_collections: rand::random::<u64>() % 10,
338            gc_pause_time_ms: rand::random::<f64>() * 10.0,
339        })
340    }
341}
342
343/// Search analytics service
344pub struct SearchAnalyzer {
345    config: MonitoringConfig,
346    metrics_collector: Arc<MetricsCollector>,
347    search_history: Arc<RwLock<Vec<SearchAnalytics>>>,
348    query_patterns: Arc<RwLock<HashMap<String, u64>>>,
349    is_running: Arc<RwLock<bool>>,
350}
351
352impl SearchAnalyzer {
353    pub async fn new(config: MonitoringConfig, metrics_collector: Arc<MetricsCollector>) -> Self {
354        Self {
355            config,
356            metrics_collector,
357            search_history: Arc::new(RwLock::new(Vec::new())),
358            query_patterns: Arc::new(RwLock::new(HashMap::new())),
359            is_running: Arc::new(RwLock::new(false)),
360        }
361    }
362
363    pub async fn start(&self) -> RragResult<()> {
364        let mut running = self.is_running.write().await;
365        if *running {
366            return Err(RragError::config(
367                "search_analyzer",
368                "stopped",
369                "already running",
370            ));
371        }
372        *running = true;
373        tracing::info!("Search analyzer started");
374        Ok(())
375    }
376
377    pub async fn stop(&self) -> RragResult<()> {
378        let mut running = self.is_running.write().await;
379        if !*running {
380            return Ok(());
381        }
382        *running = false;
383        tracing::info!("Search analyzer stopped");
384        Ok(())
385    }
386
387    pub async fn is_healthy(&self) -> bool {
388        *self.is_running.read().await
389    }
390
391    pub async fn record_search(&self, analytics: SearchAnalytics) -> RragResult<()> {
392        if !*self.is_running.read().await {
393            return Err(RragError::config("search_analyzer", "running", "stopped"));
394        }
395
396        // Update query patterns
397        let mut patterns = self.query_patterns.write().await;
398        *patterns.entry(analytics.query.clone()).or_insert(0) += 1;
399        drop(patterns);
400
401        // Store in history
402        let mut history = self.search_history.write().await;
403        history.push(analytics.clone());
404
405        // Keep only recent data (last 1000 searches)
406        let current_len = history.len();
407        if current_len > 1000 {
408            history.drain(0..current_len - 1000);
409        }
410
411        // Update metrics
412        let _ = self
413            .metrics_collector
414            .inc_counter("search_queries_total")
415            .await;
416        let _ = self
417            .metrics_collector
418            .record_timer("search_processing_time_ms", analytics.processing_time_ms)
419            .await;
420        let _ = self
421            .metrics_collector
422            .observe_histogram("search_results_count", analytics.results_count as f64, None)
423            .await;
424
425        if analytics.success {
426            let _ = self
427                .metrics_collector
428                .inc_counter("search_queries_successful")
429                .await;
430        } else {
431            let _ = self
432                .metrics_collector
433                .inc_counter("search_queries_failed")
434                .await;
435        }
436
437        if analytics.cache_hit {
438            let _ = self
439                .metrics_collector
440                .inc_counter("search_cache_hits")
441                .await;
442        } else {
443            let _ = self
444                .metrics_collector
445                .inc_counter("search_cache_misses")
446                .await;
447        }
448
449        Ok(())
450    }
451
452    pub async fn get_popular_queries(&self, limit: usize) -> Vec<(String, u64)> {
453        let patterns = self.query_patterns.read().await;
454        let mut query_counts: Vec<_> = patterns
455            .iter()
456            .map(|(query, count)| (query.clone(), *count))
457            .collect();
458
459        query_counts.sort_by(|a, b| b.1.cmp(&a.1));
460        query_counts.into_iter().take(limit).collect()
461    }
462
463    pub async fn get_search_stats(&self) -> SearchStats {
464        let history = self.search_history.read().await;
465
466        if history.is_empty() {
467            return SearchStats::default();
468        }
469
470        let total_searches = history.len();
471        let successful_searches = history.iter().filter(|s| s.success).count();
472        let cache_hits = history.iter().filter(|s| s.cache_hit).count();
473        let rerank_applied = history.iter().filter(|s| s.rerank_applied).count();
474
475        let avg_processing_time =
476            history.iter().map(|s| s.processing_time_ms).sum::<f64>() / total_searches as f64;
477
478        let avg_results_count =
479            history.iter().map(|s| s.results_count as f64).sum::<f64>() / total_searches as f64;
480
481        SearchStats {
482            total_searches,
483            successful_searches,
484            success_rate: (successful_searches as f64 / total_searches as f64) * 100.0,
485            cache_hit_rate: (cache_hits as f64 / total_searches as f64) * 100.0,
486            rerank_usage_rate: (rerank_applied as f64 / total_searches as f64) * 100.0,
487            average_processing_time_ms: avg_processing_time,
488            average_results_count: avg_results_count,
489            query_types: self.analyze_query_types(&history),
490        }
491    }
492
493    fn analyze_query_types(&self, history: &[SearchAnalytics]) -> HashMap<QueryType, usize> {
494        let mut counts = HashMap::new();
495        for search in history {
496            *counts.entry(search.query_type.clone()).or_insert(0) += 1;
497        }
498        counts
499    }
500
501    fn classify_query(&self, query: &str) -> QueryType {
502        // Simple heuristic-based classification
503        let query_lower = query.to_lowercase();
504
505        if query_lower.contains("what")
506            || query_lower.contains("who")
507            || query_lower.contains("when")
508            || query_lower.contains("where")
509        {
510            QueryType::Factual
511        } else if query_lower.contains("how")
512            || query_lower.contains("explain")
513            || query_lower.contains("describe")
514        {
515            QueryType::Procedural
516        } else if query_lower.contains("why")
517            || query_lower.contains("concept")
518            || query_lower.contains("theory")
519        {
520            QueryType::Conceptual
521        } else if query_lower.contains("can you")
522            || query_lower.contains("please")
523            || query_lower.len() > 50
524        {
525            QueryType::Conversational
526        } else {
527            QueryType::Unknown
528        }
529    }
530}
531
532/// Search statistics summary
533#[derive(Debug, Clone, Serialize, Deserialize)]
534pub struct SearchStats {
535    pub total_searches: usize,
536    pub successful_searches: usize,
537    pub success_rate: f64,
538    pub cache_hit_rate: f64,
539    pub rerank_usage_rate: f64,
540    pub average_processing_time_ms: f64,
541    pub average_results_count: f64,
542    pub query_types: HashMap<QueryType, usize>,
543}
544
545impl Default for SearchStats {
546    fn default() -> Self {
547        Self {
548            total_searches: 0,
549            successful_searches: 0,
550            success_rate: 0.0,
551            cache_hit_rate: 0.0,
552            rerank_usage_rate: 0.0,
553            average_processing_time_ms: 0.0,
554            average_results_count: 0.0,
555            query_types: HashMap::new(),
556        }
557    }
558}
559
560/// User activity tracking service
561pub struct UserActivityTracker {
562    config: MonitoringConfig,
563    metrics_collector: Arc<MetricsCollector>,
564    activity_history: Arc<RwLock<Vec<UserActivity>>>,
565    active_sessions: Arc<RwLock<HashMap<String, DateTime<Utc>>>>,
566    is_running: Arc<RwLock<bool>>,
567}
568
569impl UserActivityTracker {
570    pub async fn new(config: MonitoringConfig, metrics_collector: Arc<MetricsCollector>) -> Self {
571        Self {
572            config,
573            metrics_collector,
574            activity_history: Arc::new(RwLock::new(Vec::new())),
575            active_sessions: Arc::new(RwLock::new(HashMap::new())),
576            is_running: Arc::new(RwLock::new(false)),
577        }
578    }
579
580    pub async fn start(&self) -> RragResult<()> {
581        let mut running = self.is_running.write().await;
582        if *running {
583            return Err(RragError::config(
584                "user_activity_tracker",
585                "stopped",
586                "already running",
587            ));
588        }
589        *running = true;
590        tracing::info!("User activity tracker started");
591        Ok(())
592    }
593
594    pub async fn stop(&self) -> RragResult<()> {
595        let mut running = self.is_running.write().await;
596        if !*running {
597            return Ok(());
598        }
599        *running = false;
600        tracing::info!("User activity tracker stopped");
601        Ok(())
602    }
603
604    pub async fn is_healthy(&self) -> bool {
605        *self.is_running.read().await
606    }
607
608    pub async fn track_activity(&self, activity: UserActivity) -> RragResult<()> {
609        if !*self.is_running.read().await {
610            return Err(RragError::config(
611                "user_activity_tracker",
612                "running",
613                "stopped",
614            ));
615        }
616
617        // Update active sessions
618        let mut sessions = self.active_sessions.write().await;
619        sessions.insert(activity.session_id.clone(), activity.timestamp);
620        drop(sessions);
621
622        // Store activity
623        let mut history = self.activity_history.write().await;
624        history.push(activity.clone());
625
626        // Keep only recent activity (last 10000 actions)
627        let current_len = history.len();
628        if current_len > 10000 {
629            history.drain(0..current_len - 10000);
630        }
631
632        // Update metrics
633        let _ = self
634            .metrics_collector
635            .inc_counter("user_actions_total")
636            .await;
637        let _ = self
638            .metrics_collector
639            .record_timer("user_action_response_time_ms", activity.response_time_ms)
640            .await;
641
642        match activity.action {
643            UserAction::Search => {
644                let _ = self
645                    .metrics_collector
646                    .inc_counter("user_searches_total")
647                    .await;
648            }
649            UserAction::Chat => {
650                let _ = self.metrics_collector.inc_counter("user_chats_total").await;
651            }
652            UserAction::DocumentUpload => {
653                let _ = self
654                    .metrics_collector
655                    .inc_counter("user_document_uploads_total")
656                    .await;
657            }
658            _ => {}
659        }
660
661        Ok(())
662    }
663
664    pub async fn get_active_sessions_count(&self) -> usize {
665        // Clean up old sessions (older than 1 hour)
666        let cutoff = Utc::now() - Duration::hours(1);
667        let mut sessions = self.active_sessions.write().await;
668        sessions.retain(|_, timestamp| *timestamp > cutoff);
669        sessions.len()
670    }
671
672    pub async fn get_user_stats(&self, time_window_hours: i64) -> UserStats {
673        let history = self.activity_history.read().await;
674        let cutoff_time = Utc::now() - Duration::hours(time_window_hours);
675
676        let recent_activity: Vec<_> = history
677            .iter()
678            .filter(|a| a.timestamp >= cutoff_time)
679            .collect();
680
681        if recent_activity.is_empty() {
682            return UserStats::default();
683        }
684
685        let unique_users: std::collections::HashSet<_> =
686            recent_activity.iter().map(|a| a.user_id.as_str()).collect();
687
688        let unique_sessions: std::collections::HashSet<_> = recent_activity
689            .iter()
690            .map(|a| a.session_id.as_str())
691            .collect();
692
693        let action_counts = self.count_actions(&recent_activity);
694        let avg_response_time = recent_activity
695            .iter()
696            .map(|a| a.response_time_ms)
697            .sum::<f64>()
698            / recent_activity.len() as f64;
699
700        UserStats {
701            total_actions: recent_activity.len(),
702            unique_users: unique_users.len(),
703            unique_sessions: unique_sessions.len(),
704            action_breakdown: action_counts,
705            average_response_time_ms: avg_response_time,
706            time_window_hours,
707        }
708    }
709
710    fn count_actions(&self, activities: &[&UserActivity]) -> HashMap<String, usize> {
711        let mut counts = HashMap::new();
712        for activity in activities {
713            let action_name = match &activity.action {
714                UserAction::Search => "search",
715                UserAction::Chat => "chat",
716                UserAction::DocumentUpload => "document_upload",
717                UserAction::DocumentView => "document_view",
718                UserAction::SystemHealth => "system_health",
719                UserAction::Other(name) => name,
720            };
721            *counts.entry(action_name.to_string()).or_insert(0) += 1;
722        }
723        counts
724    }
725}
726
727/// User activity statistics
728#[derive(Debug, Clone, Serialize, Deserialize)]
729pub struct UserStats {
730    pub total_actions: usize,
731    pub unique_users: usize,
732    pub unique_sessions: usize,
733    pub action_breakdown: HashMap<String, usize>,
734    pub average_response_time_ms: f64,
735    pub time_window_hours: i64,
736}
737
738impl Default for UserStats {
739    fn default() -> Self {
740        Self {
741            total_actions: 0,
742            unique_users: 0,
743            unique_sessions: 0,
744            action_breakdown: HashMap::new(),
745            average_response_time_ms: 0.0,
746            time_window_hours: 24,
747        }
748    }
749}
750
751/// Main system monitor orchestrating all monitoring services
752pub struct SystemMonitor {
753    config: MonitoringConfig,
754    performance_monitor: Arc<PerformanceMonitor>,
755    search_analyzer: Arc<SearchAnalyzer>,
756    user_tracker: Arc<UserActivityTracker>,
757    is_running: Arc<RwLock<bool>>,
758}
759
760impl SystemMonitor {
761    pub async fn new(
762        config: MonitoringConfig,
763        metrics_collector: Arc<MetricsCollector>,
764    ) -> RragResult<Self> {
765        let performance_monitor =
766            Arc::new(PerformanceMonitor::new(config.clone(), metrics_collector.clone()).await?);
767        let search_analyzer =
768            Arc::new(SearchAnalyzer::new(config.clone(), metrics_collector.clone()).await);
769        let user_tracker =
770            Arc::new(UserActivityTracker::new(config.clone(), metrics_collector).await);
771
772        Ok(Self {
773            config,
774            performance_monitor,
775            search_analyzer,
776            user_tracker,
777            is_running: Arc::new(RwLock::new(false)),
778        })
779    }
780
781    pub async fn start(&self) -> RragResult<()> {
782        let mut running = self.is_running.write().await;
783        if *running {
784            return Err(RragError::config(
785                "system_monitor",
786                "stopped",
787                "already running",
788            ));
789        }
790
791        // Start all monitoring services
792        if self.config.resource_monitoring_enabled {
793            self.performance_monitor.start().await?;
794        }
795
796        if self.config.search_analytics_enabled {
797            self.search_analyzer.start().await?;
798        }
799
800        if self.config.user_tracking_enabled {
801            self.user_tracker.start().await?;
802        }
803
804        *running = true;
805        tracing::info!("System monitor started");
806        Ok(())
807    }
808
809    pub async fn stop(&self) -> RragResult<()> {
810        let mut running = self.is_running.write().await;
811        if !*running {
812            return Ok(());
813        }
814
815        // Stop all monitoring services
816        self.performance_monitor.stop().await?;
817        self.search_analyzer.stop().await?;
818        self.user_tracker.stop().await?;
819
820        *running = false;
821        tracing::info!("System monitor stopped");
822        Ok(())
823    }
824
825    pub async fn is_healthy(&self) -> bool {
826        *self.is_running.read().await
827            && self.performance_monitor.is_healthy().await
828            && self.search_analyzer.is_healthy().await
829            && self.user_tracker.is_healthy().await
830    }
831
832    pub fn performance(&self) -> &PerformanceMonitor {
833        &self.performance_monitor
834    }
835
836    pub fn search_analytics(&self) -> &SearchAnalyzer {
837        &self.search_analyzer
838    }
839
840    pub fn user_activity(&self) -> &UserActivityTracker {
841        &self.user_tracker
842    }
843
844    pub async fn get_system_overview(&self) -> SystemOverview {
845        let performance = if self.config.resource_monitoring_enabled {
846            self.performance_monitor.get_current_metrics().await.ok()
847        } else {
848            None
849        };
850
851        let search_stats = if self.config.search_analytics_enabled {
852            Some(self.search_analyzer.get_search_stats().await)
853        } else {
854            None
855        };
856
857        let user_stats = if self.config.user_tracking_enabled {
858            Some(self.user_tracker.get_user_stats(24).await)
859        } else {
860            None
861        };
862
863        SystemOverview {
864            timestamp: Utc::now(),
865            performance_metrics: performance,
866            search_stats,
867            user_stats,
868            active_sessions: if self.config.user_tracking_enabled {
869                Some(self.user_tracker.get_active_sessions_count().await)
870            } else {
871                None
872            },
873        }
874    }
875}
876
877/// Complete system overview
878#[derive(Debug, Clone, Serialize, Deserialize)]
879pub struct SystemOverview {
880    pub timestamp: DateTime<Utc>,
881    pub performance_metrics: Option<PerformanceMetrics>,
882    pub search_stats: Option<SearchStats>,
883    pub user_stats: Option<UserStats>,
884    pub active_sessions: Option<usize>,
885}
886
887/// Monitoring service interface
888pub struct MonitoringService {
889    system_monitor: Arc<RwLock<SystemMonitor>>,
890}
891
892impl MonitoringService {
893    pub async fn new(
894        config: MonitoringConfig,
895        metrics_collector: Arc<MetricsCollector>,
896    ) -> RragResult<Self> {
897        let system_monitor = SystemMonitor::new(config, metrics_collector).await?;
898        Ok(Self {
899            system_monitor: Arc::new(RwLock::new(system_monitor)),
900        })
901    }
902
903    pub async fn start(&self) -> RragResult<()> {
904        let monitor = self.system_monitor.read().await;
905        monitor.start().await
906    }
907
908    pub async fn stop(&self) -> RragResult<()> {
909        let monitor = self.system_monitor.read().await;
910        monitor.stop().await
911    }
912
913    pub async fn is_healthy(&self) -> bool {
914        let monitor = self.system_monitor.read().await;
915        monitor.is_healthy().await
916    }
917
918    pub async fn get_overview(&self) -> SystemOverview {
919        let monitor = self.system_monitor.read().await;
920        monitor.get_system_overview().await
921    }
922
923    pub async fn record_search(&self, analytics: SearchAnalytics) -> RragResult<()> {
924        let monitor = self.system_monitor.read().await;
925        monitor.search_analytics().record_search(analytics).await
926    }
927
928    pub async fn track_user_activity(&self, activity: UserActivity) -> RragResult<()> {
929        let monitor = self.system_monitor.read().await;
930        monitor.user_activity().track_activity(activity).await
931    }
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937    use crate::observability::metrics::MetricsConfig;
938
939    async fn create_test_metrics_collector() -> Arc<MetricsCollector> {
940        Arc::new(
941            MetricsCollector::new(MetricsConfig::default())
942                .await
943                .unwrap(),
944        )
945    }
946
947    #[tokio::test]
948    async fn test_performance_monitor() {
949        let metrics_collector = create_test_metrics_collector().await;
950        let config = MonitoringConfig::default();
951        let mut monitor = PerformanceMonitor::new(config, metrics_collector)
952            .await
953            .unwrap();
954
955        assert!(!monitor.is_healthy().await);
956
957        monitor.start().await.unwrap();
958        assert!(monitor.is_healthy().await);
959
960        let current_metrics = monitor.get_current_metrics().await.unwrap();
961        assert!(current_metrics.cpu_usage_percent >= 0.0);
962        assert!(current_metrics.memory_usage_mb >= 0.0);
963
964        monitor.stop().await.unwrap();
965        assert!(!monitor.is_healthy().await);
966    }
967
968    #[tokio::test]
969    async fn test_search_analyzer() {
970        let metrics_collector = create_test_metrics_collector().await;
971        let config = MonitoringConfig::default();
972        let analyzer = SearchAnalyzer::new(config, metrics_collector).await;
973
974        analyzer.start().await.unwrap();
975        assert!(analyzer.is_healthy().await);
976
977        let search_analytics = SearchAnalytics {
978            timestamp: Utc::now(),
979            query: "test query".to_string(),
980            query_type: QueryType::Factual,
981            results_count: 5,
982            processing_time_ms: 150.0,
983            success: true,
984            user_id: Some("user123".to_string()),
985            session_id: Some("session456".to_string()),
986            similarity_scores: vec![0.9, 0.8, 0.7],
987            rerank_applied: true,
988            cache_hit: false,
989        };
990
991        analyzer.record_search(search_analytics).await.unwrap();
992
993        let stats = analyzer.get_search_stats().await;
994        assert_eq!(stats.total_searches, 1);
995        assert_eq!(stats.successful_searches, 1);
996        assert_eq!(stats.success_rate, 100.0);
997
998        analyzer.stop().await.unwrap();
999        assert!(!analyzer.is_healthy().await);
1000    }
1001
1002    #[tokio::test]
1003    async fn test_user_activity_tracker() {
1004        let metrics_collector = create_test_metrics_collector().await;
1005        let config = MonitoringConfig::default();
1006        let tracker = UserActivityTracker::new(config, metrics_collector).await;
1007
1008        tracker.start().await.unwrap();
1009        assert!(tracker.is_healthy().await);
1010
1011        let activity = UserActivity {
1012            timestamp: Utc::now(),
1013            user_id: "user123".to_string(),
1014            session_id: "session456".to_string(),
1015            action: UserAction::Search,
1016            query: Some("test query".to_string()),
1017            response_time_ms: 200.0,
1018            success: true,
1019            ip_address: Some("127.0.0.1".to_string()),
1020            user_agent: Some("test-agent".to_string()),
1021        };
1022
1023        tracker.track_activity(activity).await.unwrap();
1024
1025        let stats = tracker.get_user_stats(24).await;
1026        assert_eq!(stats.total_actions, 1);
1027        assert_eq!(stats.unique_users, 1);
1028        assert_eq!(stats.unique_sessions, 1);
1029
1030        tracker.stop().await.unwrap();
1031        assert!(!tracker.is_healthy().await);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_system_monitor() {
1036        let metrics_collector = create_test_metrics_collector().await;
1037        let config = MonitoringConfig::default();
1038        let mut monitor = SystemMonitor::new(config, metrics_collector).await.unwrap();
1039
1040        assert!(!monitor.is_healthy().await);
1041
1042        monitor.start().await.unwrap();
1043        assert!(monitor.is_healthy().await);
1044
1045        let overview = monitor.get_system_overview().await;
1046        assert!(overview.performance_metrics.is_some());
1047        assert!(overview.search_stats.is_some());
1048        assert!(overview.user_stats.is_some());
1049
1050        monitor.stop().await.unwrap();
1051        assert!(!monitor.is_healthy().await);
1052    }
1053
1054    #[test]
1055    fn test_query_classification() {
1056        let metrics_collector = futures::executor::block_on(create_test_metrics_collector());
1057        let config = MonitoringConfig::default();
1058        let analyzer = futures::executor::block_on(SearchAnalyzer::new(config, metrics_collector));
1059
1060        assert_eq!(
1061            analyzer.classify_query("What is machine learning?"),
1062            QueryType::Factual
1063        );
1064        assert_eq!(
1065            analyzer.classify_query("How do I implement a neural network?"),
1066            QueryType::Procedural
1067        );
1068        assert_eq!(
1069            analyzer.classify_query("Why does backpropagation work?"),
1070            QueryType::Conceptual
1071        );
1072        assert_eq!(
1073            analyzer.classify_query("Can you help me understand this concept please?"),
1074            QueryType::Conversational
1075        );
1076        assert_eq!(
1077            analyzer.classify_query("neural networks"),
1078            QueryType::Unknown
1079        );
1080    }
1081
1082    #[test]
1083    fn test_alert_thresholds() {
1084        let thresholds = AlertThresholds::default();
1085        assert_eq!(thresholds.cpu_usage_percent, 80.0);
1086        assert_eq!(thresholds.memory_usage_percent, 85.0);
1087        assert_eq!(thresholds.error_rate_percent, 5.0);
1088        assert_eq!(thresholds.response_time_ms, 1000.0);
1089    }
1090}