1use super::{
7 health::HealthReport,
8 metrics::{Metric, MetricsCollector},
9 monitoring::SystemOverview,
10 profiling::PerformanceReport,
11};
12use crate::{RragError, RragResult};
13use chrono::{DateTime, Utc};
14use serde::{Deserialize, Serialize};
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::{mpsc, RwLock};
18
19#[derive(Debug, Clone, Serialize, Deserialize)]
21pub struct ExportConfig {
22 pub enabled: bool,
23 pub default_format: ExportFormat,
24 pub output_directory: String,
25 pub max_file_size_mb: u64,
26 pub retention_days: u32,
27 pub compression_enabled: bool,
28 pub scheduled_exports: Vec<ScheduledExportConfig>,
29 pub destinations: Vec<ExportDestinationConfig>,
30}
31
32impl Default for ExportConfig {
33 fn default() -> Self {
34 Self {
35 enabled: true,
36 default_format: ExportFormat::Json,
37 output_directory: "./exports".to_string(),
38 max_file_size_mb: 100,
39 retention_days: 90,
40 compression_enabled: true,
41 scheduled_exports: Vec::new(),
42 destinations: Vec::new(),
43 }
44 }
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ScheduledExportConfig {
50 pub name: String,
51 pub schedule_cron: String,
52 pub export_type: ExportType,
53 pub format: ExportFormat,
54 pub destinations: Vec<String>,
55 pub filters: ExportFilters,
56}
57
58#[derive(Debug, Clone, Serialize, Deserialize)]
60pub struct ExportDestinationConfig {
61 pub name: String,
62 pub destination_type: DestinationType,
63 pub config: HashMap<String, String>,
64 pub enabled: bool,
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
68pub enum DestinationType {
69 LocalFile,
70 S3,
71 Azure,
72 GCS,
73 SFTP,
74 HTTP,
75 Email,
76 Webhook,
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
81pub enum ExportType {
82 Metrics,
83 Logs,
84 HealthReport,
85 PerformanceReport,
86 SystemOverview,
87 AlertHistory,
88 UserActivity,
89 CustomReport,
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)]
94pub enum ExportFormat {
95 Json,
96 Csv,
97 Xml,
98 Yaml,
99 Parquet,
100 Avro,
101 Excel,
102 Pdf,
103}
104
105#[derive(Debug, Clone, Serialize, Deserialize)]
107pub struct ExportFilters {
108 pub time_range: Option<TimeRange>,
109 pub components: Vec<String>,
110 pub severity_levels: Vec<String>,
111 pub custom_fields: HashMap<String, String>,
112}
113
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct TimeRange {
116 pub start: DateTime<Utc>,
117 pub end: DateTime<Utc>,
118}
119
120impl Default for ExportFilters {
121 fn default() -> Self {
122 Self {
123 time_range: None,
124 components: Vec::new(),
125 severity_levels: Vec::new(),
126 custom_fields: HashMap::new(),
127 }
128 }
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize)]
133pub struct ExportResult {
134 pub export_id: String,
135 pub export_type: ExportType,
136 pub format: ExportFormat,
137 pub file_path: Option<String>,
138 pub file_size_bytes: u64,
139 pub record_count: usize,
140 pub started_at: DateTime<Utc>,
141 pub completed_at: Option<DateTime<Utc>>,
142 pub status: ExportStatus,
143 pub error_message: Option<String>,
144 pub destinations: Vec<DestinationResult>,
145}
146
147#[derive(Debug, Clone, Serialize, Deserialize)]
148pub struct DestinationResult {
149 pub destination_name: String,
150 pub status: ExportStatus,
151 pub delivered_at: Option<DateTime<Utc>>,
152 pub error_message: Option<String>,
153 pub delivery_info: HashMap<String, String>,
154}
155
156#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
157pub enum ExportStatus {
158 Pending,
159 InProgress,
160 Completed,
161 Failed,
162 PartiallyCompleted,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct ReportConfig {
168 pub name: String,
169 pub description: String,
170 pub report_type: ReportType,
171 pub template: Option<String>,
172 pub parameters: HashMap<String, serde_json::Value>,
173 pub output_format: ExportFormat,
174 pub include_charts: bool,
175 pub chart_config: ChartConfig,
176}
177
178#[derive(Debug, Clone, Serialize, Deserialize)]
179pub enum ReportType {
180 SystemHealth,
181 PerformanceSummary,
182 SecurityAudit,
183 UsageAnalytics,
184 ErrorAnalysis,
185 CapacityPlanning,
186 Custom,
187}
188
189#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct ChartConfig {
191 pub chart_types: Vec<ChartType>,
192 pub color_scheme: String,
193 pub dimensions: ChartDimensions,
194 pub include_legends: bool,
195}
196
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub enum ChartType {
199 Line,
200 Bar,
201 Pie,
202 Area,
203 Scatter,
204 Heatmap,
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct ChartDimensions {
209 pub width: u32,
210 pub height: u32,
211}
212
213impl Default for ChartConfig {
214 fn default() -> Self {
215 Self {
216 chart_types: vec![ChartType::Line, ChartType::Bar],
217 color_scheme: "default".to_string(),
218 dimensions: ChartDimensions {
219 width: 800,
220 height: 600,
221 },
222 include_legends: true,
223 }
224 }
225}
226
227#[async_trait::async_trait]
229pub trait DataFormatter: Send + Sync {
230 async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>>;
231 async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>>;
232 async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>>;
233 async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>>;
234 fn content_type(&self) -> &'static str;
235 fn file_extension(&self) -> &'static str;
236}
237
238pub struct JsonFormatter;
240
241#[async_trait::async_trait]
242impl DataFormatter for JsonFormatter {
243 async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>> {
244 serde_json::to_vec_pretty(metrics)
245 .map_err(|e| RragError::agent("json_formatter", e.to_string()))
246 }
247
248 async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>> {
249 serde_json::to_vec_pretty(report)
250 .map_err(|e| RragError::agent("json_formatter", e.to_string()))
251 }
252
253 async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>> {
254 serde_json::to_vec_pretty(report)
255 .map_err(|e| RragError::agent("json_formatter", e.to_string()))
256 }
257
258 async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>> {
259 serde_json::to_vec_pretty(overview)
260 .map_err(|e| RragError::agent("json_formatter", e.to_string()))
261 }
262
263 fn content_type(&self) -> &'static str {
264 "application/json"
265 }
266
267 fn file_extension(&self) -> &'static str {
268 "json"
269 }
270}
271
272pub struct CsvFormatter;
274
275#[async_trait::async_trait]
276impl DataFormatter for CsvFormatter {
277 async fn format_metrics(&self, metrics: &[Metric]) -> RragResult<Vec<u8>> {
278 let mut output = Vec::new();
279
280 output.extend_from_slice(b"timestamp,name,type,value,labels\n");
282
283 for metric in metrics {
284 let timestamp = metric.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
285 let name = &metric.name;
286 let metric_type = format!("{:?}", metric.metric_type);
287 let value = match &metric.value {
288 super::metrics::MetricValue::Counter(v) => v.to_string(),
289 super::metrics::MetricValue::Gauge(v) => v.to_string(),
290 super::metrics::MetricValue::Timer { duration_ms, .. } => duration_ms.to_string(),
291 super::metrics::MetricValue::Histogram { sum, count, .. } => {
292 if *count > 0 {
293 (sum / *count as f64).to_string()
294 } else {
295 "0".to_string()
296 }
297 }
298 super::metrics::MetricValue::Summary { sum, count, .. } => {
299 if *count > 0 {
300 (sum / *count as f64).to_string()
301 } else {
302 "0".to_string()
303 }
304 }
305 };
306 let labels = metric
307 .labels
308 .iter()
309 .map(|(k, v)| format!("{}={}", k, v))
310 .collect::<Vec<_>>()
311 .join(";");
312
313 let line = format!(
314 "{},{},{},{},\"{}\"\n",
315 timestamp, name, metric_type, value, labels
316 );
317 output.extend_from_slice(line.as_bytes());
318 }
319
320 Ok(output)
321 }
322
323 async fn format_health_report(&self, report: &HealthReport) -> RragResult<Vec<u8>> {
324 let mut output = Vec::new();
325
326 output.extend_from_slice(b"component,status,last_check,response_time_ms,error_message\n");
328
329 for (component, health) in &report.services {
330 let status = health.status.to_string();
331 let last_check = health.last_check.format("%Y-%m-%d %H:%M:%S").to_string();
332 let response_time = health
333 .response_time_ms
334 .map(|t| t.to_string())
335 .unwrap_or_default();
336 let error_message = health.error_message.as_deref().unwrap_or("");
337
338 let line = format!(
339 "{},{},{},{},\"{}\"\n",
340 component, status, last_check, response_time, error_message
341 );
342 output.extend_from_slice(line.as_bytes());
343 }
344
345 Ok(output)
346 }
347
348 async fn format_performance_report(&self, report: &PerformanceReport) -> RragResult<Vec<u8>> {
349 let mut output = Vec::new();
350
351 output.extend_from_slice(
353 b"component,operation_count,avg_duration_ms,max_duration_ms,std_deviation_ms\n",
354 );
355
356 for (component, metrics) in &report.component_performance {
357 let line = format!(
358 "{},{},{:.2},{:.2},{:.2}\n",
359 component,
360 metrics.operation_count,
361 metrics.average_duration_ms,
362 metrics.max_duration_ms,
363 metrics.standard_deviation_ms
364 );
365 output.extend_from_slice(line.as_bytes());
366 }
367
368 Ok(output)
369 }
370
371 async fn format_system_overview(&self, overview: &SystemOverview) -> RragResult<Vec<u8>> {
372 let mut output = Vec::new();
373
374 output.extend_from_slice(
376 b"timestamp,cpu_usage,memory_usage,active_sessions,total_searches\n",
377 );
378
379 let timestamp = overview.timestamp.format("%Y-%m-%d %H:%M:%S").to_string();
380 let cpu_usage = overview
381 .performance_metrics
382 .as_ref()
383 .map(|p| p.cpu_usage_percent.to_string())
384 .unwrap_or_default();
385 let memory_usage = overview
386 .performance_metrics
387 .as_ref()
388 .map(|p| p.memory_usage_percent.to_string())
389 .unwrap_or_default();
390 let active_sessions = overview
391 .active_sessions
392 .map(|s| s.to_string())
393 .unwrap_or_default();
394 let total_searches = overview
395 .search_stats
396 .as_ref()
397 .map(|s| s.total_searches.to_string())
398 .unwrap_or_default();
399
400 let line = format!(
401 "{},{},{},{},{}\n",
402 timestamp, cpu_usage, memory_usage, active_sessions, total_searches
403 );
404 output.extend_from_slice(line.as_bytes());
405
406 Ok(output)
407 }
408
409 fn content_type(&self) -> &'static str {
410 "text/csv"
411 }
412
413 fn file_extension(&self) -> &'static str {
414 "csv"
415 }
416}
417
418#[async_trait::async_trait]
420pub trait ExportDestination: Send + Sync {
421 async fn export_data(
422 &self,
423 data: &[u8],
424 filename: &str,
425 content_type: &str,
426 ) -> RragResult<DestinationResult>;
427 fn destination_name(&self) -> &str;
428 async fn test_connection(&self) -> RragResult<bool>;
429}
430
431pub struct LocalFileDestination {
433 name: String,
434 base_path: String,
435}
436
437impl LocalFileDestination {
438 pub fn new(name: impl Into<String>, base_path: impl Into<String>) -> Self {
439 Self {
440 name: name.into(),
441 base_path: base_path.into(),
442 }
443 }
444}
445
446#[async_trait::async_trait]
447impl ExportDestination for LocalFileDestination {
448 async fn export_data(
449 &self,
450 data: &[u8],
451 filename: &str,
452 _content_type: &str,
453 ) -> RragResult<DestinationResult> {
454 let full_path = format!("{}/{}", self.base_path, filename);
455
456 if let Some(parent) = std::path::Path::new(&full_path).parent() {
458 tokio::fs::create_dir_all(parent)
459 .await
460 .map_err(|e| RragError::storage("create_directory", e))?;
461 }
462
463 tokio::fs::write(&full_path, data)
465 .await
466 .map_err(|e| RragError::storage("write_file", e))?;
467
468 Ok(DestinationResult {
469 destination_name: self.name.clone(),
470 status: ExportStatus::Completed,
471 delivered_at: Some(Utc::now()),
472 error_message: None,
473 delivery_info: HashMap::from([
474 ("file_path".to_string(), full_path),
475 ("file_size".to_string(), data.len().to_string()),
476 ]),
477 })
478 }
479
480 fn destination_name(&self) -> &str {
481 &self.name
482 }
483
484 async fn test_connection(&self) -> RragResult<bool> {
485 match tokio::fs::metadata(&self.base_path).await {
487 Ok(metadata) => Ok(metadata.is_dir()),
488 Err(_) => {
489 match tokio::fs::create_dir_all(&self.base_path).await {
491 Ok(_) => Ok(true),
492 Err(_) => Ok(false),
493 }
494 }
495 }
496 }
497}
498
499pub struct WebhookDestination {
501 name: String,
502 url: String,
503 headers: HashMap<String, String>,
504 #[cfg(feature = "http")]
505 client: reqwest::Client,
506}
507
508impl WebhookDestination {
509 pub fn new(name: impl Into<String>, url: impl Into<String>) -> Self {
510 Self {
511 name: name.into(),
512 url: url.into(),
513 headers: HashMap::new(),
514 #[cfg(feature = "http")]
515 client: reqwest::Client::new(),
516 }
517 }
518
519 pub fn with_header(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
520 self.headers.insert(key.into(), value.into());
521 self
522 }
523}
524
525#[async_trait::async_trait]
526impl ExportDestination for WebhookDestination {
527 async fn export_data(
528 &self,
529 data: &[u8],
530 filename: &str,
531 content_type: &str,
532 ) -> RragResult<DestinationResult> {
533 #[cfg(feature = "http")]
534 {
535 let mut request = self
536 .client
537 .post(&self.url)
538 .header("Content-Type", content_type)
539 .header("X-Filename", filename)
540 .body(data.to_vec());
541
542 for (key, value) in &self.headers {
543 request = request.header(key, value);
544 }
545
546 match request.send().await {
547 Ok(response) => {
548 let status_code = response.status().as_u16();
549 if response.status().is_success() {
550 Ok(DestinationResult {
551 destination_name: self.name.clone(),
552 status: ExportStatus::Completed,
553 delivered_at: Some(Utc::now()),
554 error_message: None,
555 delivery_info: HashMap::from([
556 ("status_code".to_string(), status_code.to_string()),
557 ("url".to_string(), self.url.clone()),
558 ]),
559 })
560 } else {
561 Ok(DestinationResult {
562 destination_name: self.name.clone(),
563 status: ExportStatus::Failed,
564 delivered_at: None,
565 error_message: Some(format!(
566 "HTTP {}: {}",
567 status_code,
568 response.status()
569 )),
570 delivery_info: HashMap::from([(
571 "status_code".to_string(),
572 status_code.to_string(),
573 )]),
574 })
575 }
576 }
577 Err(e) => Ok(DestinationResult {
578 destination_name: self.name.clone(),
579 status: ExportStatus::Failed,
580 delivered_at: None,
581 error_message: Some(e.to_string()),
582 delivery_info: HashMap::new(),
583 }),
584 }
585 }
586 #[cfg(not(feature = "http"))]
587 {
588 Ok(DestinationResult {
590 destination_name: self.name.clone(),
591 status: ExportStatus::Failed,
592 delivered_at: None,
593 error_message: Some("HTTP feature not enabled".to_string()),
594 delivery_info: HashMap::from([
595 ("note".to_string(), "HTTP feature disabled".to_string()),
596 ("url".to_string(), self.url.clone()),
597 ]),
598 })
599 }
600 }
601
602 fn destination_name(&self) -> &str {
603 &self.name
604 }
605
606 async fn test_connection(&self) -> RragResult<bool> {
607 #[cfg(feature = "http")]
608 {
609 match self.client.head(&self.url).send().await {
610 Ok(response) => Ok(response.status().is_success()),
611 Err(_) => Ok(false),
612 }
613 }
614 #[cfg(not(feature = "http"))]
615 {
616 Ok(true)
618 }
619 }
620}
621
622pub struct ReportGenerator {
624 templates: Arc<RwLock<HashMap<String, String>>>,
625}
626
627impl ReportGenerator {
628 pub fn new() -> Self {
629 Self {
630 templates: Arc::new(RwLock::new(HashMap::new())),
631 }
632 }
633
634 pub async fn add_template(&self, name: impl Into<String>, template: impl Into<String>) {
635 let mut templates = self.templates.write().await;
636 templates.insert(name.into(), template.into());
637 }
638
639 pub async fn generate_report(
640 &self,
641 config: &ReportConfig,
642 data: &SystemOverview,
643 ) -> RragResult<Vec<u8>> {
644 match config.report_type {
645 ReportType::SystemHealth => self.generate_system_health_report(data).await,
646 ReportType::PerformanceSummary => self.generate_performance_summary_report(data).await,
647 ReportType::UsageAnalytics => self.generate_usage_analytics_report(data).await,
648 _ => self.generate_generic_report(config, data).await,
649 }
650 }
651
652 async fn generate_system_health_report(&self, data: &SystemOverview) -> RragResult<Vec<u8>> {
653 let report = serde_json::json!({
654 "title": "System Health Report",
655 "generated_at": Utc::now(),
656 "data": data,
657 "summary": {
658 "overall_status": "healthy",
659 "components_checked": data.performance_metrics.as_ref().map(|_| 1).unwrap_or(0),
660 "issues_detected": 0
661 }
662 });
663
664 serde_json::to_vec_pretty(&report)
665 .map_err(|e| RragError::agent("report_generator", e.to_string()))
666 }
667
668 async fn generate_performance_summary_report(
669 &self,
670 data: &SystemOverview,
671 ) -> RragResult<Vec<u8>> {
672 let report = serde_json::json!({
673 "title": "Performance Summary Report",
674 "generated_at": Utc::now(),
675 "performance_metrics": data.performance_metrics,
676 "search_stats": data.search_stats,
677 "user_stats": data.user_stats
678 });
679
680 serde_json::to_vec_pretty(&report)
681 .map_err(|e| RragError::agent("report_generator", e.to_string()))
682 }
683
684 async fn generate_usage_analytics_report(&self, data: &SystemOverview) -> RragResult<Vec<u8>> {
685 let report = serde_json::json!({
686 "title": "Usage Analytics Report",
687 "generated_at": Utc::now(),
688 "active_sessions": data.active_sessions,
689 "user_stats": data.user_stats,
690 "search_stats": data.search_stats
691 });
692
693 serde_json::to_vec_pretty(&report)
694 .map_err(|e| RragError::agent("report_generator", e.to_string()))
695 }
696
697 async fn generate_generic_report(
698 &self,
699 config: &ReportConfig,
700 data: &SystemOverview,
701 ) -> RragResult<Vec<u8>> {
702 let report = serde_json::json!({
703 "title": config.name,
704 "description": config.description,
705 "generated_at": Utc::now(),
706 "data": data
707 });
708
709 serde_json::to_vec_pretty(&report)
710 .map_err(|e| RragError::agent("report_generator", e.to_string()))
711 }
712}
713
714pub struct ExportManager {
716 config: ExportConfig,
717 formatters: Arc<RwLock<HashMap<ExportFormat, Box<dyn DataFormatter>>>>,
718 destinations: Arc<RwLock<HashMap<String, Box<dyn ExportDestination>>>>,
719 report_generator: Arc<ReportGenerator>,
720 export_history: Arc<RwLock<Vec<ExportResult>>>,
721 export_queue: mpsc::UnboundedSender<ExportRequest>,
722 _queue_receiver: Arc<RwLock<mpsc::UnboundedReceiver<ExportRequest>>>,
723 processing_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
724 is_running: Arc<RwLock<bool>>,
725}
726
727#[derive(Debug)]
728struct ExportRequest {
729 export_id: String,
730 export_type: ExportType,
731 format: ExportFormat,
732 data: ExportData,
733 destinations: Vec<String>,
734 filters: ExportFilters,
735}
736
737#[derive(Debug)]
738enum ExportData {
739 Metrics(Vec<Metric>),
740 HealthReport(HealthReport),
741 PerformanceReport(PerformanceReport),
742 SystemOverview(SystemOverview),
743}
744
745pub struct MetricsExporter {
747 export_manager: Arc<ExportManager>,
748 metrics_collector: Arc<MetricsCollector>,
749}
750
751impl MetricsExporter {
752 pub fn new(
753 export_manager: Arc<ExportManager>,
754 metrics_collector: Arc<MetricsCollector>,
755 ) -> Self {
756 Self {
757 export_manager,
758 metrics_collector,
759 }
760 }
761
762 pub async fn export_current_metrics(
763 &self,
764 format: ExportFormat,
765 destinations: Vec<String>,
766 ) -> RragResult<ExportResult> {
767 let metrics = self.metrics_collector.get_all_metrics().await;
768 self.export_manager
769 .export_metrics(metrics, format, destinations, ExportFilters::default())
770 .await
771 }
772
773 pub async fn schedule_periodic_export(
774 &self,
775 interval_minutes: u32,
776 format: ExportFormat,
777 destinations: Vec<String>,
778 ) -> RragResult<()> {
779 tracing::info!(
782 "Scheduled metrics export every {} minutes to {:?} destinations in {:?} format",
783 interval_minutes,
784 destinations,
785 format
786 );
787 Ok(())
788 }
789}
790
791impl ExportManager {
792 pub async fn new(config: ExportConfig) -> RragResult<Self> {
793 let formatters: Arc<RwLock<HashMap<ExportFormat, Box<dyn DataFormatter>>>> =
794 Arc::new(RwLock::new(HashMap::new()));
795 let destinations: Arc<RwLock<HashMap<String, Box<dyn ExportDestination>>>> =
796 Arc::new(RwLock::new(HashMap::new()));
797 let report_generator = Arc::new(ReportGenerator::new());
798 let export_history = Arc::new(RwLock::new(Vec::new()));
799
800 let (export_queue, queue_receiver) = mpsc::unbounded_channel();
801
802 {
804 let mut fmt = formatters.write().await;
805 fmt.insert(ExportFormat::Json, Box::new(JsonFormatter));
806 fmt.insert(ExportFormat::Csv, Box::new(CsvFormatter));
807 }
808
809 {
811 let mut dest = destinations.write().await;
812 for dest_config in &config.destinations {
813 if !dest_config.enabled {
814 continue;
815 }
816
817 match dest_config.destination_type {
818 DestinationType::LocalFile => {
819 let base_path = dest_config
820 .config
821 .get("path")
822 .unwrap_or(&config.output_directory);
823 dest.insert(
824 dest_config.name.clone(),
825 Box::new(LocalFileDestination::new(&dest_config.name, base_path)),
826 );
827 }
828 DestinationType::Webhook | DestinationType::HTTP => {
829 if let Some(url) = dest_config.config.get("url") {
830 let mut webhook = WebhookDestination::new(&dest_config.name, url);
831
832 for (key, value) in &dest_config.config {
834 if key.starts_with("header_") {
835 let header_name = key.strip_prefix("header_").unwrap();
836 webhook = webhook.with_header(header_name, value);
837 }
838 }
839
840 dest.insert(dest_config.name.clone(), Box::new(webhook));
841 }
842 }
843 _ => {
844 tracing::warn!(
845 "Destination type {:?} not yet implemented",
846 dest_config.destination_type
847 );
848 }
849 }
850 }
851 }
852
853 Ok(Self {
854 config,
855 formatters,
856 destinations,
857 report_generator,
858 export_history,
859 export_queue,
860 _queue_receiver: Arc::new(RwLock::new(queue_receiver)),
861 processing_handle: Arc::new(RwLock::new(None)),
862 is_running: Arc::new(RwLock::new(false)),
863 })
864 }
865
866 pub async fn start(&self) -> RragResult<()> {
867 let mut running = self.is_running.write().await;
868 if *running {
869 return Err(RragError::config(
870 "export_manager",
871 "stopped",
872 "already running",
873 ));
874 }
875
876 *running = true;
878 tracing::info!("Export manager started");
879 Ok(())
880 }
881
882 pub async fn stop(&self) -> RragResult<()> {
883 let mut running = self.is_running.write().await;
884 if !*running {
885 return Ok(());
886 }
887
888 {
889 let mut handle_guard = self.processing_handle.write().await;
890 if let Some(handle) = handle_guard.take() {
891 handle.abort();
892 }
893 }
894
895 *running = false;
896 tracing::info!("Export manager stopped");
897 Ok(())
898 }
899
900 pub async fn is_healthy(&self) -> bool {
901 *self.is_running.read().await
902 }
903
904 pub async fn export_metrics(
905 &self,
906 metrics: Vec<Metric>,
907 format: ExportFormat,
908 destinations: Vec<String>,
909 filters: ExportFilters,
910 ) -> RragResult<ExportResult> {
911 let export_id = uuid::Uuid::new_v4().to_string();
912 let started_at = Utc::now();
913
914 let filtered_metrics = self.apply_metric_filters(metrics, &filters);
916
917 let formatters = self.formatters.read().await;
919 let formatter = formatters.get(&format).ok_or_else(|| {
920 RragError::config("export_format", "supported", &format!("{:?}", format))
921 })?;
922
923 let formatted_data = formatter.format_metrics(&filtered_metrics).await?;
924 let filename = format!(
928 "metrics_{}.{}",
929 started_at.format("%Y%m%d_%H%M%S"),
930 match format {
931 ExportFormat::Json => "json",
932 ExportFormat::Csv => "csv",
933 _ => "data",
934 }
935 );
936
937 let destinations_map = self.destinations.read().await;
939 let mut destination_results = Vec::new();
940
941 for dest_name in destinations {
942 if let Some(destination) = destinations_map.get(&dest_name) {
943 let result = destination
944 .export_data(&formatted_data, &filename, formatter.content_type())
945 .await?;
946 destination_results.push(result);
947 }
948 }
949
950 let export_result = ExportResult {
951 export_id: export_id.clone(),
952 export_type: ExportType::Metrics,
953 format,
954 file_path: Some(filename),
955 file_size_bytes: formatted_data.len() as u64,
956 record_count: filtered_metrics.len(),
957 started_at,
958 completed_at: Some(Utc::now()),
959 status: if destination_results
960 .iter()
961 .all(|r| r.status == ExportStatus::Completed)
962 {
963 ExportStatus::Completed
964 } else {
965 ExportStatus::PartiallyCompleted
966 },
967 error_message: None,
968 destinations: destination_results,
969 };
970
971 let mut history = self.export_history.write().await;
973 history.push(export_result.clone());
974
975 let history_len = history.len();
977 if history_len > 1000 {
978 history.drain(0..history_len - 1000);
979 }
980
981 Ok(export_result)
982 }
983
984 fn apply_metric_filters(&self, metrics: Vec<Metric>, filters: &ExportFilters) -> Vec<Metric> {
985 metrics
986 .into_iter()
987 .filter(|metric| {
988 if let Some(ref time_range) = filters.time_range {
990 if metric.timestamp < time_range.start || metric.timestamp > time_range.end {
991 return false;
992 }
993 }
994
995 if !filters.components.is_empty() {
997 if let Some(component) = metric.labels.get("component") {
998 if !filters.components.contains(component) {
999 return false;
1000 }
1001 }
1002 }
1003
1004 true
1005 })
1006 .collect()
1007 }
1008
1009 pub async fn generate_and_export_report(
1010 &self,
1011 config: ReportConfig,
1012 data: SystemOverview,
1013 destinations: Vec<String>,
1014 ) -> RragResult<ExportResult> {
1015 let export_id = uuid::Uuid::new_v4().to_string();
1016 let started_at = Utc::now();
1017
1018 let report_data = self
1020 .report_generator
1021 .generate_report(&config, &data)
1022 .await?;
1023
1024 let filename = format!(
1026 "report_{}_{}.{}",
1027 config.name.replace(' ', "_").to_lowercase(),
1028 started_at.format("%Y%m%d_%H%M%S"),
1029 match config.output_format {
1030 ExportFormat::Json => "json",
1031 ExportFormat::Csv => "csv",
1032 ExportFormat::Pdf => "pdf",
1033 _ => "report",
1034 }
1035 );
1036
1037 let destinations_map = self.destinations.read().await;
1039 let mut destination_results = Vec::new();
1040
1041 for dest_name in destinations {
1042 if let Some(destination) = destinations_map.get(&dest_name) {
1043 let result = destination
1044 .export_data(
1045 &report_data,
1046 &filename,
1047 "application/json", )
1049 .await?;
1050 destination_results.push(result);
1051 }
1052 }
1053
1054 let export_result = ExportResult {
1055 export_id,
1056 export_type: ExportType::CustomReport,
1057 format: config.output_format,
1058 file_path: Some(filename),
1059 file_size_bytes: report_data.len() as u64,
1060 record_count: 1,
1061 started_at,
1062 completed_at: Some(Utc::now()),
1063 status: if destination_results
1064 .iter()
1065 .all(|r| r.status == ExportStatus::Completed)
1066 {
1067 ExportStatus::Completed
1068 } else {
1069 ExportStatus::PartiallyCompleted
1070 },
1071 error_message: None,
1072 destinations: destination_results,
1073 };
1074
1075 let mut history = self.export_history.write().await;
1077 history.push(export_result.clone());
1078
1079 Ok(export_result)
1080 }
1081
1082 pub async fn get_export_history(&self, limit: Option<usize>) -> Vec<ExportResult> {
1083 let history = self.export_history.read().await;
1084 let limit = limit.unwrap_or(history.len());
1085 let start_index = history.len().saturating_sub(limit);
1086 history[start_index..].to_vec()
1087 }
1088
1089 pub async fn get_export_status(&self, export_id: &str) -> Option<ExportResult> {
1090 let history = self.export_history.read().await;
1091 history.iter().find(|r| r.export_id == export_id).cloned()
1092 }
1093
1094 pub async fn test_destination(&self, destination_name: &str) -> RragResult<bool> {
1095 let destinations = self.destinations.read().await;
1096 if let Some(destination) = destinations.get(destination_name) {
1097 destination.test_connection().await
1098 } else {
1099 Err(RragError::config("destination", "exists", "not_found"))
1100 }
1101 }
1102
1103 pub async fn add_destination(&self, name: String, destination: Box<dyn ExportDestination>) {
1104 let mut destinations = self.destinations.write().await;
1105 destinations.insert(name, destination);
1106 }
1107
1108 pub async fn remove_destination(&self, name: &str) {
1109 let mut destinations = self.destinations.write().await;
1110 destinations.remove(name);
1111 }
1112
1113 pub async fn list_destinations(&self) -> Vec<String> {
1114 let destinations = self.destinations.read().await;
1115 destinations.keys().cloned().collect()
1116 }
1117
1118 pub async fn get_export_stats(&self) -> ExportStats {
1119 let history = self.export_history.read().await;
1120
1121 let total_exports = history.len();
1122 let successful_exports = history
1123 .iter()
1124 .filter(|r| r.status == ExportStatus::Completed)
1125 .count();
1126 let failed_exports = history
1127 .iter()
1128 .filter(|r| r.status == ExportStatus::Failed)
1129 .count();
1130
1131 let total_data_exported = history.iter().map(|r| r.file_size_bytes).sum::<u64>();
1132
1133 let exports_by_type = history.iter().fold(HashMap::new(), |mut acc, result| {
1134 *acc.entry(result.export_type.clone()).or_insert(0) += 1;
1135 acc
1136 });
1137
1138 let exports_by_format = history.iter().fold(HashMap::new(), |mut acc, result| {
1139 *acc.entry(result.format.clone()).or_insert(0) += 1;
1140 acc
1141 });
1142
1143 ExportStats {
1144 total_exports,
1145 successful_exports,
1146 failed_exports,
1147 total_data_exported_bytes: total_data_exported,
1148 exports_by_type,
1149 exports_by_format,
1150 last_export: history.last().map(|r| r.started_at),
1151 }
1152 }
1153}
1154
1155#[derive(Debug, Clone, Serialize, Deserialize)]
1156pub struct ExportStats {
1157 pub total_exports: usize,
1158 pub successful_exports: usize,
1159 pub failed_exports: usize,
1160 pub total_data_exported_bytes: u64,
1161 pub exports_by_type: HashMap<ExportType, usize>,
1162 pub exports_by_format: HashMap<ExportFormat, usize>,
1163 pub last_export: Option<DateTime<Utc>>,
1164}
1165
1166#[cfg(test)]
1167mod tests {
1168 use super::*;
1169
1170 #[tokio::test]
1171 async fn test_json_formatter() {
1172 let formatter = JsonFormatter;
1173 let metrics = vec![
1174 Metric::counter("test_counter", 42),
1175 Metric::gauge("test_gauge", 3.14),
1176 ];
1177
1178 let result = formatter.format_metrics(&metrics).await.unwrap();
1179 let json_str = String::from_utf8(result).unwrap();
1180
1181 assert!(json_str.contains("test_counter"));
1182 assert!(json_str.contains("test_gauge"));
1183 assert_eq!(formatter.content_type(), "application/json");
1184 assert_eq!(formatter.file_extension(), "json");
1185 }
1186
1187 #[tokio::test]
1188 async fn test_csv_formatter() {
1189 let formatter = CsvFormatter;
1190 let metrics = vec![Metric::counter("requests_total", 100).with_label("method", "GET")];
1191
1192 let result = formatter.format_metrics(&metrics).await.unwrap();
1193 let csv_str = String::from_utf8(result).unwrap();
1194
1195 assert!(csv_str.contains("timestamp,name,type,value,labels"));
1196 assert!(csv_str.contains("requests_total"));
1197 assert!(csv_str.contains("Counter"));
1198 assert_eq!(formatter.content_type(), "text/csv");
1199 assert_eq!(formatter.file_extension(), "csv");
1200 }
1201
1202 #[tokio::test]
1203 async fn test_local_file_destination() {
1204 let temp_dir = tempfile::tempdir().unwrap();
1205 let destination =
1206 LocalFileDestination::new("test_local", temp_dir.path().to_string_lossy().to_string());
1207
1208 assert!(destination.test_connection().await.unwrap());
1209
1210 let test_data = b"test export data";
1211 let result = destination
1212 .export_data(test_data, "test.json", "application/json")
1213 .await
1214 .unwrap();
1215
1216 assert_eq!(result.status, ExportStatus::Completed);
1217 assert_eq!(result.destination_name, "test_local");
1218 assert!(result.delivered_at.is_some());
1219 }
1220
1221 #[tokio::test]
1222 async fn test_export_manager() {
1223 let config = ExportConfig {
1224 output_directory: tempfile::tempdir()
1225 .unwrap()
1226 .path()
1227 .to_string_lossy()
1228 .to_string(),
1229 destinations: vec![ExportDestinationConfig {
1230 name: "local_test".to_string(),
1231 destination_type: DestinationType::LocalFile,
1232 config: HashMap::new(),
1233 enabled: true,
1234 }],
1235 ..Default::default()
1236 };
1237
1238 let mut manager = ExportManager::new(config).await.unwrap();
1239 manager.start().await.unwrap();
1240
1241 let metrics = vec![
1242 Metric::counter("test_metric", 123),
1243 Metric::gauge("test_gauge", 45.6),
1244 ];
1245
1246 let result = manager
1247 .export_metrics(
1248 metrics,
1249 ExportFormat::Json,
1250 vec!["local_test".to_string()],
1251 ExportFilters::default(),
1252 )
1253 .await
1254 .unwrap();
1255
1256 assert_eq!(result.export_type, ExportType::Metrics);
1257 assert_eq!(result.format, ExportFormat::Json);
1258 assert_eq!(result.status, ExportStatus::Completed);
1259 assert_eq!(result.record_count, 2);
1260 assert!(!result.destinations.is_empty());
1261
1262 let history = manager.get_export_history(Some(10)).await;
1263 assert_eq!(history.len(), 1);
1264
1265 let stats = manager.get_export_stats().await;
1266 assert_eq!(stats.total_exports, 1);
1267 assert_eq!(stats.successful_exports, 1);
1268
1269 manager.stop().await.unwrap();
1270 }
1271
1272 #[tokio::test]
1273 async fn test_report_generator() {
1274 let generator = ReportGenerator::new();
1275 let overview = SystemOverview {
1276 timestamp: Utc::now(),
1277 performance_metrics: None,
1278 search_stats: None,
1279 user_stats: None,
1280 active_sessions: Some(10),
1281 };
1282
1283 let config = ReportConfig {
1284 name: "Test Report".to_string(),
1285 description: "A test report".to_string(),
1286 report_type: ReportType::SystemHealth,
1287 template: None,
1288 parameters: HashMap::new(),
1289 output_format: ExportFormat::Json,
1290 include_charts: false,
1291 chart_config: ChartConfig::default(),
1292 };
1293
1294 let report_data = generator.generate_report(&config, &overview).await.unwrap();
1295 let report_str = String::from_utf8(report_data).unwrap();
1296
1297 assert!(report_str.contains("System Health Report"));
1298 assert!(report_str.contains("generated_at"));
1299 }
1300
1301 #[test]
1302 fn test_export_filters() {
1303 let filters = ExportFilters {
1304 time_range: Some(TimeRange {
1305 start: Utc::now() - Duration::hours(1),
1306 end: Utc::now(),
1307 }),
1308 components: vec!["search".to_string(), "storage".to_string()],
1309 ..Default::default()
1310 };
1311
1312 assert!(filters.time_range.is_some());
1313 assert_eq!(filters.components.len(), 2);
1314 assert!(filters.components.contains(&"search".to_string()));
1315 }
1316
1317 #[test]
1318 fn test_export_status() {
1319 assert_eq!(ExportStatus::Completed, ExportStatus::Completed);
1320 assert_ne!(ExportStatus::Completed, ExportStatus::Failed);
1321 }
1322}