1use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21 pub enable_cost_modeling: bool,
22 pub enable_ml_prediction: bool,
23 pub enable_pattern_analysis: bool,
24 pub optimization_interval: Duration,
25 pub min_samples_for_prediction: usize,
26 pub cost_weight_latency: f64,
27 pub cost_weight_throughput: f64,
28 pub cost_weight_reliability: f64,
29 pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33 fn default() -> Self {
34 Self {
35 enable_cost_modeling: true,
36 enable_ml_prediction: true,
37 enable_pattern_analysis: true,
38 optimization_interval: Duration::from_secs(300), min_samples_for_prediction: 100,
40 cost_weight_latency: 0.3,
41 cost_weight_throughput: 0.3,
42 cost_weight_reliability: 0.3,
43 cost_weight_resource_usage: 0.1,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51 pub pattern_type: PatternType,
52 pub event_rate: f64,
53 pub batch_size: u32,
54 pub event_size_bytes: u64,
55 pub temporal_distribution: TemporalDistribution,
56 pub data_characteristics: DataCharacteristics,
57 pub consistency_requirements: ConsistencyLevel,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63 Steady,
65 Bursty,
67 Seasonal,
69 Random,
71 RealTime,
73 BatchOriented,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80 Uniform,
81 Normal { mean: f64, std_dev: f64 },
82 Exponential { lambda: f64 },
83 Poisson { lambda: f64 },
84 Custom { distribution_name: String },
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90 pub compression_ratio: f64,
91 pub serialization_overhead: f64,
92 pub has_complex_structures: bool,
93 pub requires_ordering: bool,
94 pub has_time_windows: bool,
95 pub requires_deduplication: bool,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101 AtMostOnce,
103 AtLeastOnce,
105 ExactlyOnce,
107 Session,
109 Strong,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116 pub backend_type: BackendType,
117 pub measured_latency_p50: f64,
118 pub measured_latency_p95: f64,
119 pub measured_latency_p99: f64,
120 pub measured_throughput: f64,
121 pub reliability_score: f64,
122 pub resource_usage: ResourceUsage,
123 pub cost_per_hour: f64,
124 pub setup_complexity: u8, pub scalability_factor: f64,
126 pub last_updated: DateTime<Utc>,
127 pub sample_count: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133 pub cpu_usage_percent: f64,
134 pub memory_usage_mb: f64,
135 pub network_usage_mbps: f64,
136 pub disk_io_ops_per_sec: f64,
137 pub connection_count: u32,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143 pub total_cost: f64,
144 pub latency_cost: f64,
145 pub throughput_cost: f64,
146 pub reliability_cost: f64,
147 pub resource_cost: f64,
148 pub scaling_cost: f64,
149 pub maintenance_cost: f64,
150}
151
152#[derive(Debug, Clone)]
154pub struct MLPredictor {
155 performance_history: Vec<PerformanceDataPoint>,
157 patterns: HashMap<String, PatternModel>,
159 _feature_weights: Vec<f64>,
161 _confidence_threshold: f64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168 pub timestamp: DateTime<Utc>,
169 pub backend_type: BackendType,
170 pub workload_pattern: WorkloadPattern,
171 pub actual_latency: f64,
172 pub actual_throughput: f64,
173 pub actual_reliability: f64,
174 pub resource_usage: ResourceUsage,
175 pub external_factors: HashMap<String, f64>,
176}
177
178#[derive(Debug, Clone)]
180pub struct PatternModel {
181 pub pattern_name: String,
182 pub coefficients: Vec<f64>,
183 pub intercept: f64,
184 pub confidence: f64,
185 pub last_trained: DateTime<Utc>,
186 pub sample_count: usize,
187}
188
189pub struct BackendOptimizer {
191 config: OptimizerConfig,
192 backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193 pattern_analyzer: PatternAnalyzer,
194 cost_calculator: CostCalculator,
195 ml_predictor: Option<MLPredictor>,
196 optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199pub struct PatternAnalyzer {
201 event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202 _pattern_cache: HashMap<String, WorkloadPattern>,
203 analysis_window: ChronoDuration,
204}
205
206pub struct CostCalculator {
208 config: OptimizerConfig,
209 baseline_costs: HashMap<BackendType, f64>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215 pub timestamp: DateTime<Utc>,
216 pub selected_backend: BackendType,
217 pub workload_pattern: WorkloadPattern,
218 pub predicted_performance: BackendPerformance,
219 pub cost_model: CostModel,
220 pub confidence: f64,
221 pub reason: String,
222}
223
224#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227 pub backend_type: BackendType,
228 pub score: f64,
229 pub predicted_latency: f64,
230 pub predicted_throughput: f64,
231 pub predicted_cost: f64,
232 pub confidence: f64,
233 pub strengths: Vec<String>,
234 pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238 pub fn new(config: OptimizerConfig) -> Self {
240 let ml_predictor = if config.enable_ml_prediction {
241 Some(MLPredictor::new())
242 } else {
243 None
244 };
245
246 Self {
247 pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248 cost_calculator: CostCalculator::new(config.clone()),
249 backend_performance: Arc::new(RwLock::new(HashMap::new())),
250 optimization_history: Arc::new(RwLock::new(Vec::new())),
251 config,
252 ml_predictor,
253 }
254 }
255
256 pub async fn update_backend_performance(
258 &self,
259 backend_type: BackendType,
260 metrics: &StreamingMetrics,
261 ) -> Result<()> {
262 let mut performance_map = self.backend_performance.write().await;
263
264 let performance = performance_map
265 .entry(backend_type.clone())
266 .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268 let alpha = 0.1; performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271 + (1.0 - alpha) * performance.measured_latency_p50;
272 performance.measured_throughput = alpha * metrics.producer_throughput_eps
273 + (1.0 - alpha) * performance.measured_throughput;
274 performance.reliability_score =
275 alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277 performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278 performance.resource_usage.memory_usage_mb =
279 metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280 performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282 performance.last_updated = Utc::now();
283 performance.sample_count += 1;
284
285 debug!(
286 "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287 backend_type,
288 performance.measured_latency_p50,
289 performance.measured_throughput,
290 performance.reliability_score
291 );
292
293 Ok(())
294 }
295
296 pub async fn analyze_workload_pattern(
298 &mut self,
299 events: &[StreamEvent],
300 ) -> Result<WorkloadPattern> {
301 self.pattern_analyzer.analyze_pattern(events).await
302 }
303
304 pub async fn recommend_backend(
306 &self,
307 pattern: &WorkloadPattern,
308 ) -> Result<Vec<BackendRecommendation>> {
309 let mut recommendations = Vec::new();
310 let performance_map = self.backend_performance.read().await;
311
312 for (backend_type, performance) in performance_map.iter() {
313 let cost = self
314 .cost_calculator
315 .calculate_cost(backend_type, pattern, performance)
316 .await?;
317
318 let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319 predictor.predict_performance(backend_type, pattern).await?
320 } else {
321 performance.clone()
322 };
323
324 let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325 let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327 let recommendation = BackendRecommendation {
328 backend_type: backend_type.clone(),
329 score,
330 predicted_latency: predicted_performance.measured_latency_p50,
331 predicted_throughput: predicted_performance.measured_throughput,
332 predicted_cost: cost.total_cost,
333 confidence,
334 strengths: self.analyze_strengths(backend_type, pattern),
335 weaknesses: self.analyze_weaknesses(backend_type, pattern),
336 };
337
338 recommendations.push(recommendation);
339 }
340
341 recommendations.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
343
344 info!(
345 "Generated {} backend recommendations for workload pattern: {:?}",
346 recommendations.len(),
347 pattern.pattern_type
348 );
349
350 Ok(recommendations)
351 }
352
353 pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
355 if let Some(predictor) = &mut self.ml_predictor {
356 predictor.add_training_data(data_point).await?;
357
358 if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
359 predictor.retrain_models().await?;
360 }
361 }
362 Ok(())
363 }
364
365 pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
367 let mut history = self.optimization_history.write().await;
368 history.push(decision);
369
370 if history.len() > 1000 {
372 history.drain(0..100);
373 }
374 Ok(())
375 }
376
377 pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
379 let history = self.optimization_history.read().await;
380 let performance_map = self.backend_performance.read().await;
381
382 let total_decisions = history.len();
383 let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
384 *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
385 acc
386 });
387
388 let average_confidence = if total_decisions > 0 {
389 history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
390 } else {
391 0.0
392 };
393
394 let performance_improvements = self.calculate_performance_improvements(&history).await?;
395
396 Ok(OptimizationStats {
397 total_decisions,
398 backend_usage,
399 average_confidence,
400 performance_improvements,
401 active_backends: performance_map.len(),
402 last_optimization: history.last().map(|d| d.timestamp),
403 })
404 }
405
406 fn calculate_backend_score(
408 &self,
409 cost: &CostModel,
410 performance: &BackendPerformance,
411 pattern: &WorkloadPattern,
412 ) -> f64 {
413 let latency_score = match pattern.pattern_type {
414 PatternType::RealTime => {
415 if performance.measured_latency_p99 < 10.0 {
417 1.0
418 } else if performance.measured_latency_p99 < 50.0 {
419 0.7
420 } else {
421 0.3
422 }
423 }
424 _ => {
425 (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
427 }
428 };
429
430 let throughput_score = match pattern.pattern_type {
431 PatternType::BatchOriented => {
432 (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
434 }
435 _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
436 };
437
438 let reliability_score = performance.reliability_score;
439 let cost_score = 1.0 / (cost.total_cost + 1.0);
440
441 (latency_score * self.config.cost_weight_latency
443 + throughput_score * self.config.cost_weight_throughput
444 + reliability_score * self.config.cost_weight_reliability
445 + cost_score * self.config.cost_weight_resource_usage)
446 / (self.config.cost_weight_latency
447 + self.config.cost_weight_throughput
448 + self.config.cost_weight_reliability
449 + self.config.cost_weight_resource_usage)
450 }
451
452 fn calculate_confidence(
454 &self,
455 performance: &BackendPerformance,
456 _pattern: &WorkloadPattern,
457 ) -> f64 {
458 let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
459 let recency_confidence = {
460 let age_hours = Utc::now()
461 .signed_duration_since(performance.last_updated)
462 .num_hours() as f64;
463 (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
464 };
465
466 (sample_confidence + recency_confidence) / 2.0
467 }
468
469 fn analyze_strengths(
471 &self,
472 backend_type: &BackendType,
473 pattern: &WorkloadPattern,
474 ) -> Vec<String> {
475 let mut strengths = Vec::new();
476
477 match backend_type {
478 BackendType::Kafka => {
479 strengths.push("High throughput".to_string());
480 strengths.push("Strong durability".to_string());
481 strengths.push("Excellent ordering guarantees".to_string());
482 if matches!(
483 pattern.consistency_requirements,
484 ConsistencyLevel::ExactlyOnce
485 ) {
486 strengths.push("Exactly-once semantics".to_string());
487 }
488 }
489 BackendType::Nats => {
490 strengths.push("Low latency".to_string());
491 strengths.push("Simple setup".to_string());
492 strengths.push("Built-in clustering".to_string());
493 if matches!(pattern.pattern_type, PatternType::RealTime) {
494 strengths.push("Real-time performance".to_string());
495 }
496 }
497 BackendType::Redis => {
498 strengths.push("In-memory speed".to_string());
499 strengths.push("Low latency".to_string());
500 strengths.push("Rich data structures".to_string());
501 }
502 BackendType::Kinesis => {
503 strengths.push("AWS native integration".to_string());
504 strengths.push("Auto-scaling".to_string());
505 strengths.push("Pay-per-use model".to_string());
506 }
507 BackendType::Pulsar => {
508 strengths.push("Multi-tenancy".to_string());
509 strengths.push("Geo-replication".to_string());
510 strengths.push("Unified messaging".to_string());
511 }
512 BackendType::RabbitMQ => {
513 strengths.push("Mature and stable".to_string());
514 strengths.push("Rich routing capabilities".to_string());
515 strengths.push("Strong reliability guarantees".to_string());
516 if matches!(
517 pattern.consistency_requirements,
518 ConsistencyLevel::AtLeastOnce
519 ) {
520 strengths.push("Persistent message delivery".to_string());
521 }
522 }
523 BackendType::Memory => {
524 strengths.push("Zero latency".to_string());
525 strengths.push("Perfect for testing".to_string());
526 }
527 }
528
529 strengths
530 }
531
532 fn analyze_weaknesses(
534 &self,
535 backend_type: &BackendType,
536 pattern: &WorkloadPattern,
537 ) -> Vec<String> {
538 let mut weaknesses = Vec::new();
539
540 match backend_type {
541 BackendType::Kafka => {
542 weaknesses.push("Complex setup".to_string());
543 weaknesses.push("Higher resource usage".to_string());
544 if matches!(pattern.pattern_type, PatternType::RealTime) {
545 weaknesses.push("Higher latency than NATS".to_string());
546 }
547 }
548 BackendType::Nats => {
549 if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
550 weaknesses.push("Limited durability options".to_string());
551 }
552 if pattern.event_rate > 100000.0 {
553 weaknesses.push("May not handle extreme throughput".to_string());
554 }
555 }
556 BackendType::Redis => {
557 weaknesses.push("Memory-bound".to_string());
558 weaknesses.push("Limited durability".to_string());
559 if pattern.event_size_bytes > 1000000 {
560 weaknesses.push("Not suitable for large events".to_string());
561 }
562 }
563 BackendType::Kinesis => {
564 weaknesses.push("AWS vendor lock-in".to_string());
565 weaknesses.push("Cost can scale quickly".to_string());
566 weaknesses.push("Regional limitations".to_string());
567 }
568 BackendType::Pulsar => {
569 weaknesses.push("Newer ecosystem".to_string());
570 weaknesses.push("Complex architecture".to_string());
571 }
572 BackendType::RabbitMQ => {
573 weaknesses.push("Lower throughput than Kafka".to_string());
574 weaknesses.push("Memory-based by default".to_string());
575 if matches!(pattern.pattern_type, PatternType::BatchOriented) {
576 weaknesses.push("Not optimized for batch processing".to_string());
577 }
578 }
579 BackendType::Memory => {
580 weaknesses.push("No persistence".to_string());
581 weaknesses.push("Single node only".to_string());
582 weaknesses.push("Memory limitations".to_string());
583 }
584 }
585
586 weaknesses
587 }
588
589 async fn calculate_performance_improvements(
591 &self,
592 history: &[OptimizationDecision],
593 ) -> Result<HashMap<String, f64>> {
594 let mut improvements = HashMap::new();
595
596 if history.len() < 10 {
597 return Ok(improvements);
598 }
599
600 let recent_decisions = &history[history.len() - 10..];
601 let older_decisions = &history[0..10.min(history.len() - 10)];
602
603 let recent_avg_latency = recent_decisions
604 .iter()
605 .map(|d| d.predicted_performance.measured_latency_p50)
606 .sum::<f64>()
607 / recent_decisions.len() as f64;
608
609 let older_avg_latency = older_decisions
610 .iter()
611 .map(|d| d.predicted_performance.measured_latency_p50)
612 .sum::<f64>()
613 / older_decisions.len() as f64;
614
615 let latency_improvement =
616 (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
617 improvements.insert(
618 "latency_improvement_percent".to_string(),
619 latency_improvement,
620 );
621
622 let recent_avg_throughput = recent_decisions
623 .iter()
624 .map(|d| d.predicted_performance.measured_throughput)
625 .sum::<f64>()
626 / recent_decisions.len() as f64;
627
628 let older_avg_throughput = older_decisions
629 .iter()
630 .map(|d| d.predicted_performance.measured_throughput)
631 .sum::<f64>()
632 / older_decisions.len() as f64;
633
634 let throughput_improvement =
635 (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
636 improvements.insert(
637 "throughput_improvement_percent".to_string(),
638 throughput_improvement,
639 );
640
641 Ok(improvements)
642 }
643}
644
645impl PatternAnalyzer {
646 pub fn new(analysis_window: ChronoDuration) -> Self {
647 Self {
648 event_history: Vec::new(),
649 _pattern_cache: HashMap::new(),
650 analysis_window,
651 }
652 }
653
654 pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
655 let now = Utc::now();
657 for event in events {
658 self.event_history.push((now, event.clone()));
659 }
660
661 let cutoff = now - self.analysis_window;
663 self.event_history
664 .retain(|(timestamp, _)| *timestamp >= cutoff);
665
666 if self.event_history.is_empty() {
667 return Ok(WorkloadPattern::default());
668 }
669
670 let duration_seconds = self.analysis_window.num_seconds() as f64;
672 let event_rate = self.event_history.len() as f64 / duration_seconds;
673
674 let temporal_distribution = self.analyze_temporal_distribution().await?;
676
677 let pattern_type = self
679 .classify_pattern_type(event_rate, &temporal_distribution)
680 .await?;
681
682 let avg_event_size = self.calculate_average_event_size().await?;
684
685 let data_characteristics = self.analyze_data_characteristics().await?;
687
688 let consistency_requirements = self.determine_consistency_requirements().await?;
690
691 Ok(WorkloadPattern {
692 pattern_type,
693 event_rate,
694 batch_size: self.estimate_optimal_batch_size(event_rate),
695 event_size_bytes: avg_event_size,
696 temporal_distribution,
697 data_characteristics,
698 consistency_requirements,
699 })
700 }
701
702 async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
703 if self.event_history.len() < 10 {
704 return Ok(TemporalDistribution::Uniform);
705 }
706
707 let mut intervals = Vec::new();
709 for i in 1..self.event_history.len() {
710 let interval = self.event_history[i]
711 .0
712 .signed_duration_since(self.event_history[i - 1].0)
713 .num_milliseconds() as f64;
714 intervals.push(interval);
715 }
716
717 let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
719 let variance =
720 intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
721 let std_dev = variance.sqrt();
722
723 let cv = std_dev / mean; if cv < 0.1 {
727 Ok(TemporalDistribution::Uniform)
728 } else if cv < 0.5 {
729 Ok(TemporalDistribution::Normal { mean, std_dev })
730 } else {
731 Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
732 }
733 }
734
735 async fn classify_pattern_type(
736 &self,
737 event_rate: f64,
738 temporal_dist: &TemporalDistribution,
739 ) -> Result<PatternType> {
740 match temporal_dist {
742 TemporalDistribution::Uniform => {
743 if event_rate > 10000.0 {
744 Ok(PatternType::BatchOriented)
745 } else if event_rate > 100.0 {
746 Ok(PatternType::Steady)
747 } else {
748 Ok(PatternType::RealTime)
749 }
750 }
751 TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
752 TemporalDistribution::Normal { std_dev, mean } => {
753 if std_dev / mean > 1.0 {
754 Ok(PatternType::Random)
755 } else {
756 Ok(PatternType::Steady)
757 }
758 }
759 _ => Ok(PatternType::Steady),
760 }
761 }
762
763 async fn calculate_average_event_size(&self) -> Result<u64> {
764 if self.event_history.is_empty() {
765 return Ok(1024); }
767
768 let avg_size = self
770 .event_history
771 .iter()
772 .map(|(_, event)| self.estimate_event_size(event))
773 .sum::<u64>()
774 / self.event_history.len() as u64;
775
776 Ok(avg_size)
777 }
778
779 fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
780 match event {
782 StreamEvent::TripleAdded {
783 subject,
784 predicate,
785 object,
786 ..
787 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
788 StreamEvent::TripleRemoved {
789 subject,
790 predicate,
791 object,
792 ..
793 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
794 StreamEvent::GraphCreated { .. } => 200,
795 StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
796 StreamEvent::TransactionBegin { .. } => 150,
797 StreamEvent::TransactionCommit { .. } => 100,
798 StreamEvent::Heartbeat { .. } => 50,
799 _ => 300, }
801 }
802
803 async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
804 let has_complex_structures = self
805 .event_history
806 .iter()
807 .any(|(_, event)| self.is_complex_event(event));
808
809 let requires_ordering = self
810 .event_history
811 .iter()
812 .any(|(_, event)| self.requires_ordering(event));
813
814 Ok(DataCharacteristics {
815 compression_ratio: 0.7, serialization_overhead: 0.1, has_complex_structures,
818 requires_ordering,
819 has_time_windows: false, requires_deduplication: true, })
822 }
823
824 fn is_complex_event(&self, event: &StreamEvent) -> bool {
825 matches!(
826 event,
827 StreamEvent::SparqlUpdate { .. }
828 | StreamEvent::SchemaChanged { .. }
829 | StreamEvent::QueryCompleted { .. }
830 )
831 }
832
833 fn requires_ordering(&self, event: &StreamEvent) -> bool {
834 matches!(
835 event,
836 StreamEvent::TransactionBegin { .. }
837 | StreamEvent::TransactionCommit { .. }
838 | StreamEvent::TransactionAbort { .. }
839 )
840 }
841
842 async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
843 let has_transactions = self.event_history.iter().any(|(_, event)| {
844 matches!(
845 event,
846 StreamEvent::TransactionBegin { .. }
847 | StreamEvent::TransactionCommit { .. }
848 | StreamEvent::TransactionAbort { .. }
849 )
850 });
851
852 if has_transactions {
853 Ok(ConsistencyLevel::ExactlyOnce)
854 } else {
855 Ok(ConsistencyLevel::AtLeastOnce)
856 }
857 }
858
859 fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
860 if event_rate > 10000.0 {
861 1000
862 } else if event_rate > 1000.0 {
863 500
864 } else if event_rate > 100.0 {
865 100
866 } else {
867 10
868 }
869 }
870}
871
872impl CostCalculator {
873 pub fn new(config: OptimizerConfig) -> Self {
874 let mut baseline_costs = HashMap::new();
875
876 baseline_costs.insert(BackendType::Memory, 0.0);
878 baseline_costs.insert(BackendType::Redis, 0.1);
879 baseline_costs.insert(BackendType::Nats, 0.2);
880 baseline_costs.insert(BackendType::Kafka, 0.5);
881 baseline_costs.insert(BackendType::Pulsar, 0.4);
882 baseline_costs.insert(BackendType::Kinesis, 0.8);
883
884 Self {
885 config,
886 baseline_costs,
887 }
888 }
889
890 pub async fn calculate_cost(
891 &self,
892 backend_type: &BackendType,
893 pattern: &WorkloadPattern,
894 performance: &BackendPerformance,
895 ) -> Result<CostModel> {
896 let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
897
898 let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
900 let throughput_cost =
901 self.calculate_throughput_cost(performance.measured_throughput, pattern);
902 let reliability_cost =
903 self.calculate_reliability_cost(performance.reliability_score, pattern);
904 let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
905 let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
906 let maintenance_cost =
907 self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
908
909 let total_cost = base_cost
910 + latency_cost * self.config.cost_weight_latency
911 + throughput_cost * self.config.cost_weight_throughput
912 + reliability_cost * self.config.cost_weight_reliability
913 + resource_cost * self.config.cost_weight_resource_usage
914 + scaling_cost * 0.1
915 + maintenance_cost * 0.1;
916
917 Ok(CostModel {
918 total_cost,
919 latency_cost,
920 throughput_cost,
921 reliability_cost,
922 resource_cost,
923 scaling_cost,
924 maintenance_cost,
925 })
926 }
927
928 fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
929 let latency_penalty = match pattern.pattern_type {
930 PatternType::RealTime => latency / 10.0, PatternType::Bursty => latency / 50.0,
932 _ => latency / 100.0,
933 };
934 latency_penalty.min(2.0) }
936
937 fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
938 let required_throughput = pattern.event_rate * 1.5; if throughput < required_throughput {
940 (required_throughput - throughput) / required_throughput
941 } else {
942 0.0
943 }
944 }
945
946 fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
947 let required_reliability = match pattern.consistency_requirements {
948 ConsistencyLevel::ExactlyOnce => 0.999,
949 ConsistencyLevel::AtLeastOnce => 0.995,
950 _ => 0.99,
951 };
952
953 if reliability < required_reliability {
954 (required_reliability - reliability) * 10.0
955 } else {
956 0.0
957 }
958 }
959
960 fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
961 (usage.cpu_usage_percent / 100.0) * 0.1
963 + (usage.memory_usage_mb / 1000.0) * 0.05
964 + (usage.network_usage_mbps / 100.0) * 0.02
965 }
966
967 fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
968 let scaling_factor = match backend_type {
969 BackendType::Kinesis => 0.1, BackendType::Kafka => 0.5, BackendType::Memory => 1.0, _ => 0.3,
973 };
974
975 match pattern.pattern_type {
976 PatternType::Bursty | PatternType::Random => scaling_factor,
977 _ => 0.0,
978 }
979 }
980
981 fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
982 setup_complexity as f64 / 10.0
983 }
984}
985
986impl Default for MLPredictor {
987 fn default() -> Self {
988 Self::new()
989 }
990}
991
992impl MLPredictor {
993 pub fn new() -> Self {
994 Self {
995 performance_history: Vec::new(),
996 patterns: HashMap::new(),
997 _feature_weights: vec![1.0; 10], _confidence_threshold: 0.7,
999 }
1000 }
1001
1002 pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
1003 self.performance_history.push(data_point);
1004
1005 if self.performance_history.len() > 10000 {
1007 self.performance_history.drain(0..1000);
1008 }
1009
1010 Ok(())
1011 }
1012
1013 pub async fn predict_performance(
1014 &self,
1015 backend_type: &BackendType,
1016 pattern: &WorkloadPattern,
1017 ) -> Result<BackendPerformance> {
1018 let relevant_data: Vec<&PerformanceDataPoint> = self
1020 .performance_history
1021 .iter()
1022 .filter(|dp| dp.backend_type == *backend_type)
1023 .collect();
1024
1025 if relevant_data.is_empty() {
1026 return Err(anyhow!(
1027 "No historical data for backend type: {:?}",
1028 backend_type
1029 ));
1030 }
1031
1032 let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1034 .iter()
1035 .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1036 .cloned()
1037 .collect();
1038
1039 let prediction_data = if similar_data.is_empty() {
1040 &relevant_data
1041 } else {
1042 &similar_data
1043 };
1044
1045 let predicted_latency = prediction_data
1047 .iter()
1048 .map(|dp| dp.actual_latency)
1049 .sum::<f64>()
1050 / prediction_data.len() as f64;
1051
1052 let predicted_throughput = prediction_data
1053 .iter()
1054 .map(|dp| dp.actual_throughput)
1055 .sum::<f64>()
1056 / prediction_data.len() as f64;
1057
1058 let predicted_reliability = prediction_data
1059 .iter()
1060 .map(|dp| dp.actual_reliability)
1061 .sum::<f64>()
1062 / prediction_data.len() as f64;
1063
1064 Ok(BackendPerformance {
1065 backend_type: backend_type.clone(),
1066 measured_latency_p50: predicted_latency,
1067 measured_latency_p95: predicted_latency * 1.5,
1068 measured_latency_p99: predicted_latency * 2.0,
1069 measured_throughput: predicted_throughput,
1070 reliability_score: predicted_reliability,
1071 resource_usage: prediction_data[0].resource_usage.clone(),
1072 cost_per_hour: 0.0, setup_complexity: 5, scalability_factor: 1.0,
1075 last_updated: Utc::now(),
1076 sample_count: prediction_data.len() as u64,
1077 })
1078 }
1079
1080 pub async fn retrain_models(&mut self) -> Result<()> {
1081 for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1083 let backend_data: Vec<&PerformanceDataPoint> = self
1084 .performance_history
1085 .iter()
1086 .filter(|dp| dp.backend_type == *backend_type)
1087 .collect();
1088
1089 if backend_data.len() < 10 {
1090 continue;
1091 }
1092
1093 let pattern_name = format!("{backend_type:?}_model");
1095 let model = self.train_linear_model(&backend_data).await?;
1096 self.patterns.insert(pattern_name, model);
1097 }
1098
1099 info!("Retrained ML models for {} patterns", self.patterns.len());
1100 Ok(())
1101 }
1102
1103 async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1104 let n = data.len() as f64;
1106
1107 let features: Vec<Vec<f64>> = data
1109 .iter()
1110 .map(|dp| self.extract_features(&dp.workload_pattern))
1111 .collect();
1112
1113 let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1114
1115 let feature_count = features[0].len();
1117 let mut coefficients = vec![0.0; feature_count];
1118 let intercept = targets.iter().sum::<f64>() / n;
1119
1120 for i in 0..feature_count {
1122 let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1123 let correlation = self.calculate_correlation(&feature_values, &targets);
1124 coefficients[i] = correlation * 0.1; }
1126
1127 Ok(PatternModel {
1128 pattern_name: "latency_model".to_string(),
1129 coefficients,
1130 intercept,
1131 confidence: 0.8, last_trained: Utc::now(),
1133 sample_count: data.len(),
1134 })
1135 }
1136
1137 fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1138 vec![
1139 pattern.event_rate,
1140 pattern.batch_size as f64,
1141 pattern.event_size_bytes as f64,
1142 pattern.data_characteristics.compression_ratio,
1143 pattern.data_characteristics.serialization_overhead,
1144 if pattern.data_characteristics.has_complex_structures {
1145 1.0
1146 } else {
1147 0.0
1148 },
1149 if pattern.data_characteristics.requires_ordering {
1150 1.0
1151 } else {
1152 0.0
1153 },
1154 match pattern.pattern_type {
1155 PatternType::RealTime => 1.0,
1156 PatternType::BatchOriented => 2.0,
1157 PatternType::Bursty => 3.0,
1158 _ => 0.0,
1159 },
1160 match pattern.consistency_requirements {
1161 ConsistencyLevel::ExactlyOnce => 3.0,
1162 ConsistencyLevel::AtLeastOnce => 2.0,
1163 _ => 1.0,
1164 },
1165 1.0, ]
1167 }
1168
1169 fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1170 let rate_similarity =
1171 1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1172 let size_similarity = 1.0
1173 - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1174 / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1175 let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1176 == std::mem::discriminant(&p2.pattern_type)
1177 {
1178 1.0
1179 } else {
1180 0.0
1181 };
1182
1183 (rate_similarity + size_similarity + type_similarity) / 3.0
1184 }
1185
1186 fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1187 let n = x.len() as f64;
1188 let mean_x = x.iter().sum::<f64>() / n;
1189 let mean_y = y.iter().sum::<f64>() / n;
1190
1191 let numerator: f64 = x
1192 .iter()
1193 .zip(y.iter())
1194 .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1195 .sum();
1196
1197 let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1198 let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1199
1200 if denom_x * denom_y == 0.0 {
1201 0.0
1202 } else {
1203 numerator / (denom_x * denom_y).sqrt()
1204 }
1205 }
1206}
1207
1208impl BackendPerformance {
1209 pub fn new(backend_type: BackendType) -> Self {
1210 Self {
1211 backend_type,
1212 measured_latency_p50: 100.0, measured_latency_p95: 200.0,
1214 measured_latency_p99: 500.0,
1215 measured_throughput: 1000.0, reliability_score: 0.99,
1217 resource_usage: ResourceUsage::default(),
1218 cost_per_hour: 0.1,
1219 setup_complexity: 5,
1220 scalability_factor: 1.0,
1221 last_updated: Utc::now(),
1222 sample_count: 0,
1223 }
1224 }
1225}
1226
1227impl Default for ResourceUsage {
1228 fn default() -> Self {
1229 Self {
1230 cpu_usage_percent: 10.0,
1231 memory_usage_mb: 100.0,
1232 network_usage_mbps: 1.0,
1233 disk_io_ops_per_sec: 100.0,
1234 connection_count: 10,
1235 }
1236 }
1237}
1238
1239impl Default for WorkloadPattern {
1240 fn default() -> Self {
1241 Self {
1242 pattern_type: PatternType::Steady,
1243 event_rate: 100.0,
1244 batch_size: 100,
1245 event_size_bytes: 1024,
1246 temporal_distribution: TemporalDistribution::Uniform,
1247 data_characteristics: DataCharacteristics {
1248 compression_ratio: 0.7,
1249 serialization_overhead: 0.1,
1250 has_complex_structures: false,
1251 requires_ordering: false,
1252 has_time_windows: false,
1253 requires_deduplication: true,
1254 },
1255 consistency_requirements: ConsistencyLevel::AtLeastOnce,
1256 }
1257 }
1258}
1259
1260#[derive(Debug, Clone, Serialize, Deserialize)]
1262pub struct OptimizationStats {
1263 pub total_decisions: usize,
1264 pub backend_usage: HashMap<BackendType, usize>,
1265 pub average_confidence: f64,
1266 pub performance_improvements: HashMap<String, f64>,
1267 pub active_backends: usize,
1268 pub last_optimization: Option<DateTime<Utc>>,
1269}
1270
1271#[cfg(test)]
1272mod tests {
1273 use super::*;
1274 use crate::{EventMetadata, StreamEvent};
1275
1276 fn create_test_event() -> StreamEvent {
1277 StreamEvent::TripleAdded {
1278 subject: "http://example.org/subject".to_string(),
1279 predicate: "http://example.org/predicate".to_string(),
1280 object: "http://example.org/object".to_string(),
1281 graph: None,
1282 metadata: EventMetadata {
1283 event_id: uuid::Uuid::new_v4().to_string(),
1284 timestamp: Utc::now(),
1285 source: "test".to_string(),
1286 user: None,
1287 context: None,
1288 caused_by: None,
1289 version: "1.0".to_string(),
1290 properties: std::collections::HashMap::new(),
1291 checksum: None,
1292 },
1293 }
1294 }
1295
1296 #[tokio::test]
1297 async fn test_backend_optimizer_creation() {
1298 let config = OptimizerConfig::default();
1299 let optimizer = BackendOptimizer::new(config);
1300
1301 assert!(optimizer.ml_predictor.is_some());
1302 assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1303 }
1304
1305 #[tokio::test]
1306 async fn test_pattern_analysis() {
1307 let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1308 let events = vec![create_test_event(); 100];
1309
1310 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1311
1312 assert!(pattern.event_rate > 0.0);
1313 assert!(pattern.batch_size > 0);
1314 assert!(pattern.event_size_bytes > 0);
1315 }
1316
1317 #[tokio::test]
1318 async fn test_cost_calculation() {
1319 let config = OptimizerConfig::default();
1320 let calculator = CostCalculator::new(config);
1321 let pattern = WorkloadPattern::default();
1322 let performance = BackendPerformance::new(BackendType::Kafka);
1323
1324 let cost = calculator
1325 .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1326 .await
1327 .unwrap();
1328
1329 assert!(cost.total_cost > 0.0);
1330 assert!(cost.latency_cost >= 0.0);
1331 assert!(cost.throughput_cost >= 0.0);
1332 }
1333
1334 #[tokio::test]
1335 async fn test_backend_recommendation() {
1336 let config = OptimizerConfig {
1337 enable_ml_prediction: false, ..Default::default()
1339 };
1340 let optimizer = BackendOptimizer::new(config);
1341
1342 let metrics = StreamingMetrics::default();
1344 optimizer
1345 .update_backend_performance(BackendType::Kafka, &metrics)
1346 .await
1347 .unwrap();
1348 optimizer
1349 .update_backend_performance(BackendType::Nats, &metrics)
1350 .await
1351 .unwrap();
1352
1353 let pattern = WorkloadPattern::default();
1354 let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1355
1356 assert!(recommendations.len() >= 2);
1357 assert!(recommendations[0].score >= recommendations[1].score);
1358 }
1359
1360 #[tokio::test]
1361 async fn test_ml_predictor() {
1362 let mut predictor = MLPredictor::new();
1363
1364 let data_point = PerformanceDataPoint {
1365 timestamp: Utc::now(),
1366 backend_type: BackendType::Kafka,
1367 workload_pattern: WorkloadPattern::default(),
1368 actual_latency: 50.0,
1369 actual_throughput: 1000.0,
1370 actual_reliability: 0.99,
1371 resource_usage: ResourceUsage::default(),
1372 external_factors: HashMap::new(),
1373 };
1374
1375 predictor.add_training_data(data_point).await.unwrap();
1376 assert_eq!(predictor.performance_history.len(), 1);
1377 }
1378
1379 #[test]
1380 fn test_pattern_similarity() {
1381 let predictor = MLPredictor::new();
1382 let pattern1 = WorkloadPattern {
1383 event_rate: 100.0,
1384 pattern_type: PatternType::Steady,
1385 ..Default::default()
1386 };
1387 let pattern2 = WorkloadPattern {
1388 event_rate: 110.0,
1389 pattern_type: PatternType::Steady,
1390 ..Default::default()
1391 };
1392
1393 let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1394 assert!(similarity > 0.8);
1395 }
1396
1397 #[tokio::test]
1398 async fn test_workload_pattern_classification() {
1399 let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1401
1402 let mut events = Vec::new();
1404 let base_time = Utc::now();
1405 for i in 0..10 {
1406 let mut event = create_test_event();
1407 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1408 metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1409 }
1410 events.push(event);
1411 }
1412 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1413 assert!(matches!(
1415 pattern.pattern_type,
1416 PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1417 ));
1418
1419 let mut events = Vec::new();
1421 let base_time = Utc::now();
1422 for i in 0..3500 {
1424 let mut event = create_test_event();
1425 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1426 metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1427 }
1428 events.push(event);
1429 }
1430 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1431 assert!(pattern.event_rate > 100.0);
1433 }
1434
1435 #[test]
1436 fn test_backend_strengths_analysis() {
1437 let config = OptimizerConfig::default();
1438 let optimizer = BackendOptimizer::new(config);
1439 let pattern = WorkloadPattern {
1440 pattern_type: PatternType::RealTime,
1441 consistency_requirements: ConsistencyLevel::ExactlyOnce,
1442 ..Default::default()
1443 };
1444
1445 let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1446 assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1447
1448 let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1449 assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1450 }
1451
1452 #[test]
1453 fn test_config_serialization() {
1454 let config = OptimizerConfig::default();
1455 let serialized = serde_json::to_string(&config).unwrap();
1456 let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1457
1458 assert_eq!(
1459 config.enable_cost_modeling,
1460 deserialized.enable_cost_modeling
1461 );
1462 assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1463 }
1464}