oxirs_stream/
backend_optimizer.rs

1//! # Backend Optimization and Selection
2//!
3//! Advanced backend selection algorithms, cost modeling, and ML-driven optimization
4//! for choosing the optimal streaming backend based on workload patterns and performance metrics.
5
6use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18/// Backend optimization configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21    pub enable_cost_modeling: bool,
22    pub enable_ml_prediction: bool,
23    pub enable_pattern_analysis: bool,
24    pub optimization_interval: Duration,
25    pub min_samples_for_prediction: usize,
26    pub cost_weight_latency: f64,
27    pub cost_weight_throughput: f64,
28    pub cost_weight_reliability: f64,
29    pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33    fn default() -> Self {
34        Self {
35            enable_cost_modeling: true,
36            enable_ml_prediction: true,
37            enable_pattern_analysis: true,
38            optimization_interval: Duration::from_secs(300), // 5 minutes
39            min_samples_for_prediction: 100,
40            cost_weight_latency: 0.3,
41            cost_weight_throughput: 0.3,
42            cost_weight_reliability: 0.3,
43            cost_weight_resource_usage: 0.1,
44        }
45    }
46}
47
48/// Workload pattern characteristics
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51    pub pattern_type: PatternType,
52    pub event_rate: f64,
53    pub batch_size: u32,
54    pub event_size_bytes: u64,
55    pub temporal_distribution: TemporalDistribution,
56    pub data_characteristics: DataCharacteristics,
57    pub consistency_requirements: ConsistencyLevel,
58}
59
60/// Pattern types for workload classification
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63    /// Steady, predictable load
64    Steady,
65    /// Variable load with spikes
66    Bursty,
67    /// Seasonal patterns
68    Seasonal,
69    /// Random/unpredictable patterns
70    Random,
71    /// Real-time processing requirements
72    RealTime,
73    /// Batch processing oriented
74    BatchOriented,
75}
76
77/// Temporal distribution patterns
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80    Uniform,
81    Normal { mean: f64, std_dev: f64 },
82    Exponential { lambda: f64 },
83    Poisson { lambda: f64 },
84    Custom { distribution_name: String },
85}
86
87/// Data characteristics affecting backend choice
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90    pub compression_ratio: f64,
91    pub serialization_overhead: f64,
92    pub has_complex_structures: bool,
93    pub requires_ordering: bool,
94    pub has_time_windows: bool,
95    pub requires_deduplication: bool,
96}
97
98/// Consistency level requirements
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101    /// At most once delivery
102    AtMostOnce,
103    /// At least once delivery
104    AtLeastOnce,
105    /// Exactly once delivery
106    ExactlyOnce,
107    /// Session consistency
108    Session,
109    /// Strong consistency
110    Strong,
111}
112
113/// Backend performance characteristics
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116    pub backend_type: BackendType,
117    pub measured_latency_p50: f64,
118    pub measured_latency_p95: f64,
119    pub measured_latency_p99: f64,
120    pub measured_throughput: f64,
121    pub reliability_score: f64,
122    pub resource_usage: ResourceUsage,
123    pub cost_per_hour: f64,
124    pub setup_complexity: u8, // 1-10 scale
125    pub scalability_factor: f64,
126    pub last_updated: DateTime<Utc>,
127    pub sample_count: u64,
128}
129
130/// Resource usage metrics
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133    pub cpu_usage_percent: f64,
134    pub memory_usage_mb: f64,
135    pub network_usage_mbps: f64,
136    pub disk_io_ops_per_sec: f64,
137    pub connection_count: u32,
138}
139
140/// Cost model for backend selection
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143    pub total_cost: f64,
144    pub latency_cost: f64,
145    pub throughput_cost: f64,
146    pub reliability_cost: f64,
147    pub resource_cost: f64,
148    pub scaling_cost: f64,
149    pub maintenance_cost: f64,
150}
151
152/// ML prediction model for performance forecasting
153#[derive(Debug, Clone)]
154pub struct MLPredictor {
155    /// Historical performance data
156    performance_history: Vec<PerformanceDataPoint>,
157    /// Learned patterns
158    patterns: HashMap<String, PatternModel>,
159    /// Feature weights
160    _feature_weights: Vec<f64>,
161    /// Prediction confidence threshold
162    _confidence_threshold: f64,
163}
164
165/// Single performance data point for ML training
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168    pub timestamp: DateTime<Utc>,
169    pub backend_type: BackendType,
170    pub workload_pattern: WorkloadPattern,
171    pub actual_latency: f64,
172    pub actual_throughput: f64,
173    pub actual_reliability: f64,
174    pub resource_usage: ResourceUsage,
175    pub external_factors: HashMap<String, f64>,
176}
177
178/// Learned pattern model for ML predictions
179#[derive(Debug, Clone)]
180pub struct PatternModel {
181    pub pattern_name: String,
182    pub coefficients: Vec<f64>,
183    pub intercept: f64,
184    pub confidence: f64,
185    pub last_trained: DateTime<Utc>,
186    pub sample_count: usize,
187}
188
189/// Backend optimizer for intelligent backend selection
190pub struct BackendOptimizer {
191    config: OptimizerConfig,
192    backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193    pattern_analyzer: PatternAnalyzer,
194    cost_calculator: CostCalculator,
195    ml_predictor: Option<MLPredictor>,
196    optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199/// Pattern analyzer for workload classification
200pub struct PatternAnalyzer {
201    event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202    _pattern_cache: HashMap<String, WorkloadPattern>,
203    analysis_window: ChronoDuration,
204}
205
206/// Cost calculator for backend evaluation
207pub struct CostCalculator {
208    config: OptimizerConfig,
209    baseline_costs: HashMap<BackendType, f64>,
210}
211
212/// Optimization decision record
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215    pub timestamp: DateTime<Utc>,
216    pub selected_backend: BackendType,
217    pub workload_pattern: WorkloadPattern,
218    pub predicted_performance: BackendPerformance,
219    pub cost_model: CostModel,
220    pub confidence: f64,
221    pub reason: String,
222}
223
224/// Backend recommendation with ranking
225#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227    pub backend_type: BackendType,
228    pub score: f64,
229    pub predicted_latency: f64,
230    pub predicted_throughput: f64,
231    pub predicted_cost: f64,
232    pub confidence: f64,
233    pub strengths: Vec<String>,
234    pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238    /// Create a new backend optimizer
239    pub fn new(config: OptimizerConfig) -> Self {
240        let ml_predictor = if config.enable_ml_prediction {
241            Some(MLPredictor::new())
242        } else {
243            None
244        };
245
246        Self {
247            pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248            cost_calculator: CostCalculator::new(config.clone()),
249            backend_performance: Arc::new(RwLock::new(HashMap::new())),
250            optimization_history: Arc::new(RwLock::new(Vec::new())),
251            config,
252            ml_predictor,
253        }
254    }
255
256    /// Update backend performance metrics
257    pub async fn update_backend_performance(
258        &self,
259        backend_type: BackendType,
260        metrics: &StreamingMetrics,
261    ) -> Result<()> {
262        let mut performance_map = self.backend_performance.write().await;
263
264        let performance = performance_map
265            .entry(backend_type.clone())
266            .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268        // Update measured performance with exponential moving average
269        let alpha = 0.1; // Smoothing factor
270        performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271            + (1.0 - alpha) * performance.measured_latency_p50;
272        performance.measured_throughput = alpha * metrics.producer_throughput_eps
273            + (1.0 - alpha) * performance.measured_throughput;
274        performance.reliability_score =
275            alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277        performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278        performance.resource_usage.memory_usage_mb =
279            metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280        performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282        performance.last_updated = Utc::now();
283        performance.sample_count += 1;
284
285        debug!(
286            "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287            backend_type,
288            performance.measured_latency_p50,
289            performance.measured_throughput,
290            performance.reliability_score
291        );
292
293        Ok(())
294    }
295
296    /// Analyze workload pattern from recent events
297    pub async fn analyze_workload_pattern(
298        &mut self,
299        events: &[StreamEvent],
300    ) -> Result<WorkloadPattern> {
301        self.pattern_analyzer.analyze_pattern(events).await
302    }
303
304    /// Get optimal backend recommendation for given workload
305    pub async fn recommend_backend(
306        &self,
307        pattern: &WorkloadPattern,
308    ) -> Result<Vec<BackendRecommendation>> {
309        let mut recommendations = Vec::new();
310        let performance_map = self.backend_performance.read().await;
311
312        for (backend_type, performance) in performance_map.iter() {
313            let cost = self
314                .cost_calculator
315                .calculate_cost(backend_type, pattern, performance)
316                .await?;
317
318            let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319                predictor.predict_performance(backend_type, pattern).await?
320            } else {
321                performance.clone()
322            };
323
324            let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325            let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327            let recommendation = BackendRecommendation {
328                backend_type: backend_type.clone(),
329                score,
330                predicted_latency: predicted_performance.measured_latency_p50,
331                predicted_throughput: predicted_performance.measured_throughput,
332                predicted_cost: cost.total_cost,
333                confidence,
334                strengths: self.analyze_strengths(backend_type, pattern),
335                weaknesses: self.analyze_weaknesses(backend_type, pattern),
336            };
337
338            recommendations.push(recommendation);
339        }
340
341        // Sort by score (higher is better)
342        recommendations.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
343
344        info!(
345            "Generated {} backend recommendations for workload pattern: {:?}",
346            recommendations.len(),
347            pattern.pattern_type
348        );
349
350        Ok(recommendations)
351    }
352
353    /// Train ML predictor with new performance data
354    pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
355        if let Some(predictor) = &mut self.ml_predictor {
356            predictor.add_training_data(data_point).await?;
357
358            if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
359                predictor.retrain_models().await?;
360            }
361        }
362        Ok(())
363    }
364
365    /// Record optimization decision
366    pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
367        let mut history = self.optimization_history.write().await;
368        history.push(decision);
369
370        // Keep only recent decisions (last 1000)
371        if history.len() > 1000 {
372            history.drain(0..100);
373        }
374        Ok(())
375    }
376
377    /// Get optimization statistics
378    pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
379        let history = self.optimization_history.read().await;
380        let performance_map = self.backend_performance.read().await;
381
382        let total_decisions = history.len();
383        let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
384            *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
385            acc
386        });
387
388        let average_confidence = if total_decisions > 0 {
389            history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
390        } else {
391            0.0
392        };
393
394        let performance_improvements = self.calculate_performance_improvements(&history).await?;
395
396        Ok(OptimizationStats {
397            total_decisions,
398            backend_usage,
399            average_confidence,
400            performance_improvements,
401            active_backends: performance_map.len(),
402            last_optimization: history.last().map(|d| d.timestamp),
403        })
404    }
405
406    /// Calculate backend score based on cost model and performance
407    fn calculate_backend_score(
408        &self,
409        cost: &CostModel,
410        performance: &BackendPerformance,
411        pattern: &WorkloadPattern,
412    ) -> f64 {
413        let latency_score = match pattern.pattern_type {
414            PatternType::RealTime => {
415                // For real-time, heavily penalize high latency
416                if performance.measured_latency_p99 < 10.0 {
417                    1.0
418                } else if performance.measured_latency_p99 < 50.0 {
419                    0.7
420                } else {
421                    0.3
422                }
423            }
424            _ => {
425                // For other patterns, moderate latency tolerance
426                (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
427            }
428        };
429
430        let throughput_score = match pattern.pattern_type {
431            PatternType::BatchOriented => {
432                // Batch processing values high throughput
433                (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
434            }
435            _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
436        };
437
438        let reliability_score = performance.reliability_score;
439        let cost_score = 1.0 / (cost.total_cost + 1.0);
440
441        // Weighted combination
442        (latency_score * self.config.cost_weight_latency
443            + throughput_score * self.config.cost_weight_throughput
444            + reliability_score * self.config.cost_weight_reliability
445            + cost_score * self.config.cost_weight_resource_usage)
446            / (self.config.cost_weight_latency
447                + self.config.cost_weight_throughput
448                + self.config.cost_weight_reliability
449                + self.config.cost_weight_resource_usage)
450    }
451
452    /// Calculate confidence in prediction
453    fn calculate_confidence(
454        &self,
455        performance: &BackendPerformance,
456        _pattern: &WorkloadPattern,
457    ) -> f64 {
458        let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
459        let recency_confidence = {
460            let age_hours = Utc::now()
461                .signed_duration_since(performance.last_updated)
462                .num_hours() as f64;
463            (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
464        };
465
466        (sample_confidence + recency_confidence) / 2.0
467    }
468
469    /// Analyze backend strengths for given pattern
470    fn analyze_strengths(
471        &self,
472        backend_type: &BackendType,
473        pattern: &WorkloadPattern,
474    ) -> Vec<String> {
475        let mut strengths = Vec::new();
476
477        match backend_type {
478            BackendType::Kafka => {
479                strengths.push("High throughput".to_string());
480                strengths.push("Strong durability".to_string());
481                strengths.push("Excellent ordering guarantees".to_string());
482                if matches!(
483                    pattern.consistency_requirements,
484                    ConsistencyLevel::ExactlyOnce
485                ) {
486                    strengths.push("Exactly-once semantics".to_string());
487                }
488            }
489            BackendType::Nats => {
490                strengths.push("Low latency".to_string());
491                strengths.push("Simple setup".to_string());
492                strengths.push("Built-in clustering".to_string());
493                if matches!(pattern.pattern_type, PatternType::RealTime) {
494                    strengths.push("Real-time performance".to_string());
495                }
496            }
497            BackendType::Redis => {
498                strengths.push("In-memory speed".to_string());
499                strengths.push("Low latency".to_string());
500                strengths.push("Rich data structures".to_string());
501            }
502            BackendType::Kinesis => {
503                strengths.push("AWS native integration".to_string());
504                strengths.push("Auto-scaling".to_string());
505                strengths.push("Pay-per-use model".to_string());
506            }
507            BackendType::Pulsar => {
508                strengths.push("Multi-tenancy".to_string());
509                strengths.push("Geo-replication".to_string());
510                strengths.push("Unified messaging".to_string());
511            }
512            BackendType::Memory => {
513                strengths.push("Zero latency".to_string());
514                strengths.push("Perfect for testing".to_string());
515            }
516        }
517
518        strengths
519    }
520
521    /// Analyze backend weaknesses for given pattern
522    fn analyze_weaknesses(
523        &self,
524        backend_type: &BackendType,
525        pattern: &WorkloadPattern,
526    ) -> Vec<String> {
527        let mut weaknesses = Vec::new();
528
529        match backend_type {
530            BackendType::Kafka => {
531                weaknesses.push("Complex setup".to_string());
532                weaknesses.push("Higher resource usage".to_string());
533                if matches!(pattern.pattern_type, PatternType::RealTime) {
534                    weaknesses.push("Higher latency than NATS".to_string());
535                }
536            }
537            BackendType::Nats => {
538                if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
539                    weaknesses.push("Limited durability options".to_string());
540                }
541                if pattern.event_rate > 100000.0 {
542                    weaknesses.push("May not handle extreme throughput".to_string());
543                }
544            }
545            BackendType::Redis => {
546                weaknesses.push("Memory-bound".to_string());
547                weaknesses.push("Limited durability".to_string());
548                if pattern.event_size_bytes > 1000000 {
549                    weaknesses.push("Not suitable for large events".to_string());
550                }
551            }
552            BackendType::Kinesis => {
553                weaknesses.push("AWS vendor lock-in".to_string());
554                weaknesses.push("Cost can scale quickly".to_string());
555                weaknesses.push("Regional limitations".to_string());
556            }
557            BackendType::Pulsar => {
558                weaknesses.push("Newer ecosystem".to_string());
559                weaknesses.push("Complex architecture".to_string());
560            }
561            BackendType::Memory => {
562                weaknesses.push("No persistence".to_string());
563                weaknesses.push("Single node only".to_string());
564                weaknesses.push("Memory limitations".to_string());
565            }
566        }
567
568        weaknesses
569    }
570
571    /// Calculate performance improvements over time
572    async fn calculate_performance_improvements(
573        &self,
574        history: &[OptimizationDecision],
575    ) -> Result<HashMap<String, f64>> {
576        let mut improvements = HashMap::new();
577
578        if history.len() < 10 {
579            return Ok(improvements);
580        }
581
582        let recent_decisions = &history[history.len() - 10..];
583        let older_decisions = &history[0..10.min(history.len() - 10)];
584
585        let recent_avg_latency = recent_decisions
586            .iter()
587            .map(|d| d.predicted_performance.measured_latency_p50)
588            .sum::<f64>()
589            / recent_decisions.len() as f64;
590
591        let older_avg_latency = older_decisions
592            .iter()
593            .map(|d| d.predicted_performance.measured_latency_p50)
594            .sum::<f64>()
595            / older_decisions.len() as f64;
596
597        let latency_improvement =
598            (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
599        improvements.insert(
600            "latency_improvement_percent".to_string(),
601            latency_improvement,
602        );
603
604        let recent_avg_throughput = recent_decisions
605            .iter()
606            .map(|d| d.predicted_performance.measured_throughput)
607            .sum::<f64>()
608            / recent_decisions.len() as f64;
609
610        let older_avg_throughput = older_decisions
611            .iter()
612            .map(|d| d.predicted_performance.measured_throughput)
613            .sum::<f64>()
614            / older_decisions.len() as f64;
615
616        let throughput_improvement =
617            (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
618        improvements.insert(
619            "throughput_improvement_percent".to_string(),
620            throughput_improvement,
621        );
622
623        Ok(improvements)
624    }
625}
626
627impl PatternAnalyzer {
628    pub fn new(analysis_window: ChronoDuration) -> Self {
629        Self {
630            event_history: Vec::new(),
631            _pattern_cache: HashMap::new(),
632            analysis_window,
633        }
634    }
635
636    pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
637        // Add events to history
638        let now = Utc::now();
639        for event in events {
640            self.event_history.push((now, event.clone()));
641        }
642
643        // Remove old events outside analysis window
644        let cutoff = now - self.analysis_window;
645        self.event_history
646            .retain(|(timestamp, _)| *timestamp >= cutoff);
647
648        if self.event_history.is_empty() {
649            return Ok(WorkloadPattern::default());
650        }
651
652        // Calculate event rate
653        let duration_seconds = self.analysis_window.num_seconds() as f64;
654        let event_rate = self.event_history.len() as f64 / duration_seconds;
655
656        // Analyze temporal distribution
657        let temporal_distribution = self.analyze_temporal_distribution().await?;
658
659        // Determine pattern type
660        let pattern_type = self
661            .classify_pattern_type(event_rate, &temporal_distribution)
662            .await?;
663
664        // Calculate average event size
665        let avg_event_size = self.calculate_average_event_size().await?;
666
667        // Analyze data characteristics
668        let data_characteristics = self.analyze_data_characteristics().await?;
669
670        // Determine consistency requirements based on event types
671        let consistency_requirements = self.determine_consistency_requirements().await?;
672
673        Ok(WorkloadPattern {
674            pattern_type,
675            event_rate,
676            batch_size: self.estimate_optimal_batch_size(event_rate),
677            event_size_bytes: avg_event_size,
678            temporal_distribution,
679            data_characteristics,
680            consistency_requirements,
681        })
682    }
683
684    async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
685        if self.event_history.len() < 10 {
686            return Ok(TemporalDistribution::Uniform);
687        }
688
689        // Calculate inter-arrival times
690        let mut intervals = Vec::new();
691        for i in 1..self.event_history.len() {
692            let interval = self.event_history[i]
693                .0
694                .signed_duration_since(self.event_history[i - 1].0)
695                .num_milliseconds() as f64;
696            intervals.push(interval);
697        }
698
699        // Calculate basic statistics
700        let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
701        let variance =
702            intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
703        let std_dev = variance.sqrt();
704
705        // Simple distribution classification
706        let cv = std_dev / mean; // Coefficient of variation
707
708        if cv < 0.1 {
709            Ok(TemporalDistribution::Uniform)
710        } else if cv < 0.5 {
711            Ok(TemporalDistribution::Normal { mean, std_dev })
712        } else {
713            Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
714        }
715    }
716
717    async fn classify_pattern_type(
718        &self,
719        event_rate: f64,
720        temporal_dist: &TemporalDistribution,
721    ) -> Result<PatternType> {
722        // Simple heuristic-based classification
723        match temporal_dist {
724            TemporalDistribution::Uniform => {
725                if event_rate > 10000.0 {
726                    Ok(PatternType::BatchOriented)
727                } else if event_rate > 100.0 {
728                    Ok(PatternType::Steady)
729                } else {
730                    Ok(PatternType::RealTime)
731                }
732            }
733            TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
734            TemporalDistribution::Normal { std_dev, mean } => {
735                if std_dev / mean > 1.0 {
736                    Ok(PatternType::Random)
737                } else {
738                    Ok(PatternType::Steady)
739                }
740            }
741            _ => Ok(PatternType::Steady),
742        }
743    }
744
745    async fn calculate_average_event_size(&self) -> Result<u64> {
746        if self.event_history.is_empty() {
747            return Ok(1024); // Default 1KB
748        }
749
750        // Estimate serialized size (simplified)
751        let avg_size = self
752            .event_history
753            .iter()
754            .map(|(_, event)| self.estimate_event_size(event))
755            .sum::<u64>()
756            / self.event_history.len() as u64;
757
758        Ok(avg_size)
759    }
760
761    fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
762        // Simplified size estimation based on event type
763        match event {
764            StreamEvent::TripleAdded {
765                subject,
766                predicate,
767                object,
768                ..
769            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
770            StreamEvent::TripleRemoved {
771                subject,
772                predicate,
773                object,
774                ..
775            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
776            StreamEvent::GraphCreated { .. } => 200,
777            StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
778            StreamEvent::TransactionBegin { .. } => 150,
779            StreamEvent::TransactionCommit { .. } => 100,
780            StreamEvent::Heartbeat { .. } => 50,
781            _ => 300, // Default for complex events
782        }
783    }
784
785    async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
786        let has_complex_structures = self
787            .event_history
788            .iter()
789            .any(|(_, event)| self.is_complex_event(event));
790
791        let requires_ordering = self
792            .event_history
793            .iter()
794            .any(|(_, event)| self.requires_ordering(event));
795
796        Ok(DataCharacteristics {
797            compression_ratio: 0.7,      // Assume 30% compression
798            serialization_overhead: 0.1, // 10% overhead
799            has_complex_structures,
800            requires_ordering,
801            has_time_windows: false,      // Simplified
802            requires_deduplication: true, // Conservative default
803        })
804    }
805
806    fn is_complex_event(&self, event: &StreamEvent) -> bool {
807        matches!(
808            event,
809            StreamEvent::SparqlUpdate { .. }
810                | StreamEvent::SchemaChanged { .. }
811                | StreamEvent::QueryCompleted { .. }
812        )
813    }
814
815    fn requires_ordering(&self, event: &StreamEvent) -> bool {
816        matches!(
817            event,
818            StreamEvent::TransactionBegin { .. }
819                | StreamEvent::TransactionCommit { .. }
820                | StreamEvent::TransactionAbort { .. }
821        )
822    }
823
824    async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
825        let has_transactions = self.event_history.iter().any(|(_, event)| {
826            matches!(
827                event,
828                StreamEvent::TransactionBegin { .. }
829                    | StreamEvent::TransactionCommit { .. }
830                    | StreamEvent::TransactionAbort { .. }
831            )
832        });
833
834        if has_transactions {
835            Ok(ConsistencyLevel::ExactlyOnce)
836        } else {
837            Ok(ConsistencyLevel::AtLeastOnce)
838        }
839    }
840
841    fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
842        if event_rate > 10000.0 {
843            1000
844        } else if event_rate > 1000.0 {
845            500
846        } else if event_rate > 100.0 {
847            100
848        } else {
849            10
850        }
851    }
852}
853
854impl CostCalculator {
855    pub fn new(config: OptimizerConfig) -> Self {
856        let mut baseline_costs = HashMap::new();
857
858        // Baseline hourly costs (normalized)
859        baseline_costs.insert(BackendType::Memory, 0.0);
860        baseline_costs.insert(BackendType::Redis, 0.1);
861        baseline_costs.insert(BackendType::Nats, 0.2);
862        baseline_costs.insert(BackendType::Kafka, 0.5);
863        baseline_costs.insert(BackendType::Pulsar, 0.4);
864        baseline_costs.insert(BackendType::Kinesis, 0.8);
865
866        Self {
867            config,
868            baseline_costs,
869        }
870    }
871
872    pub async fn calculate_cost(
873        &self,
874        backend_type: &BackendType,
875        pattern: &WorkloadPattern,
876        performance: &BackendPerformance,
877    ) -> Result<CostModel> {
878        let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
879
880        // Calculate component costs
881        let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
882        let throughput_cost =
883            self.calculate_throughput_cost(performance.measured_throughput, pattern);
884        let reliability_cost =
885            self.calculate_reliability_cost(performance.reliability_score, pattern);
886        let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
887        let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
888        let maintenance_cost =
889            self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
890
891        let total_cost = base_cost
892            + latency_cost * self.config.cost_weight_latency
893            + throughput_cost * self.config.cost_weight_throughput
894            + reliability_cost * self.config.cost_weight_reliability
895            + resource_cost * self.config.cost_weight_resource_usage
896            + scaling_cost * 0.1
897            + maintenance_cost * 0.1;
898
899        Ok(CostModel {
900            total_cost,
901            latency_cost,
902            throughput_cost,
903            reliability_cost,
904            resource_cost,
905            scaling_cost,
906            maintenance_cost,
907        })
908    }
909
910    fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
911        let latency_penalty = match pattern.pattern_type {
912            PatternType::RealTime => latency / 10.0, // Heavy penalty for real-time
913            PatternType::Bursty => latency / 50.0,
914            _ => latency / 100.0,
915        };
916        latency_penalty.min(2.0) // Cap at 2x cost
917    }
918
919    fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
920        let required_throughput = pattern.event_rate * 1.5; // 50% buffer
921        if throughput < required_throughput {
922            (required_throughput - throughput) / required_throughput
923        } else {
924            0.0
925        }
926    }
927
928    fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
929        let required_reliability = match pattern.consistency_requirements {
930            ConsistencyLevel::ExactlyOnce => 0.999,
931            ConsistencyLevel::AtLeastOnce => 0.995,
932            _ => 0.99,
933        };
934
935        if reliability < required_reliability {
936            (required_reliability - reliability) * 10.0
937        } else {
938            0.0
939        }
940    }
941
942    fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
943        // Normalize resource usage to cost
944        (usage.cpu_usage_percent / 100.0) * 0.1
945            + (usage.memory_usage_mb / 1000.0) * 0.05
946            + (usage.network_usage_mbps / 100.0) * 0.02
947    }
948
949    fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
950        let scaling_factor = match backend_type {
951            BackendType::Kinesis => 0.1, // Auto-scaling
952            BackendType::Kafka => 0.5,   // Manual scaling
953            BackendType::Memory => 1.0,  // No scaling
954            _ => 0.3,
955        };
956
957        match pattern.pattern_type {
958            PatternType::Bursty | PatternType::Random => scaling_factor,
959            _ => 0.0,
960        }
961    }
962
963    fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
964        setup_complexity as f64 / 10.0
965    }
966}
967
968impl Default for MLPredictor {
969    fn default() -> Self {
970        Self::new()
971    }
972}
973
974impl MLPredictor {
975    pub fn new() -> Self {
976        Self {
977            performance_history: Vec::new(),
978            patterns: HashMap::new(),
979            _feature_weights: vec![1.0; 10], // Start with equal weights
980            _confidence_threshold: 0.7,
981        }
982    }
983
984    pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
985        self.performance_history.push(data_point);
986
987        // Keep only recent data (last 10,000 points)
988        if self.performance_history.len() > 10000 {
989            self.performance_history.drain(0..1000);
990        }
991
992        Ok(())
993    }
994
995    pub async fn predict_performance(
996        &self,
997        backend_type: &BackendType,
998        pattern: &WorkloadPattern,
999    ) -> Result<BackendPerformance> {
1000        // Filter relevant historical data
1001        let relevant_data: Vec<&PerformanceDataPoint> = self
1002            .performance_history
1003            .iter()
1004            .filter(|dp| dp.backend_type == *backend_type)
1005            .collect();
1006
1007        if relevant_data.is_empty() {
1008            return Err(anyhow!(
1009                "No historical data for backend type: {:?}",
1010                backend_type
1011            ));
1012        }
1013
1014        // Simple prediction based on similar patterns
1015        let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1016            .iter()
1017            .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1018            .cloned()
1019            .collect();
1020
1021        let prediction_data = if similar_data.is_empty() {
1022            &relevant_data
1023        } else {
1024            &similar_data
1025        };
1026
1027        // Calculate weighted averages
1028        let predicted_latency = prediction_data
1029            .iter()
1030            .map(|dp| dp.actual_latency)
1031            .sum::<f64>()
1032            / prediction_data.len() as f64;
1033
1034        let predicted_throughput = prediction_data
1035            .iter()
1036            .map(|dp| dp.actual_throughput)
1037            .sum::<f64>()
1038            / prediction_data.len() as f64;
1039
1040        let predicted_reliability = prediction_data
1041            .iter()
1042            .map(|dp| dp.actual_reliability)
1043            .sum::<f64>()
1044            / prediction_data.len() as f64;
1045
1046        Ok(BackendPerformance {
1047            backend_type: backend_type.clone(),
1048            measured_latency_p50: predicted_latency,
1049            measured_latency_p95: predicted_latency * 1.5,
1050            measured_latency_p99: predicted_latency * 2.0,
1051            measured_throughput: predicted_throughput,
1052            reliability_score: predicted_reliability,
1053            resource_usage: prediction_data[0].resource_usage.clone(),
1054            cost_per_hour: 0.0,  // Will be calculated by cost model
1055            setup_complexity: 5, // Default
1056            scalability_factor: 1.0,
1057            last_updated: Utc::now(),
1058            sample_count: prediction_data.len() as u64,
1059        })
1060    }
1061
1062    pub async fn retrain_models(&mut self) -> Result<()> {
1063        // Simple retraining using linear regression
1064        for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1065            let backend_data: Vec<&PerformanceDataPoint> = self
1066                .performance_history
1067                .iter()
1068                .filter(|dp| dp.backend_type == *backend_type)
1069                .collect();
1070
1071            if backend_data.len() < 10 {
1072                continue;
1073            }
1074
1075            // Create pattern model for this backend
1076            let pattern_name = format!("{backend_type:?}_model");
1077            let model = self.train_linear_model(&backend_data).await?;
1078            self.patterns.insert(pattern_name, model);
1079        }
1080
1081        info!("Retrained ML models for {} patterns", self.patterns.len());
1082        Ok(())
1083    }
1084
1085    async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1086        // Simplified linear regression implementation
1087        let n = data.len() as f64;
1088
1089        // Extract features and target (latency)
1090        let features: Vec<Vec<f64>> = data
1091            .iter()
1092            .map(|dp| self.extract_features(&dp.workload_pattern))
1093            .collect();
1094
1095        let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1096
1097        // Simple linear regression: y = mx + b
1098        let feature_count = features[0].len();
1099        let mut coefficients = vec![0.0; feature_count];
1100        let intercept = targets.iter().sum::<f64>() / n;
1101
1102        // Calculate correlations (simplified)
1103        for i in 0..feature_count {
1104            let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1105            let correlation = self.calculate_correlation(&feature_values, &targets);
1106            coefficients[i] = correlation * 0.1; // Simplified coefficient
1107        }
1108
1109        Ok(PatternModel {
1110            pattern_name: "latency_model".to_string(),
1111            coefficients,
1112            intercept,
1113            confidence: 0.8, // Default confidence
1114            last_trained: Utc::now(),
1115            sample_count: data.len(),
1116        })
1117    }
1118
1119    fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1120        vec![
1121            pattern.event_rate,
1122            pattern.batch_size as f64,
1123            pattern.event_size_bytes as f64,
1124            pattern.data_characteristics.compression_ratio,
1125            pattern.data_characteristics.serialization_overhead,
1126            if pattern.data_characteristics.has_complex_structures {
1127                1.0
1128            } else {
1129                0.0
1130            },
1131            if pattern.data_characteristics.requires_ordering {
1132                1.0
1133            } else {
1134                0.0
1135            },
1136            match pattern.pattern_type {
1137                PatternType::RealTime => 1.0,
1138                PatternType::BatchOriented => 2.0,
1139                PatternType::Bursty => 3.0,
1140                _ => 0.0,
1141            },
1142            match pattern.consistency_requirements {
1143                ConsistencyLevel::ExactlyOnce => 3.0,
1144                ConsistencyLevel::AtLeastOnce => 2.0,
1145                _ => 1.0,
1146            },
1147            1.0, // Bias term
1148        ]
1149    }
1150
1151    fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1152        let rate_similarity =
1153            1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1154        let size_similarity = 1.0
1155            - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1156                / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1157        let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1158            == std::mem::discriminant(&p2.pattern_type)
1159        {
1160            1.0
1161        } else {
1162            0.0
1163        };
1164
1165        (rate_similarity + size_similarity + type_similarity) / 3.0
1166    }
1167
1168    fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1169        let n = x.len() as f64;
1170        let mean_x = x.iter().sum::<f64>() / n;
1171        let mean_y = y.iter().sum::<f64>() / n;
1172
1173        let numerator: f64 = x
1174            .iter()
1175            .zip(y.iter())
1176            .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1177            .sum();
1178
1179        let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1180        let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1181
1182        if denom_x * denom_y == 0.0 {
1183            0.0
1184        } else {
1185            numerator / (denom_x * denom_y).sqrt()
1186        }
1187    }
1188}
1189
1190impl BackendPerformance {
1191    pub fn new(backend_type: BackendType) -> Self {
1192        Self {
1193            backend_type,
1194            measured_latency_p50: 100.0, // Default 100ms
1195            measured_latency_p95: 200.0,
1196            measured_latency_p99: 500.0,
1197            measured_throughput: 1000.0, // Default 1000 eps
1198            reliability_score: 0.99,
1199            resource_usage: ResourceUsage::default(),
1200            cost_per_hour: 0.1,
1201            setup_complexity: 5,
1202            scalability_factor: 1.0,
1203            last_updated: Utc::now(),
1204            sample_count: 0,
1205        }
1206    }
1207}
1208
1209impl Default for ResourceUsage {
1210    fn default() -> Self {
1211        Self {
1212            cpu_usage_percent: 10.0,
1213            memory_usage_mb: 100.0,
1214            network_usage_mbps: 1.0,
1215            disk_io_ops_per_sec: 100.0,
1216            connection_count: 10,
1217        }
1218    }
1219}
1220
1221impl Default for WorkloadPattern {
1222    fn default() -> Self {
1223        Self {
1224            pattern_type: PatternType::Steady,
1225            event_rate: 100.0,
1226            batch_size: 100,
1227            event_size_bytes: 1024,
1228            temporal_distribution: TemporalDistribution::Uniform,
1229            data_characteristics: DataCharacteristics {
1230                compression_ratio: 0.7,
1231                serialization_overhead: 0.1,
1232                has_complex_structures: false,
1233                requires_ordering: false,
1234                has_time_windows: false,
1235                requires_deduplication: true,
1236            },
1237            consistency_requirements: ConsistencyLevel::AtLeastOnce,
1238        }
1239    }
1240}
1241
1242/// Optimization statistics
1243#[derive(Debug, Clone, Serialize, Deserialize)]
1244pub struct OptimizationStats {
1245    pub total_decisions: usize,
1246    pub backend_usage: HashMap<BackendType, usize>,
1247    pub average_confidence: f64,
1248    pub performance_improvements: HashMap<String, f64>,
1249    pub active_backends: usize,
1250    pub last_optimization: Option<DateTime<Utc>>,
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255    use super::*;
1256    use crate::{EventMetadata, StreamEvent};
1257
1258    fn create_test_event() -> StreamEvent {
1259        StreamEvent::TripleAdded {
1260            subject: "http://example.org/subject".to_string(),
1261            predicate: "http://example.org/predicate".to_string(),
1262            object: "http://example.org/object".to_string(),
1263            graph: None,
1264            metadata: EventMetadata {
1265                event_id: uuid::Uuid::new_v4().to_string(),
1266                timestamp: Utc::now(),
1267                source: "test".to_string(),
1268                user: None,
1269                context: None,
1270                caused_by: None,
1271                version: "1.0".to_string(),
1272                properties: std::collections::HashMap::new(),
1273                checksum: None,
1274            },
1275        }
1276    }
1277
1278    #[tokio::test]
1279    async fn test_backend_optimizer_creation() {
1280        let config = OptimizerConfig::default();
1281        let optimizer = BackendOptimizer::new(config);
1282
1283        assert!(optimizer.ml_predictor.is_some());
1284        assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1285    }
1286
1287    #[tokio::test]
1288    async fn test_pattern_analysis() {
1289        let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1290        let events = vec![create_test_event(); 100];
1291
1292        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1293
1294        assert!(pattern.event_rate > 0.0);
1295        assert!(pattern.batch_size > 0);
1296        assert!(pattern.event_size_bytes > 0);
1297    }
1298
1299    #[tokio::test]
1300    async fn test_cost_calculation() {
1301        let config = OptimizerConfig::default();
1302        let calculator = CostCalculator::new(config);
1303        let pattern = WorkloadPattern::default();
1304        let performance = BackendPerformance::new(BackendType::Kafka);
1305
1306        let cost = calculator
1307            .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1308            .await
1309            .unwrap();
1310
1311        assert!(cost.total_cost > 0.0);
1312        assert!(cost.latency_cost >= 0.0);
1313        assert!(cost.throughput_cost >= 0.0);
1314    }
1315
1316    #[tokio::test]
1317    async fn test_backend_recommendation() {
1318        let config = OptimizerConfig {
1319            enable_ml_prediction: false, // Disable ML prediction for test
1320            ..Default::default()
1321        };
1322        let optimizer = BackendOptimizer::new(config);
1323
1324        // Add some backend performance data
1325        let metrics = StreamingMetrics::default();
1326        optimizer
1327            .update_backend_performance(BackendType::Kafka, &metrics)
1328            .await
1329            .unwrap();
1330        optimizer
1331            .update_backend_performance(BackendType::Nats, &metrics)
1332            .await
1333            .unwrap();
1334
1335        let pattern = WorkloadPattern::default();
1336        let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1337
1338        assert!(recommendations.len() >= 2);
1339        assert!(recommendations[0].score >= recommendations[1].score);
1340    }
1341
1342    #[tokio::test]
1343    async fn test_ml_predictor() {
1344        let mut predictor = MLPredictor::new();
1345
1346        let data_point = PerformanceDataPoint {
1347            timestamp: Utc::now(),
1348            backend_type: BackendType::Kafka,
1349            workload_pattern: WorkloadPattern::default(),
1350            actual_latency: 50.0,
1351            actual_throughput: 1000.0,
1352            actual_reliability: 0.99,
1353            resource_usage: ResourceUsage::default(),
1354            external_factors: HashMap::new(),
1355        };
1356
1357        predictor.add_training_data(data_point).await.unwrap();
1358        assert_eq!(predictor.performance_history.len(), 1);
1359    }
1360
1361    #[test]
1362    fn test_pattern_similarity() {
1363        let predictor = MLPredictor::new();
1364        let pattern1 = WorkloadPattern {
1365            event_rate: 100.0,
1366            pattern_type: PatternType::Steady,
1367            ..Default::default()
1368        };
1369        let pattern2 = WorkloadPattern {
1370            event_rate: 110.0,
1371            pattern_type: PatternType::Steady,
1372            ..Default::default()
1373        };
1374
1375        let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1376        assert!(similarity > 0.8);
1377    }
1378
1379    #[tokio::test]
1380    async fn test_workload_pattern_classification() {
1381        // Use shorter analysis window for testing
1382        let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1383
1384        // Test real-time pattern (low rate) - create events with different timestamps
1385        let mut events = Vec::new();
1386        let base_time = Utc::now();
1387        for i in 0..10 {
1388            let mut event = create_test_event();
1389            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1390                metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1391            }
1392            events.push(event);
1393        }
1394        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1395        // With 10 events in 30 seconds = 0.33 events/sec, should be RealTime
1396        assert!(matches!(
1397            pattern.pattern_type,
1398            PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1399        ));
1400
1401        // Test batch pattern (high rate) - create many events with varied timestamps
1402        let mut events = Vec::new();
1403        let base_time = Utc::now();
1404        // Create 3000+ events to ensure high rate (3000/30 = 100+ events/sec)
1405        for i in 0..3500 {
1406            let mut event = create_test_event();
1407            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1408                metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1409            }
1410            events.push(event);
1411        }
1412        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1413        // With 3500 events in 30 seconds = 116.67 events/sec, should be > 100
1414        assert!(pattern.event_rate > 100.0);
1415    }
1416
1417    #[test]
1418    fn test_backend_strengths_analysis() {
1419        let config = OptimizerConfig::default();
1420        let optimizer = BackendOptimizer::new(config);
1421        let pattern = WorkloadPattern {
1422            pattern_type: PatternType::RealTime,
1423            consistency_requirements: ConsistencyLevel::ExactlyOnce,
1424            ..Default::default()
1425        };
1426
1427        let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1428        assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1429
1430        let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1431        assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1432    }
1433
1434    #[test]
1435    fn test_config_serialization() {
1436        let config = OptimizerConfig::default();
1437        let serialized = serde_json::to_string(&config).unwrap();
1438        let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1439
1440        assert_eq!(
1441            config.enable_cost_modeling,
1442            deserialized.enable_cost_modeling
1443        );
1444        assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1445    }
1446}