1use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21 pub enable_cost_modeling: bool,
22 pub enable_ml_prediction: bool,
23 pub enable_pattern_analysis: bool,
24 pub optimization_interval: Duration,
25 pub min_samples_for_prediction: usize,
26 pub cost_weight_latency: f64,
27 pub cost_weight_throughput: f64,
28 pub cost_weight_reliability: f64,
29 pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33 fn default() -> Self {
34 Self {
35 enable_cost_modeling: true,
36 enable_ml_prediction: true,
37 enable_pattern_analysis: true,
38 optimization_interval: Duration::from_secs(300), min_samples_for_prediction: 100,
40 cost_weight_latency: 0.3,
41 cost_weight_throughput: 0.3,
42 cost_weight_reliability: 0.3,
43 cost_weight_resource_usage: 0.1,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51 pub pattern_type: PatternType,
52 pub event_rate: f64,
53 pub batch_size: u32,
54 pub event_size_bytes: u64,
55 pub temporal_distribution: TemporalDistribution,
56 pub data_characteristics: DataCharacteristics,
57 pub consistency_requirements: ConsistencyLevel,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63 Steady,
65 Bursty,
67 Seasonal,
69 Random,
71 RealTime,
73 BatchOriented,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80 Uniform,
81 Normal { mean: f64, std_dev: f64 },
82 Exponential { lambda: f64 },
83 Poisson { lambda: f64 },
84 Custom { distribution_name: String },
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90 pub compression_ratio: f64,
91 pub serialization_overhead: f64,
92 pub has_complex_structures: bool,
93 pub requires_ordering: bool,
94 pub has_time_windows: bool,
95 pub requires_deduplication: bool,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101 AtMostOnce,
103 AtLeastOnce,
105 ExactlyOnce,
107 Session,
109 Strong,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116 pub backend_type: BackendType,
117 pub measured_latency_p50: f64,
118 pub measured_latency_p95: f64,
119 pub measured_latency_p99: f64,
120 pub measured_throughput: f64,
121 pub reliability_score: f64,
122 pub resource_usage: ResourceUsage,
123 pub cost_per_hour: f64,
124 pub setup_complexity: u8, pub scalability_factor: f64,
126 pub last_updated: DateTime<Utc>,
127 pub sample_count: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133 pub cpu_usage_percent: f64,
134 pub memory_usage_mb: f64,
135 pub network_usage_mbps: f64,
136 pub disk_io_ops_per_sec: f64,
137 pub connection_count: u32,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143 pub total_cost: f64,
144 pub latency_cost: f64,
145 pub throughput_cost: f64,
146 pub reliability_cost: f64,
147 pub resource_cost: f64,
148 pub scaling_cost: f64,
149 pub maintenance_cost: f64,
150}
151
152#[derive(Debug, Clone)]
154pub struct MLPredictor {
155 performance_history: Vec<PerformanceDataPoint>,
157 patterns: HashMap<String, PatternModel>,
159 _feature_weights: Vec<f64>,
161 _confidence_threshold: f64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168 pub timestamp: DateTime<Utc>,
169 pub backend_type: BackendType,
170 pub workload_pattern: WorkloadPattern,
171 pub actual_latency: f64,
172 pub actual_throughput: f64,
173 pub actual_reliability: f64,
174 pub resource_usage: ResourceUsage,
175 pub external_factors: HashMap<String, f64>,
176}
177
178#[derive(Debug, Clone)]
180pub struct PatternModel {
181 pub pattern_name: String,
182 pub coefficients: Vec<f64>,
183 pub intercept: f64,
184 pub confidence: f64,
185 pub last_trained: DateTime<Utc>,
186 pub sample_count: usize,
187}
188
189pub struct BackendOptimizer {
191 config: OptimizerConfig,
192 backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193 pattern_analyzer: PatternAnalyzer,
194 cost_calculator: CostCalculator,
195 ml_predictor: Option<MLPredictor>,
196 optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199pub struct PatternAnalyzer {
201 event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202 _pattern_cache: HashMap<String, WorkloadPattern>,
203 analysis_window: ChronoDuration,
204}
205
206pub struct CostCalculator {
208 config: OptimizerConfig,
209 baseline_costs: HashMap<BackendType, f64>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215 pub timestamp: DateTime<Utc>,
216 pub selected_backend: BackendType,
217 pub workload_pattern: WorkloadPattern,
218 pub predicted_performance: BackendPerformance,
219 pub cost_model: CostModel,
220 pub confidence: f64,
221 pub reason: String,
222}
223
224#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227 pub backend_type: BackendType,
228 pub score: f64,
229 pub predicted_latency: f64,
230 pub predicted_throughput: f64,
231 pub predicted_cost: f64,
232 pub confidence: f64,
233 pub strengths: Vec<String>,
234 pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238 pub fn new(config: OptimizerConfig) -> Self {
240 let ml_predictor = if config.enable_ml_prediction {
241 Some(MLPredictor::new())
242 } else {
243 None
244 };
245
246 Self {
247 pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248 cost_calculator: CostCalculator::new(config.clone()),
249 backend_performance: Arc::new(RwLock::new(HashMap::new())),
250 optimization_history: Arc::new(RwLock::new(Vec::new())),
251 config,
252 ml_predictor,
253 }
254 }
255
256 pub async fn update_backend_performance(
258 &self,
259 backend_type: BackendType,
260 metrics: &StreamingMetrics,
261 ) -> Result<()> {
262 let mut performance_map = self.backend_performance.write().await;
263
264 let performance = performance_map
265 .entry(backend_type.clone())
266 .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268 let alpha = 0.1; performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271 + (1.0 - alpha) * performance.measured_latency_p50;
272 performance.measured_throughput = alpha * metrics.producer_throughput_eps
273 + (1.0 - alpha) * performance.measured_throughput;
274 performance.reliability_score =
275 alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277 performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278 performance.resource_usage.memory_usage_mb =
279 metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280 performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282 performance.last_updated = Utc::now();
283 performance.sample_count += 1;
284
285 debug!(
286 "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287 backend_type,
288 performance.measured_latency_p50,
289 performance.measured_throughput,
290 performance.reliability_score
291 );
292
293 Ok(())
294 }
295
296 pub async fn analyze_workload_pattern(
298 &mut self,
299 events: &[StreamEvent],
300 ) -> Result<WorkloadPattern> {
301 self.pattern_analyzer.analyze_pattern(events).await
302 }
303
304 pub async fn recommend_backend(
306 &self,
307 pattern: &WorkloadPattern,
308 ) -> Result<Vec<BackendRecommendation>> {
309 let mut recommendations = Vec::new();
310 let performance_map = self.backend_performance.read().await;
311
312 for (backend_type, performance) in performance_map.iter() {
313 let cost = self
314 .cost_calculator
315 .calculate_cost(backend_type, pattern, performance)
316 .await?;
317
318 let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319 predictor.predict_performance(backend_type, pattern).await?
320 } else {
321 performance.clone()
322 };
323
324 let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325 let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327 let recommendation = BackendRecommendation {
328 backend_type: backend_type.clone(),
329 score,
330 predicted_latency: predicted_performance.measured_latency_p50,
331 predicted_throughput: predicted_performance.measured_throughput,
332 predicted_cost: cost.total_cost,
333 confidence,
334 strengths: self.analyze_strengths(backend_type, pattern),
335 weaknesses: self.analyze_weaknesses(backend_type, pattern),
336 };
337
338 recommendations.push(recommendation);
339 }
340
341 recommendations.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
343
344 info!(
345 "Generated {} backend recommendations for workload pattern: {:?}",
346 recommendations.len(),
347 pattern.pattern_type
348 );
349
350 Ok(recommendations)
351 }
352
353 pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
355 if let Some(predictor) = &mut self.ml_predictor {
356 predictor.add_training_data(data_point).await?;
357
358 if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
359 predictor.retrain_models().await?;
360 }
361 }
362 Ok(())
363 }
364
365 pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
367 let mut history = self.optimization_history.write().await;
368 history.push(decision);
369
370 if history.len() > 1000 {
372 history.drain(0..100);
373 }
374 Ok(())
375 }
376
377 pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
379 let history = self.optimization_history.read().await;
380 let performance_map = self.backend_performance.read().await;
381
382 let total_decisions = history.len();
383 let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
384 *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
385 acc
386 });
387
388 let average_confidence = if total_decisions > 0 {
389 history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
390 } else {
391 0.0
392 };
393
394 let performance_improvements = self.calculate_performance_improvements(&history).await?;
395
396 Ok(OptimizationStats {
397 total_decisions,
398 backend_usage,
399 average_confidence,
400 performance_improvements,
401 active_backends: performance_map.len(),
402 last_optimization: history.last().map(|d| d.timestamp),
403 })
404 }
405
406 fn calculate_backend_score(
408 &self,
409 cost: &CostModel,
410 performance: &BackendPerformance,
411 pattern: &WorkloadPattern,
412 ) -> f64 {
413 let latency_score = match pattern.pattern_type {
414 PatternType::RealTime => {
415 if performance.measured_latency_p99 < 10.0 {
417 1.0
418 } else if performance.measured_latency_p99 < 50.0 {
419 0.7
420 } else {
421 0.3
422 }
423 }
424 _ => {
425 (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
427 }
428 };
429
430 let throughput_score = match pattern.pattern_type {
431 PatternType::BatchOriented => {
432 (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
434 }
435 _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
436 };
437
438 let reliability_score = performance.reliability_score;
439 let cost_score = 1.0 / (cost.total_cost + 1.0);
440
441 (latency_score * self.config.cost_weight_latency
443 + throughput_score * self.config.cost_weight_throughput
444 + reliability_score * self.config.cost_weight_reliability
445 + cost_score * self.config.cost_weight_resource_usage)
446 / (self.config.cost_weight_latency
447 + self.config.cost_weight_throughput
448 + self.config.cost_weight_reliability
449 + self.config.cost_weight_resource_usage)
450 }
451
452 fn calculate_confidence(
454 &self,
455 performance: &BackendPerformance,
456 _pattern: &WorkloadPattern,
457 ) -> f64 {
458 let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
459 let recency_confidence = {
460 let age_hours = Utc::now()
461 .signed_duration_since(performance.last_updated)
462 .num_hours() as f64;
463 (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
464 };
465
466 (sample_confidence + recency_confidence) / 2.0
467 }
468
469 fn analyze_strengths(
471 &self,
472 backend_type: &BackendType,
473 pattern: &WorkloadPattern,
474 ) -> Vec<String> {
475 let mut strengths = Vec::new();
476
477 match backend_type {
478 BackendType::Kafka => {
479 strengths.push("High throughput".to_string());
480 strengths.push("Strong durability".to_string());
481 strengths.push("Excellent ordering guarantees".to_string());
482 if matches!(
483 pattern.consistency_requirements,
484 ConsistencyLevel::ExactlyOnce
485 ) {
486 strengths.push("Exactly-once semantics".to_string());
487 }
488 }
489 BackendType::Nats => {
490 strengths.push("Low latency".to_string());
491 strengths.push("Simple setup".to_string());
492 strengths.push("Built-in clustering".to_string());
493 if matches!(pattern.pattern_type, PatternType::RealTime) {
494 strengths.push("Real-time performance".to_string());
495 }
496 }
497 BackendType::Redis => {
498 strengths.push("In-memory speed".to_string());
499 strengths.push("Low latency".to_string());
500 strengths.push("Rich data structures".to_string());
501 }
502 BackendType::Kinesis => {
503 strengths.push("AWS native integration".to_string());
504 strengths.push("Auto-scaling".to_string());
505 strengths.push("Pay-per-use model".to_string());
506 }
507 BackendType::Pulsar => {
508 strengths.push("Multi-tenancy".to_string());
509 strengths.push("Geo-replication".to_string());
510 strengths.push("Unified messaging".to_string());
511 }
512 BackendType::Memory => {
513 strengths.push("Zero latency".to_string());
514 strengths.push("Perfect for testing".to_string());
515 }
516 }
517
518 strengths
519 }
520
521 fn analyze_weaknesses(
523 &self,
524 backend_type: &BackendType,
525 pattern: &WorkloadPattern,
526 ) -> Vec<String> {
527 let mut weaknesses = Vec::new();
528
529 match backend_type {
530 BackendType::Kafka => {
531 weaknesses.push("Complex setup".to_string());
532 weaknesses.push("Higher resource usage".to_string());
533 if matches!(pattern.pattern_type, PatternType::RealTime) {
534 weaknesses.push("Higher latency than NATS".to_string());
535 }
536 }
537 BackendType::Nats => {
538 if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
539 weaknesses.push("Limited durability options".to_string());
540 }
541 if pattern.event_rate > 100000.0 {
542 weaknesses.push("May not handle extreme throughput".to_string());
543 }
544 }
545 BackendType::Redis => {
546 weaknesses.push("Memory-bound".to_string());
547 weaknesses.push("Limited durability".to_string());
548 if pattern.event_size_bytes > 1000000 {
549 weaknesses.push("Not suitable for large events".to_string());
550 }
551 }
552 BackendType::Kinesis => {
553 weaknesses.push("AWS vendor lock-in".to_string());
554 weaknesses.push("Cost can scale quickly".to_string());
555 weaknesses.push("Regional limitations".to_string());
556 }
557 BackendType::Pulsar => {
558 weaknesses.push("Newer ecosystem".to_string());
559 weaknesses.push("Complex architecture".to_string());
560 }
561 BackendType::Memory => {
562 weaknesses.push("No persistence".to_string());
563 weaknesses.push("Single node only".to_string());
564 weaknesses.push("Memory limitations".to_string());
565 }
566 }
567
568 weaknesses
569 }
570
571 async fn calculate_performance_improvements(
573 &self,
574 history: &[OptimizationDecision],
575 ) -> Result<HashMap<String, f64>> {
576 let mut improvements = HashMap::new();
577
578 if history.len() < 10 {
579 return Ok(improvements);
580 }
581
582 let recent_decisions = &history[history.len() - 10..];
583 let older_decisions = &history[0..10.min(history.len() - 10)];
584
585 let recent_avg_latency = recent_decisions
586 .iter()
587 .map(|d| d.predicted_performance.measured_latency_p50)
588 .sum::<f64>()
589 / recent_decisions.len() as f64;
590
591 let older_avg_latency = older_decisions
592 .iter()
593 .map(|d| d.predicted_performance.measured_latency_p50)
594 .sum::<f64>()
595 / older_decisions.len() as f64;
596
597 let latency_improvement =
598 (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
599 improvements.insert(
600 "latency_improvement_percent".to_string(),
601 latency_improvement,
602 );
603
604 let recent_avg_throughput = recent_decisions
605 .iter()
606 .map(|d| d.predicted_performance.measured_throughput)
607 .sum::<f64>()
608 / recent_decisions.len() as f64;
609
610 let older_avg_throughput = older_decisions
611 .iter()
612 .map(|d| d.predicted_performance.measured_throughput)
613 .sum::<f64>()
614 / older_decisions.len() as f64;
615
616 let throughput_improvement =
617 (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
618 improvements.insert(
619 "throughput_improvement_percent".to_string(),
620 throughput_improvement,
621 );
622
623 Ok(improvements)
624 }
625}
626
627impl PatternAnalyzer {
628 pub fn new(analysis_window: ChronoDuration) -> Self {
629 Self {
630 event_history: Vec::new(),
631 _pattern_cache: HashMap::new(),
632 analysis_window,
633 }
634 }
635
636 pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
637 let now = Utc::now();
639 for event in events {
640 self.event_history.push((now, event.clone()));
641 }
642
643 let cutoff = now - self.analysis_window;
645 self.event_history
646 .retain(|(timestamp, _)| *timestamp >= cutoff);
647
648 if self.event_history.is_empty() {
649 return Ok(WorkloadPattern::default());
650 }
651
652 let duration_seconds = self.analysis_window.num_seconds() as f64;
654 let event_rate = self.event_history.len() as f64 / duration_seconds;
655
656 let temporal_distribution = self.analyze_temporal_distribution().await?;
658
659 let pattern_type = self
661 .classify_pattern_type(event_rate, &temporal_distribution)
662 .await?;
663
664 let avg_event_size = self.calculate_average_event_size().await?;
666
667 let data_characteristics = self.analyze_data_characteristics().await?;
669
670 let consistency_requirements = self.determine_consistency_requirements().await?;
672
673 Ok(WorkloadPattern {
674 pattern_type,
675 event_rate,
676 batch_size: self.estimate_optimal_batch_size(event_rate),
677 event_size_bytes: avg_event_size,
678 temporal_distribution,
679 data_characteristics,
680 consistency_requirements,
681 })
682 }
683
684 async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
685 if self.event_history.len() < 10 {
686 return Ok(TemporalDistribution::Uniform);
687 }
688
689 let mut intervals = Vec::new();
691 for i in 1..self.event_history.len() {
692 let interval = self.event_history[i]
693 .0
694 .signed_duration_since(self.event_history[i - 1].0)
695 .num_milliseconds() as f64;
696 intervals.push(interval);
697 }
698
699 let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
701 let variance =
702 intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
703 let std_dev = variance.sqrt();
704
705 let cv = std_dev / mean; if cv < 0.1 {
709 Ok(TemporalDistribution::Uniform)
710 } else if cv < 0.5 {
711 Ok(TemporalDistribution::Normal { mean, std_dev })
712 } else {
713 Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
714 }
715 }
716
717 async fn classify_pattern_type(
718 &self,
719 event_rate: f64,
720 temporal_dist: &TemporalDistribution,
721 ) -> Result<PatternType> {
722 match temporal_dist {
724 TemporalDistribution::Uniform => {
725 if event_rate > 10000.0 {
726 Ok(PatternType::BatchOriented)
727 } else if event_rate > 100.0 {
728 Ok(PatternType::Steady)
729 } else {
730 Ok(PatternType::RealTime)
731 }
732 }
733 TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
734 TemporalDistribution::Normal { std_dev, mean } => {
735 if std_dev / mean > 1.0 {
736 Ok(PatternType::Random)
737 } else {
738 Ok(PatternType::Steady)
739 }
740 }
741 _ => Ok(PatternType::Steady),
742 }
743 }
744
745 async fn calculate_average_event_size(&self) -> Result<u64> {
746 if self.event_history.is_empty() {
747 return Ok(1024); }
749
750 let avg_size = self
752 .event_history
753 .iter()
754 .map(|(_, event)| self.estimate_event_size(event))
755 .sum::<u64>()
756 / self.event_history.len() as u64;
757
758 Ok(avg_size)
759 }
760
761 fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
762 match event {
764 StreamEvent::TripleAdded {
765 subject,
766 predicate,
767 object,
768 ..
769 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
770 StreamEvent::TripleRemoved {
771 subject,
772 predicate,
773 object,
774 ..
775 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
776 StreamEvent::GraphCreated { .. } => 200,
777 StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
778 StreamEvent::TransactionBegin { .. } => 150,
779 StreamEvent::TransactionCommit { .. } => 100,
780 StreamEvent::Heartbeat { .. } => 50,
781 _ => 300, }
783 }
784
785 async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
786 let has_complex_structures = self
787 .event_history
788 .iter()
789 .any(|(_, event)| self.is_complex_event(event));
790
791 let requires_ordering = self
792 .event_history
793 .iter()
794 .any(|(_, event)| self.requires_ordering(event));
795
796 Ok(DataCharacteristics {
797 compression_ratio: 0.7, serialization_overhead: 0.1, has_complex_structures,
800 requires_ordering,
801 has_time_windows: false, requires_deduplication: true, })
804 }
805
806 fn is_complex_event(&self, event: &StreamEvent) -> bool {
807 matches!(
808 event,
809 StreamEvent::SparqlUpdate { .. }
810 | StreamEvent::SchemaChanged { .. }
811 | StreamEvent::QueryCompleted { .. }
812 )
813 }
814
815 fn requires_ordering(&self, event: &StreamEvent) -> bool {
816 matches!(
817 event,
818 StreamEvent::TransactionBegin { .. }
819 | StreamEvent::TransactionCommit { .. }
820 | StreamEvent::TransactionAbort { .. }
821 )
822 }
823
824 async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
825 let has_transactions = self.event_history.iter().any(|(_, event)| {
826 matches!(
827 event,
828 StreamEvent::TransactionBegin { .. }
829 | StreamEvent::TransactionCommit { .. }
830 | StreamEvent::TransactionAbort { .. }
831 )
832 });
833
834 if has_transactions {
835 Ok(ConsistencyLevel::ExactlyOnce)
836 } else {
837 Ok(ConsistencyLevel::AtLeastOnce)
838 }
839 }
840
841 fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
842 if event_rate > 10000.0 {
843 1000
844 } else if event_rate > 1000.0 {
845 500
846 } else if event_rate > 100.0 {
847 100
848 } else {
849 10
850 }
851 }
852}
853
854impl CostCalculator {
855 pub fn new(config: OptimizerConfig) -> Self {
856 let mut baseline_costs = HashMap::new();
857
858 baseline_costs.insert(BackendType::Memory, 0.0);
860 baseline_costs.insert(BackendType::Redis, 0.1);
861 baseline_costs.insert(BackendType::Nats, 0.2);
862 baseline_costs.insert(BackendType::Kafka, 0.5);
863 baseline_costs.insert(BackendType::Pulsar, 0.4);
864 baseline_costs.insert(BackendType::Kinesis, 0.8);
865
866 Self {
867 config,
868 baseline_costs,
869 }
870 }
871
872 pub async fn calculate_cost(
873 &self,
874 backend_type: &BackendType,
875 pattern: &WorkloadPattern,
876 performance: &BackendPerformance,
877 ) -> Result<CostModel> {
878 let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
879
880 let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
882 let throughput_cost =
883 self.calculate_throughput_cost(performance.measured_throughput, pattern);
884 let reliability_cost =
885 self.calculate_reliability_cost(performance.reliability_score, pattern);
886 let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
887 let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
888 let maintenance_cost =
889 self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
890
891 let total_cost = base_cost
892 + latency_cost * self.config.cost_weight_latency
893 + throughput_cost * self.config.cost_weight_throughput
894 + reliability_cost * self.config.cost_weight_reliability
895 + resource_cost * self.config.cost_weight_resource_usage
896 + scaling_cost * 0.1
897 + maintenance_cost * 0.1;
898
899 Ok(CostModel {
900 total_cost,
901 latency_cost,
902 throughput_cost,
903 reliability_cost,
904 resource_cost,
905 scaling_cost,
906 maintenance_cost,
907 })
908 }
909
910 fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
911 let latency_penalty = match pattern.pattern_type {
912 PatternType::RealTime => latency / 10.0, PatternType::Bursty => latency / 50.0,
914 _ => latency / 100.0,
915 };
916 latency_penalty.min(2.0) }
918
919 fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
920 let required_throughput = pattern.event_rate * 1.5; if throughput < required_throughput {
922 (required_throughput - throughput) / required_throughput
923 } else {
924 0.0
925 }
926 }
927
928 fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
929 let required_reliability = match pattern.consistency_requirements {
930 ConsistencyLevel::ExactlyOnce => 0.999,
931 ConsistencyLevel::AtLeastOnce => 0.995,
932 _ => 0.99,
933 };
934
935 if reliability < required_reliability {
936 (required_reliability - reliability) * 10.0
937 } else {
938 0.0
939 }
940 }
941
942 fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
943 (usage.cpu_usage_percent / 100.0) * 0.1
945 + (usage.memory_usage_mb / 1000.0) * 0.05
946 + (usage.network_usage_mbps / 100.0) * 0.02
947 }
948
949 fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
950 let scaling_factor = match backend_type {
951 BackendType::Kinesis => 0.1, BackendType::Kafka => 0.5, BackendType::Memory => 1.0, _ => 0.3,
955 };
956
957 match pattern.pattern_type {
958 PatternType::Bursty | PatternType::Random => scaling_factor,
959 _ => 0.0,
960 }
961 }
962
963 fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
964 setup_complexity as f64 / 10.0
965 }
966}
967
968impl Default for MLPredictor {
969 fn default() -> Self {
970 Self::new()
971 }
972}
973
974impl MLPredictor {
975 pub fn new() -> Self {
976 Self {
977 performance_history: Vec::new(),
978 patterns: HashMap::new(),
979 _feature_weights: vec![1.0; 10], _confidence_threshold: 0.7,
981 }
982 }
983
984 pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
985 self.performance_history.push(data_point);
986
987 if self.performance_history.len() > 10000 {
989 self.performance_history.drain(0..1000);
990 }
991
992 Ok(())
993 }
994
995 pub async fn predict_performance(
996 &self,
997 backend_type: &BackendType,
998 pattern: &WorkloadPattern,
999 ) -> Result<BackendPerformance> {
1000 let relevant_data: Vec<&PerformanceDataPoint> = self
1002 .performance_history
1003 .iter()
1004 .filter(|dp| dp.backend_type == *backend_type)
1005 .collect();
1006
1007 if relevant_data.is_empty() {
1008 return Err(anyhow!(
1009 "No historical data for backend type: {:?}",
1010 backend_type
1011 ));
1012 }
1013
1014 let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1016 .iter()
1017 .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1018 .cloned()
1019 .collect();
1020
1021 let prediction_data = if similar_data.is_empty() {
1022 &relevant_data
1023 } else {
1024 &similar_data
1025 };
1026
1027 let predicted_latency = prediction_data
1029 .iter()
1030 .map(|dp| dp.actual_latency)
1031 .sum::<f64>()
1032 / prediction_data.len() as f64;
1033
1034 let predicted_throughput = prediction_data
1035 .iter()
1036 .map(|dp| dp.actual_throughput)
1037 .sum::<f64>()
1038 / prediction_data.len() as f64;
1039
1040 let predicted_reliability = prediction_data
1041 .iter()
1042 .map(|dp| dp.actual_reliability)
1043 .sum::<f64>()
1044 / prediction_data.len() as f64;
1045
1046 Ok(BackendPerformance {
1047 backend_type: backend_type.clone(),
1048 measured_latency_p50: predicted_latency,
1049 measured_latency_p95: predicted_latency * 1.5,
1050 measured_latency_p99: predicted_latency * 2.0,
1051 measured_throughput: predicted_throughput,
1052 reliability_score: predicted_reliability,
1053 resource_usage: prediction_data[0].resource_usage.clone(),
1054 cost_per_hour: 0.0, setup_complexity: 5, scalability_factor: 1.0,
1057 last_updated: Utc::now(),
1058 sample_count: prediction_data.len() as u64,
1059 })
1060 }
1061
1062 pub async fn retrain_models(&mut self) -> Result<()> {
1063 for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1065 let backend_data: Vec<&PerformanceDataPoint> = self
1066 .performance_history
1067 .iter()
1068 .filter(|dp| dp.backend_type == *backend_type)
1069 .collect();
1070
1071 if backend_data.len() < 10 {
1072 continue;
1073 }
1074
1075 let pattern_name = format!("{backend_type:?}_model");
1077 let model = self.train_linear_model(&backend_data).await?;
1078 self.patterns.insert(pattern_name, model);
1079 }
1080
1081 info!("Retrained ML models for {} patterns", self.patterns.len());
1082 Ok(())
1083 }
1084
1085 async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1086 let n = data.len() as f64;
1088
1089 let features: Vec<Vec<f64>> = data
1091 .iter()
1092 .map(|dp| self.extract_features(&dp.workload_pattern))
1093 .collect();
1094
1095 let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1096
1097 let feature_count = features[0].len();
1099 let mut coefficients = vec![0.0; feature_count];
1100 let intercept = targets.iter().sum::<f64>() / n;
1101
1102 for i in 0..feature_count {
1104 let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1105 let correlation = self.calculate_correlation(&feature_values, &targets);
1106 coefficients[i] = correlation * 0.1; }
1108
1109 Ok(PatternModel {
1110 pattern_name: "latency_model".to_string(),
1111 coefficients,
1112 intercept,
1113 confidence: 0.8, last_trained: Utc::now(),
1115 sample_count: data.len(),
1116 })
1117 }
1118
1119 fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1120 vec![
1121 pattern.event_rate,
1122 pattern.batch_size as f64,
1123 pattern.event_size_bytes as f64,
1124 pattern.data_characteristics.compression_ratio,
1125 pattern.data_characteristics.serialization_overhead,
1126 if pattern.data_characteristics.has_complex_structures {
1127 1.0
1128 } else {
1129 0.0
1130 },
1131 if pattern.data_characteristics.requires_ordering {
1132 1.0
1133 } else {
1134 0.0
1135 },
1136 match pattern.pattern_type {
1137 PatternType::RealTime => 1.0,
1138 PatternType::BatchOriented => 2.0,
1139 PatternType::Bursty => 3.0,
1140 _ => 0.0,
1141 },
1142 match pattern.consistency_requirements {
1143 ConsistencyLevel::ExactlyOnce => 3.0,
1144 ConsistencyLevel::AtLeastOnce => 2.0,
1145 _ => 1.0,
1146 },
1147 1.0, ]
1149 }
1150
1151 fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1152 let rate_similarity =
1153 1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1154 let size_similarity = 1.0
1155 - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1156 / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1157 let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1158 == std::mem::discriminant(&p2.pattern_type)
1159 {
1160 1.0
1161 } else {
1162 0.0
1163 };
1164
1165 (rate_similarity + size_similarity + type_similarity) / 3.0
1166 }
1167
1168 fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1169 let n = x.len() as f64;
1170 let mean_x = x.iter().sum::<f64>() / n;
1171 let mean_y = y.iter().sum::<f64>() / n;
1172
1173 let numerator: f64 = x
1174 .iter()
1175 .zip(y.iter())
1176 .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1177 .sum();
1178
1179 let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1180 let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1181
1182 if denom_x * denom_y == 0.0 {
1183 0.0
1184 } else {
1185 numerator / (denom_x * denom_y).sqrt()
1186 }
1187 }
1188}
1189
1190impl BackendPerformance {
1191 pub fn new(backend_type: BackendType) -> Self {
1192 Self {
1193 backend_type,
1194 measured_latency_p50: 100.0, measured_latency_p95: 200.0,
1196 measured_latency_p99: 500.0,
1197 measured_throughput: 1000.0, reliability_score: 0.99,
1199 resource_usage: ResourceUsage::default(),
1200 cost_per_hour: 0.1,
1201 setup_complexity: 5,
1202 scalability_factor: 1.0,
1203 last_updated: Utc::now(),
1204 sample_count: 0,
1205 }
1206 }
1207}
1208
1209impl Default for ResourceUsage {
1210 fn default() -> Self {
1211 Self {
1212 cpu_usage_percent: 10.0,
1213 memory_usage_mb: 100.0,
1214 network_usage_mbps: 1.0,
1215 disk_io_ops_per_sec: 100.0,
1216 connection_count: 10,
1217 }
1218 }
1219}
1220
1221impl Default for WorkloadPattern {
1222 fn default() -> Self {
1223 Self {
1224 pattern_type: PatternType::Steady,
1225 event_rate: 100.0,
1226 batch_size: 100,
1227 event_size_bytes: 1024,
1228 temporal_distribution: TemporalDistribution::Uniform,
1229 data_characteristics: DataCharacteristics {
1230 compression_ratio: 0.7,
1231 serialization_overhead: 0.1,
1232 has_complex_structures: false,
1233 requires_ordering: false,
1234 has_time_windows: false,
1235 requires_deduplication: true,
1236 },
1237 consistency_requirements: ConsistencyLevel::AtLeastOnce,
1238 }
1239 }
1240}
1241
1242#[derive(Debug, Clone, Serialize, Deserialize)]
1244pub struct OptimizationStats {
1245 pub total_decisions: usize,
1246 pub backend_usage: HashMap<BackendType, usize>,
1247 pub average_confidence: f64,
1248 pub performance_improvements: HashMap<String, f64>,
1249 pub active_backends: usize,
1250 pub last_optimization: Option<DateTime<Utc>>,
1251}
1252
1253#[cfg(test)]
1254mod tests {
1255 use super::*;
1256 use crate::{EventMetadata, StreamEvent};
1257
1258 fn create_test_event() -> StreamEvent {
1259 StreamEvent::TripleAdded {
1260 subject: "http://example.org/subject".to_string(),
1261 predicate: "http://example.org/predicate".to_string(),
1262 object: "http://example.org/object".to_string(),
1263 graph: None,
1264 metadata: EventMetadata {
1265 event_id: uuid::Uuid::new_v4().to_string(),
1266 timestamp: Utc::now(),
1267 source: "test".to_string(),
1268 user: None,
1269 context: None,
1270 caused_by: None,
1271 version: "1.0".to_string(),
1272 properties: std::collections::HashMap::new(),
1273 checksum: None,
1274 },
1275 }
1276 }
1277
1278 #[tokio::test]
1279 async fn test_backend_optimizer_creation() {
1280 let config = OptimizerConfig::default();
1281 let optimizer = BackendOptimizer::new(config);
1282
1283 assert!(optimizer.ml_predictor.is_some());
1284 assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1285 }
1286
1287 #[tokio::test]
1288 async fn test_pattern_analysis() {
1289 let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1290 let events = vec![create_test_event(); 100];
1291
1292 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1293
1294 assert!(pattern.event_rate > 0.0);
1295 assert!(pattern.batch_size > 0);
1296 assert!(pattern.event_size_bytes > 0);
1297 }
1298
1299 #[tokio::test]
1300 async fn test_cost_calculation() {
1301 let config = OptimizerConfig::default();
1302 let calculator = CostCalculator::new(config);
1303 let pattern = WorkloadPattern::default();
1304 let performance = BackendPerformance::new(BackendType::Kafka);
1305
1306 let cost = calculator
1307 .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1308 .await
1309 .unwrap();
1310
1311 assert!(cost.total_cost > 0.0);
1312 assert!(cost.latency_cost >= 0.0);
1313 assert!(cost.throughput_cost >= 0.0);
1314 }
1315
1316 #[tokio::test]
1317 async fn test_backend_recommendation() {
1318 let config = OptimizerConfig {
1319 enable_ml_prediction: false, ..Default::default()
1321 };
1322 let optimizer = BackendOptimizer::new(config);
1323
1324 let metrics = StreamingMetrics::default();
1326 optimizer
1327 .update_backend_performance(BackendType::Kafka, &metrics)
1328 .await
1329 .unwrap();
1330 optimizer
1331 .update_backend_performance(BackendType::Nats, &metrics)
1332 .await
1333 .unwrap();
1334
1335 let pattern = WorkloadPattern::default();
1336 let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1337
1338 assert!(recommendations.len() >= 2);
1339 assert!(recommendations[0].score >= recommendations[1].score);
1340 }
1341
1342 #[tokio::test]
1343 async fn test_ml_predictor() {
1344 let mut predictor = MLPredictor::new();
1345
1346 let data_point = PerformanceDataPoint {
1347 timestamp: Utc::now(),
1348 backend_type: BackendType::Kafka,
1349 workload_pattern: WorkloadPattern::default(),
1350 actual_latency: 50.0,
1351 actual_throughput: 1000.0,
1352 actual_reliability: 0.99,
1353 resource_usage: ResourceUsage::default(),
1354 external_factors: HashMap::new(),
1355 };
1356
1357 predictor.add_training_data(data_point).await.unwrap();
1358 assert_eq!(predictor.performance_history.len(), 1);
1359 }
1360
1361 #[test]
1362 fn test_pattern_similarity() {
1363 let predictor = MLPredictor::new();
1364 let pattern1 = WorkloadPattern {
1365 event_rate: 100.0,
1366 pattern_type: PatternType::Steady,
1367 ..Default::default()
1368 };
1369 let pattern2 = WorkloadPattern {
1370 event_rate: 110.0,
1371 pattern_type: PatternType::Steady,
1372 ..Default::default()
1373 };
1374
1375 let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1376 assert!(similarity > 0.8);
1377 }
1378
1379 #[tokio::test]
1380 async fn test_workload_pattern_classification() {
1381 let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1383
1384 let mut events = Vec::new();
1386 let base_time = Utc::now();
1387 for i in 0..10 {
1388 let mut event = create_test_event();
1389 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1390 metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1391 }
1392 events.push(event);
1393 }
1394 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1395 assert!(matches!(
1397 pattern.pattern_type,
1398 PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1399 ));
1400
1401 let mut events = Vec::new();
1403 let base_time = Utc::now();
1404 for i in 0..3500 {
1406 let mut event = create_test_event();
1407 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1408 metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1409 }
1410 events.push(event);
1411 }
1412 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1413 assert!(pattern.event_rate > 100.0);
1415 }
1416
1417 #[test]
1418 fn test_backend_strengths_analysis() {
1419 let config = OptimizerConfig::default();
1420 let optimizer = BackendOptimizer::new(config);
1421 let pattern = WorkloadPattern {
1422 pattern_type: PatternType::RealTime,
1423 consistency_requirements: ConsistencyLevel::ExactlyOnce,
1424 ..Default::default()
1425 };
1426
1427 let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1428 assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1429
1430 let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1431 assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1432 }
1433
1434 #[test]
1435 fn test_config_serialization() {
1436 let config = OptimizerConfig::default();
1437 let serialized = serde_json::to_string(&config).unwrap();
1438 let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1439
1440 assert_eq!(
1441 config.enable_cost_modeling,
1442 deserialized.enable_cost_modeling
1443 );
1444 assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1445 }
1446}