rexis_rag/observability/
mod.rs

1//! # RRAG Observability System
2//!
3//! Enterprise-grade observability and monitoring for production RAG deployments.
4//!
5//! This module provides comprehensive monitoring, metrics collection, alerting,
6//! and visualization capabilities to ensure your RAG system operates reliably
7//! at scale. It includes real-time dashboards, intelligent alerting, performance
8//! profiling, and data export capabilities.
9//!
10//! ## Features
11//!
12//! - **Metrics Collection**: Prometheus-compatible metrics with custom dashboards
13//! - **Real-time Monitoring**: Live system health and performance tracking
14//! - **Intelligent Alerting**: Smart alerts with multiple notification channels
15//! - **Performance Profiling**: Bottleneck detection and optimization insights
16//! - **Health Monitoring**: Component-level health checks and diagnostics
17//! - **Log Aggregation**: Structured logging with search and analysis
18//! - **Data Export**: Export metrics and logs for external analysis
19//! - **Data Retention**: Configurable retention policies for long-term storage
20//!
21//! ## Quick Start
22//!
23//! ### Basic Observability Setup
24//! ```rust
25//! use rrag::observability::{ObservabilitySystem, ObservabilityConfig};
26//!
27//! # async fn example() -> rrag::RragResult<()> {
28//! let observability = ObservabilitySystem::new(
29//!     ObservabilityConfig::default()
30//!         .with_metrics(true)
31//!         .with_monitoring(true)
32//!         .with_alerting(true)
33//!         .with_dashboard(true)
34//! ).await?;
35//!
36//! // Start the observability system
37//! observability.start().await?;
38//!
39//! // Access components
40//! let metrics = observability.metrics();
41//! let monitoring = observability.monitoring();
42//! let alerting = observability.alerting();
43//! # Ok(())
44//! # }
45//! ```
46//!
47//! ### Custom Metrics Collection
48//! ```rust
49//! use rrag::observability::{MetricsCollector, MetricType};
50//!
51//! # async fn example() -> rrag::RragResult<()> {
52//! let metrics = MetricsCollector::new();
53//!
54//! // Counter metrics
55//! metrics.inc_counter("requests_total").await?;
56//! metrics.inc_counter_by("documents_processed", 10).await?;
57//!
58//! // Gauge metrics
59//! metrics.set_gauge("active_users", 150.0).await?;
60//! metrics.set_gauge("memory_usage_mb", 512.0).await?;
61//!
62//! // Histogram metrics for latency
63//! metrics.observe_histogram("request_duration_ms", 45.2).await?;
64//!
65//! // Timer metrics
66//! let timer = metrics.start_timer("query_processing_time");
67//! // ... do work ...
68//! timer.stop().await?;
69//! # Ok(())
70//! # }
71//! ```
72//!
73//! ### Alert Configuration
74//! ```rust
75//! use rrag::observability::{AlertManager, AlertRule, AlertSeverity};
76//!
77//! # async fn example() -> rrag::RragResult<()> {
78//! let alert_manager = AlertManager::new();
79//!
80//! // High latency alert
81//! let latency_alert = AlertRule::new("high_latency")
82//!     .condition("avg(request_duration_ms) > 1000")
83//!     .severity(AlertSeverity::High)
84//!     .description("Query latency is too high")
85//!     .notification_channels(vec!["slack", "email"])
86//!     .cooldown_minutes(5);
87//!
88//! alert_manager.add_rule(latency_alert).await?;
89//!
90//! // Error rate alert
91//! let error_alert = AlertRule::new("high_error_rate")
92//!     .condition("rate(error_count) > 0.05")
93//!     .severity(AlertSeverity::Critical)
94//!     .description("Error rate exceeded 5%")
95//!     .notification_channels(vec!["pagerduty", "slack"]);
96//!
97//! alert_manager.add_rule(error_alert).await?;
98//! # Ok(())
99//! # }
100//! ```
101//!
102//! ### Health Monitoring
103//! ```rust
104//! use rrag::observability::{HealthMonitor, HealthCheck};
105//!
106//! # async fn example() -> rrag::RragResult<()> {
107//! let health_monitor = HealthMonitor::new();
108//!
109//! // Add custom health checks
110//! health_monitor.add_check(
111//!     "database",
112//!     Box::new(|_| async {
113//!         // Check database connectivity
114//!         Ok(true)
115//!     })
116//! ).await?;
117//!
118//! health_monitor.add_check(
119//!     "embedding_service",
120//!     Box::new(|_| async {
121//!         // Check embedding service
122//!         Ok(true)
123//!     })
124//! ).await?;
125//!
126//! // Get overall health status
127//! let status = health_monitor.check_all().await?;
128//! tracing::debug!("System health: {:?}", status.overall_status);
129//! # Ok(())
130//! # }
131//! ```
132//!
133//! ### Performance Profiling
134//! ```rust
135//! use rrag::observability::{PerformanceProfiler, ProfileConfig};
136//!
137//! # async fn example() -> rrag::RragResult<()> {
138//! let profiler = PerformanceProfiler::new(ProfileConfig::default());
139//!
140//! // Start profiling a specific operation
141//! let profile_id = profiler.start_profile("document_processing").await?;
142//!
143//! // ... perform work ...
144//!
145//! let profile = profiler.stop_profile(profile_id).await?;
146//!
147//! // Analyze bottlenecks
148//! let bottlenecks = profiler.analyze_bottlenecks(5).await?;
149//! for bottleneck in bottlenecks.bottlenecks {
150//!     tracing::debug!("Bottleneck: {} took {:.2}ms",
151//!              bottleneck.operation,
152//!              bottleneck.average_duration_ms);
153//! }
154//! # Ok(())
155//! # }
156//! ```
157//!
158//! ### Dashboard and Visualization
159//! ```rust
160//! use rrag::observability::{DashboardServer, DashboardConfig};
161//!
162//! # async fn example() -> rrag::RragResult<()> {
163//! let dashboard = DashboardServer::new(
164//!     DashboardConfig::default()
165//!         .with_port(3000)
166//!         .with_realtime_updates(true)
167//!         .with_custom_charts(vec![
168//!             "query_latency_histogram",
169//!             "documents_processed_rate",
170//!             "error_rate_by_component"
171//!         ])
172//! );
173//!
174//! // Start dashboard server
175//! dashboard.start().await?;
176//! tracing::debug!("Dashboard available at: http://localhost:3000");
177//! # Ok(())
178//! # }
179//! ```
180//!
181//! ## Integration Examples
182//!
183//! ### With RAG System
184//! ```rust
185//! use rrag::{RragSystemBuilder, observability::ObservabilityConfig};
186//!
187//! # async fn example() -> rrag::RragResult<()> {
188//! let rag = RragSystemBuilder::new()
189//!     .with_observability(
190//!         ObservabilityConfig::production()
191//!             .with_prometheus_endpoint(true)
192//!             .with_health_checks(true)
193//!             .with_performance_profiling(true)
194//!     )
195//!     .build()
196//!     .await?;
197//!
198//! // System automatically reports metrics
199//! let results = rag.search("query", Some(10)).await?;
200//! // Metrics like query_count, search_latency, results_returned are automatic
201//! # Ok(())
202//! # }
203//! ```
204
205pub mod alerting;
206pub mod dashboard;
207pub mod export;
208pub mod health;
209pub mod logging;
210pub mod metrics;
211pub mod monitoring;
212pub mod profiling;
213pub mod retention;
214
215// Core observability system
216pub use alerting::{
217    AlertCondition, AlertConfig, AlertManager, AlertNotification, AlertRule, AlertSeverity,
218    NotificationChannel,
219};
220pub use dashboard::{
221    ChartData, DashboardConfig, DashboardHandler, DashboardMetrics, DashboardServer,
222    RealtimeMetrics, WebSocketManager,
223};
224pub use export::{
225    ExportConfig, ExportFormat, ExportManager, MetricsExporter, ReportConfig, ReportGenerator,
226};
227pub use health::{
228    ComponentStatus, HealthChecker, HealthConfig, HealthMonitor, HealthReport, ServiceHealth,
229};
230pub use logging::{
231    LogAggregator, LogConfig, LogEntry, LogFilter, LogLevel, LogQuery, StructuredLogger,
232};
233pub use metrics::{
234    CounterMetric, GaugeMetric, HistogramMetric, Metric, MetricType, MetricValue, MetricsCollector,
235    MetricsRegistry, TimerMetric,
236};
237pub use monitoring::{
238    MonitoringConfig, MonitoringService, PerformanceMonitor, SearchAnalyzer, SystemMonitor,
239    UserActivityTracker,
240};
241pub use profiling::{
242    BottleneckAnalysis, PerformanceProfiler, PerformanceReport, ProfileData, Profiler,
243    ProfilingConfig,
244};
245pub use retention::{
246    ArchiveManager, DataRetention, HistoricalAnalyzer, RetentionConfig, RetentionPolicy,
247};
248
249use crate::{RragError, RragResult};
250use chrono::{DateTime, Utc};
251use serde::{Deserialize, Serialize};
252use std::collections::HashMap;
253use std::sync::Arc;
254use tokio::sync::RwLock;
255
256/// Main observability system configuration
257#[derive(Debug, Clone, Serialize, Deserialize)]
258pub struct ObservabilityConfig {
259    /// System identification
260    pub system_id: String,
261    pub environment: String,
262
263    /// Component configurations
264    pub metrics: metrics::MetricsConfig,
265    pub monitoring: monitoring::MonitoringConfig,
266    pub alerting: alerting::AlertConfig,
267    pub dashboard: dashboard::DashboardConfig,
268    pub logging: logging::LogConfig,
269    pub health: health::HealthConfig,
270    pub profiling: profiling::ProfilingConfig,
271    pub export: export::ExportConfig,
272    pub retention: retention::RetentionConfig,
273
274    /// Global settings
275    pub enabled: bool,
276    pub sample_rate: f64,
277    pub batch_size: usize,
278    pub flush_interval_seconds: u64,
279}
280
281impl Default for ObservabilityConfig {
282    fn default() -> Self {
283        Self {
284            system_id: "rrag-system".to_string(),
285            environment: "production".to_string(),
286            metrics: metrics::MetricsConfig::default(),
287            monitoring: monitoring::MonitoringConfig::default(),
288            alerting: alerting::AlertConfig::default(),
289            dashboard: dashboard::DashboardConfig::default(),
290            logging: logging::LogConfig::default(),
291            health: health::HealthConfig::default(),
292            profiling: profiling::ProfilingConfig::default(),
293            export: export::ExportConfig::default(),
294            retention: retention::RetentionConfig::default(),
295            enabled: true,
296            sample_rate: 1.0,
297            batch_size: 100,
298            flush_interval_seconds: 30,
299        }
300    }
301}
302
303/// Main observability system
304pub struct ObservabilitySystem {
305    config: ObservabilityConfig,
306    metrics: Arc<MetricsCollector>,
307    monitoring: Arc<SystemMonitor>,
308    alerting: Arc<AlertManager>,
309    dashboard: Arc<DashboardServer>,
310    logging: Arc<LogAggregator>,
311    health: Arc<HealthMonitor>,
312    profiling: Arc<PerformanceProfiler>,
313    export: Arc<ExportManager>,
314    retention: Arc<DataRetention>,
315
316    // Internal state
317    start_time: DateTime<Utc>,
318    is_running: Arc<RwLock<bool>>,
319}
320
321impl ObservabilitySystem {
322    /// Create new observability system
323    pub async fn new(config: ObservabilityConfig) -> RragResult<Self> {
324        if !config.enabled {
325            return Err(RragError::config("observability.enabled", "true", "false"));
326        }
327
328        let metrics = Arc::new(MetricsCollector::new(config.metrics.clone()).await?);
329        let monitoring =
330            Arc::new(SystemMonitor::new(config.monitoring.clone(), metrics.clone()).await?);
331
332        let alerting = Arc::new(AlertManager::new(config.alerting.clone(), metrics.clone()).await?);
333
334        let dashboard = Arc::new(
335            DashboardServer::new(
336                config.dashboard.clone(),
337                metrics.clone(),
338                monitoring.clone(),
339            )
340            .await?,
341        );
342
343        let logging = Arc::new(LogAggregator::new(config.logging.clone()).await?);
344        let health = Arc::new(HealthMonitor::new(config.health.clone()).await?);
345        let profiling = Arc::new(PerformanceProfiler::new(config.profiling.clone()).await?);
346        let export = Arc::new(ExportManager::new(config.export.clone()).await?);
347        let retention = Arc::new(DataRetention::new(config.retention.clone()).await?);
348
349        Ok(Self {
350            config,
351            metrics,
352            monitoring,
353            alerting,
354            dashboard,
355            logging,
356            health,
357            profiling,
358            export,
359            retention,
360            start_time: Utc::now(),
361            is_running: Arc::new(RwLock::new(false)),
362        })
363    }
364
365    /// Start the observability system
366    pub async fn start(&self) -> RragResult<()> {
367        let mut running = self.is_running.write().await;
368        if *running {
369            return Err(RragError::config(
370                "observability",
371                "stopped",
372                "already running",
373            ));
374        }
375
376        // Start all components
377        self.metrics.start().await?;
378        self.monitoring.start().await?;
379        self.alerting.start().await?;
380        self.dashboard.start().await?;
381        self.logging.start().await?;
382        self.health.start().await?;
383        self.profiling.start().await?;
384        self.export.start().await?;
385        self.retention.start().await?;
386
387        *running = true;
388        tracing::info!("Observability system started successfully");
389
390        Ok(())
391    }
392
393    /// Stop the observability system
394    pub async fn stop(&self) -> RragResult<()> {
395        let mut running = self.is_running.write().await;
396        if !*running {
397            return Ok(());
398        }
399
400        // Stop all components in reverse order
401        self.retention.stop().await?;
402        self.export.stop().await?;
403        self.profiling.stop().await?;
404        self.health.stop().await?;
405        self.logging.stop().await?;
406        self.dashboard.stop().await?;
407        self.alerting.stop().await?;
408        self.monitoring.stop().await?;
409        self.metrics.stop().await?;
410
411        *running = false;
412        tracing::info!("Observability system stopped successfully");
413
414        Ok(())
415    }
416
417    /// Check if system is running
418    pub async fn is_running(&self) -> bool {
419        *self.is_running.read().await
420    }
421
422    /// Get metrics collector
423    pub fn metrics(&self) -> &Arc<MetricsCollector> {
424        &self.metrics
425    }
426
427    /// Get system monitor
428    pub fn monitoring(&self) -> &Arc<SystemMonitor> {
429        &self.monitoring
430    }
431
432    /// Get alert manager
433    pub fn alerting(&self) -> &Arc<AlertManager> {
434        &self.alerting
435    }
436
437    /// Get dashboard server
438    pub fn dashboard(&self) -> &Arc<DashboardServer> {
439        &self.dashboard
440    }
441
442    /// Get log aggregator
443    pub fn logging(&self) -> &Arc<LogAggregator> {
444        &self.logging
445    }
446
447    /// Get health monitor
448    pub fn health(&self) -> &Arc<HealthMonitor> {
449        &self.health
450    }
451
452    /// Get profiler
453    pub fn profiling(&self) -> &Arc<PerformanceProfiler> {
454        &self.profiling
455    }
456
457    /// Get export manager
458    pub fn export(&self) -> &Arc<ExportManager> {
459        &self.export
460    }
461
462    /// Get retention manager
463    pub fn retention(&self) -> &Arc<DataRetention> {
464        &self.retention
465    }
466
467    /// Get system configuration
468    pub fn config(&self) -> &ObservabilityConfig {
469        &self.config
470    }
471
472    /// Get system uptime
473    pub fn uptime(&self) -> chrono::Duration {
474        Utc::now() - self.start_time
475    }
476
477    /// Get comprehensive system status
478    pub async fn status(&self) -> ObservabilityStatus {
479        ObservabilityStatus {
480            running: self.is_running().await,
481            uptime_seconds: self.uptime().num_seconds(),
482            components: HashMap::from([
483                ("metrics".to_string(), self.metrics.is_healthy().await),
484                ("monitoring".to_string(), self.monitoring.is_healthy().await),
485                ("alerting".to_string(), self.alerting.is_healthy().await),
486                ("dashboard".to_string(), self.dashboard.is_healthy().await),
487                ("logging".to_string(), self.logging.is_healthy().await),
488                ("health".to_string(), self.health.is_healthy().await),
489                ("profiling".to_string(), self.profiling.is_healthy().await),
490                ("export".to_string(), self.export.is_healthy().await),
491                ("retention".to_string(), self.retention.is_healthy().await),
492            ]),
493            last_check: Utc::now(),
494        }
495    }
496}
497
498/// System status information
499#[derive(Debug, Clone, Serialize, Deserialize)]
500pub struct ObservabilityStatus {
501    pub running: bool,
502    pub uptime_seconds: i64,
503    pub components: HashMap<String, bool>,
504    pub last_check: DateTime<Utc>,
505}
506
507/// Builder for observability system
508pub struct ObservabilityBuilder {
509    config: ObservabilityConfig,
510}
511
512impl ObservabilityBuilder {
513    pub fn new() -> Self {
514        Self {
515            config: ObservabilityConfig::default(),
516        }
517    }
518
519    pub fn with_system_id(mut self, id: impl Into<String>) -> Self {
520        self.config.system_id = id.into();
521        self
522    }
523
524    pub fn with_environment(mut self, env: impl Into<String>) -> Self {
525        self.config.environment = env.into();
526        self
527    }
528
529    pub fn with_sample_rate(mut self, rate: f64) -> Self {
530        self.config.sample_rate = rate.clamp(0.0, 1.0);
531        self
532    }
533
534    pub fn with_batch_size(mut self, size: usize) -> Self {
535        self.config.batch_size = size;
536        self
537    }
538
539    pub fn with_flush_interval(mut self, seconds: u64) -> Self {
540        self.config.flush_interval_seconds = seconds;
541        self
542    }
543
544    pub fn enable_dashboard(mut self, enabled: bool) -> Self {
545        self.config.dashboard.enabled = enabled;
546        self
547    }
548
549    pub fn with_dashboard_port(mut self, port: u16) -> Self {
550        self.config.dashboard.port = port;
551        self
552    }
553
554    pub fn enable_alerts(mut self, enabled: bool) -> Self {
555        self.config.alerting.enabled = enabled;
556        self
557    }
558
559    pub fn enable_profiling(mut self, enabled: bool) -> Self {
560        self.config.profiling.enabled = enabled;
561        self
562    }
563
564    pub fn with_retention_days(mut self, days: u32) -> Self {
565        self.config.retention.retention_days = days;
566        self
567    }
568
569    pub async fn build(self) -> RragResult<ObservabilitySystem> {
570        ObservabilitySystem::new(self.config).await
571    }
572}
573
574impl Default for ObservabilityBuilder {
575    fn default() -> Self {
576        Self::new()
577    }
578}
579
580#[cfg(test)]
581mod tests {
582    use super::*;
583
584    #[tokio::test]
585    async fn test_observability_system_creation() {
586        let config = ObservabilityConfig::default();
587        let system = ObservabilitySystem::new(config).await.unwrap();
588
589        assert!(!system.is_running().await);
590        assert_eq!(system.config.system_id, "rrag-system");
591    }
592
593    #[tokio::test]
594    async fn test_observability_builder() {
595        let system = ObservabilityBuilder::new()
596            .with_system_id("test-system")
597            .with_environment("test")
598            .with_sample_rate(0.5)
599            .build()
600            .await
601            .unwrap();
602
603        assert_eq!(system.config.system_id, "test-system");
604        assert_eq!(system.config.environment, "test");
605        assert_eq!(system.config.sample_rate, 0.5);
606    }
607
608    #[tokio::test]
609    async fn test_system_lifecycle() {
610        let system = ObservabilityBuilder::new()
611            .with_system_id("test-lifecycle")
612            .build()
613            .await
614            .unwrap();
615
616        assert!(!system.is_running().await);
617
618        system.start().await.unwrap();
619        assert!(system.is_running().await);
620
621        system.stop().await.unwrap();
622        assert!(!system.is_running().await);
623    }
624
625    #[tokio::test]
626    async fn test_system_status() {
627        let system = ObservabilityBuilder::new().build().await.unwrap();
628        let status = system.status().await;
629
630        assert!(!status.running);
631        assert!(status.uptime_seconds >= 0);
632        assert_eq!(status.components.len(), 9); // All components
633    }
634}