rexis_rag/observability/
export.rs

1//! # Export and Reporting System
2//!
3//! Comprehensive data export capabilities with multiple formats,
4//! automated report generation, and scheduled exports for RRAG observability data.
5
6use super::{
7    health::HealthReport,
8    metrics::{Metric, MetricsCollector},
9    monitoring::SystemOverview,
10    profiling::PerformanceReport,
11};
12use crate::{RragError, RragResult};
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19/// Export configuration
20#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ExportConfig {
22    pub enabled: bool,
23    pub default_format: ExportFormat,
24    pub output_directory: String,
25    pub max_file_size_mb: u64,
26    pub retention_days: u32,
27    pub compression_enabled: bool,
28    pub scheduled_exports: Vec<ScheduledExportConfig>,
29    pub destinations: Vec<ExportDestinationConfig>,
30}
31
32impl Default for ExportConfig {
33    fn default() -> Self {
34        Self {
35            enabled: true,
36            default_format: ExportFormat::Json,
37            output_directory: "./exports".to_string(),
38            max_file_size_mb: 100,
39            retention_days: 90,
40            compression_enabled: true,
41            scheduled_exports: Vec::new(),
42            destinations: Vec::new(),
43        }
44    }
45}
46
47/// Scheduled export configuration
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ScheduledExportConfig {
50    pub name: String,
51    pub schedule_cron: String,
52    pub export_type: ExportType,
53    pub format: ExportFormat,
54    pub destinations: Vec<String>,
55    pub filters: ExportFilters,
56}
57
58/// Export destination configuration
59#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ExportDestinationConfig {
61    pub name: String,
62    pub destination_type: DestinationType,
63    pub config: HashMap<String, String>,
64    pub enabled: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68pub enum DestinationType {
69    LocalFile,
70    S3,
71    Azure,
72    GCS,
73    SFTP,
74    HTTP,
75    Email,
76    Webhook,
77}
78
79/// Export data types
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
81pub enum ExportType {
82    Metrics,
83    Logs,
84    HealthReport,
85    PerformanceReport,
86    SystemOverview,
87    AlertHistory,
88    UserActivity,
89    CustomReport,
90}
91
92/// Supported export formats
93#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
94pub enum ExportFormat {
95    Json,
96    Csv,
97    Xml,
98    Yaml,
99    Parquet,
100    Avro,
101    Excel,
102    Pdf,
103}
104
105/// Export filters
106#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ExportFilters {
108    pub time_range: Option<TimeRange>,
109    pub components: Vec<String>,
110    pub severity_levels: Vec<String>,
111    pub custom_fields: HashMap<String, String>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TimeRange {
116    pub start: DateTime<Utc>,
117    pub end: DateTime<Utc>,
118}
119
120impl Default for ExportFilters {
121    fn default() -> Self {
122        Self {
123            time_range: None,
124            components: Vec::new(),
125            severity_levels: Vec::new(),
126            custom_fields: HashMap::new(),
127        }
128    }
129}
130
131/// Export result information
132#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct ExportResult {
134    pub export_id: String,
135    pub export_type: ExportType,
136    pub format: ExportFormat,
137    pub file_path: Option<String>,
138    pub file_size_bytes: u64,
139    pub record_count: usize,
140    pub started_at: DateTime<Utc>,
141    pub completed_at: Option<DateTime<Utc>>,
142    pub status: ExportStatus,
143    pub error_message: Option<String>,
144    pub destinations: Vec<DestinationResult>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct DestinationResult {
149    pub destination_name: String,
150    pub status: ExportStatus,
151    pub delivered_at: Option<DateTime<Utc>>,
152    pub error_message: Option<String>,
153    pub delivery_info: HashMap<String, String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub enum ExportStatus {
158    Pending,
159    InProgress,
160    Completed,
161    Failed,
162    PartiallyCompleted,
163}
164
165/// Report generation configuration
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ReportConfig {
168    pub name: String,
169    pub description: String,
170    pub report_type: ReportType,
171    pub template: Option<String>,
172    pub parameters: HashMap<String, serde_json::Value>,
173    pub output_format: ExportFormat,
174    pub include_charts: bool,
175    pub chart_config: ChartConfig,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub enum ReportType {
180    SystemHealth,
181    PerformanceSummary,
182    SecurityAudit,
183    UsageAnalytics,
184    ErrorAnalysis,
185    CapacityPlanning,
186    Custom,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ChartConfig {
191    pub chart_types: Vec<ChartType>,
192    pub color_scheme: String,
193    pub dimensions: ChartDimensions,
194    pub include_legends: bool,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum ChartType {
199    Line,
200    Bar,
201    Pie,
202    Area,
203    Scatter,
204    Heatmap,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ChartDimensions {
209    pub width: u32,
210    pub height: u32,
211}
212
213impl Default for ChartConfig {
214    fn default() -> Self {
215        Self {
216            chart_types: vec![ChartType::Line, ChartType::Bar],
217            color_scheme: "default".to_string(),
218            dimensions: ChartDimensions {
219                width: 800,
220                height: 600,
221            },
222            include_legends: true,
223        }
224    }
225}
226
227/// Data formatter trait
228#[async_trait::async_trait]
229pub trait DataFormatter: Send + Sync {
230    async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>>;
231    async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>>;
232    async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>>;
233    async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>>;
234    fn content_type(&self) -> &'static str;
235    fn file_extension(&self) -> &'static str;
236}
237
238/// JSON formatter
239pub struct JsonFormatter;
240
241#[async_trait::async_trait]
242impl DataFormatter for JsonFormatter {
243    async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>> {
244        serde_json::to_vec_pretty(metrics)
245            .map_err(|e| RragError::agent("json_formatter", e.to_string()))
246    }
247
248    async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>> {
249        serde_json::to_vec_pretty(report)
250            .map_err(|e| RragError::agent("json_formatter", e.to_string()))
251    }
252
253    async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>> {
254        serde_json::to_vec_pretty(report)
255            .map_err(|e| RragError::agent("json_formatter", e.to_string()))
256    }
257
258    async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>> {
259        serde_json::to_vec_pretty(overview)
260            .map_err(|e| RragError::agent("json_formatter", e.to_string()))
261    }
262
263    fn content_type(&self) -> &'static str {
264        "application/json"
265    }
266
267    fn file_extension(&self) -> &'static str {
268        "json"
269    }
270}
271
272/// CSV formatter
273pub struct CsvFormatter;
274
275#[async_trait::async_trait]
276impl DataFormatter for CsvFormatter {
277    async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>> {
278        let mut output = Vec::new();
279
280        // CSV header
281        output.extend_from_slice(b"timestamp,name,type,value,labels\n");
282
283        for metric in metrics {
284            let timestamp = metric.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
285            let name = &metric.name;
286            let metric_type = format!("{:?}", metric.metric_type);
287            let value = match &metric.value {
288                super::metrics::MetricValue::Counter(v) => v.to_string(),
289                super::metrics::MetricValue::Gauge(v) => v.to_string(),
290                super::metrics::MetricValue::Timer { duration_ms, .. } => duration_ms.to_string(),
291                super::metrics::MetricValue::Histogram { sum, count, .. } => {
292                    if *count > 0 {
293                        (sum / *count as f64).to_string()
294                    } else {
295                        "0".to_string()
296                    }
297                }
298                super::metrics::MetricValue::Summary { sum, count, .. } => {
299                    if *count > 0 {
300                        (sum / *count as f64).to_string()
301                    } else {
302                        "0".to_string()
303                    }
304                }
305            };
306            let labels = metric
307                .labels
308                .iter()
309                .map(|(k, v)| format!("{}={}", k, v))
310                .collect::<Vec<_>>()
311                .join(";");
312
313            let line = format!(
314                "{},{},{},{},\"{}\"\n",
315                timestamp, name, metric_type, value, labels
316            );
317            output.extend_from_slice(line.as_bytes());
318        }
319
320        Ok(output)
321    }
322
323    async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>> {
324        let mut output = Vec::new();
325
326        // CSV header
327        output.extend_from_slice(b"component,status,last_check,response_time_ms,error_message\n");
328
329        for (component, health) in &report.services {
330            let status = health.status.to_string();
331            let last_check = health.last_check.format("%Y-%m-%d %H:%M:%S").to_string();
332            let response_time = health
333                .response_time_ms
334                .map(|t| t.to_string())
335                .unwrap_or_default();
336            let error_message = health.error_message.as_deref().unwrap_or("");
337
338            let line = format!(
339                "{},{},{},{},\"{}\"\n",
340                component, status, last_check, response_time, error_message
341            );
342            output.extend_from_slice(line.as_bytes());
343        }
344
345        Ok(output)
346    }
347
348    async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>> {
349        let mut output = Vec::new();
350
351        // CSV header for component performance
352        output.extend_from_slice(
353            b"component,operation_count,avg_duration_ms,max_duration_ms,std_deviation_ms\n",
354        );
355
356        for (component, metrics) in &report.component_performance {
357            let line = format!(
358                "{},{},{:.2},{:.2},{:.2}\n",
359                component,
360                metrics.operation_count,
361                metrics.average_duration_ms,
362                metrics.max_duration_ms,
363                metrics.standard_deviation_ms
364            );
365            output.extend_from_slice(line.as_bytes());
366        }
367
368        Ok(output)
369    }
370
371    async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>> {
372        let mut output = Vec::new();
373
374        // CSV header
375        output.extend_from_slice(
376            b"timestamp,cpu_usage,memory_usage,active_sessions,total_searches\n",
377        );
378
379        let timestamp = overview.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
380        let cpu_usage = overview
381            .performance_metrics
382            .as_ref()
383            .map(|p| p.cpu_usage_percent.to_string())
384            .unwrap_or_default();
385        let memory_usage = overview
386            .performance_metrics
387            .as_ref()
388            .map(|p| p.memory_usage_percent.to_string())
389            .unwrap_or_default();
390        let active_sessions = overview
391            .active_sessions
392            .map(|s| s.to_string())
393            .unwrap_or_default();
394        let total_searches = overview
395            .search_stats
396            .as_ref()
397            .map(|s| s.total_searches.to_string())
398            .unwrap_or_default();
399
400        let line = format!(
401            "{},{},{},{},{}\n",
402            timestamp, cpu_usage, memory_usage, active_sessions, total_searches
403        );
404        output.extend_from_slice(line.as_bytes());
405
406        Ok(output)
407    }
408
409    fn content_type(&self) -> &'static str {
410        "text/csv"
411    }
412
413    fn file_extension(&self) -> &'static str {
414        "csv"
415    }
416}
417
418/// Export destination trait
419#[async_trait::async_trait]
420pub trait ExportDestination: Send + Sync {
421    async fn export_data(
422        &self,
423        data: &[u8],
424        filename: &str,
425        content_type: &str,
426    ) -> RragResult<DestinationResult>;
427    fn destination_name(&self) -> &str;
428    async fn test_connection(&self) -> RragResult<bool>;
429}
430
431/// Local file export destination
432pub struct LocalFileDestination {
433    name: String,
434    base_path: String,
435}
436
437impl LocalFileDestination {
438    pub fn new(name: impl Into<String>, base_path: impl Into<String>) -> Self {
439        Self {
440            name: name.into(),
441            base_path: base_path.into(),
442        }
443    }
444}
445
446#[async_trait::async_trait]
447impl ExportDestination for LocalFileDestination {
448    async fn export_data(
449        &self,
450        data: &[u8],
451        filename: &str,
452        _content_type: &str,
453    ) -> RragResult<DestinationResult> {
454        let full_path = format!("{}/{}", self.base_path, filename);
455
456        // Create directory if it doesn't exist
457        if let Some(parent) = std::path::Path::new(&full_path).parent() {
458            tokio::fs::create_dir_all(parent)
459                .await
460                .map_err(|e| RragError::storage("create_directory", e))?;
461        }
462
463        // Write file
464        tokio::fs::write(&full_path, data)
465            .await
466            .map_err(|e| RragError::storage("write_file", e))?;
467
468        Ok(DestinationResult {
469            destination_name: self.name.clone(),
470            status: ExportStatus::Completed,
471            delivered_at: Some(Utc::now()),
472            error_message: None,
473            delivery_info: HashMap::from([
474                ("file_path".to_string(), full_path),
475                ("file_size".to_string(), data.len().to_string()),
476            ]),
477        })
478    }
479
480    fn destination_name(&self) -> &str {
481        &self.name
482    }
483
484    async fn test_connection(&self) -> RragResult<bool> {
485        // Test if we can write to the directory
486        match tokio::fs::metadata(&self.base_path).await {
487            Ok(metadata) => Ok(metadata.is_dir()),
488            Err(_) => {
489                // Try to create the directory
490                match tokio::fs::create_dir_all(&self.base_path).await {
491                    Ok(_) => Ok(true),
492                    Err(_) => Ok(false),
493                }
494            }
495        }
496    }
497}
498
499/// HTTP webhook export destination
500pub struct WebhookDestination {
501    name: String,
502    url: String,
503    headers: HashMap<String, String>,
504    #[cfg(feature = "http")]
505    client: reqwest::Client,
506}
507
508impl WebhookDestination {
509    pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
510        Self {
511            name: name.into(),
512            url: url.into(),
513            headers: HashMap::new(),
514            #[cfg(feature = "http")]
515            client: reqwest::Client::new(),
516        }
517    }
518
519    pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
520        self.headers.insert(key.into(), value.into());
521        self
522    }
523}
524
525#[async_trait::async_trait]
526impl ExportDestination for WebhookDestination {
527    async fn export_data(
528        &self,
529        data: &[u8],
530        filename: &str,
531        content_type: &str,
532    ) -> RragResult<DestinationResult> {
533        #[cfg(feature = "http")]
534        {
535            let mut request = self
536                .client
537                .post(&self.url)
538                .header("Content-Type", content_type)
539                .header("X-Filename", filename)
540                .body(data.to_vec());
541
542            for (key, value) in &self.headers {
543                request = request.header(key, value);
544            }
545
546            match request.send().await {
547                Ok(response) => {
548                    let status_code = response.status().as_u16();
549                    if response.status().is_success() {
550                        Ok(DestinationResult {
551                            destination_name: self.name.clone(),
552                            status: ExportStatus::Completed,
553                            delivered_at: Some(Utc::now()),
554                            error_message: None,
555                            delivery_info: HashMap::from([
556                                ("status_code".to_string(), status_code.to_string()),
557                                ("url".to_string(), self.url.clone()),
558                            ]),
559                        })
560                    } else {
561                        Ok(DestinationResult {
562                            destination_name: self.name.clone(),
563                            status: ExportStatus::Failed,
564                            delivered_at: None,
565                            error_message: Some(format!(
566                                "HTTP {}: {}",
567                                status_code,
568                                response.status()
569                            )),
570                            delivery_info: HashMap::from([(
571                                "status_code".to_string(),
572                                status_code.to_string(),
573                            )]),
574                        })
575                    }
576                }
577                Err(e) => Ok(DestinationResult {
578                    destination_name: self.name.clone(),
579                    status: ExportStatus::Failed,
580                    delivered_at: None,
581                    error_message: Some(e.to_string()),
582                    delivery_info: HashMap::new(),
583                }),
584            }
585        }
586        #[cfg(not(feature = "http"))]
587        {
588            // Without HTTP feature, return skipped status
589            Ok(DestinationResult {
590                destination_name: self.name.clone(),
591                status: ExportStatus::Failed,
592                delivered_at: None,
593                error_message: Some("HTTP feature not enabled".to_string()),
594                delivery_info: HashMap::from([
595                    ("note".to_string(), "HTTP feature disabled".to_string()),
596                    ("url".to_string(), self.url.clone()),
597                ]),
598            })
599        }
600    }
601
602    fn destination_name(&self) -> &str {
603        &self.name
604    }
605
606    async fn test_connection(&self) -> RragResult<bool> {
607        #[cfg(feature = "http")]
608        {
609            match self.client.head(&self.url).send().await {
610                Ok(response) => Ok(response.status().is_success()),
611                Err(_) => Ok(false),
612            }
613        }
614        #[cfg(not(feature = "http"))]
615        {
616            // Without HTTP feature, assume connection is fine
617            Ok(true)
618        }
619    }
620}
621
622/// Report generator
623pub struct ReportGenerator {
624    templates: Arc<RwLock<HashMap<String, String>>>,
625}
626
627impl ReportGenerator {
628    pub fn new() -> Self {
629        Self {
630            templates: Arc::new(RwLock::new(HashMap::new())),
631        }
632    }
633
634    pub async fn add_template(&self, name: impl Into<String>, template: impl Into<String>) {
635        let mut templates = self.templates.write().await;
636        templates.insert(name.into(), template.into());
637    }
638
639    pub async fn generate_report(
640        &self,
641        config: &ReportConfig,
642        data: &SystemOverview,
643    ) -> RragResult<Vec<u8>> {
644        match config.report_type {
645            ReportType::SystemHealth => self.generate_system_health_report(data).await,
646            ReportType::PerformanceSummary => self.generate_performance_summary_report(data).await,
647            ReportType::UsageAnalytics => self.generate_usage_analytics_report(data).await,
648            _ => self.generate_generic_report(config, data).await,
649        }
650    }
651
652    async fn generate_system_health_report(&self, data: &SystemOverview) -> RragResult<Vec<u8>> {
653        let report = serde_json::json!({
654            "title": "System Health Report",
655            "generated_at": Utc::now(),
656            "data": data,
657            "summary": {
658                "overall_status": "healthy",
659                "components_checked": data.performance_metrics.as_ref().map(|_| 1).unwrap_or(0),
660                "issues_detected": 0
661            }
662        });
663
664        serde_json::to_vec_pretty(&report)
665            .map_err(|e| RragError::agent("report_generator", e.to_string()))
666    }
667
668    async fn generate_performance_summary_report(
669        &self,
670        data: &SystemOverview,
671    ) -> RragResult<Vec<u8>> {
672        let report = serde_json::json!({
673            "title": "Performance Summary Report",
674            "generated_at": Utc::now(),
675            "performance_metrics": data.performance_metrics,
676            "search_stats": data.search_stats,
677            "user_stats": data.user_stats
678        });
679
680        serde_json::to_vec_pretty(&report)
681            .map_err(|e| RragError::agent("report_generator", e.to_string()))
682    }
683
684    async fn generate_usage_analytics_report(&self, data: &SystemOverview) -> RragResult<Vec<u8>> {
685        let report = serde_json::json!({
686            "title": "Usage Analytics Report",
687            "generated_at": Utc::now(),
688            "active_sessions": data.active_sessions,
689            "user_stats": data.user_stats,
690            "search_stats": data.search_stats
691        });
692
693        serde_json::to_vec_pretty(&report)
694            .map_err(|e| RragError::agent("report_generator", e.to_string()))
695    }
696
697    async fn generate_generic_report(
698        &self,
699        config: &ReportConfig,
700        data: &SystemOverview,
701    ) -> RragResult<Vec<u8>> {
702        let report = serde_json::json!({
703            "title": config.name,
704            "description": config.description,
705            "generated_at": Utc::now(),
706            "data": data
707        });
708
709        serde_json::to_vec_pretty(&report)
710            .map_err(|e| RragError::agent("report_generator", e.to_string()))
711    }
712}
713
714/// Main export manager
715pub struct ExportManager {
716    config: ExportConfig,
717    formatters: Arc<RwLock<HashMap<ExportFormat, Box<dyn DataFormatter>>>>,
718    destinations: Arc<RwLock<HashMap<String, Box<dyn ExportDestination>>>>,
719    report_generator: Arc<ReportGenerator>,
720    export_history: Arc<RwLock<Vec<ExportResult>>>,
721    export_queue: mpsc::UnboundedSender<ExportRequest>,
722    _queue_receiver: Arc<RwLock<mpsc::UnboundedReceiver<ExportRequest>>>,
723    processing_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
724    is_running: Arc<RwLock<bool>>,
725}
726
727#[derive(Debug)]
728struct ExportRequest {
729    export_id: String,
730    export_type: ExportType,
731    format: ExportFormat,
732    data: ExportData,
733    destinations: Vec<String>,
734    filters: ExportFilters,
735}
736
737#[derive(Debug)]
738enum ExportData {
739    Metrics(Vec<Metric>),
740    HealthReport(HealthReport),
741    PerformanceReport(PerformanceReport),
742    SystemOverview(SystemOverview),
743}
744
745/// Metrics exporter for integration with external systems
746pub struct MetricsExporter {
747    export_manager: Arc<ExportManager>,
748    metrics_collector: Arc<MetricsCollector>,
749}
750
751impl MetricsExporter {
752    pub fn new(
753        export_manager: Arc<ExportManager>,
754        metrics_collector: Arc<MetricsCollector>,
755    ) -> Self {
756        Self {
757            export_manager,
758            metrics_collector,
759        }
760    }
761
762    pub async fn export_current_metrics(
763        &self,
764        format: ExportFormat,
765        destinations: Vec<String>,
766    ) -> RragResult<ExportResult> {
767        let metrics = self.metrics_collector.get_all_metrics().await;
768        self.export_manager
769            .export_metrics(metrics, format, destinations, ExportFilters::default())
770            .await
771    }
772
773    pub async fn schedule_periodic_export(
774        &self,
775        interval_minutes: u32,
776        format: ExportFormat,
777        destinations: Vec<String>,
778    ) -> RragResult<()> {
779        // This would set up a periodic export job
780        // For now, just log the setup
781        tracing::info!(
782            "Scheduled metrics export every {} minutes to {:?} destinations in {:?} format",
783            interval_minutes,
784            destinations,
785            format
786        );
787        Ok(())
788    }
789}
790
791impl ExportManager {
792    pub async fn new(config: ExportConfig) -> RragResult<Self> {
793        let formatters: Arc<RwLock<HashMap<ExportFormat, Box<dyn DataFormatter>>>> =
794            Arc::new(RwLock::new(HashMap::new()));
795        let destinations: Arc<RwLock<HashMap<String, Box<dyn ExportDestination>>>> =
796            Arc::new(RwLock::new(HashMap::new()));
797        let report_generator = Arc::new(ReportGenerator::new());
798        let export_history = Arc::new(RwLock::new(Vec::new()));
799
800        let (export_queue, queue_receiver) = mpsc::unbounded_channel();
801
802        // Initialize default formatters
803        {
804            let mut fmt = formatters.write().await;
805            fmt.insert(ExportFormat::Json, Box::new(JsonFormatter));
806            fmt.insert(ExportFormat::Csv, Box::new(CsvFormatter));
807        }
808
809        // Initialize destinations from config
810        {
811            let mut dest = destinations.write().await;
812            for dest_config in &config.destinations {
813                if !dest_config.enabled {
814                    continue;
815                }
816
817                match dest_config.destination_type {
818                    DestinationType::LocalFile => {
819                        let base_path = dest_config
820                            .config
821                            .get("path")
822                            .unwrap_or(&config.output_directory);
823                        dest.insert(
824                            dest_config.name.clone(),
825                            Box::new(LocalFileDestination::new(&dest_config.name, base_path)),
826                        );
827                    }
828                    DestinationType::Webhook | DestinationType::HTTP => {
829                        if let Some(url) = dest_config.config.get("url") {
830                            let mut webhook = WebhookDestination::new(&dest_config.name, url);
831
832                            // Add custom headers
833                            for (key, value) in &dest_config.config {
834                                if key.starts_with("header_") {
835                                    let header_name = key.strip_prefix("header_").unwrap();
836                                    webhook = webhook.with_header(header_name, value);
837                                }
838                            }
839
840                            dest.insert(dest_config.name.clone(), Box::new(webhook));
841                        }
842                    }
843                    _ => {
844                        tracing::warn!(
845                            "Destination type {:?} not yet implemented",
846                            dest_config.destination_type
847                        );
848                    }
849                }
850            }
851        }
852
853        Ok(Self {
854            config,
855            formatters,
856            destinations,
857            report_generator,
858            export_history,
859            export_queue,
860            _queue_receiver: Arc::new(RwLock::new(queue_receiver)),
861            processing_handle: Arc::new(RwLock::new(None)),
862            is_running: Arc::new(RwLock::new(false)),
863        })
864    }
865
866    pub async fn start(&self) -> RragResult<()> {
867        let mut running = self.is_running.write().await;
868        if *running {
869            return Err(RragError::config(
870                "export_manager",
871                "stopped",
872                "already running",
873            ));
874        }
875
876        // Start export processing loop would go here
877        *running = true;
878        tracing::info!("Export manager started");
879        Ok(())
880    }
881
882    pub async fn stop(&self) -> RragResult<()> {
883        let mut running = self.is_running.write().await;
884        if !*running {
885            return Ok(());
886        }
887
888        {
889            let mut handle_guard = self.processing_handle.write().await;
890            if let Some(handle) = handle_guard.take() {
891                handle.abort();
892            }
893        }
894
895        *running = false;
896        tracing::info!("Export manager stopped");
897        Ok(())
898    }
899
900    pub async fn is_healthy(&self) -> bool {
901        *self.is_running.read().await
902    }
903
904    pub async fn export_metrics(
905        &self,
906        metrics: Vec<Metric>,
907        format: ExportFormat,
908        destinations: Vec<String>,
909        filters: ExportFilters,
910    ) -> RragResult<ExportResult> {
911        let export_id = uuid::Uuid::new_v4().to_string();
912        let started_at = Utc::now();
913
914        // Apply filters
915        let filtered_metrics = self.apply_metric_filters(metrics, &filters);
916
917        // Format data
918        let formatters = self.formatters.read().await;
919        let formatter = formatters.get(&format).ok_or_else(|| {
920            RragError::config("export_format", "supported", &format!("{:?}", format))
921        })?;
922
923        let formatted_data = formatter.format_metrics(&filtered_metrics).await?;
924        // formatters will be automatically dropped when it goes out of scope
925
926        // Generate filename
927        let filename = format!(
928            "metrics_{}.{}",
929            started_at.format("%Y%m%d_%H%M%S"),
930            match format {
931                ExportFormat::Json => "json",
932                ExportFormat::Csv => "csv",
933                _ => "data",
934            }
935        );
936
937        // Export to destinations
938        let destinations_map = self.destinations.read().await;
939        let mut destination_results = Vec::new();
940
941        for dest_name in destinations {
942            if let Some(destination) = destinations_map.get(&dest_name) {
943                let result = destination
944                    .export_data(&formatted_data, &filename, formatter.content_type())
945                    .await?;
946                destination_results.push(result);
947            }
948        }
949
950        let export_result = ExportResult {
951            export_id: export_id.clone(),
952            export_type: ExportType::Metrics,
953            format,
954            file_path: Some(filename),
955            file_size_bytes: formatted_data.len() as u64,
956            record_count: filtered_metrics.len(),
957            started_at,
958            completed_at: Some(Utc::now()),
959            status: if destination_results
960                .iter()
961                .all(|r| r.status == ExportStatus::Completed)
962            {
963                ExportStatus::Completed
964            } else {
965                ExportStatus::PartiallyCompleted
966            },
967            error_message: None,
968            destinations: destination_results,
969        };
970
971        // Store in history
972        let mut history = self.export_history.write().await;
973        history.push(export_result.clone());
974
975        // Keep only recent exports
976        let history_len = history.len();
977        if history_len > 1000 {
978            history.drain(0..history_len - 1000);
979        }
980
981        Ok(export_result)
982    }
983
984    fn apply_metric_filters(&self, metrics: Vec<Metric>, filters: &ExportFilters) -> Vec<Metric> {
985        metrics
986            .into_iter()
987            .filter(|metric| {
988                // Time range filter
989                if let Some(ref time_range) = filters.time_range {
990                    if metric.timestamp < time_range.start || metric.timestamp > time_range.end {
991                        return false;
992                    }
993                }
994
995                // Component filter
996                if !filters.components.is_empty() {
997                    if let Some(component) = metric.labels.get("component") {
998                        if !filters.components.contains(component) {
999                            return false;
1000                        }
1001                    }
1002                }
1003
1004                true
1005            })
1006            .collect()
1007    }
1008
1009    pub async fn generate_and_export_report(
1010        &self,
1011        config: ReportConfig,
1012        data: SystemOverview,
1013        destinations: Vec<String>,
1014    ) -> RragResult<ExportResult> {
1015        let export_id = uuid::Uuid::new_v4().to_string();
1016        let started_at = Utc::now();
1017
1018        // Generate report
1019        let report_data = self
1020            .report_generator
1021            .generate_report(&config, &data)
1022            .await?;
1023
1024        // Generate filename
1025        let filename = format!(
1026            "report_{}_{}.{}",
1027            config.name.replace(' ', "_").to_lowercase(),
1028            started_at.format("%Y%m%d_%H%M%S"),
1029            match config.output_format {
1030                ExportFormat::Json => "json",
1031                ExportFormat::Csv => "csv",
1032                ExportFormat::Pdf => "pdf",
1033                _ => "report",
1034            }
1035        );
1036
1037        // Export to destinations
1038        let destinations_map = self.destinations.read().await;
1039        let mut destination_results = Vec::new();
1040
1041        for dest_name in destinations {
1042            if let Some(destination) = destinations_map.get(&dest_name) {
1043                let result = destination
1044                    .export_data(
1045                        &report_data,
1046                        &filename,
1047                        "application/json", // Default content type
1048                    )
1049                    .await?;
1050                destination_results.push(result);
1051            }
1052        }
1053
1054        let export_result = ExportResult {
1055            export_id,
1056            export_type: ExportType::CustomReport,
1057            format: config.output_format,
1058            file_path: Some(filename),
1059            file_size_bytes: report_data.len() as u64,
1060            record_count: 1,
1061            started_at,
1062            completed_at: Some(Utc::now()),
1063            status: if destination_results
1064                .iter()
1065                .all(|r| r.status == ExportStatus::Completed)
1066            {
1067                ExportStatus::Completed
1068            } else {
1069                ExportStatus::PartiallyCompleted
1070            },
1071            error_message: None,
1072            destinations: destination_results,
1073        };
1074
1075        // Store in history
1076        let mut history = self.export_history.write().await;
1077        history.push(export_result.clone());
1078
1079        Ok(export_result)
1080    }
1081
1082    pub async fn get_export_history(&self, limit: Option<usize>) -> Vec<ExportResult> {
1083        let history = self.export_history.read().await;
1084        let limit = limit.unwrap_or(history.len());
1085        let start_index = history.len().saturating_sub(limit);
1086        history[start_index..].to_vec()
1087    }
1088
1089    pub async fn get_export_status(&self, export_id: &str) -> Option<ExportResult> {
1090        let history = self.export_history.read().await;
1091        history.iter().find(|r| r.export_id == export_id).cloned()
1092    }
1093
1094    pub async fn test_destination(&self, destination_name: &str) -> RragResult<bool> {
1095        let destinations = self.destinations.read().await;
1096        if let Some(destination) = destinations.get(destination_name) {
1097            destination.test_connection().await
1098        } else {
1099            Err(RragError::config("destination", "exists", "not_found"))
1100        }
1101    }
1102
1103    pub async fn add_destination(&self, name: String, destination: Box<dyn ExportDestination>) {
1104        let mut destinations = self.destinations.write().await;
1105        destinations.insert(name, destination);
1106    }
1107
1108    pub async fn remove_destination(&self, name: &str) {
1109        let mut destinations = self.destinations.write().await;
1110        destinations.remove(name);
1111    }
1112
1113    pub async fn list_destinations(&self) -> Vec<String> {
1114        let destinations = self.destinations.read().await;
1115        destinations.keys().cloned().collect()
1116    }
1117
1118    pub async fn get_export_stats(&self) -> ExportStats {
1119        let history = self.export_history.read().await;
1120
1121        let total_exports = history.len();
1122        let successful_exports = history
1123            .iter()
1124            .filter(|r| r.status == ExportStatus::Completed)
1125            .count();
1126        let failed_exports = history
1127            .iter()
1128            .filter(|r| r.status == ExportStatus::Failed)
1129            .count();
1130
1131        let total_data_exported = history.iter().map(|r| r.file_size_bytes).sum::<u64>();
1132
1133        let exports_by_type = history.iter().fold(HashMap::new(), |mut acc, result| {
1134            *acc.entry(result.export_type.clone()).or_insert(0) += 1;
1135            acc
1136        });
1137
1138        let exports_by_format = history.iter().fold(HashMap::new(), |mut acc, result| {
1139            *acc.entry(result.format.clone()).or_insert(0) += 1;
1140            acc
1141        });
1142
1143        ExportStats {
1144            total_exports,
1145            successful_exports,
1146            failed_exports,
1147            total_data_exported_bytes: total_data_exported,
1148            exports_by_type,
1149            exports_by_format,
1150            last_export: history.last().map(|r| r.started_at),
1151        }
1152    }
1153}
1154
1155#[derive(Debug, Clone, Serialize, Deserialize)]
1156pub struct ExportStats {
1157    pub total_exports: usize,
1158    pub successful_exports: usize,
1159    pub failed_exports: usize,
1160    pub total_data_exported_bytes: u64,
1161    pub exports_by_type: HashMap<ExportType, usize>,
1162    pub exports_by_format: HashMap<ExportFormat, usize>,
1163    pub last_export: Option<DateTime<Utc>>,
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168    use super::*;
1169
1170    #[tokio::test]
1171    async fn test_json_formatter() {
1172        let formatter = JsonFormatter;
1173        let metrics = vec![
1174            Metric::counter("test_counter", 42),
1175            Metric::gauge("test_gauge", 3.14),
1176        ];
1177
1178        let result = formatter.format_metrics(&metrics).await.unwrap();
1179        let json_str = String::from_utf8(result).unwrap();
1180
1181        assert!(json_str.contains("test_counter"));
1182        assert!(json_str.contains("test_gauge"));
1183        assert_eq!(formatter.content_type(), "application/json");
1184        assert_eq!(formatter.file_extension(), "json");
1185    }
1186
1187    #[tokio::test]
1188    async fn test_csv_formatter() {
1189        let formatter = CsvFormatter;
1190        let metrics = vec![Metric::counter("requests_total", 100).with_label("method", "GET")];
1191
1192        let result = formatter.format_metrics(&metrics).await.unwrap();
1193        let csv_str = String::from_utf8(result).unwrap();
1194
1195        assert!(csv_str.contains("timestamp,name,type,value,labels"));
1196        assert!(csv_str.contains("requests_total"));
1197        assert!(csv_str.contains("Counter"));
1198        assert_eq!(formatter.content_type(), "text/csv");
1199        assert_eq!(formatter.file_extension(), "csv");
1200    }
1201
1202    #[tokio::test]
1203    async fn test_local_file_destination() {
1204        let temp_dir = tempfile::tempdir().unwrap();
1205        let destination =
1206            LocalFileDestination::new("test_local", temp_dir.path().to_string_lossy().to_string());
1207
1208        assert!(destination.test_connection().await.unwrap());
1209
1210        let test_data = b"test export data";
1211        let result = destination
1212            .export_data(test_data, "test.json", "application/json")
1213            .await
1214            .unwrap();
1215
1216        assert_eq!(result.status, ExportStatus::Completed);
1217        assert_eq!(result.destination_name, "test_local");
1218        assert!(result.delivered_at.is_some());
1219    }
1220
1221    #[tokio::test]
1222    async fn test_export_manager() {
1223        let config = ExportConfig {
1224            output_directory: tempfile::tempdir()
1225                .unwrap()
1226                .path()
1227                .to_string_lossy()
1228                .to_string(),
1229            destinations: vec![ExportDestinationConfig {
1230                name: "local_test".to_string(),
1231                destination_type: DestinationType::LocalFile,
1232                config: HashMap::new(),
1233                enabled: true,
1234            }],
1235            ..Default::default()
1236        };
1237
1238        let mut manager = ExportManager::new(config).await.unwrap();
1239        manager.start().await.unwrap();
1240
1241        let metrics = vec![
1242            Metric::counter("test_metric", 123),
1243            Metric::gauge("test_gauge", 45.6),
1244        ];
1245
1246        let result = manager
1247            .export_metrics(
1248                metrics,
1249                ExportFormat::Json,
1250                vec!["local_test".to_string()],
1251                ExportFilters::default(),
1252            )
1253            .await
1254            .unwrap();
1255
1256        assert_eq!(result.export_type, ExportType::Metrics);
1257        assert_eq!(result.format, ExportFormat::Json);
1258        assert_eq!(result.status, ExportStatus::Completed);
1259        assert_eq!(result.record_count, 2);
1260        assert!(!result.destinations.is_empty());
1261
1262        let history = manager.get_export_history(Some(10)).await;
1263        assert_eq!(history.len(), 1);
1264
1265        let stats = manager.get_export_stats().await;
1266        assert_eq!(stats.total_exports, 1);
1267        assert_eq!(stats.successful_exports, 1);
1268
1269        manager.stop().await.unwrap();
1270    }
1271
1272    #[tokio::test]
1273    async fn test_report_generator() {
1274        let generator = ReportGenerator::new();
1275        let overview = SystemOverview {
1276            timestamp: Utc::now(),
1277            performance_metrics: None,
1278            search_stats: None,
1279            user_stats: None,
1280            active_sessions: Some(10),
1281        };
1282
1283        let config = ReportConfig {
1284            name: "Test Report".to_string(),
1285            description: "A test report".to_string(),
1286            report_type: ReportType::SystemHealth,
1287            template: None,
1288            parameters: HashMap::new(),
1289            output_format: ExportFormat::Json,
1290            include_charts: false,
1291            chart_config: ChartConfig::default(),
1292        };
1293
1294        let report_data = generator.generate_report(&config, &overview).await.unwrap();
1295        let report_str = String::from_utf8(report_data).unwrap();
1296
1297        assert!(report_str.contains("System Health Report"));
1298        assert!(report_str.contains("generated_at"));
1299    }
1300
1301    #[test]
1302    fn test_export_filters() {
1303        let filters = ExportFilters {
1304            time_range: Some(TimeRange {
1305                start: Utc::now() - Duration::hours(1),
1306                end: Utc::now(),
1307            }),
1308            components: vec!["search".to_string(), "storage".to_string()],
1309            ..Default::default()
1310        };
1311
1312        assert!(filters.time_range.is_some());
1313        assert_eq!(filters.components.len(), 2);
1314        assert!(filters.components.contains(&"search".to_string()));
1315    }
1316
1317    #[test]
1318    fn test_export_status() {
1319        assert_eq!(ExportStatus::Completed, ExportStatus::Completed);
1320        assert_ne!(ExportStatus::Completed, ExportStatus::Failed);
1321    }
1322}