Skip to main content

oxirs_stream/
backend_optimizer.rs

1//! # Backend Optimization and Selection
2//!
3//! Advanced backend selection algorithms, cost modeling, and ML-driven optimization
4//! for choosing the optimal streaming backend based on workload patterns and performance metrics.
5
6use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18/// Backend optimization configuration
19#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21    pub enable_cost_modeling: bool,
22    pub enable_ml_prediction: bool,
23    pub enable_pattern_analysis: bool,
24    pub optimization_interval: Duration,
25    pub min_samples_for_prediction: usize,
26    pub cost_weight_latency: f64,
27    pub cost_weight_throughput: f64,
28    pub cost_weight_reliability: f64,
29    pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33    fn default() -> Self {
34        Self {
35            enable_cost_modeling: true,
36            enable_ml_prediction: true,
37            enable_pattern_analysis: true,
38            optimization_interval: Duration::from_secs(300), // 5 minutes
39            min_samples_for_prediction: 100,
40            cost_weight_latency: 0.3,
41            cost_weight_throughput: 0.3,
42            cost_weight_reliability: 0.3,
43            cost_weight_resource_usage: 0.1,
44        }
45    }
46}
47
48/// Workload pattern characteristics
49#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51    pub pattern_type: PatternType,
52    pub event_rate: f64,
53    pub batch_size: u32,
54    pub event_size_bytes: u64,
55    pub temporal_distribution: TemporalDistribution,
56    pub data_characteristics: DataCharacteristics,
57    pub consistency_requirements: ConsistencyLevel,
58}
59
60/// Pattern types for workload classification
61#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63    /// Steady, predictable load
64    Steady,
65    /// Variable load with spikes
66    Bursty,
67    /// Seasonal patterns
68    Seasonal,
69    /// Random/unpredictable patterns
70    Random,
71    /// Real-time processing requirements
72    RealTime,
73    /// Batch processing oriented
74    BatchOriented,
75}
76
77/// Temporal distribution patterns
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80    Uniform,
81    Normal { mean: f64, std_dev: f64 },
82    Exponential { lambda: f64 },
83    Poisson { lambda: f64 },
84    Custom { distribution_name: String },
85}
86
87/// Data characteristics affecting backend choice
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90    pub compression_ratio: f64,
91    pub serialization_overhead: f64,
92    pub has_complex_structures: bool,
93    pub requires_ordering: bool,
94    pub has_time_windows: bool,
95    pub requires_deduplication: bool,
96}
97
98/// Consistency level requirements
99#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101    /// At most once delivery
102    AtMostOnce,
103    /// At least once delivery
104    AtLeastOnce,
105    /// Exactly once delivery
106    ExactlyOnce,
107    /// Session consistency
108    Session,
109    /// Strong consistency
110    Strong,
111}
112
113/// Backend performance characteristics
114#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116    pub backend_type: BackendType,
117    pub measured_latency_p50: f64,
118    pub measured_latency_p95: f64,
119    pub measured_latency_p99: f64,
120    pub measured_throughput: f64,
121    pub reliability_score: f64,
122    pub resource_usage: ResourceUsage,
123    pub cost_per_hour: f64,
124    pub setup_complexity: u8, // 1-10 scale
125    pub scalability_factor: f64,
126    pub last_updated: DateTime<Utc>,
127    pub sample_count: u64,
128}
129
130/// Resource usage metrics
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133    pub cpu_usage_percent: f64,
134    pub memory_usage_mb: f64,
135    pub network_usage_mbps: f64,
136    pub disk_io_ops_per_sec: f64,
137    pub connection_count: u32,
138}
139
140/// Cost model for backend selection
141#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143    pub total_cost: f64,
144    pub latency_cost: f64,
145    pub throughput_cost: f64,
146    pub reliability_cost: f64,
147    pub resource_cost: f64,
148    pub scaling_cost: f64,
149    pub maintenance_cost: f64,
150}
151
152/// ML prediction model for performance forecasting
153#[derive(Debug, Clone)]
154pub struct MLPredictor {
155    /// Historical performance data
156    performance_history: Vec<PerformanceDataPoint>,
157    /// Learned patterns
158    patterns: HashMap<String, PatternModel>,
159    /// Feature weights
160    _feature_weights: Vec<f64>,
161    /// Prediction confidence threshold
162    _confidence_threshold: f64,
163}
164
165/// Single performance data point for ML training
166#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168    pub timestamp: DateTime<Utc>,
169    pub backend_type: BackendType,
170    pub workload_pattern: WorkloadPattern,
171    pub actual_latency: f64,
172    pub actual_throughput: f64,
173    pub actual_reliability: f64,
174    pub resource_usage: ResourceUsage,
175    pub external_factors: HashMap<String, f64>,
176}
177
178/// Learned pattern model for ML predictions
179#[derive(Debug, Clone)]
180pub struct PatternModel {
181    pub pattern_name: String,
182    pub coefficients: Vec<f64>,
183    pub intercept: f64,
184    pub confidence: f64,
185    pub last_trained: DateTime<Utc>,
186    pub sample_count: usize,
187}
188
189/// Backend optimizer for intelligent backend selection
190pub struct BackendOptimizer {
191    config: OptimizerConfig,
192    backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193    pattern_analyzer: PatternAnalyzer,
194    cost_calculator: CostCalculator,
195    ml_predictor: Option<MLPredictor>,
196    optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199/// Pattern analyzer for workload classification
200pub struct PatternAnalyzer {
201    event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202    _pattern_cache: HashMap<String, WorkloadPattern>,
203    analysis_window: ChronoDuration,
204}
205
206/// Cost calculator for backend evaluation
207pub struct CostCalculator {
208    config: OptimizerConfig,
209    baseline_costs: HashMap<BackendType, f64>,
210}
211
212/// Optimization decision record
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215    pub timestamp: DateTime<Utc>,
216    pub selected_backend: BackendType,
217    pub workload_pattern: WorkloadPattern,
218    pub predicted_performance: BackendPerformance,
219    pub cost_model: CostModel,
220    pub confidence: f64,
221    pub reason: String,
222}
223
224/// Backend recommendation with ranking
225#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227    pub backend_type: BackendType,
228    pub score: f64,
229    pub predicted_latency: f64,
230    pub predicted_throughput: f64,
231    pub predicted_cost: f64,
232    pub confidence: f64,
233    pub strengths: Vec<String>,
234    pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238    /// Create a new backend optimizer
239    pub fn new(config: OptimizerConfig) -> Self {
240        let ml_predictor = if config.enable_ml_prediction {
241            Some(MLPredictor::new())
242        } else {
243            None
244        };
245
246        Self {
247            pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248            cost_calculator: CostCalculator::new(config.clone()),
249            backend_performance: Arc::new(RwLock::new(HashMap::new())),
250            optimization_history: Arc::new(RwLock::new(Vec::new())),
251            config,
252            ml_predictor,
253        }
254    }
255
256    /// Update backend performance metrics
257    pub async fn update_backend_performance(
258        &self,
259        backend_type: BackendType,
260        metrics: &StreamingMetrics,
261    ) -> Result<()> {
262        let mut performance_map = self.backend_performance.write().await;
263
264        let performance = performance_map
265            .entry(backend_type.clone())
266            .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268        // Update measured performance with exponential moving average
269        let alpha = 0.1; // Smoothing factor
270        performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271            + (1.0 - alpha) * performance.measured_latency_p50;
272        performance.measured_throughput = alpha * metrics.producer_throughput_eps
273            + (1.0 - alpha) * performance.measured_throughput;
274        performance.reliability_score =
275            alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277        performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278        performance.resource_usage.memory_usage_mb =
279            metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280        performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282        performance.last_updated = Utc::now();
283        performance.sample_count += 1;
284
285        debug!(
286            "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287            backend_type,
288            performance.measured_latency_p50,
289            performance.measured_throughput,
290            performance.reliability_score
291        );
292
293        Ok(())
294    }
295
296    /// Analyze workload pattern from recent events
297    pub async fn analyze_workload_pattern(
298        &mut self,
299        events: &[StreamEvent],
300    ) -> Result<WorkloadPattern> {
301        self.pattern_analyzer.analyze_pattern(events).await
302    }
303
304    /// Get optimal backend recommendation for given workload
305    pub async fn recommend_backend(
306        &self,
307        pattern: &WorkloadPattern,
308    ) -> Result<Vec<BackendRecommendation>> {
309        let mut recommendations = Vec::new();
310        let performance_map = self.backend_performance.read().await;
311
312        for (backend_type, performance) in performance_map.iter() {
313            let cost = self
314                .cost_calculator
315                .calculate_cost(backend_type, pattern, performance)
316                .await?;
317
318            let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319                predictor.predict_performance(backend_type, pattern).await?
320            } else {
321                performance.clone()
322            };
323
324            let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325            let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327            let recommendation = BackendRecommendation {
328                backend_type: backend_type.clone(),
329                score,
330                predicted_latency: predicted_performance.measured_latency_p50,
331                predicted_throughput: predicted_performance.measured_throughput,
332                predicted_cost: cost.total_cost,
333                confidence,
334                strengths: self.analyze_strengths(backend_type, pattern),
335                weaknesses: self.analyze_weaknesses(backend_type, pattern),
336            };
337
338            recommendations.push(recommendation);
339        }
340
341        // Sort by score (higher is better)
342        recommendations.sort_by(|a, b| {
343            b.score
344                .partial_cmp(&a.score)
345                .unwrap_or(std::cmp::Ordering::Equal)
346        });
347
348        info!(
349            "Generated {} backend recommendations for workload pattern: {:?}",
350            recommendations.len(),
351            pattern.pattern_type
352        );
353
354        Ok(recommendations)
355    }
356
357    /// Train ML predictor with new performance data
358    pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
359        if let Some(predictor) = &mut self.ml_predictor {
360            predictor.add_training_data(data_point).await?;
361
362            if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
363                predictor.retrain_models().await?;
364            }
365        }
366        Ok(())
367    }
368
369    /// Record optimization decision
370    pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
371        let mut history = self.optimization_history.write().await;
372        history.push(decision);
373
374        // Keep only recent decisions (last 1000)
375        if history.len() > 1000 {
376            history.drain(0..100);
377        }
378        Ok(())
379    }
380
381    /// Get optimization statistics
382    pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
383        let history = self.optimization_history.read().await;
384        let performance_map = self.backend_performance.read().await;
385
386        let total_decisions = history.len();
387        let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
388            *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
389            acc
390        });
391
392        let average_confidence = if total_decisions > 0 {
393            history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
394        } else {
395            0.0
396        };
397
398        let performance_improvements = self.calculate_performance_improvements(&history).await?;
399
400        Ok(OptimizationStats {
401            total_decisions,
402            backend_usage,
403            average_confidence,
404            performance_improvements,
405            active_backends: performance_map.len(),
406            last_optimization: history.last().map(|d| d.timestamp),
407        })
408    }
409
410    /// Calculate backend score based on cost model and performance
411    fn calculate_backend_score(
412        &self,
413        cost: &CostModel,
414        performance: &BackendPerformance,
415        pattern: &WorkloadPattern,
416    ) -> f64 {
417        let latency_score = match pattern.pattern_type {
418            PatternType::RealTime => {
419                // For real-time, heavily penalize high latency
420                if performance.measured_latency_p99 < 10.0 {
421                    1.0
422                } else if performance.measured_latency_p99 < 50.0 {
423                    0.7
424                } else {
425                    0.3
426                }
427            }
428            _ => {
429                // For other patterns, moderate latency tolerance
430                (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
431            }
432        };
433
434        let throughput_score = match pattern.pattern_type {
435            PatternType::BatchOriented => {
436                // Batch processing values high throughput
437                (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
438            }
439            _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
440        };
441
442        let reliability_score = performance.reliability_score;
443        let cost_score = 1.0 / (cost.total_cost + 1.0);
444
445        // Weighted combination
446        (latency_score * self.config.cost_weight_latency
447            + throughput_score * self.config.cost_weight_throughput
448            + reliability_score * self.config.cost_weight_reliability
449            + cost_score * self.config.cost_weight_resource_usage)
450            / (self.config.cost_weight_latency
451                + self.config.cost_weight_throughput
452                + self.config.cost_weight_reliability
453                + self.config.cost_weight_resource_usage)
454    }
455
456    /// Calculate confidence in prediction
457    fn calculate_confidence(
458        &self,
459        performance: &BackendPerformance,
460        _pattern: &WorkloadPattern,
461    ) -> f64 {
462        let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
463        let recency_confidence = {
464            let age_hours = Utc::now()
465                .signed_duration_since(performance.last_updated)
466                .num_hours() as f64;
467            (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
468        };
469
470        (sample_confidence + recency_confidence) / 2.0
471    }
472
473    /// Analyze backend strengths for given pattern
474    fn analyze_strengths(
475        &self,
476        backend_type: &BackendType,
477        pattern: &WorkloadPattern,
478    ) -> Vec<String> {
479        let mut strengths = Vec::new();
480
481        match backend_type {
482            BackendType::Kafka => {
483                strengths.push("High throughput".to_string());
484                strengths.push("Strong durability".to_string());
485                strengths.push("Excellent ordering guarantees".to_string());
486                if matches!(
487                    pattern.consistency_requirements,
488                    ConsistencyLevel::ExactlyOnce
489                ) {
490                    strengths.push("Exactly-once semantics".to_string());
491                }
492            }
493            BackendType::Nats => {
494                strengths.push("Low latency".to_string());
495                strengths.push("Simple setup".to_string());
496                strengths.push("Built-in clustering".to_string());
497                if matches!(pattern.pattern_type, PatternType::RealTime) {
498                    strengths.push("Real-time performance".to_string());
499                }
500            }
501            BackendType::Redis => {
502                strengths.push("In-memory speed".to_string());
503                strengths.push("Low latency".to_string());
504                strengths.push("Rich data structures".to_string());
505            }
506            BackendType::Kinesis => {
507                strengths.push("AWS native integration".to_string());
508                strengths.push("Auto-scaling".to_string());
509                strengths.push("Pay-per-use model".to_string());
510            }
511            BackendType::Pulsar => {
512                strengths.push("Multi-tenancy".to_string());
513                strengths.push("Geo-replication".to_string());
514                strengths.push("Unified messaging".to_string());
515            }
516            BackendType::RabbitMQ => {
517                strengths.push("Mature and stable".to_string());
518                strengths.push("Rich routing capabilities".to_string());
519                strengths.push("Strong reliability guarantees".to_string());
520                if matches!(
521                    pattern.consistency_requirements,
522                    ConsistencyLevel::AtLeastOnce
523                ) {
524                    strengths.push("Persistent message delivery".to_string());
525                }
526            }
527            BackendType::Memory => {
528                strengths.push("Zero latency".to_string());
529                strengths.push("Perfect for testing".to_string());
530            }
531        }
532
533        strengths
534    }
535
536    /// Analyze backend weaknesses for given pattern
537    fn analyze_weaknesses(
538        &self,
539        backend_type: &BackendType,
540        pattern: &WorkloadPattern,
541    ) -> Vec<String> {
542        let mut weaknesses = Vec::new();
543
544        match backend_type {
545            BackendType::Kafka => {
546                weaknesses.push("Complex setup".to_string());
547                weaknesses.push("Higher resource usage".to_string());
548                if matches!(pattern.pattern_type, PatternType::RealTime) {
549                    weaknesses.push("Higher latency than NATS".to_string());
550                }
551            }
552            BackendType::Nats => {
553                if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
554                    weaknesses.push("Limited durability options".to_string());
555                }
556                if pattern.event_rate > 100000.0 {
557                    weaknesses.push("May not handle extreme throughput".to_string());
558                }
559            }
560            BackendType::Redis => {
561                weaknesses.push("Memory-bound".to_string());
562                weaknesses.push("Limited durability".to_string());
563                if pattern.event_size_bytes > 1000000 {
564                    weaknesses.push("Not suitable for large events".to_string());
565                }
566            }
567            BackendType::Kinesis => {
568                weaknesses.push("AWS vendor lock-in".to_string());
569                weaknesses.push("Cost can scale quickly".to_string());
570                weaknesses.push("Regional limitations".to_string());
571            }
572            BackendType::Pulsar => {
573                weaknesses.push("Newer ecosystem".to_string());
574                weaknesses.push("Complex architecture".to_string());
575            }
576            BackendType::RabbitMQ => {
577                weaknesses.push("Lower throughput than Kafka".to_string());
578                weaknesses.push("Memory-based by default".to_string());
579                if matches!(pattern.pattern_type, PatternType::BatchOriented) {
580                    weaknesses.push("Not optimized for batch processing".to_string());
581                }
582            }
583            BackendType::Memory => {
584                weaknesses.push("No persistence".to_string());
585                weaknesses.push("Single node only".to_string());
586                weaknesses.push("Memory limitations".to_string());
587            }
588        }
589
590        weaknesses
591    }
592
593    /// Calculate performance improvements over time
594    async fn calculate_performance_improvements(
595        &self,
596        history: &[OptimizationDecision],
597    ) -> Result<HashMap<String, f64>> {
598        let mut improvements = HashMap::new();
599
600        if history.len() < 10 {
601            return Ok(improvements);
602        }
603
604        let recent_decisions = &history[history.len() - 10..];
605        let older_decisions = &history[0..10.min(history.len() - 10)];
606
607        let recent_avg_latency = recent_decisions
608            .iter()
609            .map(|d| d.predicted_performance.measured_latency_p50)
610            .sum::<f64>()
611            / recent_decisions.len() as f64;
612
613        let older_avg_latency = older_decisions
614            .iter()
615            .map(|d| d.predicted_performance.measured_latency_p50)
616            .sum::<f64>()
617            / older_decisions.len() as f64;
618
619        let latency_improvement =
620            (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
621        improvements.insert(
622            "latency_improvement_percent".to_string(),
623            latency_improvement,
624        );
625
626        let recent_avg_throughput = recent_decisions
627            .iter()
628            .map(|d| d.predicted_performance.measured_throughput)
629            .sum::<f64>()
630            / recent_decisions.len() as f64;
631
632        let older_avg_throughput = older_decisions
633            .iter()
634            .map(|d| d.predicted_performance.measured_throughput)
635            .sum::<f64>()
636            / older_decisions.len() as f64;
637
638        let throughput_improvement =
639            (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
640        improvements.insert(
641            "throughput_improvement_percent".to_string(),
642            throughput_improvement,
643        );
644
645        Ok(improvements)
646    }
647}
648
649impl PatternAnalyzer {
650    pub fn new(analysis_window: ChronoDuration) -> Self {
651        Self {
652            event_history: Vec::new(),
653            _pattern_cache: HashMap::new(),
654            analysis_window,
655        }
656    }
657
658    pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
659        // Add events to history
660        let now = Utc::now();
661        for event in events {
662            self.event_history.push((now, event.clone()));
663        }
664
665        // Remove old events outside analysis window
666        let cutoff = now - self.analysis_window;
667        self.event_history
668            .retain(|(timestamp, _)| *timestamp >= cutoff);
669
670        if self.event_history.is_empty() {
671            return Ok(WorkloadPattern::default());
672        }
673
674        // Calculate event rate
675        let duration_seconds = self.analysis_window.num_seconds() as f64;
676        let event_rate = self.event_history.len() as f64 / duration_seconds;
677
678        // Analyze temporal distribution
679        let temporal_distribution = self.analyze_temporal_distribution().await?;
680
681        // Determine pattern type
682        let pattern_type = self
683            .classify_pattern_type(event_rate, &temporal_distribution)
684            .await?;
685
686        // Calculate average event size
687        let avg_event_size = self.calculate_average_event_size().await?;
688
689        // Analyze data characteristics
690        let data_characteristics = self.analyze_data_characteristics().await?;
691
692        // Determine consistency requirements based on event types
693        let consistency_requirements = self.determine_consistency_requirements().await?;
694
695        Ok(WorkloadPattern {
696            pattern_type,
697            event_rate,
698            batch_size: self.estimate_optimal_batch_size(event_rate),
699            event_size_bytes: avg_event_size,
700            temporal_distribution,
701            data_characteristics,
702            consistency_requirements,
703        })
704    }
705
706    async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
707        if self.event_history.len() < 10 {
708            return Ok(TemporalDistribution::Uniform);
709        }
710
711        // Calculate inter-arrival times
712        let mut intervals = Vec::new();
713        for i in 1..self.event_history.len() {
714            let interval = self.event_history[i]
715                .0
716                .signed_duration_since(self.event_history[i - 1].0)
717                .num_milliseconds() as f64;
718            intervals.push(interval);
719        }
720
721        // Calculate basic statistics
722        let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
723        let variance =
724            intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
725        let std_dev = variance.sqrt();
726
727        // Simple distribution classification
728        let cv = std_dev / mean; // Coefficient of variation
729
730        if cv < 0.1 {
731            Ok(TemporalDistribution::Uniform)
732        } else if cv < 0.5 {
733            Ok(TemporalDistribution::Normal { mean, std_dev })
734        } else {
735            Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
736        }
737    }
738
739    async fn classify_pattern_type(
740        &self,
741        event_rate: f64,
742        temporal_dist: &TemporalDistribution,
743    ) -> Result<PatternType> {
744        // Simple heuristic-based classification
745        match temporal_dist {
746            TemporalDistribution::Uniform => {
747                if event_rate > 10000.0 {
748                    Ok(PatternType::BatchOriented)
749                } else if event_rate > 100.0 {
750                    Ok(PatternType::Steady)
751                } else {
752                    Ok(PatternType::RealTime)
753                }
754            }
755            TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
756            TemporalDistribution::Normal { std_dev, mean } => {
757                if std_dev / mean > 1.0 {
758                    Ok(PatternType::Random)
759                } else {
760                    Ok(PatternType::Steady)
761                }
762            }
763            _ => Ok(PatternType::Steady),
764        }
765    }
766
767    async fn calculate_average_event_size(&self) -> Result<u64> {
768        if self.event_history.is_empty() {
769            return Ok(1024); // Default 1KB
770        }
771
772        // Estimate serialized size (simplified)
773        let avg_size = self
774            .event_history
775            .iter()
776            .map(|(_, event)| self.estimate_event_size(event))
777            .sum::<u64>()
778            / self.event_history.len() as u64;
779
780        Ok(avg_size)
781    }
782
783    fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
784        // Simplified size estimation based on event type
785        match event {
786            StreamEvent::TripleAdded {
787                subject,
788                predicate,
789                object,
790                ..
791            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
792            StreamEvent::TripleRemoved {
793                subject,
794                predicate,
795                object,
796                ..
797            } => (subject.len() + predicate.len() + object.len() + 100) as u64,
798            StreamEvent::GraphCreated { .. } => 200,
799            StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
800            StreamEvent::TransactionBegin { .. } => 150,
801            StreamEvent::TransactionCommit { .. } => 100,
802            StreamEvent::Heartbeat { .. } => 50,
803            _ => 300, // Default for complex events
804        }
805    }
806
807    async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
808        let has_complex_structures = self
809            .event_history
810            .iter()
811            .any(|(_, event)| self.is_complex_event(event));
812
813        let requires_ordering = self
814            .event_history
815            .iter()
816            .any(|(_, event)| self.requires_ordering(event));
817
818        Ok(DataCharacteristics {
819            compression_ratio: 0.7,      // Assume 30% compression
820            serialization_overhead: 0.1, // 10% overhead
821            has_complex_structures,
822            requires_ordering,
823            has_time_windows: false,      // Simplified
824            requires_deduplication: true, // Conservative default
825        })
826    }
827
828    fn is_complex_event(&self, event: &StreamEvent) -> bool {
829        matches!(
830            event,
831            StreamEvent::SparqlUpdate { .. }
832                | StreamEvent::SchemaChanged { .. }
833                | StreamEvent::QueryCompleted { .. }
834        )
835    }
836
837    fn requires_ordering(&self, event: &StreamEvent) -> bool {
838        matches!(
839            event,
840            StreamEvent::TransactionBegin { .. }
841                | StreamEvent::TransactionCommit { .. }
842                | StreamEvent::TransactionAbort { .. }
843        )
844    }
845
846    async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
847        let has_transactions = self.event_history.iter().any(|(_, event)| {
848            matches!(
849                event,
850                StreamEvent::TransactionBegin { .. }
851                    | StreamEvent::TransactionCommit { .. }
852                    | StreamEvent::TransactionAbort { .. }
853            )
854        });
855
856        if has_transactions {
857            Ok(ConsistencyLevel::ExactlyOnce)
858        } else {
859            Ok(ConsistencyLevel::AtLeastOnce)
860        }
861    }
862
863    fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
864        if event_rate > 10000.0 {
865            1000
866        } else if event_rate > 1000.0 {
867            500
868        } else if event_rate > 100.0 {
869            100
870        } else {
871            10
872        }
873    }
874}
875
876impl CostCalculator {
877    pub fn new(config: OptimizerConfig) -> Self {
878        let mut baseline_costs = HashMap::new();
879
880        // Baseline hourly costs (normalized)
881        baseline_costs.insert(BackendType::Memory, 0.0);
882        baseline_costs.insert(BackendType::Redis, 0.1);
883        baseline_costs.insert(BackendType::Nats, 0.2);
884        baseline_costs.insert(BackendType::Kafka, 0.5);
885        baseline_costs.insert(BackendType::Pulsar, 0.4);
886        baseline_costs.insert(BackendType::Kinesis, 0.8);
887
888        Self {
889            config,
890            baseline_costs,
891        }
892    }
893
894    pub async fn calculate_cost(
895        &self,
896        backend_type: &BackendType,
897        pattern: &WorkloadPattern,
898        performance: &BackendPerformance,
899    ) -> Result<CostModel> {
900        let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
901
902        // Calculate component costs
903        let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
904        let throughput_cost =
905            self.calculate_throughput_cost(performance.measured_throughput, pattern);
906        let reliability_cost =
907            self.calculate_reliability_cost(performance.reliability_score, pattern);
908        let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
909        let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
910        let maintenance_cost =
911            self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
912
913        let total_cost = base_cost
914            + latency_cost * self.config.cost_weight_latency
915            + throughput_cost * self.config.cost_weight_throughput
916            + reliability_cost * self.config.cost_weight_reliability
917            + resource_cost * self.config.cost_weight_resource_usage
918            + scaling_cost * 0.1
919            + maintenance_cost * 0.1;
920
921        Ok(CostModel {
922            total_cost,
923            latency_cost,
924            throughput_cost,
925            reliability_cost,
926            resource_cost,
927            scaling_cost,
928            maintenance_cost,
929        })
930    }
931
932    fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
933        let latency_penalty = match pattern.pattern_type {
934            PatternType::RealTime => latency / 10.0, // Heavy penalty for real-time
935            PatternType::Bursty => latency / 50.0,
936            _ => latency / 100.0,
937        };
938        latency_penalty.min(2.0) // Cap at 2x cost
939    }
940
941    fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
942        let required_throughput = pattern.event_rate * 1.5; // 50% buffer
943        if throughput < required_throughput {
944            (required_throughput - throughput) / required_throughput
945        } else {
946            0.0
947        }
948    }
949
950    fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
951        let required_reliability = match pattern.consistency_requirements {
952            ConsistencyLevel::ExactlyOnce => 0.999,
953            ConsistencyLevel::AtLeastOnce => 0.995,
954            _ => 0.99,
955        };
956
957        if reliability < required_reliability {
958            (required_reliability - reliability) * 10.0
959        } else {
960            0.0
961        }
962    }
963
964    fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
965        // Normalize resource usage to cost
966        (usage.cpu_usage_percent / 100.0) * 0.1
967            + (usage.memory_usage_mb / 1000.0) * 0.05
968            + (usage.network_usage_mbps / 100.0) * 0.02
969    }
970
971    fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
972        let scaling_factor = match backend_type {
973            BackendType::Kinesis => 0.1, // Auto-scaling
974            BackendType::Kafka => 0.5,   // Manual scaling
975            BackendType::Memory => 1.0,  // No scaling
976            _ => 0.3,
977        };
978
979        match pattern.pattern_type {
980            PatternType::Bursty | PatternType::Random => scaling_factor,
981            _ => 0.0,
982        }
983    }
984
985    fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
986        setup_complexity as f64 / 10.0
987    }
988}
989
990impl Default for MLPredictor {
991    fn default() -> Self {
992        Self::new()
993    }
994}
995
996impl MLPredictor {
997    pub fn new() -> Self {
998        Self {
999            performance_history: Vec::new(),
1000            patterns: HashMap::new(),
1001            _feature_weights: vec![1.0; 10], // Start with equal weights
1002            _confidence_threshold: 0.7,
1003        }
1004    }
1005
1006    pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
1007        self.performance_history.push(data_point);
1008
1009        // Keep only recent data (last 10,000 points)
1010        if self.performance_history.len() > 10000 {
1011            self.performance_history.drain(0..1000);
1012        }
1013
1014        Ok(())
1015    }
1016
1017    pub async fn predict_performance(
1018        &self,
1019        backend_type: &BackendType,
1020        pattern: &WorkloadPattern,
1021    ) -> Result<BackendPerformance> {
1022        // Filter relevant historical data
1023        let relevant_data: Vec<&PerformanceDataPoint> = self
1024            .performance_history
1025            .iter()
1026            .filter(|dp| dp.backend_type == *backend_type)
1027            .collect();
1028
1029        if relevant_data.is_empty() {
1030            return Err(anyhow!(
1031                "No historical data for backend type: {:?}",
1032                backend_type
1033            ));
1034        }
1035
1036        // Simple prediction based on similar patterns
1037        let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1038            .iter()
1039            .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1040            .cloned()
1041            .collect();
1042
1043        let prediction_data = if similar_data.is_empty() {
1044            &relevant_data
1045        } else {
1046            &similar_data
1047        };
1048
1049        // Calculate weighted averages
1050        let predicted_latency = prediction_data
1051            .iter()
1052            .map(|dp| dp.actual_latency)
1053            .sum::<f64>()
1054            / prediction_data.len() as f64;
1055
1056        let predicted_throughput = prediction_data
1057            .iter()
1058            .map(|dp| dp.actual_throughput)
1059            .sum::<f64>()
1060            / prediction_data.len() as f64;
1061
1062        let predicted_reliability = prediction_data
1063            .iter()
1064            .map(|dp| dp.actual_reliability)
1065            .sum::<f64>()
1066            / prediction_data.len() as f64;
1067
1068        Ok(BackendPerformance {
1069            backend_type: backend_type.clone(),
1070            measured_latency_p50: predicted_latency,
1071            measured_latency_p95: predicted_latency * 1.5,
1072            measured_latency_p99: predicted_latency * 2.0,
1073            measured_throughput: predicted_throughput,
1074            reliability_score: predicted_reliability,
1075            resource_usage: prediction_data[0].resource_usage.clone(),
1076            cost_per_hour: 0.0,  // Will be calculated by cost model
1077            setup_complexity: 5, // Default
1078            scalability_factor: 1.0,
1079            last_updated: Utc::now(),
1080            sample_count: prediction_data.len() as u64,
1081        })
1082    }
1083
1084    pub async fn retrain_models(&mut self) -> Result<()> {
1085        // Simple retraining using linear regression
1086        for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1087            let backend_data: Vec<&PerformanceDataPoint> = self
1088                .performance_history
1089                .iter()
1090                .filter(|dp| dp.backend_type == *backend_type)
1091                .collect();
1092
1093            if backend_data.len() < 10 {
1094                continue;
1095            }
1096
1097            // Create pattern model for this backend
1098            let pattern_name = format!("{backend_type:?}_model");
1099            let model = self.train_linear_model(&backend_data).await?;
1100            self.patterns.insert(pattern_name, model);
1101        }
1102
1103        info!("Retrained ML models for {} patterns", self.patterns.len());
1104        Ok(())
1105    }
1106
1107    async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1108        // Simplified linear regression implementation
1109        let n = data.len() as f64;
1110
1111        // Extract features and target (latency)
1112        let features: Vec<Vec<f64>> = data
1113            .iter()
1114            .map(|dp| self.extract_features(&dp.workload_pattern))
1115            .collect();
1116
1117        let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1118
1119        // Simple linear regression: y = mx + b
1120        let feature_count = features[0].len();
1121        let mut coefficients = vec![0.0; feature_count];
1122        let intercept = targets.iter().sum::<f64>() / n;
1123
1124        // Calculate correlations (simplified)
1125        for i in 0..feature_count {
1126            let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1127            let correlation = self.calculate_correlation(&feature_values, &targets);
1128            coefficients[i] = correlation * 0.1; // Simplified coefficient
1129        }
1130
1131        Ok(PatternModel {
1132            pattern_name: "latency_model".to_string(),
1133            coefficients,
1134            intercept,
1135            confidence: 0.8, // Default confidence
1136            last_trained: Utc::now(),
1137            sample_count: data.len(),
1138        })
1139    }
1140
1141    fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1142        vec![
1143            pattern.event_rate,
1144            pattern.batch_size as f64,
1145            pattern.event_size_bytes as f64,
1146            pattern.data_characteristics.compression_ratio,
1147            pattern.data_characteristics.serialization_overhead,
1148            if pattern.data_characteristics.has_complex_structures {
1149                1.0
1150            } else {
1151                0.0
1152            },
1153            if pattern.data_characteristics.requires_ordering {
1154                1.0
1155            } else {
1156                0.0
1157            },
1158            match pattern.pattern_type {
1159                PatternType::RealTime => 1.0,
1160                PatternType::BatchOriented => 2.0,
1161                PatternType::Bursty => 3.0,
1162                _ => 0.0,
1163            },
1164            match pattern.consistency_requirements {
1165                ConsistencyLevel::ExactlyOnce => 3.0,
1166                ConsistencyLevel::AtLeastOnce => 2.0,
1167                _ => 1.0,
1168            },
1169            1.0, // Bias term
1170        ]
1171    }
1172
1173    fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1174        let rate_similarity =
1175            1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1176        let size_similarity = 1.0
1177            - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1178                / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1179        let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1180            == std::mem::discriminant(&p2.pattern_type)
1181        {
1182            1.0
1183        } else {
1184            0.0
1185        };
1186
1187        (rate_similarity + size_similarity + type_similarity) / 3.0
1188    }
1189
1190    fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1191        let n = x.len() as f64;
1192        let mean_x = x.iter().sum::<f64>() / n;
1193        let mean_y = y.iter().sum::<f64>() / n;
1194
1195        let numerator: f64 = x
1196            .iter()
1197            .zip(y.iter())
1198            .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1199            .sum();
1200
1201        let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1202        let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1203
1204        if denom_x * denom_y == 0.0 {
1205            0.0
1206        } else {
1207            numerator / (denom_x * denom_y).sqrt()
1208        }
1209    }
1210}
1211
1212impl BackendPerformance {
1213    pub fn new(backend_type: BackendType) -> Self {
1214        Self {
1215            backend_type,
1216            measured_latency_p50: 100.0, // Default 100ms
1217            measured_latency_p95: 200.0,
1218            measured_latency_p99: 500.0,
1219            measured_throughput: 1000.0, // Default 1000 eps
1220            reliability_score: 0.99,
1221            resource_usage: ResourceUsage::default(),
1222            cost_per_hour: 0.1,
1223            setup_complexity: 5,
1224            scalability_factor: 1.0,
1225            last_updated: Utc::now(),
1226            sample_count: 0,
1227        }
1228    }
1229}
1230
1231impl Default for ResourceUsage {
1232    fn default() -> Self {
1233        Self {
1234            cpu_usage_percent: 10.0,
1235            memory_usage_mb: 100.0,
1236            network_usage_mbps: 1.0,
1237            disk_io_ops_per_sec: 100.0,
1238            connection_count: 10,
1239        }
1240    }
1241}
1242
1243impl Default for WorkloadPattern {
1244    fn default() -> Self {
1245        Self {
1246            pattern_type: PatternType::Steady,
1247            event_rate: 100.0,
1248            batch_size: 100,
1249            event_size_bytes: 1024,
1250            temporal_distribution: TemporalDistribution::Uniform,
1251            data_characteristics: DataCharacteristics {
1252                compression_ratio: 0.7,
1253                serialization_overhead: 0.1,
1254                has_complex_structures: false,
1255                requires_ordering: false,
1256                has_time_windows: false,
1257                requires_deduplication: true,
1258            },
1259            consistency_requirements: ConsistencyLevel::AtLeastOnce,
1260        }
1261    }
1262}
1263
1264/// Optimization statistics
1265#[derive(Debug, Clone, Serialize, Deserialize)]
1266pub struct OptimizationStats {
1267    pub total_decisions: usize,
1268    pub backend_usage: HashMap<BackendType, usize>,
1269    pub average_confidence: f64,
1270    pub performance_improvements: HashMap<String, f64>,
1271    pub active_backends: usize,
1272    pub last_optimization: Option<DateTime<Utc>>,
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use super::*;
1278    use crate::{EventMetadata, StreamEvent};
1279
1280    fn create_test_event() -> StreamEvent {
1281        StreamEvent::TripleAdded {
1282            subject: "http://example.org/subject".to_string(),
1283            predicate: "http://example.org/predicate".to_string(),
1284            object: "http://example.org/object".to_string(),
1285            graph: None,
1286            metadata: EventMetadata {
1287                event_id: uuid::Uuid::new_v4().to_string(),
1288                timestamp: Utc::now(),
1289                source: "test".to_string(),
1290                user: None,
1291                context: None,
1292                caused_by: None,
1293                version: "1.0".to_string(),
1294                properties: std::collections::HashMap::new(),
1295                checksum: None,
1296            },
1297        }
1298    }
1299
1300    #[tokio::test]
1301    async fn test_backend_optimizer_creation() {
1302        let config = OptimizerConfig::default();
1303        let optimizer = BackendOptimizer::new(config);
1304
1305        assert!(optimizer.ml_predictor.is_some());
1306        assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1307    }
1308
1309    #[tokio::test]
1310    async fn test_pattern_analysis() {
1311        let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1312        let events = vec![create_test_event(); 100];
1313
1314        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1315
1316        assert!(pattern.event_rate > 0.0);
1317        assert!(pattern.batch_size > 0);
1318        assert!(pattern.event_size_bytes > 0);
1319    }
1320
1321    #[tokio::test]
1322    async fn test_cost_calculation() {
1323        let config = OptimizerConfig::default();
1324        let calculator = CostCalculator::new(config);
1325        let pattern = WorkloadPattern::default();
1326        let performance = BackendPerformance::new(BackendType::Kafka);
1327
1328        let cost = calculator
1329            .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1330            .await
1331            .unwrap();
1332
1333        assert!(cost.total_cost > 0.0);
1334        assert!(cost.latency_cost >= 0.0);
1335        assert!(cost.throughput_cost >= 0.0);
1336    }
1337
1338    #[tokio::test]
1339    async fn test_backend_recommendation() {
1340        let config = OptimizerConfig {
1341            enable_ml_prediction: false, // Disable ML prediction for test
1342            ..Default::default()
1343        };
1344        let optimizer = BackendOptimizer::new(config);
1345
1346        // Add some backend performance data
1347        let metrics = StreamingMetrics::default();
1348        optimizer
1349            .update_backend_performance(BackendType::Kafka, &metrics)
1350            .await
1351            .unwrap();
1352        optimizer
1353            .update_backend_performance(BackendType::Nats, &metrics)
1354            .await
1355            .unwrap();
1356
1357        let pattern = WorkloadPattern::default();
1358        let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1359
1360        assert!(recommendations.len() >= 2);
1361        assert!(recommendations[0].score >= recommendations[1].score);
1362    }
1363
1364    #[tokio::test]
1365    async fn test_ml_predictor() {
1366        let mut predictor = MLPredictor::new();
1367
1368        let data_point = PerformanceDataPoint {
1369            timestamp: Utc::now(),
1370            backend_type: BackendType::Kafka,
1371            workload_pattern: WorkloadPattern::default(),
1372            actual_latency: 50.0,
1373            actual_throughput: 1000.0,
1374            actual_reliability: 0.99,
1375            resource_usage: ResourceUsage::default(),
1376            external_factors: HashMap::new(),
1377        };
1378
1379        predictor.add_training_data(data_point).await.unwrap();
1380        assert_eq!(predictor.performance_history.len(), 1);
1381    }
1382
1383    #[test]
1384    fn test_pattern_similarity() {
1385        let predictor = MLPredictor::new();
1386        let pattern1 = WorkloadPattern {
1387            event_rate: 100.0,
1388            pattern_type: PatternType::Steady,
1389            ..Default::default()
1390        };
1391        let pattern2 = WorkloadPattern {
1392            event_rate: 110.0,
1393            pattern_type: PatternType::Steady,
1394            ..Default::default()
1395        };
1396
1397        let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1398        assert!(similarity > 0.8);
1399    }
1400
1401    #[tokio::test]
1402    async fn test_workload_pattern_classification() {
1403        // Use shorter analysis window for testing
1404        let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1405
1406        // Test real-time pattern (low rate) - create events with different timestamps
1407        let mut events = Vec::new();
1408        let base_time = Utc::now();
1409        for i in 0..10 {
1410            let mut event = create_test_event();
1411            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1412                metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1413            }
1414            events.push(event);
1415        }
1416        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1417        // With 10 events in 30 seconds = 0.33 events/sec, should be RealTime
1418        assert!(matches!(
1419            pattern.pattern_type,
1420            PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1421        ));
1422
1423        // Test batch pattern (high rate) - create many events with varied timestamps
1424        let mut events = Vec::new();
1425        let base_time = Utc::now();
1426        // Create 3000+ events to ensure high rate (3000/30 = 100+ events/sec)
1427        for i in 0..3500 {
1428            let mut event = create_test_event();
1429            if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1430                metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1431            }
1432            events.push(event);
1433        }
1434        let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1435        // With 3500 events in 30 seconds = 116.67 events/sec, should be > 100
1436        assert!(pattern.event_rate > 100.0);
1437    }
1438
1439    #[test]
1440    fn test_backend_strengths_analysis() {
1441        let config = OptimizerConfig::default();
1442        let optimizer = BackendOptimizer::new(config);
1443        let pattern = WorkloadPattern {
1444            pattern_type: PatternType::RealTime,
1445            consistency_requirements: ConsistencyLevel::ExactlyOnce,
1446            ..Default::default()
1447        };
1448
1449        let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1450        assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1451
1452        let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1453        assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1454    }
1455
1456    #[test]
1457    fn test_config_serialization() {
1458        let config = OptimizerConfig::default();
1459        let serialized = serde_json::to_string(&config).unwrap();
1460        let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1461
1462        assert_eq!(
1463            config.enable_cost_modeling,
1464            deserialized.enable_cost_modeling
1465        );
1466        assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1467    }
1468}