oxirs_stream/
backend_optimizer.rs

1//! # Backend Optimization and Selection
2//!
3//! Advanced backend selection algorithms, cost modeling, and ML-driven optimization
4//! for choosing the optimal streaming backend based on workload patterns and performance metrics.
5
6use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18/// Backend optimization configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21    pub enable_cost_modeling: bool,
22    pub enable_ml_prediction: bool,
23    pub enable_pattern_analysis: bool,
24    pub optimization_interval: Duration,
25    pub min_samples_for_prediction: usize,
26    pub cost_weight_latency: f64,
27    pub cost_weight_throughput: f64,
28    pub cost_weight_reliability: f64,
29    pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33    fn default() -> Self {
34        Self {
35            enable_cost_modeling: true,
36            enable_ml_prediction: true,
37            enable_pattern_analysis: true,
38            optimization_interval: Duration::from_secs(300), // 5 minutes
39            min_samples_for_prediction: 100,
40            cost_weight_latency: 0.3,
41            cost_weight_throughput: 0.3,
42            cost_weight_reliability: 0.3,
43            cost_weight_resource_usage: 0.1,
44        }
45    }
46}
47
48/// Workload pattern characteristics
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51    pub pattern_type: PatternType,
52    pub event_rate: f64,
53    pub batch_size: u32,
54    pub event_size_bytes: u64,
55    pub temporal_distribution: TemporalDistribution,
56    pub data_characteristics: DataCharacteristics,
57    pub consistency_requirements: ConsistencyLevel,
58}
59
60/// Pattern types for workload classification
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63    /// Steady, predictable load
64    Steady,
65    /// Variable load with spikes
66    Bursty,
67    /// Seasonal patterns
68    Seasonal,
69    /// Random/unpredictable patterns
70    Random,
71    /// Real-time processing requirements
72    RealTime,
73    /// Batch processing oriented
74    BatchOriented,
75}
76
77/// Temporal distribution patterns
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80    Uniform,
81    Normal { mean: f64, std_dev: f64 },
82    Exponential { lambda: f64 },
83    Poisson { lambda: f64 },
84    Custom { distribution_name: String },
85}
86
87/// Data characteristics affecting backend choice
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90    pub compression_ratio: f64,
91    pub serialization_overhead: f64,
92    pub has_complex_structures: bool,
93    pub requires_ordering: bool,
94    pub has_time_windows: bool,
95    pub requires_deduplication: bool,
96}
97
98/// Consistency level requirements
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101    /// At most once delivery
102    AtMostOnce,
103    /// At least once delivery
104    AtLeastOnce,
105    /// Exactly once delivery
106    ExactlyOnce,
107    /// Session consistency
108    Session,
109    /// Strong consistency
110    Strong,
111}
112
113/// Backend performance characteristics
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116    pub backend_type: BackendType,
117    pub measured_latency_p50: f64,
118    pub measured_latency_p95: f64,
119    pub measured_latency_p99: f64,
120    pub measured_throughput: f64,
121    pub reliability_score: f64,
122    pub resource_usage: ResourceUsage,
123    pub cost_per_hour: f64,
124    pub setup_complexity: u8, // 1-10 scale
125    pub scalability_factor: f64,
126    pub last_updated: DateTime<Utc>,
127    pub sample_count: u64,
128}
129
130/// Resource usage metrics
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133    pub cpu_usage_percent: f64,
134    pub memory_usage_mb: f64,
135    pub network_usage_mbps: f64,
136    pub disk_io_ops_per_sec: f64,
137    pub connection_count: u32,
138}
139
140/// Cost model for backend selection
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143    pub total_cost: f64,
144    pub latency_cost: f64,
145    pub throughput_cost: f64,
146    pub reliability_cost: f64,
147    pub resource_cost: f64,
148    pub scaling_cost: f64,
149    pub maintenance_cost: f64,
150}
151
152/// ML prediction model for performance forecasting
153#[derive(Debug, Clone)]
154pub struct MLPredictor {
155    /// Historical performance data
156    performance_history: Vec<PerformanceDataPoint>,
157    /// Learned patterns
158    patterns: HashMap<String, PatternModel>,
159    /// Feature weights
160    _feature_weights: Vec<f64>,
161    /// Prediction confidence threshold
162    _confidence_threshold: f64,
163}
164
165/// Single performance data point for ML training
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168    pub timestamp: DateTime<Utc>,
169    pub backend_type: BackendType,
170    pub workload_pattern: WorkloadPattern,
171    pub actual_latency: f64,
172    pub actual_throughput: f64,
173    pub actual_reliability: f64,
174    pub resource_usage: ResourceUsage,
175    pub external_factors: HashMap<String, f64>,
176}
177
178/// Learned pattern model for ML predictions
179#[derive(Debug, Clone)]
180pub struct PatternModel {
181    pub pattern_name: String,
182    pub coefficients: Vec<f64>,
183    pub intercept: f64,
184    pub confidence: f64,
185    pub last_trained: DateTime<Utc>,
186    pub sample_count: usize,
187}
188
189/// Backend optimizer for intelligent backend selection
190pub struct BackendOptimizer {
191    config: OptimizerConfig,
192    backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193    pattern_analyzer: PatternAnalyzer,
194    cost_calculator: CostCalculator,
195    ml_predictor: Option<MLPredictor>,
196    optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199/// Pattern analyzer for workload classification
200pub struct PatternAnalyzer {
201    event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202    _pattern_cache: HashMap<String, WorkloadPattern>,
203    analysis_window: ChronoDuration,
204}
205
206/// Cost calculator for backend evaluation
207pub struct CostCalculator {
208    config: OptimizerConfig,
209    baseline_costs: HashMap<BackendType, f64>,
210}
211
212/// Optimization decision record
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215    pub timestamp: DateTime<Utc>,
216    pub selected_backend: BackendType,
217    pub workload_pattern: WorkloadPattern,
218    pub predicted_performance: BackendPerformance,
219    pub cost_model: CostModel,
220    pub confidence: f64,
221    pub reason: String,
222}
223
224/// Backend recommendation with ranking
225#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227    pub backend_type: BackendType,
228    pub score: f64,
229    pub predicted_latency: f64,
230    pub predicted_throughput: f64,
231    pub predicted_cost: f64,
232    pub confidence: f64,
233    pub strengths: Vec<String>,
234    pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238    /// Create a new backend optimizer
239    pub fn new(config: OptimizerConfig) -> Self {
240        let ml_predictor = if config.enable_ml_prediction {
241            Some(MLPredictor::new())
242        } else {
243            None
244        };
245
246        Self {
247            pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248            cost_calculator: CostCalculator::new(config.clone()),
249            backend_performance: Arc::new(RwLock::new(HashMap::new())),
250            optimization_history: Arc::new(RwLock::new(Vec::new())),
251            config,
252            ml_predictor,
253        }
254    }
255
256    /// Update backend performance metrics
257    pub async fn update_backend_performance(
258        &self,
259        backend_type: BackendType,
260        metrics: &StreamingMetrics,
261    ) -> Result<()> {
262        let mut performance_map = self.backend_performance.write().await;
263
264        let performance = performance_map
265            .entry(backend_type.clone())
266            .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268        // Update measured performance with exponential moving average
269        let alpha = 0.1; // Smoothing factor
270        performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271            + (1.0 - alpha) * performance.measured_latency_p50;
272        performance.measured_throughput = alpha * metrics.producer_throughput_eps
273            + (1.0 - alpha) * performance.measured_throughput;
274        performance.reliability_score =
275            alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277        performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278        performance.resource_usage.memory_usage_mb =
279            metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280        performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282        performance.last_updated = Utc::now();
283        performance.sample_count += 1;
284
285        debug!(
286            "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287            backend_type,
288            performance.measured_latency_p50,
289            performance.measured_throughput,
290            performance.reliability_score
291        );
292
293        Ok(())
294    }
295
296    /// Analyze workload pattern from recent events
297    pub async fn analyze_workload_pattern(
298        &mut self,
299        events: &[StreamEvent],
300    ) -> Result<WorkloadPattern> {
301        self.pattern_analyzer.analyze_pattern(events).await
302    }
303
304    /// Get optimal backend recommendation for given workload
305    pub async fn recommend_backend(
306        &self,
307        pattern: &WorkloadPattern,
308    ) -> Result<Vec<BackendRecommendation>> {
309        let mut recommendations = Vec::new();
310        let performance_map = self.backend_performance.read().await;
311
312        for (backend_type, performance) in performance_map.iter() {
313            let cost = self
314                .cost_calculator
315                .calculate_cost(backend_type, pattern, performance)
316                .await?;
317
318            let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319                predictor.predict_performance(backend_type, pattern).await?
320            } else {
321                performance.clone()
322            };
323
324            let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325            let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327            let recommendation = BackendRecommendation {
328                backend_type: backend_type.clone(),
329                score,
330                predicted_latency: predicted_performance.measured_latency_p50,
331                predicted_throughput: predicted_performance.measured_throughput,
332                predicted_cost: cost.total_cost,
333                confidence,
334                strengths: self.analyze_strengths(backend_type, pattern),
335                weaknesses: self.analyze_weaknesses(backend_type, pattern),
336            };
337
338            recommendations.push(recommendation);
339        }
340
341        // Sort by score (higher is better)
342        recommendations.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
343
344        info!(
345            "Generated {} backend recommendations for workload pattern: {:?}",
346            recommendations.len(),
347            pattern.pattern_type
348        );
349
350        Ok(recommendations)
351    }
352
353    /// Train ML predictor with new performance data
354    pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
355        if let Some(predictor) = &mut self.ml_predictor {
356            predictor.add_training_data(data_point).await?;
357
358            if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
359                predictor.retrain_models().await?;
360            }
361        }
362        Ok(())
363    }
364
365    /// Record optimization decision
366    pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
367        let mut history = self.optimization_history.write().await;
368        history.push(decision);
369
370        // Keep only recent decisions (last 1000)
371        if history.len() > 1000 {
372            history.drain(0..100);
373        }
374        Ok(())
375    }
376
377    /// Get optimization statistics
378    pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
379        let history = self.optimization_history.read().await;
380        let performance_map = self.backend_performance.read().await;
381
382        let total_decisions = history.len();
383        let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
384            *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
385            acc
386        });
387
388        let average_confidence = if total_decisions > 0 {
389            history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
390        } else {
391            0.0
392        };
393
394        let performance_improvements = self.calculate_performance_improvements(&history).await?;
395
396        Ok(OptimizationStats {
397            total_decisions,
398            backend_usage,
399            average_confidence,
400            performance_improvements,
401            active_backends: performance_map.len(),
402            last_optimization: history.last().map(|d| d.timestamp),
403        })
404    }
405
406    /// Calculate backend score based on cost model and performance
407    fn calculate_backend_score(
408        &self,
409        cost: &CostModel,
410        performance: &BackendPerformance,
411        pattern: &WorkloadPattern,
412    ) -> f64 {
413        let latency_score = match pattern.pattern_type {
414            PatternType::RealTime => {
415                // For real-time, heavily penalize high latency
416                if performance.measured_latency_p99 < 10.0 {
417                    1.0
418                } else if performance.measured_latency_p99 < 50.0 {
419                    0.7
420                } else {
421                    0.3
422                }
423            }
424            _ => {
425                // For other patterns, moderate latency tolerance
426                (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
427            }
428        };
429
430        let throughput_score = match pattern.pattern_type {
431            PatternType::BatchOriented => {
432                // Batch processing values high throughput
433                (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
434            }
435            _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
436        };
437
438        let reliability_score = performance.reliability_score;
439        let cost_score = 1.0 / (cost.total_cost + 1.0);
440
441        // Weighted combination
442        (latency_score * self.config.cost_weight_latency
443            + throughput_score * self.config.cost_weight_throughput
444            + reliability_score * self.config.cost_weight_reliability
445            + cost_score * self.config.cost_weight_resource_usage)
446            / (self.config.cost_weight_latency
447                + self.config.cost_weight_throughput
448                + self.config.cost_weight_reliability
449                + self.config.cost_weight_resource_usage)
450    }
451
452    /// Calculate confidence in prediction
453    fn calculate_confidence(
454        &self,
455        performance: &BackendPerformance,
456        _pattern: &WorkloadPattern,
457    ) -> f64 {
458        let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
459        let recency_confidence = {
460            let age_hours = Utc::now()
461                .signed_duration_since(performance.last_updated)
462                .num_hours() as f64;
463            (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
464        };
465
466        (sample_confidence + recency_confidence) / 2.0
467    }
468
469    /// Analyze backend strengths for given pattern
470    fn analyze_strengths(
471        &self,
472        backend_type: &BackendType,
473        pattern: &WorkloadPattern,
474    ) -> Vec<String> {
475        let mut strengths = Vec::new();
476
477        match backend_type {
478            BackendType::Kafka => {
479                strengths.push("High throughput".to_string());
480                strengths.push("Strong durability".to_string());
481                strengths.push("Excellent ordering guarantees".to_string());
482                if matches!(
483                    pattern.consistency_requirements,
484                    ConsistencyLevel::ExactlyOnce
485                ) {
486                    strengths.push("Exactly-once semantics".to_string());
487                }
488            }
489            BackendType::Nats => {
490                strengths.push("Low latency".to_string());
491                strengths.push("Simple setup".to_string());
492                strengths.push("Built-in clustering".to_string());
493                if matches!(pattern.pattern_type, PatternType::RealTime) {
494                    strengths.push("Real-time performance".to_string());
495                }
496            }
497            BackendType::Redis => {
498                strengths.push("In-memory speed".to_string());
499                strengths.push("Low latency".to_string());
500                strengths.push("Rich data structures".to_string());
501            }
502            BackendType::Kinesis => {
503                strengths.push("AWS native integration".to_string());
504                strengths.push("Auto-scaling".to_string());
505                strengths.push("Pay-per-use model".to_string());
506            }
507            BackendType::Pulsar => {
508                strengths.push("Multi-tenancy".to_string());
509                strengths.push("Geo-replication".to_string());
510                strengths.push("Unified messaging".to_string());
511            }
512            BackendType::RabbitMQ => {
513                strengths.push("Mature and stable".to_string());
514                strengths.push("Rich routing capabilities".to_string());
515                strengths.push("Strong reliability guarantees".to_string());
516                if matches!(
517                    pattern.consistency_requirements,
518                    ConsistencyLevel::AtLeastOnce
519                ) {
520                    strengths.push("Persistent message delivery".to_string());
521                }
522            }
523            BackendType::Memory => {
524                strengths.push("Zero latency".to_string());
525                strengths.push("Perfect for testing".to_string());
526            }
527        }
528
529        strengths
530    }
531
532    /// Analyze backend weaknesses for given pattern
533    fn analyze_weaknesses(
534        &self,
535        backend_type: &BackendType,
536        pattern: &WorkloadPattern,
537    ) -> Vec<String> {
538        let mut weaknesses = Vec::new();
539
540        match backend_type {
541            BackendType::Kafka => {
542                weaknesses.push("Complex setup".to_string());
543                weaknesses.push("Higher resource usage".to_string());
544                if matches!(pattern.pattern_type, PatternType::RealTime) {
545                    weaknesses.push("Higher latency than NATS".to_string());
546                }
547            }
548            BackendType::Nats => {
549                if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
550                    weaknesses.push("Limited durability options".to_string());
551                }
552                if pattern.event_rate > 100000.0 {
553                    weaknesses.push("May not handle extreme throughput".to_string());
554                }
555            }
556            BackendType::Redis => {
557                weaknesses.push("Memory-bound".to_string());
558                weaknesses.push("Limited durability".to_string());
559                if pattern.event_size_bytes > 1000000 {
560                    weaknesses.push("Not suitable for large events".to_string());
561                }
562            }
563            BackendType::Kinesis => {
564                weaknesses.push("AWS vendor lock-in".to_string());
565                weaknesses.push("Cost can scale quickly".to_string());
566                weaknesses.push("Regional limitations".to_string());
567            }
568            BackendType::Pulsar => {
569                weaknesses.push("Newer ecosystem".to_string());
570                weaknesses.push("Complex architecture".to_string());
571            }
572            BackendType::RabbitMQ => {
573                weaknesses.push("Lower throughput than Kafka".to_string());
574                weaknesses.push("Memory-based by default".to_string());
575                if matches!(pattern.pattern_type, PatternType::BatchOriented) {
576                    weaknesses.push("Not optimized for batch processing".to_string());
577                }
578            }
579            BackendType::Memory => {
580                weaknesses.push("No persistence".to_string());
581                weaknesses.push("Single node only".to_string());
582                weaknesses.push("Memory limitations".to_string());
583            }
584        }
585
586        weaknesses
587    }
588
589    /// Calculate performance improvements over time
590    async fn calculate_performance_improvements(
591        &self,
592        history: &[OptimizationDecision],
593    ) -> Result<HashMap<String, f64>> {
594        let mut improvements = HashMap::new();
595
596        if history.len() < 10 {
597            return Ok(improvements);
598        }
599
600        let recent_decisions = &history[history.len() - 10..];
601        let older_decisions = &history[0..10.min(history.len() - 10)];
602
603        let recent_avg_latency = recent_decisions
604            .iter()
605            .map(|d| d.predicted_performance.measured_latency_p50)
606            .sum::<f64>()
607            / recent_decisions.len() as f64;
608
609        let older_avg_latency = older_decisions
610            .iter()
611            .map(|d| d.predicted_performance.measured_latency_p50)
612            .sum::<f64>()
613            / older_decisions.len() as f64;
614
615        let latency_improvement =
616            (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
617        improvements.insert(
618            "latency_improvement_percent".to_string(),
619            latency_improvement,
620        );
621
622        let recent_avg_throughput = recent_decisions
623            .iter()
624            .map(|d| d.predicted_performance.measured_throughput)
625            .sum::<f64>()
626            / recent_decisions.len() as f64;
627
628        let older_avg_throughput = older_decisions
629            .iter()
630            .map(|d| d.predicted_performance.measured_throughput)
631            .sum::<f64>()
632            / older_decisions.len() as f64;
633
634        let throughput_improvement =
635            (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
636        improvements.insert(
637            "throughput_improvement_percent".to_string(),
638            throughput_improvement,
639        );
640
641        Ok(improvements)
642    }
643}
644
645impl PatternAnalyzer {
646    pub fn new(analysis_window: ChronoDuration) -> Self {
647        Self {
648            event_history: Vec::new(),
649            _pattern_cache: HashMap::new(),
650            analysis_window,
651        }
652    }
653
654    pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
655        // Add events to history
656        let now = Utc::now();
657        for event in events {
658            self.event_history.push((now, event.clone()));
659        }
660
661        // Remove old events outside analysis window
662        let cutoff = now - self.analysis_window;
663        self.event_history
664            .retain(|(timestamp, _)| *timestamp >= cutoff);
665
666        if self.event_history.is_empty() {
667            return Ok(WorkloadPattern::default());
668        }
669
670        // Calculate event rate
671        let duration_seconds = self.analysis_window.num_seconds() as f64;
672        let event_rate = self.event_history.len() as f64 / duration_seconds;
673
674        // Analyze temporal distribution
675        let temporal_distribution = self.analyze_temporal_distribution().await?;
676
677        // Determine pattern type
678        let pattern_type = self
679            .classify_pattern_type(event_rate, &temporal_distribution)
680            .await?;
681
682        // Calculate average event size
683        let avg_event_size = self.calculate_average_event_size().await?;
684
685        // Analyze data characteristics
686        let data_characteristics = self.analyze_data_characteristics().await?;
687
688        // Determine consistency requirements based on event types
689        let consistency_requirements = self.determine_consistency_requirements().await?;
690
691        Ok(WorkloadPattern {
692            pattern_type,
693            event_rate,
694            batch_size: self.estimate_optimal_batch_size(event_rate),
695            event_size_bytes: avg_event_size,
696            temporal_distribution,
697            data_characteristics,
698            consistency_requirements,
699        })
700    }
701
702    async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
703        if self.event_history.len() < 10 {
704            return Ok(TemporalDistribution::Uniform);
705        }
706
707        // Calculate inter-arrival times
708        let mut intervals = Vec::new();
709        for i in 1..self.event_history.len() {
710            let interval = self.event_history[i]
711                .0
712                .signed_duration_since(self.event_history[i - 1].0)
713                .num_milliseconds() as f64;
714            intervals.push(interval);
715        }
716
717        // Calculate basic statistics
718        let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
719        let variance =
720            intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
721        let std_dev = variance.sqrt();
722
723        // Simple distribution classification
724        let cv = std_dev / mean; // Coefficient of variation
725
726        if cv < 0.1 {
727            Ok(TemporalDistribution::Uniform)
728        } else if cv < 0.5 {
729            Ok(TemporalDistribution::Normal { mean, std_dev })
730        } else {
731            Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
732        }
733    }
734
735    async fn classify_pattern_type(
736        &self,
737        event_rate: f64,
738        temporal_dist: &TemporalDistribution,
739    ) -> Result<PatternType> {
740        // Simple heuristic-based classification
741        match temporal_dist {
742            TemporalDistribution::Uniform => {
743                if event_rate > 10000.0 {
744                    Ok(PatternType::BatchOriented)
745                } else if event_rate > 100.0 {
746                    Ok(PatternType::Steady)
747                } else {
748                    Ok(PatternType::RealTime)
749                }
750            }
751            TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
752            TemporalDistribution::Normal { std_dev, mean } => {
753                if std_dev / mean > 1.0 {
754                    Ok(PatternType::Random)
755                } else {
756                    Ok(PatternType::Steady)
757                }
758            }
759            _ => Ok(PatternType::Steady),
760        }
761    }
762
763    async fn calculate_average_event_size(&self) -> Result<u64> {
764        if self.event_history.is_empty() {
765            return Ok(1024); // Default 1KB
766        }
767
768        // Estimate serialized size (simplified)
769        let avg_size = self
770            .event_history
771            .iter()
772            .map(|(_, event)| self.estimate_event_size(event))
773            .sum::<u64>()
774            / self.event_history.len() as u64;
775
776        Ok(avg_size)
777    }
778
779    fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
780        // Simplified size estimation based on event type
781        match event {
782            StreamEvent::TripleAdded {
783                subject,
784                predicate,
785                object,
786                ..
787            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
788            StreamEvent::TripleRemoved {
789                subject,
790                predicate,
791                object,
792                ..
793            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
794            StreamEvent::GraphCreated { .. } => 200,
795            StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
796            StreamEvent::TransactionBegin { .. } => 150,
797            StreamEvent::TransactionCommit { .. } => 100,
798            StreamEvent::Heartbeat { .. } => 50,
799            _ => 300, // Default for complex events
800        }
801    }
802
803    async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
804        let has_complex_structures = self
805            .event_history
806            .iter()
807            .any(|(_, event)| self.is_complex_event(event));
808
809        let requires_ordering = self
810            .event_history
811            .iter()
812            .any(|(_, event)| self.requires_ordering(event));
813
814        Ok(DataCharacteristics {
815            compression_ratio: 0.7,      // Assume 30% compression
816            serialization_overhead: 0.1, // 10% overhead
817            has_complex_structures,
818            requires_ordering,
819            has_time_windows: false,      // Simplified
820            requires_deduplication: true, // Conservative default
821        })
822    }
823
824    fn is_complex_event(&self, event: &StreamEvent) -> bool {
825        matches!(
826            event,
827            StreamEvent::SparqlUpdate { .. }
828                | StreamEvent::SchemaChanged { .. }
829                | StreamEvent::QueryCompleted { .. }
830        )
831    }
832
833    fn requires_ordering(&self, event: &StreamEvent) -> bool {
834        matches!(
835            event,
836            StreamEvent::TransactionBegin { .. }
837                | StreamEvent::TransactionCommit { .. }
838                | StreamEvent::TransactionAbort { .. }
839        )
840    }
841
842    async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
843        let has_transactions = self.event_history.iter().any(|(_, event)| {
844            matches!(
845                event,
846                StreamEvent::TransactionBegin { .. }
847                    | StreamEvent::TransactionCommit { .. }
848                    | StreamEvent::TransactionAbort { .. }
849            )
850        });
851
852        if has_transactions {
853            Ok(ConsistencyLevel::ExactlyOnce)
854        } else {
855            Ok(ConsistencyLevel::AtLeastOnce)
856        }
857    }
858
859    fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
860        if event_rate > 10000.0 {
861            1000
862        } else if event_rate > 1000.0 {
863            500
864        } else if event_rate > 100.0 {
865            100
866        } else {
867            10
868        }
869    }
870}
871
872impl CostCalculator {
873    pub fn new(config: OptimizerConfig) -> Self {
874        let mut baseline_costs = HashMap::new();
875
876        // Baseline hourly costs (normalized)
877        baseline_costs.insert(BackendType::Memory, 0.0);
878        baseline_costs.insert(BackendType::Redis, 0.1);
879        baseline_costs.insert(BackendType::Nats, 0.2);
880        baseline_costs.insert(BackendType::Kafka, 0.5);
881        baseline_costs.insert(BackendType::Pulsar, 0.4);
882        baseline_costs.insert(BackendType::Kinesis, 0.8);
883
884        Self {
885            config,
886            baseline_costs,
887        }
888    }
889
890    pub async fn calculate_cost(
891        &self,
892        backend_type: &BackendType,
893        pattern: &WorkloadPattern,
894        performance: &BackendPerformance,
895    ) -> Result<CostModel> {
896        let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
897
898        // Calculate component costs
899        let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
900        let throughput_cost =
901            self.calculate_throughput_cost(performance.measured_throughput, pattern);
902        let reliability_cost =
903            self.calculate_reliability_cost(performance.reliability_score, pattern);
904        let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
905        let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
906        let maintenance_cost =
907            self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
908
909        let total_cost = base_cost
910            + latency_cost * self.config.cost_weight_latency
911            + throughput_cost * self.config.cost_weight_throughput
912            + reliability_cost * self.config.cost_weight_reliability
913            + resource_cost * self.config.cost_weight_resource_usage
914            + scaling_cost * 0.1
915            + maintenance_cost * 0.1;
916
917        Ok(CostModel {
918            total_cost,
919            latency_cost,
920            throughput_cost,
921            reliability_cost,
922            resource_cost,
923            scaling_cost,
924            maintenance_cost,
925        })
926    }
927
928    fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
929        let latency_penalty = match pattern.pattern_type {
930            PatternType::RealTime => latency / 10.0, // Heavy penalty for real-time
931            PatternType::Bursty => latency / 50.0,
932            _ => latency / 100.0,
933        };
934        latency_penalty.min(2.0) // Cap at 2x cost
935    }
936
937    fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
938        let required_throughput = pattern.event_rate * 1.5; // 50% buffer
939        if throughput < required_throughput {
940            (required_throughput - throughput) / required_throughput
941        } else {
942            0.0
943        }
944    }
945
946    fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
947        let required_reliability = match pattern.consistency_requirements {
948            ConsistencyLevel::ExactlyOnce => 0.999,
949            ConsistencyLevel::AtLeastOnce => 0.995,
950            _ => 0.99,
951        };
952
953        if reliability < required_reliability {
954            (required_reliability - reliability) * 10.0
955        } else {
956            0.0
957        }
958    }
959
960    fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
961        // Normalize resource usage to cost
962        (usage.cpu_usage_percent / 100.0) * 0.1
963            + (usage.memory_usage_mb / 1000.0) * 0.05
964            + (usage.network_usage_mbps / 100.0) * 0.02
965    }
966
967    fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
968        let scaling_factor = match backend_type {
969            BackendType::Kinesis => 0.1, // Auto-scaling
970            BackendType::Kafka => 0.5,   // Manual scaling
971            BackendType::Memory => 1.0,  // No scaling
972            _ => 0.3,
973        };
974
975        match pattern.pattern_type {
976            PatternType::Bursty | PatternType::Random => scaling_factor,
977            _ => 0.0,
978        }
979    }
980
981    fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
982        setup_complexity as f64 / 10.0
983    }
984}
985
986impl Default for MLPredictor {
987    fn default() -> Self {
988        Self::new()
989    }
990}
991
992impl MLPredictor {
993    pub fn new() -> Self {
994        Self {
995            performance_history: Vec::new(),
996            patterns: HashMap::new(),
997            _feature_weights: vec![1.0; 10], // Start with equal weights
998            _confidence_threshold: 0.7,
999        }
1000    }
1001
1002    pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
1003        self.performance_history.push(data_point);
1004
1005        // Keep only recent data (last 10,000 points)
1006        if self.performance_history.len() > 10000 {
1007            self.performance_history.drain(0..1000);
1008        }
1009
1010        Ok(())
1011    }
1012
1013    pub async fn predict_performance(
1014        &self,
1015        backend_type: &BackendType,
1016        pattern: &WorkloadPattern,
1017    ) -> Result<BackendPerformance> {
1018        // Filter relevant historical data
1019        let relevant_data: Vec<&PerformanceDataPoint> = self
1020            .performance_history
1021            .iter()
1022            .filter(|dp| dp.backend_type == *backend_type)
1023            .collect();
1024
1025        if relevant_data.is_empty() {
1026            return Err(anyhow!(
1027                "No historical data for backend type: {:?}",
1028                backend_type
1029            ));
1030        }
1031
1032        // Simple prediction based on similar patterns
1033        let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1034            .iter()
1035            .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1036            .cloned()
1037            .collect();
1038
1039        let prediction_data = if similar_data.is_empty() {
1040            &relevant_data
1041        } else {
1042            &similar_data
1043        };
1044
1045        // Calculate weighted averages
1046        let predicted_latency = prediction_data
1047            .iter()
1048            .map(|dp| dp.actual_latency)
1049            .sum::<f64>()
1050            / prediction_data.len() as f64;
1051
1052        let predicted_throughput = prediction_data
1053            .iter()
1054            .map(|dp| dp.actual_throughput)
1055            .sum::<f64>()
1056            / prediction_data.len() as f64;
1057
1058        let predicted_reliability = prediction_data
1059            .iter()
1060            .map(|dp| dp.actual_reliability)
1061            .sum::<f64>()
1062            / prediction_data.len() as f64;
1063
1064        Ok(BackendPerformance {
1065            backend_type: backend_type.clone(),
1066            measured_latency_p50: predicted_latency,
1067            measured_latency_p95: predicted_latency * 1.5,
1068            measured_latency_p99: predicted_latency * 2.0,
1069            measured_throughput: predicted_throughput,
1070            reliability_score: predicted_reliability,
1071            resource_usage: prediction_data[0].resource_usage.clone(),
1072            cost_per_hour: 0.0,  // Will be calculated by cost model
1073            setup_complexity: 5, // Default
1074            scalability_factor: 1.0,
1075            last_updated: Utc::now(),
1076            sample_count: prediction_data.len() as u64,
1077        })
1078    }
1079
1080    pub async fn retrain_models(&mut self) -> Result<()> {
1081        // Simple retraining using linear regression
1082        for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1083            let backend_data: Vec<&PerformanceDataPoint> = self
1084                .performance_history
1085                .iter()
1086                .filter(|dp| dp.backend_type == *backend_type)
1087                .collect();
1088
1089            if backend_data.len() < 10 {
1090                continue;
1091            }
1092
1093            // Create pattern model for this backend
1094            let pattern_name = format!("{backend_type:?}_model");
1095            let model = self.train_linear_model(&backend_data).await?;
1096            self.patterns.insert(pattern_name, model);
1097        }
1098
1099        info!("Retrained ML models for {} patterns", self.patterns.len());
1100        Ok(())
1101    }
1102
1103    async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1104        // Simplified linear regression implementation
1105        let n = data.len() as f64;
1106
1107        // Extract features and target (latency)
1108        let features: Vec<Vec<f64>> = data
1109            .iter()
1110            .map(|dp| self.extract_features(&dp.workload_pattern))
1111            .collect();
1112
1113        let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1114
1115        // Simple linear regression: y = mx + b
1116        let feature_count = features[0].len();
1117        let mut coefficients = vec![0.0; feature_count];
1118        let intercept = targets.iter().sum::<f64>() / n;
1119
1120        // Calculate correlations (simplified)
1121        for i in 0..feature_count {
1122            let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1123            let correlation = self.calculate_correlation(&feature_values, &targets);
1124            coefficients[i] = correlation * 0.1; // Simplified coefficient
1125        }
1126
1127        Ok(PatternModel {
1128            pattern_name: "latency_model".to_string(),
1129            coefficients,
1130            intercept,
1131            confidence: 0.8, // Default confidence
1132            last_trained: Utc::now(),
1133            sample_count: data.len(),
1134        })
1135    }
1136
1137    fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1138        vec![
1139            pattern.event_rate,
1140            pattern.batch_size as f64,
1141            pattern.event_size_bytes as f64,
1142            pattern.data_characteristics.compression_ratio,
1143            pattern.data_characteristics.serialization_overhead,
1144            if pattern.data_characteristics.has_complex_structures {
1145                1.0
1146            } else {
1147                0.0
1148            },
1149            if pattern.data_characteristics.requires_ordering {
1150                1.0
1151            } else {
1152                0.0
1153            },
1154            match pattern.pattern_type {
1155                PatternType::RealTime => 1.0,
1156                PatternType::BatchOriented => 2.0,
1157                PatternType::Bursty => 3.0,
1158                _ => 0.0,
1159            },
1160            match pattern.consistency_requirements {
1161                ConsistencyLevel::ExactlyOnce => 3.0,
1162                ConsistencyLevel::AtLeastOnce => 2.0,
1163                _ => 1.0,
1164            },
1165            1.0, // Bias term
1166        ]
1167    }
1168
1169    fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1170        let rate_similarity =
1171            1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1172        let size_similarity = 1.0
1173            - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1174                / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1175        let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1176            == std::mem::discriminant(&p2.pattern_type)
1177        {
1178            1.0
1179        } else {
1180            0.0
1181        };
1182
1183        (rate_similarity + size_similarity + type_similarity) / 3.0
1184    }
1185
1186    fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1187        let n = x.len() as f64;
1188        let mean_x = x.iter().sum::<f64>() / n;
1189        let mean_y = y.iter().sum::<f64>() / n;
1190
1191        let numerator: f64 = x
1192            .iter()
1193            .zip(y.iter())
1194            .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1195            .sum();
1196
1197        let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1198        let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1199
1200        if denom_x * denom_y == 0.0 {
1201            0.0
1202        } else {
1203            numerator / (denom_x * denom_y).sqrt()
1204        }
1205    }
1206}
1207
1208impl BackendPerformance {
1209    pub fn new(backend_type: BackendType) -> Self {
1210        Self {
1211            backend_type,
1212            measured_latency_p50: 100.0, // Default 100ms
1213            measured_latency_p95: 200.0,
1214            measured_latency_p99: 500.0,
1215            measured_throughput: 1000.0, // Default 1000 eps
1216            reliability_score: 0.99,
1217            resource_usage: ResourceUsage::default(),
1218            cost_per_hour: 0.1,
1219            setup_complexity: 5,
1220            scalability_factor: 1.0,
1221            last_updated: Utc::now(),
1222            sample_count: 0,
1223        }
1224    }
1225}
1226
1227impl Default for ResourceUsage {
1228    fn default() -> Self {
1229        Self {
1230            cpu_usage_percent: 10.0,
1231            memory_usage_mb: 100.0,
1232            network_usage_mbps: 1.0,
1233            disk_io_ops_per_sec: 100.0,
1234            connection_count: 10,
1235        }
1236    }
1237}
1238
1239impl Default for WorkloadPattern {
1240    fn default() -> Self {
1241        Self {
1242            pattern_type: PatternType::Steady,
1243            event_rate: 100.0,
1244            batch_size: 100,
1245            event_size_bytes: 1024,
1246            temporal_distribution: TemporalDistribution::Uniform,
1247            data_characteristics: DataCharacteristics {
1248                compression_ratio: 0.7,
1249                serialization_overhead: 0.1,
1250                has_complex_structures: false,
1251                requires_ordering: false,
1252                has_time_windows: false,
1253                requires_deduplication: true,
1254            },
1255            consistency_requirements: ConsistencyLevel::AtLeastOnce,
1256        }
1257    }
1258}
1259
1260/// Optimization statistics
1261#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct OptimizationStats {
1263    pub total_decisions: usize,
1264    pub backend_usage: HashMap<BackendType, usize>,
1265    pub average_confidence: f64,
1266    pub performance_improvements: HashMap<String, f64>,
1267    pub active_backends: usize,
1268    pub last_optimization: Option<DateTime<Utc>>,
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273    use super::*;
1274    use crate::{EventMetadata, StreamEvent};
1275
1276    fn create_test_event() -> StreamEvent {
1277        StreamEvent::TripleAdded {
1278            subject: "http://example.org/subject".to_string(),
1279            predicate: "http://example.org/predicate".to_string(),
1280            object: "http://example.org/object".to_string(),
1281            graph: None,
1282            metadata: EventMetadata {
1283                event_id: uuid::Uuid::new_v4().to_string(),
1284                timestamp: Utc::now(),
1285                source: "test".to_string(),
1286                user: None,
1287                context: None,
1288                caused_by: None,
1289                version: "1.0".to_string(),
1290                properties: std::collections::HashMap::new(),
1291                checksum: None,
1292            },
1293        }
1294    }
1295
1296    #[tokio::test]
1297    async fn test_backend_optimizer_creation() {
1298        let config = OptimizerConfig::default();
1299        let optimizer = BackendOptimizer::new(config);
1300
1301        assert!(optimizer.ml_predictor.is_some());
1302        assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1303    }
1304
1305    #[tokio::test]
1306    async fn test_pattern_analysis() {
1307        let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1308        let events = vec![create_test_event(); 100];
1309
1310        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1311
1312        assert!(pattern.event_rate > 0.0);
1313        assert!(pattern.batch_size > 0);
1314        assert!(pattern.event_size_bytes > 0);
1315    }
1316
1317    #[tokio::test]
1318    async fn test_cost_calculation() {
1319        let config = OptimizerConfig::default();
1320        let calculator = CostCalculator::new(config);
1321        let pattern = WorkloadPattern::default();
1322        let performance = BackendPerformance::new(BackendType::Kafka);
1323
1324        let cost = calculator
1325            .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1326            .await
1327            .unwrap();
1328
1329        assert!(cost.total_cost > 0.0);
1330        assert!(cost.latency_cost >= 0.0);
1331        assert!(cost.throughput_cost >= 0.0);
1332    }
1333
1334    #[tokio::test]
1335    async fn test_backend_recommendation() {
1336        let config = OptimizerConfig {
1337            enable_ml_prediction: false, // Disable ML prediction for test
1338            ..Default::default()
1339        };
1340        let optimizer = BackendOptimizer::new(config);
1341
1342        // Add some backend performance data
1343        let metrics = StreamingMetrics::default();
1344        optimizer
1345            .update_backend_performance(BackendType::Kafka, &metrics)
1346            .await
1347            .unwrap();
1348        optimizer
1349            .update_backend_performance(BackendType::Nats, &metrics)
1350            .await
1351            .unwrap();
1352
1353        let pattern = WorkloadPattern::default();
1354        let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1355
1356        assert!(recommendations.len() >= 2);
1357        assert!(recommendations[0].score >= recommendations[1].score);
1358    }
1359
1360    #[tokio::test]
1361    async fn test_ml_predictor() {
1362        let mut predictor = MLPredictor::new();
1363
1364        let data_point = PerformanceDataPoint {
1365            timestamp: Utc::now(),
1366            backend_type: BackendType::Kafka,
1367            workload_pattern: WorkloadPattern::default(),
1368            actual_latency: 50.0,
1369            actual_throughput: 1000.0,
1370            actual_reliability: 0.99,
1371            resource_usage: ResourceUsage::default(),
1372            external_factors: HashMap::new(),
1373        };
1374
1375        predictor.add_training_data(data_point).await.unwrap();
1376        assert_eq!(predictor.performance_history.len(), 1);
1377    }
1378
1379    #[test]
1380    fn test_pattern_similarity() {
1381        let predictor = MLPredictor::new();
1382        let pattern1 = WorkloadPattern {
1383            event_rate: 100.0,
1384            pattern_type: PatternType::Steady,
1385            ..Default::default()
1386        };
1387        let pattern2 = WorkloadPattern {
1388            event_rate: 110.0,
1389            pattern_type: PatternType::Steady,
1390            ..Default::default()
1391        };
1392
1393        let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1394        assert!(similarity > 0.8);
1395    }
1396
1397    #[tokio::test]
1398    async fn test_workload_pattern_classification() {
1399        // Use shorter analysis window for testing
1400        let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1401
1402        // Test real-time pattern (low rate) - create events with different timestamps
1403        let mut events = Vec::new();
1404        let base_time = Utc::now();
1405        for i in 0..10 {
1406            let mut event = create_test_event();
1407            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1408                metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1409            }
1410            events.push(event);
1411        }
1412        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1413        // With 10 events in 30 seconds = 0.33 events/sec, should be RealTime
1414        assert!(matches!(
1415            pattern.pattern_type,
1416            PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1417        ));
1418
1419        // Test batch pattern (high rate) - create many events with varied timestamps
1420        let mut events = Vec::new();
1421        let base_time = Utc::now();
1422        // Create 3000+ events to ensure high rate (3000/30 = 100+ events/sec)
1423        for i in 0..3500 {
1424            let mut event = create_test_event();
1425            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1426                metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1427            }
1428            events.push(event);
1429        }
1430        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1431        // With 3500 events in 30 seconds = 116.67 events/sec, should be > 100
1432        assert!(pattern.event_rate > 100.0);
1433    }
1434
1435    #[test]
1436    fn test_backend_strengths_analysis() {
1437        let config = OptimizerConfig::default();
1438        let optimizer = BackendOptimizer::new(config);
1439        let pattern = WorkloadPattern {
1440            pattern_type: PatternType::RealTime,
1441            consistency_requirements: ConsistencyLevel::ExactlyOnce,
1442            ..Default::default()
1443        };
1444
1445        let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1446        assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1447
1448        let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1449        assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1450    }
1451
1452    #[test]
1453    fn test_config_serialization() {
1454        let config = OptimizerConfig::default();
1455        let serialized = serde_json::to_string(&config).unwrap();
1456        let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1457
1458        assert_eq!(
1459            config.enable_cost_modeling,
1460            deserialized.enable_cost_modeling
1461        );
1462        assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1463    }
1464}