ant_quic/monitoring/
export.rs

1//! Export Management System
2//!
3//! This module implements centralized export management for monitoring data
4//! to various external systems with data transformation and delivery guarantees.
5
6use std::{
7    collections::HashMap,
8    sync::Arc,
9    time::{Duration, SystemTime},
10};
11
12use tokio::sync::{RwLock, Mutex};
13use tracing::{debug, info, warn};
14use serde::{Serialize, Deserialize};
15
16use crate::monitoring::MonitoringError;
17
18/// Export manager for coordinating data exports
19pub struct ExportManager {
20    /// Export configuration
21    config: ExportConfig,
22    /// Data transformers
23    transformers: Arc<DataTransformers>,
24    /// Export schedulers
25    schedulers: Arc<RwLock<HashMap<String, ExportScheduler>>>,
26    /// Delivery manager
27    delivery_manager: Arc<DeliveryManager>,
28    /// Export state
29    state: Arc<RwLock<ExportState>>,
30    /// Background tasks
31    tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>
32}
33
34impl ExportManager {
35    /// Create new export manager
36    pub async fn new(config: ExportConfig) -> Result<Self, MonitoringError> {
37        let transformers = Arc::new(DataTransformers::new());
38        let schedulers = Arc::new(RwLock::new(HashMap::new()));
39        let delivery_manager = Arc::new(DeliveryManager::new(config.delivery.clone()));
40        let state = Arc::new(RwLock::new(ExportState::new()));
41
42        Ok(Self {
43            config,
44            transformers,
45            schedulers,
46            delivery_manager,
47            state,
48            tasks: Arc::new(Mutex::new(Vec::new())),
49        })
50    }
51
52    /// Start export manager
53    pub async fn start(&self) -> Result<(), MonitoringError> {
54        info!("Starting export manager");
55
56        // Initialize schedulers for each destination
57        self.initialize_schedulers().await?;
58
59        // Start background tasks
60        self.start_export_coordination_task().await?;
61        self.start_health_monitoring_task().await?;
62
63        // Update state
64        {
65            let mut state = self.state.write().await;
66            state.status = ExportStatus::Running;
67            state.start_time = Some(SystemTime::now());
68        }
69
70        info!("Export manager started");
71        Ok(())
72    }
73
74    /// Stop export manager
75    pub async fn stop(&self) -> Result<(), MonitoringError> {
76        info!("Stopping export manager");
77
78        // Update state
79        {
80            let mut state = self.state.write().await;
81            state.status = ExportStatus::Stopping;
82        }
83
84        // Stop background tasks
85        let mut tasks = self.tasks.lock().await;
86        for task in tasks.drain(..) {
87            task.abort();
88        }
89
90        // Flush remaining data
91        self.flush_all_exports().await?;
92
93        // Update state
94        {
95            let mut state = self.state.write().await;
96            state.status = ExportStatus::Stopped;
97            state.stop_time = Some(SystemTime::now());
98        }
99
100        info!("Export manager stopped");
101        Ok(())
102    }
103
104    /// Get export manager status
105    pub async fn get_status(&self) -> String {
106        let state = self.state.read().await;
107        format!("{:?}", state.status)
108    }
109
110    /// Export metrics data
111    pub async fn export_metrics(&self, data: ExportData) -> Result<(), MonitoringError> {
112        // Transform data for each configured destination
113        for destination in &self.config.destinations {
114            if let Ok(transformed_data) = self.transformers.transform_for_destination(&data, destination).await {
115                self.delivery_manager.schedule_delivery(destination.clone(), transformed_data).await?;
116            }
117        }
118
119        Ok(())
120    }
121
122    /// Initialize export schedulers
123    async fn initialize_schedulers(&self) -> Result<(), MonitoringError> {
124        let mut schedulers = self.schedulers.write().await;
125        
126        for destination in &self.config.destinations {
127            let scheduler = ExportScheduler::new(destination.clone(), self.config.scheduling.clone());
128            schedulers.insert(destination.id().to_string(), scheduler);
129        }
130
131        info!("Initialized {} export schedulers", schedulers.len());
132        Ok(())
133    }
134
135    /// Start export coordination task
136    async fn start_export_coordination_task(&self) -> Result<(), MonitoringError> {
137        let delivery_manager = self.delivery_manager.clone();
138        let config = self.config.clone();
139
140        let task = tokio::spawn(async move {
141            let mut interval = tokio::time::interval(config.coordination_interval);
142
143            loop {
144                interval.tick().await;
145
146                if let Err(e) = delivery_manager.coordinate_deliveries().await {
147                    warn!("Export coordination failed: {}", e);
148                }
149            }
150        });
151
152        self.tasks.lock().await.push(task);
153        Ok(())
154    }
155
156    /// Start health monitoring task
157    async fn start_health_monitoring_task(&self) -> Result<(), MonitoringError> {
158        let state = self.state.clone();
159        let delivery_manager = self.delivery_manager.clone();
160
161        let task = tokio::spawn(async move {
162            let mut interval = tokio::time::interval(Duration::from_secs(30));
163
164            loop {
165                interval.tick().await;
166
167                let health = delivery_manager.get_health_status().await;
168                
169                let mut export_state = state.write().await;
170                export_state.last_health_check = Some(SystemTime::now());
171                export_state.exports_completed += health.successful_exports;
172                export_state.export_errors += health.failed_exports;
173            }
174        });
175
176        self.tasks.lock().await.push(task);
177        Ok(())
178    }
179
180    /// Flush all pending exports
181    async fn flush_all_exports(&self) -> Result<(), MonitoringError> {
182        self.delivery_manager.flush_all().await
183    }
184}
185
186/// Export configuration
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct ExportConfig {
189    /// Export destinations
190    pub destinations: Vec<ExportDestination>,
191    /// Scheduling configuration
192    pub scheduling: SchedulingConfig,
193    /// Delivery configuration
194    pub delivery: DeliveryConfig,
195    /// Export coordination interval
196    pub coordination_interval: Duration,
197    /// Data retention settings
198    pub retention: RetentionConfig,
199}
200
201impl Default for ExportConfig {
202    fn default() -> Self {
203        Self {
204            destinations: vec![
205                ExportDestination::File {
206                    id: "local-file".to_string(),
207                    path: "/tmp/ant-quic-metrics.json".to_string(),
208                    format: FileFormat::JSON,
209                }
210            ],
211            scheduling: SchedulingConfig::default(),
212            delivery: DeliveryConfig::default(),
213            coordination_interval: Duration::from_secs(60),
214            retention: RetentionConfig::default(),
215        }
216    }
217}
218
219/// Export destinations
220#[derive(Debug, Clone, Serialize, Deserialize)]
221pub enum ExportDestination {
222    File {
223        id: String,
224        path: String,
225        format: FileFormat,
226    },
227    HTTP {
228        id: String,
229        endpoint: String,
230        headers: HashMap<String, String>,
231        auth: Option<AuthConfig>,
232    },
233    S3 {
234        id: String,
235        bucket: String,
236        region: String,
237        prefix: String,
238    },
239    Database {
240        id: String,
241        connection_string: String,
242        table: String,
243        schema: String,
244    },
245    Kafka {
246        id: String,
247        brokers: Vec<String>,
248        topic: String,
249        partition_key: Option<String>,
250    },
251}
252
253impl ExportDestination {
254    pub fn id(&self) -> &str {
255        match self {
256            ExportDestination::File { id, .. } => id,
257            ExportDestination::HTTP { id, .. } => id,
258            ExportDestination::S3 { id, .. } => id,
259            ExportDestination::Database { id, .. } => id,
260            ExportDestination::Kafka { id, .. } => id,
261        }
262    }
263}
264
265/// File formats for export
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub enum FileFormat {
268    JSON,
269    CSV,
270    Parquet,
271    Avro,
272}
273
274/// Authentication configuration
275#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct AuthConfig {
277    pub auth_type: AuthType,
278    pub credentials: HashMap<String, String>,
279}
280
281/// Authentication types
282#[derive(Debug, Clone, Serialize, Deserialize)]
283pub enum AuthType {
284    Bearer,
285    Basic,
286    ApiKey,
287    OAuth2,
288}
289
290/// Scheduling configuration
291#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct SchedulingConfig {
293    /// Export interval
294    pub interval: Duration,
295    /// Batch size for exports
296    pub batch_size: usize,
297    /// Maximum delay before forced export
298    pub max_delay: Duration,
299    /// Enable intelligent scheduling
300    pub intelligent_scheduling: bool,
301}
302
303impl Default for SchedulingConfig {
304    fn default() -> Self {
305        Self {
306            interval: Duration::from_secs(300), // 5 minutes
307            batch_size: 1000,
308            max_delay: Duration::from_secs(600), // 10 minutes
309            intelligent_scheduling: true,
310        }
311    }
312}
313
314/// Delivery configuration
315#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct DeliveryConfig {
317    /// Maximum retry attempts
318    pub max_retries: u32,
319    /// Initial retry delay
320    pub initial_retry_delay: Duration,
321    /// Maximum retry delay
322    pub max_retry_delay: Duration,
323    /// Delivery timeout
324    pub delivery_timeout: Duration,
325    /// Enable compression
326    pub compression: bool,
327}
328
329impl Default for DeliveryConfig {
330    fn default() -> Self {
331        Self {
332            max_retries: 3,
333            initial_retry_delay: Duration::from_secs(1),
334            max_retry_delay: Duration::from_secs(60),
335            delivery_timeout: Duration::from_secs(30),
336            compression: true,
337        }
338    }
339}
340
341/// Data retention configuration
342#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct RetentionConfig {
344    /// How long to keep export data locally
345    pub local_retention: Duration,
346    /// How long to keep delivery receipts
347    pub receipt_retention: Duration,
348    /// Enable automatic cleanup
349    pub auto_cleanup: bool,
350}
351
352impl Default for RetentionConfig {
353    fn default() -> Self {
354        Self {
355            local_retention: Duration::from_secs(3600 * 24), // 24 hours
356            receipt_retention: Duration::from_secs(3600 * 24 * 7), // 7 days
357            auto_cleanup: true,
358        }
359    }
360}
361
362/// Data to be exported
363#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct ExportData {
365    /// Data type identifier
366    pub data_type: String,
367    /// Timestamp of data
368    pub timestamp: SystemTime,
369    /// Actual data payload
370    pub payload: serde_json::Value,
371    /// Metadata
372    pub metadata: HashMap<String, String>,
373}
374
375/// Export scheduler for managing export timing
376struct ExportScheduler {
377    destination: ExportDestination,
378    config: SchedulingConfig,
379    last_export: Option<SystemTime>,
380    pending_data: Vec<ExportData>,
381}
382
383impl ExportScheduler {
384    fn new(destination: ExportDestination, config: SchedulingConfig) -> Self {
385        Self {
386            destination,
387            config,
388            last_export: None,
389            pending_data: Vec::new(),
390        }
391    }
392
393    fn should_export(&self) -> bool {
394        if self.pending_data.len() >= self.config.batch_size {
395            return true;
396        }
397
398        if let Some(last_export) = self.last_export {
399            let elapsed = last_export.elapsed().unwrap_or_default();
400            if elapsed >= self.config.interval {
401                return true;
402            }
403            if elapsed >= self.config.max_delay && !self.pending_data.is_empty() {
404                return true;
405            }
406        } else if !self.pending_data.is_empty() {
407            return true;
408        }
409
410        false
411    }
412
413    fn add_data(&mut self, data: ExportData) {
414        self.pending_data.push(data);
415    }
416
417    fn take_pending_data(&mut self) -> Vec<ExportData> {
418        let data = self.pending_data.clone();
419        self.pending_data.clear();
420        self.last_export = Some(SystemTime::now());
421        data
422    }
423}
424
425/// Data transformers for different export formats
426struct DataTransformers;
427
428impl DataTransformers {
429    fn new() -> Self {
430        Self
431    }
432
433    async fn transform_for_destination(
434        &self,
435        data: &ExportData,
436        destination: &ExportDestination,
437    ) -> Result<TransformedData, MonitoringError> {
438        match destination {
439            ExportDestination::File { format, .. } => {
440                self.transform_for_file_format(data, format).await
441            }
442            ExportDestination::HTTP { .. } => {
443                self.transform_for_http(data).await
444            }
445            ExportDestination::S3 { .. } => {
446                self.transform_for_s3(data).await
447            }
448            ExportDestination::Database { schema, .. } => {
449                self.transform_for_database(data, schema).await
450            }
451            ExportDestination::Kafka { .. } => {
452                self.transform_for_kafka(data).await
453            }
454        }
455    }
456
457    async fn transform_for_file_format(
458        &self,
459        data: &ExportData,
460        format: &FileFormat,
461    ) -> Result<TransformedData, MonitoringError> {
462        let content = match format {
463            FileFormat::JSON => serde_json::to_string(&data.payload)
464                .map_err(|e| MonitoringError::ExportError(format!("JSON serialization failed: {}", e)))?,
465            FileFormat::CSV => {
466                // Convert JSON to CSV format
467                format!("timestamp,data_type,payload\n{:?},{},{}", 
468                    data.timestamp, data.data_type, data.payload)
469            }
470            FileFormat::Parquet | FileFormat::Avro => {
471                // Would implement binary format serialization
472                return Err(MonitoringError::ExportError("Binary formats not yet implemented".to_string()));
473            }
474        };
475
476        Ok(TransformedData {
477            content: content.into_bytes(),
478            content_type: format.content_type().to_string(),
479            metadata: data.metadata.clone(),
480        })
481    }
482
483    async fn transform_for_http(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
484        let content = serde_json::to_string(&data.payload)
485            .map_err(|e| MonitoringError::ExportError(format!("HTTP JSON serialization failed: {}", e)))?;
486
487        Ok(TransformedData {
488            content: content.into_bytes(),
489            content_type: "application/json".to_string(),
490            metadata: data.metadata.clone(),
491        })
492    }
493
494    async fn transform_for_s3(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
495        // S3 typically uses JSON format
496        self.transform_for_http(data).await
497    }
498
499    async fn transform_for_database(
500        &self,
501        data: &ExportData,
502        _schema: &str,
503    ) -> Result<TransformedData, MonitoringError> {
504        // Would transform to SQL INSERT statements or prepared statement format
505        let content = format!(
506            "INSERT INTO monitoring_data (timestamp, data_type, payload, metadata) VALUES (?, ?, ?, ?)"
507        );
508
509        Ok(TransformedData {
510            content: content.into_bytes(),
511            content_type: "application/sql".to_string(),
512            metadata: data.metadata.clone(),
513        })
514    }
515
516    async fn transform_for_kafka(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
517        // Kafka typically uses JSON or Avro
518        self.transform_for_http(data).await
519    }
520}
521
522impl FileFormat {
523    fn content_type(&self) -> &str {
524        match self {
525            FileFormat::JSON => "application/json",
526            FileFormat::CSV => "text/csv",
527            FileFormat::Parquet => "application/octet-stream",
528            FileFormat::Avro => "application/octet-stream",
529        }
530    }
531}
532
533/// Transformed data ready for export
534#[derive(Debug)]
535struct TransformedData {
536    content: Vec<u8>,
537    content_type: String,
538    metadata: HashMap<String, String>,
539}
540
541/// Delivery manager for ensuring data delivery
542struct DeliveryManager {
543    config: DeliveryConfig,
544    pending_deliveries: Arc<Mutex<Vec<PendingDelivery>>>,
545    delivery_receipts: Arc<Mutex<Vec<DeliveryReceipt>>>,
546}
547
548impl DeliveryManager {
549    fn new(config: DeliveryConfig) -> Self {
550        Self {
551            config,
552            pending_deliveries: Arc::new(Mutex::new(Vec::new())),
553            delivery_receipts: Arc::new(Mutex::new(Vec::new())),
554        }
555    }
556
557    async fn schedule_delivery(
558        &self,
559        destination: ExportDestination,
560        data: TransformedData,
561    ) -> Result<(), MonitoringError> {
562        let delivery = PendingDelivery {
563            id: uuid::Uuid::new_v4().to_string(),
564            destination,
565            data,
566            scheduled_time: SystemTime::now(),
567            retry_count: 0,
568            last_attempt: None,
569        };
570
571        let mut pending = self.pending_deliveries.lock().await;
572        pending.push(delivery);
573
574        Ok(())
575    }
576
577    async fn coordinate_deliveries(&self) -> Result<(), MonitoringError> {
578        let mut pending = self.pending_deliveries.lock().await;
579        let mut completed = Vec::new();
580
581        for (index, delivery) in pending.iter_mut().enumerate() {
582            if self.should_attempt_delivery(delivery) {
583                match self.attempt_delivery(delivery).await {
584                    Ok(receipt) => {
585                        let mut receipts = self.delivery_receipts.lock().await;
586                        receipts.push(receipt);
587                        completed.push(index);
588                    }
589                    Err(e) => {
590                        delivery.retry_count += 1;
591                        delivery.last_attempt = Some(SystemTime::now());
592                        
593                        if delivery.retry_count >= self.config.max_retries {
594                            warn!("Delivery {} failed after {} retries: {}", delivery.id, delivery.retry_count, e);
595                            completed.push(index);
596                        }
597                    }
598                }
599            }
600        }
601
602        // Remove completed deliveries in reverse order to maintain indices
603        for &index in completed.iter().rev() {
604            pending.remove(index);
605        }
606
607        Ok(())
608    }
609
610    fn should_attempt_delivery(&self, delivery: &PendingDelivery) -> bool {
611        if delivery.last_attempt.is_none() {
612            return true;
613        }
614
615        if let Some(last_attempt) = delivery.last_attempt {
616            let retry_delay = self.calculate_retry_delay(delivery.retry_count);
617            last_attempt.elapsed().unwrap_or_default() >= retry_delay
618        } else {
619            true
620        }
621    }
622
623    async fn attempt_delivery(&self, delivery: &PendingDelivery) -> Result<DeliveryReceipt, MonitoringError> {
624        debug!("Attempting delivery {} to {:?}", delivery.id, delivery.destination.id());
625
626        // Simulate delivery attempt
627        // In real implementation, would handle each destination type
628        match &delivery.destination {
629            ExportDestination::File { path, .. } => {
630                std::fs::write(path, &delivery.data.content)
631                    .map_err(|e| MonitoringError::ExportError(format!("File write failed: {}", e)))?;
632            }
633            ExportDestination::HTTP { endpoint, .. } => {
634                // Would make HTTP request
635                debug!("Would send HTTP request to {}", endpoint);
636            }
637            _ => {
638                debug!("Delivery type not yet implemented");
639            }
640        }
641
642        Ok(DeliveryReceipt {
643            delivery_id: delivery.id.clone(),
644            destination_id: delivery.destination.id().to_string(),
645            timestamp: SystemTime::now(),
646            status: DeliveryStatus::Success,
647            bytes_sent: delivery.data.content.len(),
648            response_time: Duration::from_millis(100), // Mock response time
649        })
650    }
651
652    fn calculate_retry_delay(&self, retry_count: u32) -> Duration {
653        let base_delay = self.config.initial_retry_delay;
654        let exponential_delay = base_delay * 2_u32.pow(retry_count);
655        std::cmp::min(Duration::from_millis(exponential_delay.as_millis() as u64), self.config.max_retry_delay)
656    }
657
658    async fn get_health_status(&self) -> DeliveryHealth {
659        let receipts = self.delivery_receipts.lock().await;
660        let recent_receipts: Vec<_> = receipts.iter()
661            .filter(|r| r.timestamp.elapsed().unwrap_or_default() < Duration::from_secs(3600))
662            .collect();
663
664        let successful_exports = recent_receipts.iter()
665            .filter(|r| matches!(r.status, DeliveryStatus::Success))
666            .count() as u64;
667
668        let failed_exports = recent_receipts.len() as u64 - successful_exports;
669
670        DeliveryHealth {
671            successful_exports,
672            failed_exports,
673            pending_deliveries: self.pending_deliveries.lock().await.len() as u64,
674            avg_response_time: if !recent_receipts.is_empty() {
675                recent_receipts.iter()
676                    .map(|r| r.response_time.as_millis())
677                    .sum::<u128>() / recent_receipts.len() as u128
678            } else {
679                0
680            },
681        }
682    }
683
684    async fn flush_all(&self) -> Result<(), MonitoringError> {
685        // Force delivery of all pending items
686        self.coordinate_deliveries().await?;
687        
688        // Wait for any remaining deliveries
689        let pending_count = self.pending_deliveries.lock().await.len();
690        if pending_count > 0 {
691            warn!("Flushing with {} pending deliveries", pending_count);
692        }
693
694        Ok(())
695    }
696}
697
698/// Pending delivery information
699#[derive(Debug)]
700struct PendingDelivery {
701    id: String,
702    destination: ExportDestination,
703    data: TransformedData,
704    scheduled_time: SystemTime,
705    retry_count: u32,
706    last_attempt: Option<SystemTime>,
707}
708
709/// Delivery receipt
710#[derive(Debug)]
711struct DeliveryReceipt {
712    delivery_id: String,
713    destination_id: String,
714    timestamp: SystemTime,
715    status: DeliveryStatus,
716    bytes_sent: usize,
717    response_time: Duration,
718}
719
720/// Delivery status
721#[derive(Debug)]
722enum DeliveryStatus {
723    Success,
724    Failure,
725    Retry,
726}
727
728/// Delivery health metrics
729#[derive(Debug)]
730struct DeliveryHealth {
731    successful_exports: u64,
732    failed_exports: u64,
733    pending_deliveries: u64,
734    avg_response_time: u128,
735}
736
737/// Export manager state
738#[derive(Debug)]
739struct ExportState {
740    status: ExportStatus,
741    start_time: Option<SystemTime>,
742    stop_time: Option<SystemTime>,
743    exports_completed: u64,
744    export_errors: u64,
745    last_health_check: Option<SystemTime>,
746}
747
748impl ExportState {
749    fn new() -> Self {
750        Self {
751            status: ExportStatus::Stopped,
752            start_time: None,
753            stop_time: None,
754            exports_completed: 0,
755            export_errors: 0,
756            last_health_check: None,
757        }
758    }
759}
760
761/// Export manager status
762#[derive(Debug, Clone)]
763enum ExportStatus {
764    Stopped,
765    Starting,
766    Running,
767    Stopping,
768    Error,
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774
775    #[tokio::test]
776    async fn test_export_manager_creation() {
777        let config = ExportConfig::default();
778        let manager = ExportManager::new(config).await.unwrap();
779        
780        let status = manager.get_status().await;
781        assert!(status.contains("Stopped"));
782    }
783
784    #[tokio::test]
785    async fn test_data_transformation() {
786        let transformers = DataTransformers::new();
787        
788        let data = ExportData {
789            data_type: "test".to_string(),
790            timestamp: SystemTime::now(),
791            payload: serde_json::json!({"key": "value"}),
792            metadata: HashMap::new(),
793        };
794
795        let destination = ExportDestination::File {
796            id: "test".to_string(),
797            path: "/tmp/test".to_string(),
798            format: FileFormat::JSON,
799        };
800
801        let transformed = transformers.transform_for_destination(&data, &destination).await.unwrap();
802        assert_eq!(transformed.content_type, "application/json");
803    }
804
805    #[test]
806    fn test_export_scheduler() {
807        let destination = ExportDestination::File {
808            id: "test".to_string(),
809            path: "/tmp/test".to_string(),
810            format: FileFormat::JSON,
811        };
812        let config = SchedulingConfig::default();
813        let scheduler = ExportScheduler::new(destination, config);
814
815        // Empty scheduler should not export
816        assert!(!scheduler.should_export());
817    }
818}