1use std::{
7 collections::HashMap,
8 sync::Arc,
9 time::{Duration, SystemTime},
10};
11
12use tokio::sync::{RwLock, Mutex};
13use tracing::{debug, info, warn};
14use serde::{Serialize, Deserialize};
15
16use crate::monitoring::MonitoringError;
17
18pub struct ExportManager {
20 config: ExportConfig,
22 transformers: Arc<DataTransformers>,
24 schedulers: Arc<RwLock<HashMap<String, ExportScheduler>>>,
26 delivery_manager: Arc<DeliveryManager>,
28 state: Arc<RwLock<ExportState>>,
30 tasks: Arc<Mutex<Vec<tokio::task::JoinHandle<()>>>>
32}
33
34impl ExportManager {
35 pub async fn new(config: ExportConfig) -> Result<Self, MonitoringError> {
37 let transformers = Arc::new(DataTransformers::new());
38 let schedulers = Arc::new(RwLock::new(HashMap::new()));
39 let delivery_manager = Arc::new(DeliveryManager::new(config.delivery.clone()));
40 let state = Arc::new(RwLock::new(ExportState::new()));
41
42 Ok(Self {
43 config,
44 transformers,
45 schedulers,
46 delivery_manager,
47 state,
48 tasks: Arc::new(Mutex::new(Vec::new())),
49 })
50 }
51
52 pub async fn start(&self) -> Result<(), MonitoringError> {
54 info!("Starting export manager");
55
56 self.initialize_schedulers().await?;
58
59 self.start_export_coordination_task().await?;
61 self.start_health_monitoring_task().await?;
62
63 {
65 let mut state = self.state.write().await;
66 state.status = ExportStatus::Running;
67 state.start_time = Some(SystemTime::now());
68 }
69
70 info!("Export manager started");
71 Ok(())
72 }
73
74 pub async fn stop(&self) -> Result<(), MonitoringError> {
76 info!("Stopping export manager");
77
78 {
80 let mut state = self.state.write().await;
81 state.status = ExportStatus::Stopping;
82 }
83
84 let mut tasks = self.tasks.lock().await;
86 for task in tasks.drain(..) {
87 task.abort();
88 }
89
90 self.flush_all_exports().await?;
92
93 {
95 let mut state = self.state.write().await;
96 state.status = ExportStatus::Stopped;
97 state.stop_time = Some(SystemTime::now());
98 }
99
100 info!("Export manager stopped");
101 Ok(())
102 }
103
104 pub async fn get_status(&self) -> String {
106 let state = self.state.read().await;
107 format!("{:?}", state.status)
108 }
109
110 pub async fn export_metrics(&self, data: ExportData) -> Result<(), MonitoringError> {
112 for destination in &self.config.destinations {
114 if let Ok(transformed_data) = self.transformers.transform_for_destination(&data, destination).await {
115 self.delivery_manager.schedule_delivery(destination.clone(), transformed_data).await?;
116 }
117 }
118
119 Ok(())
120 }
121
122 async fn initialize_schedulers(&self) -> Result<(), MonitoringError> {
124 let mut schedulers = self.schedulers.write().await;
125
126 for destination in &self.config.destinations {
127 let scheduler = ExportScheduler::new(destination.clone(), self.config.scheduling.clone());
128 schedulers.insert(destination.id().to_string(), scheduler);
129 }
130
131 info!("Initialized {} export schedulers", schedulers.len());
132 Ok(())
133 }
134
135 async fn start_export_coordination_task(&self) -> Result<(), MonitoringError> {
137 let delivery_manager = self.delivery_manager.clone();
138 let config = self.config.clone();
139
140 let task = tokio::spawn(async move {
141 let mut interval = tokio::time::interval(config.coordination_interval);
142
143 loop {
144 interval.tick().await;
145
146 if let Err(e) = delivery_manager.coordinate_deliveries().await {
147 warn!("Export coordination failed: {}", e);
148 }
149 }
150 });
151
152 self.tasks.lock().await.push(task);
153 Ok(())
154 }
155
156 async fn start_health_monitoring_task(&self) -> Result<(), MonitoringError> {
158 let state = self.state.clone();
159 let delivery_manager = self.delivery_manager.clone();
160
161 let task = tokio::spawn(async move {
162 let mut interval = tokio::time::interval(Duration::from_secs(30));
163
164 loop {
165 interval.tick().await;
166
167 let health = delivery_manager.get_health_status().await;
168
169 let mut export_state = state.write().await;
170 export_state.last_health_check = Some(SystemTime::now());
171 export_state.exports_completed += health.successful_exports;
172 export_state.export_errors += health.failed_exports;
173 }
174 });
175
176 self.tasks.lock().await.push(task);
177 Ok(())
178 }
179
180 async fn flush_all_exports(&self) -> Result<(), MonitoringError> {
182 self.delivery_manager.flush_all().await
183 }
184}
185
186#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct ExportConfig {
189 pub destinations: Vec<ExportDestination>,
191 pub scheduling: SchedulingConfig,
193 pub delivery: DeliveryConfig,
195 pub coordination_interval: Duration,
197 pub retention: RetentionConfig,
199}
200
201impl Default for ExportConfig {
202 fn default() -> Self {
203 Self {
204 destinations: vec![
205 ExportDestination::File {
206 id: "local-file".to_string(),
207 path: "/tmp/ant-quic-metrics.json".to_string(),
208 format: FileFormat::JSON,
209 }
210 ],
211 scheduling: SchedulingConfig::default(),
212 delivery: DeliveryConfig::default(),
213 coordination_interval: Duration::from_secs(60),
214 retention: RetentionConfig::default(),
215 }
216 }
217}
218
219#[derive(Debug, Clone, Serialize, Deserialize)]
221pub enum ExportDestination {
222 File {
223 id: String,
224 path: String,
225 format: FileFormat,
226 },
227 HTTP {
228 id: String,
229 endpoint: String,
230 headers: HashMap<String, String>,
231 auth: Option<AuthConfig>,
232 },
233 S3 {
234 id: String,
235 bucket: String,
236 region: String,
237 prefix: String,
238 },
239 Database {
240 id: String,
241 connection_string: String,
242 table: String,
243 schema: String,
244 },
245 Kafka {
246 id: String,
247 brokers: Vec<String>,
248 topic: String,
249 partition_key: Option<String>,
250 },
251}
252
253impl ExportDestination {
254 pub fn id(&self) -> &str {
255 match self {
256 ExportDestination::File { id, .. } => id,
257 ExportDestination::HTTP { id, .. } => id,
258 ExportDestination::S3 { id, .. } => id,
259 ExportDestination::Database { id, .. } => id,
260 ExportDestination::Kafka { id, .. } => id,
261 }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub enum FileFormat {
268 JSON,
269 CSV,
270 Parquet,
271 Avro,
272}
273
274#[derive(Debug, Clone, Serialize, Deserialize)]
276pub struct AuthConfig {
277 pub auth_type: AuthType,
278 pub credentials: HashMap<String, String>,
279}
280
281#[derive(Debug, Clone, Serialize, Deserialize)]
283pub enum AuthType {
284 Bearer,
285 Basic,
286 ApiKey,
287 OAuth2,
288}
289
290#[derive(Debug, Clone, Serialize, Deserialize)]
292pub struct SchedulingConfig {
293 pub interval: Duration,
295 pub batch_size: usize,
297 pub max_delay: Duration,
299 pub intelligent_scheduling: bool,
301}
302
303impl Default for SchedulingConfig {
304 fn default() -> Self {
305 Self {
306 interval: Duration::from_secs(300), batch_size: 1000,
308 max_delay: Duration::from_secs(600), intelligent_scheduling: true,
310 }
311 }
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
316pub struct DeliveryConfig {
317 pub max_retries: u32,
319 pub initial_retry_delay: Duration,
321 pub max_retry_delay: Duration,
323 pub delivery_timeout: Duration,
325 pub compression: bool,
327}
328
329impl Default for DeliveryConfig {
330 fn default() -> Self {
331 Self {
332 max_retries: 3,
333 initial_retry_delay: Duration::from_secs(1),
334 max_retry_delay: Duration::from_secs(60),
335 delivery_timeout: Duration::from_secs(30),
336 compression: true,
337 }
338 }
339}
340
341#[derive(Debug, Clone, Serialize, Deserialize)]
343pub struct RetentionConfig {
344 pub local_retention: Duration,
346 pub receipt_retention: Duration,
348 pub auto_cleanup: bool,
350}
351
352impl Default for RetentionConfig {
353 fn default() -> Self {
354 Self {
355 local_retention: Duration::from_secs(3600 * 24), receipt_retention: Duration::from_secs(3600 * 24 * 7), auto_cleanup: true,
358 }
359 }
360}
361
362#[derive(Debug, Clone, Serialize, Deserialize)]
364pub struct ExportData {
365 pub data_type: String,
367 pub timestamp: SystemTime,
369 pub payload: serde_json::Value,
371 pub metadata: HashMap<String, String>,
373}
374
375struct ExportScheduler {
377 destination: ExportDestination,
378 config: SchedulingConfig,
379 last_export: Option<SystemTime>,
380 pending_data: Vec<ExportData>,
381}
382
383impl ExportScheduler {
384 fn new(destination: ExportDestination, config: SchedulingConfig) -> Self {
385 Self {
386 destination,
387 config,
388 last_export: None,
389 pending_data: Vec::new(),
390 }
391 }
392
393 fn should_export(&self) -> bool {
394 if self.pending_data.len() >= self.config.batch_size {
395 return true;
396 }
397
398 if let Some(last_export) = self.last_export {
399 let elapsed = last_export.elapsed().unwrap_or_default();
400 if elapsed >= self.config.interval {
401 return true;
402 }
403 if elapsed >= self.config.max_delay && !self.pending_data.is_empty() {
404 return true;
405 }
406 } else if !self.pending_data.is_empty() {
407 return true;
408 }
409
410 false
411 }
412
413 fn add_data(&mut self, data: ExportData) {
414 self.pending_data.push(data);
415 }
416
417 fn take_pending_data(&mut self) -> Vec<ExportData> {
418 let data = self.pending_data.clone();
419 self.pending_data.clear();
420 self.last_export = Some(SystemTime::now());
421 data
422 }
423}
424
425struct DataTransformers;
427
428impl DataTransformers {
429 fn new() -> Self {
430 Self
431 }
432
433 async fn transform_for_destination(
434 &self,
435 data: &ExportData,
436 destination: &ExportDestination,
437 ) -> Result<TransformedData, MonitoringError> {
438 match destination {
439 ExportDestination::File { format, .. } => {
440 self.transform_for_file_format(data, format).await
441 }
442 ExportDestination::HTTP { .. } => {
443 self.transform_for_http(data).await
444 }
445 ExportDestination::S3 { .. } => {
446 self.transform_for_s3(data).await
447 }
448 ExportDestination::Database { schema, .. } => {
449 self.transform_for_database(data, schema).await
450 }
451 ExportDestination::Kafka { .. } => {
452 self.transform_for_kafka(data).await
453 }
454 }
455 }
456
457 async fn transform_for_file_format(
458 &self,
459 data: &ExportData,
460 format: &FileFormat,
461 ) -> Result<TransformedData, MonitoringError> {
462 let content = match format {
463 FileFormat::JSON => serde_json::to_string(&data.payload)
464 .map_err(|e| MonitoringError::ExportError(format!("JSON serialization failed: {}", e)))?,
465 FileFormat::CSV => {
466 format!("timestamp,data_type,payload\n{:?},{},{}",
468 data.timestamp, data.data_type, data.payload)
469 }
470 FileFormat::Parquet | FileFormat::Avro => {
471 return Err(MonitoringError::ExportError("Binary formats not yet implemented".to_string()));
473 }
474 };
475
476 Ok(TransformedData {
477 content: content.into_bytes(),
478 content_type: format.content_type().to_string(),
479 metadata: data.metadata.clone(),
480 })
481 }
482
483 async fn transform_for_http(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
484 let content = serde_json::to_string(&data.payload)
485 .map_err(|e| MonitoringError::ExportError(format!("HTTP JSON serialization failed: {}", e)))?;
486
487 Ok(TransformedData {
488 content: content.into_bytes(),
489 content_type: "application/json".to_string(),
490 metadata: data.metadata.clone(),
491 })
492 }
493
494 async fn transform_for_s3(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
495 self.transform_for_http(data).await
497 }
498
499 async fn transform_for_database(
500 &self,
501 data: &ExportData,
502 _schema: &str,
503 ) -> Result<TransformedData, MonitoringError> {
504 let content = format!(
506 "INSERT INTO monitoring_data (timestamp, data_type, payload, metadata) VALUES (?, ?, ?, ?)"
507 );
508
509 Ok(TransformedData {
510 content: content.into_bytes(),
511 content_type: "application/sql".to_string(),
512 metadata: data.metadata.clone(),
513 })
514 }
515
516 async fn transform_for_kafka(&self, data: &ExportData) -> Result<TransformedData, MonitoringError> {
517 self.transform_for_http(data).await
519 }
520}
521
522impl FileFormat {
523 fn content_type(&self) -> &str {
524 match self {
525 FileFormat::JSON => "application/json",
526 FileFormat::CSV => "text/csv",
527 FileFormat::Parquet => "application/octet-stream",
528 FileFormat::Avro => "application/octet-stream",
529 }
530 }
531}
532
533#[derive(Debug)]
535struct TransformedData {
536 content: Vec<u8>,
537 content_type: String,
538 metadata: HashMap<String, String>,
539}
540
541struct DeliveryManager {
543 config: DeliveryConfig,
544 pending_deliveries: Arc<Mutex<Vec<PendingDelivery>>>,
545 delivery_receipts: Arc<Mutex<Vec<DeliveryReceipt>>>,
546}
547
548impl DeliveryManager {
549 fn new(config: DeliveryConfig) -> Self {
550 Self {
551 config,
552 pending_deliveries: Arc::new(Mutex::new(Vec::new())),
553 delivery_receipts: Arc::new(Mutex::new(Vec::new())),
554 }
555 }
556
557 async fn schedule_delivery(
558 &self,
559 destination: ExportDestination,
560 data: TransformedData,
561 ) -> Result<(), MonitoringError> {
562 let delivery = PendingDelivery {
563 id: uuid::Uuid::new_v4().to_string(),
564 destination,
565 data,
566 scheduled_time: SystemTime::now(),
567 retry_count: 0,
568 last_attempt: None,
569 };
570
571 let mut pending = self.pending_deliveries.lock().await;
572 pending.push(delivery);
573
574 Ok(())
575 }
576
577 async fn coordinate_deliveries(&self) -> Result<(), MonitoringError> {
578 let mut pending = self.pending_deliveries.lock().await;
579 let mut completed = Vec::new();
580
581 for (index, delivery) in pending.iter_mut().enumerate() {
582 if self.should_attempt_delivery(delivery) {
583 match self.attempt_delivery(delivery).await {
584 Ok(receipt) => {
585 let mut receipts = self.delivery_receipts.lock().await;
586 receipts.push(receipt);
587 completed.push(index);
588 }
589 Err(e) => {
590 delivery.retry_count += 1;
591 delivery.last_attempt = Some(SystemTime::now());
592
593 if delivery.retry_count >= self.config.max_retries {
594 warn!("Delivery {} failed after {} retries: {}", delivery.id, delivery.retry_count, e);
595 completed.push(index);
596 }
597 }
598 }
599 }
600 }
601
602 for &index in completed.iter().rev() {
604 pending.remove(index);
605 }
606
607 Ok(())
608 }
609
610 fn should_attempt_delivery(&self, delivery: &PendingDelivery) -> bool {
611 if delivery.last_attempt.is_none() {
612 return true;
613 }
614
615 if let Some(last_attempt) = delivery.last_attempt {
616 let retry_delay = self.calculate_retry_delay(delivery.retry_count);
617 last_attempt.elapsed().unwrap_or_default() >= retry_delay
618 } else {
619 true
620 }
621 }
622
623 async fn attempt_delivery(&self, delivery: &PendingDelivery) -> Result<DeliveryReceipt, MonitoringError> {
624 debug!("Attempting delivery {} to {:?}", delivery.id, delivery.destination.id());
625
626 match &delivery.destination {
629 ExportDestination::File { path, .. } => {
630 std::fs::write(path, &delivery.data.content)
631 .map_err(|e| MonitoringError::ExportError(format!("File write failed: {}", e)))?;
632 }
633 ExportDestination::HTTP { endpoint, .. } => {
634 debug!("Would send HTTP request to {}", endpoint);
636 }
637 _ => {
638 debug!("Delivery type not yet implemented");
639 }
640 }
641
642 Ok(DeliveryReceipt {
643 delivery_id: delivery.id.clone(),
644 destination_id: delivery.destination.id().to_string(),
645 timestamp: SystemTime::now(),
646 status: DeliveryStatus::Success,
647 bytes_sent: delivery.data.content.len(),
648 response_time: Duration::from_millis(100), })
650 }
651
652 fn calculate_retry_delay(&self, retry_count: u32) -> Duration {
653 let base_delay = self.config.initial_retry_delay;
654 let exponential_delay = base_delay * 2_u32.pow(retry_count);
655 std::cmp::min(Duration::from_millis(exponential_delay.as_millis() as u64), self.config.max_retry_delay)
656 }
657
658 async fn get_health_status(&self) -> DeliveryHealth {
659 let receipts = self.delivery_receipts.lock().await;
660 let recent_receipts: Vec<_> = receipts.iter()
661 .filter(|r| r.timestamp.elapsed().unwrap_or_default() < Duration::from_secs(3600))
662 .collect();
663
664 let successful_exports = recent_receipts.iter()
665 .filter(|r| matches!(r.status, DeliveryStatus::Success))
666 .count() as u64;
667
668 let failed_exports = recent_receipts.len() as u64 - successful_exports;
669
670 DeliveryHealth {
671 successful_exports,
672 failed_exports,
673 pending_deliveries: self.pending_deliveries.lock().await.len() as u64,
674 avg_response_time: if !recent_receipts.is_empty() {
675 recent_receipts.iter()
676 .map(|r| r.response_time.as_millis())
677 .sum::<u128>() / recent_receipts.len() as u128
678 } else {
679 0
680 },
681 }
682 }
683
684 async fn flush_all(&self) -> Result<(), MonitoringError> {
685 self.coordinate_deliveries().await?;
687
688 let pending_count = self.pending_deliveries.lock().await.len();
690 if pending_count > 0 {
691 warn!("Flushing with {} pending deliveries", pending_count);
692 }
693
694 Ok(())
695 }
696}
697
698#[derive(Debug)]
700struct PendingDelivery {
701 id: String,
702 destination: ExportDestination,
703 data: TransformedData,
704 scheduled_time: SystemTime,
705 retry_count: u32,
706 last_attempt: Option<SystemTime>,
707}
708
709#[derive(Debug)]
711struct DeliveryReceipt {
712 delivery_id: String,
713 destination_id: String,
714 timestamp: SystemTime,
715 status: DeliveryStatus,
716 bytes_sent: usize,
717 response_time: Duration,
718}
719
720#[derive(Debug)]
722enum DeliveryStatus {
723 Success,
724 Failure,
725 Retry,
726}
727
728#[derive(Debug)]
730struct DeliveryHealth {
731 successful_exports: u64,
732 failed_exports: u64,
733 pending_deliveries: u64,
734 avg_response_time: u128,
735}
736
737#[derive(Debug)]
739struct ExportState {
740 status: ExportStatus,
741 start_time: Option<SystemTime>,
742 stop_time: Option<SystemTime>,
743 exports_completed: u64,
744 export_errors: u64,
745 last_health_check: Option<SystemTime>,
746}
747
748impl ExportState {
749 fn new() -> Self {
750 Self {
751 status: ExportStatus::Stopped,
752 start_time: None,
753 stop_time: None,
754 exports_completed: 0,
755 export_errors: 0,
756 last_health_check: None,
757 }
758 }
759}
760
761#[derive(Debug, Clone)]
763enum ExportStatus {
764 Stopped,
765 Starting,
766 Running,
767 Stopping,
768 Error,
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774
775 #[tokio::test]
776 async fn test_export_manager_creation() {
777 let config = ExportConfig::default();
778 let manager = ExportManager::new(config).await.unwrap();
779
780 let status = manager.get_status().await;
781 assert!(status.contains("Stopped"));
782 }
783
784 #[tokio::test]
785 async fn test_data_transformation() {
786 let transformers = DataTransformers::new();
787
788 let data = ExportData {
789 data_type: "test".to_string(),
790 timestamp: SystemTime::now(),
791 payload: serde_json::json!({"key": "value"}),
792 metadata: HashMap::new(),
793 };
794
795 let destination = ExportDestination::File {
796 id: "test".to_string(),
797 path: "/tmp/test".to_string(),
798 format: FileFormat::JSON,
799 };
800
801 let transformed = transformers.transform_for_destination(&data, &destination).await.unwrap();
802 assert_eq!(transformed.content_type, "application/json");
803 }
804
805 #[test]
806 fn test_export_scheduler() {
807 let destination = ExportDestination::File {
808 id: "test".to_string(),
809 path: "/tmp/test".to_string(),
810 format: FileFormat::JSON,
811 };
812 let config = SchedulingConfig::default();
813 let scheduler = ExportScheduler::new(destination, config);
814
815 assert!(!scheduler.should_export());
817 }
818}