1use crate::backend::BackendType;
7use crate::event::StreamEvent;
8use crate::monitoring::StreamingMetrics;
9use anyhow::{anyhow, Result};
10use chrono::{DateTime, Duration as ChronoDuration, Utc};
11use serde::{Deserialize, Serialize};
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::sync::RwLock;
16use tracing::{debug, info};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
20pub struct OptimizerConfig {
21 pub enable_cost_modeling: bool,
22 pub enable_ml_prediction: bool,
23 pub enable_pattern_analysis: bool,
24 pub optimization_interval: Duration,
25 pub min_samples_for_prediction: usize,
26 pub cost_weight_latency: f64,
27 pub cost_weight_throughput: f64,
28 pub cost_weight_reliability: f64,
29 pub cost_weight_resource_usage: f64,
30}
31
32impl Default for OptimizerConfig {
33 fn default() -> Self {
34 Self {
35 enable_cost_modeling: true,
36 enable_ml_prediction: true,
37 enable_pattern_analysis: true,
38 optimization_interval: Duration::from_secs(300), min_samples_for_prediction: 100,
40 cost_weight_latency: 0.3,
41 cost_weight_throughput: 0.3,
42 cost_weight_reliability: 0.3,
43 cost_weight_resource_usage: 0.1,
44 }
45 }
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
50pub struct WorkloadPattern {
51 pub pattern_type: PatternType,
52 pub event_rate: f64,
53 pub batch_size: u32,
54 pub event_size_bytes: u64,
55 pub temporal_distribution: TemporalDistribution,
56 pub data_characteristics: DataCharacteristics,
57 pub consistency_requirements: ConsistencyLevel,
58}
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
62pub enum PatternType {
63 Steady,
65 Bursty,
67 Seasonal,
69 Random,
71 RealTime,
73 BatchOriented,
75}
76
77#[derive(Debug, Clone, Serialize, Deserialize)]
79pub enum TemporalDistribution {
80 Uniform,
81 Normal { mean: f64, std_dev: f64 },
82 Exponential { lambda: f64 },
83 Poisson { lambda: f64 },
84 Custom { distribution_name: String },
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct DataCharacteristics {
90 pub compression_ratio: f64,
91 pub serialization_overhead: f64,
92 pub has_complex_structures: bool,
93 pub requires_ordering: bool,
94 pub has_time_windows: bool,
95 pub requires_deduplication: bool,
96}
97
98#[derive(Debug, Clone, Serialize, Deserialize)]
100pub enum ConsistencyLevel {
101 AtMostOnce,
103 AtLeastOnce,
105 ExactlyOnce,
107 Session,
109 Strong,
111}
112
113#[derive(Debug, Clone, Serialize, Deserialize)]
115pub struct BackendPerformance {
116 pub backend_type: BackendType,
117 pub measured_latency_p50: f64,
118 pub measured_latency_p95: f64,
119 pub measured_latency_p99: f64,
120 pub measured_throughput: f64,
121 pub reliability_score: f64,
122 pub resource_usage: ResourceUsage,
123 pub cost_per_hour: f64,
124 pub setup_complexity: u8, pub scalability_factor: f64,
126 pub last_updated: DateTime<Utc>,
127 pub sample_count: u64,
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
132pub struct ResourceUsage {
133 pub cpu_usage_percent: f64,
134 pub memory_usage_mb: f64,
135 pub network_usage_mbps: f64,
136 pub disk_io_ops_per_sec: f64,
137 pub connection_count: u32,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct CostModel {
143 pub total_cost: f64,
144 pub latency_cost: f64,
145 pub throughput_cost: f64,
146 pub reliability_cost: f64,
147 pub resource_cost: f64,
148 pub scaling_cost: f64,
149 pub maintenance_cost: f64,
150}
151
152#[derive(Debug, Clone)]
154pub struct MLPredictor {
155 performance_history: Vec<PerformanceDataPoint>,
157 patterns: HashMap<String, PatternModel>,
159 _feature_weights: Vec<f64>,
161 _confidence_threshold: f64,
163}
164
165#[derive(Debug, Clone, Serialize, Deserialize)]
167pub struct PerformanceDataPoint {
168 pub timestamp: DateTime<Utc>,
169 pub backend_type: BackendType,
170 pub workload_pattern: WorkloadPattern,
171 pub actual_latency: f64,
172 pub actual_throughput: f64,
173 pub actual_reliability: f64,
174 pub resource_usage: ResourceUsage,
175 pub external_factors: HashMap<String, f64>,
176}
177
178#[derive(Debug, Clone)]
180pub struct PatternModel {
181 pub pattern_name: String,
182 pub coefficients: Vec<f64>,
183 pub intercept: f64,
184 pub confidence: f64,
185 pub last_trained: DateTime<Utc>,
186 pub sample_count: usize,
187}
188
189pub struct BackendOptimizer {
191 config: OptimizerConfig,
192 backend_performance: Arc<RwLock<HashMap<BackendType, BackendPerformance>>>,
193 pattern_analyzer: PatternAnalyzer,
194 cost_calculator: CostCalculator,
195 ml_predictor: Option<MLPredictor>,
196 optimization_history: Arc<RwLock<Vec<OptimizationDecision>>>,
197}
198
199pub struct PatternAnalyzer {
201 event_history: Vec<(DateTime<Utc>, StreamEvent)>,
202 _pattern_cache: HashMap<String, WorkloadPattern>,
203 analysis_window: ChronoDuration,
204}
205
206pub struct CostCalculator {
208 config: OptimizerConfig,
209 baseline_costs: HashMap<BackendType, f64>,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct OptimizationDecision {
215 pub timestamp: DateTime<Utc>,
216 pub selected_backend: BackendType,
217 pub workload_pattern: WorkloadPattern,
218 pub predicted_performance: BackendPerformance,
219 pub cost_model: CostModel,
220 pub confidence: f64,
221 pub reason: String,
222}
223
224#[derive(Debug, Clone)]
226pub struct BackendRecommendation {
227 pub backend_type: BackendType,
228 pub score: f64,
229 pub predicted_latency: f64,
230 pub predicted_throughput: f64,
231 pub predicted_cost: f64,
232 pub confidence: f64,
233 pub strengths: Vec<String>,
234 pub weaknesses: Vec<String>,
235}
236
237impl BackendOptimizer {
238 pub fn new(config: OptimizerConfig) -> Self {
240 let ml_predictor = if config.enable_ml_prediction {
241 Some(MLPredictor::new())
242 } else {
243 None
244 };
245
246 Self {
247 pattern_analyzer: PatternAnalyzer::new(ChronoDuration::hours(1)),
248 cost_calculator: CostCalculator::new(config.clone()),
249 backend_performance: Arc::new(RwLock::new(HashMap::new())),
250 optimization_history: Arc::new(RwLock::new(Vec::new())),
251 config,
252 ml_predictor,
253 }
254 }
255
256 pub async fn update_backend_performance(
258 &self,
259 backend_type: BackendType,
260 metrics: &StreamingMetrics,
261 ) -> Result<()> {
262 let mut performance_map = self.backend_performance.write().await;
263
264 let performance = performance_map
265 .entry(backend_type.clone())
266 .or_insert_with(|| BackendPerformance::new(backend_type.clone()));
267
268 let alpha = 0.1; performance.measured_latency_p50 = alpha * metrics.producer_average_latency_ms
271 + (1.0 - alpha) * performance.measured_latency_p50;
272 performance.measured_throughput = alpha * metrics.producer_throughput_eps
273 + (1.0 - alpha) * performance.measured_throughput;
274 performance.reliability_score =
275 alpha * metrics.success_rate + (1.0 - alpha) * performance.reliability_score;
276
277 performance.resource_usage.cpu_usage_percent = metrics.system_cpu_usage_percent;
278 performance.resource_usage.memory_usage_mb =
279 metrics.system_memory_usage_bytes as f64 / (1024.0 * 1024.0);
280 performance.resource_usage.connection_count = metrics.backend_connections_active;
281
282 performance.last_updated = Utc::now();
283 performance.sample_count += 1;
284
285 debug!(
286 "Updated performance for {:?}: latency={:.2}ms, throughput={:.0}eps, reliability={:.3}",
287 backend_type,
288 performance.measured_latency_p50,
289 performance.measured_throughput,
290 performance.reliability_score
291 );
292
293 Ok(())
294 }
295
296 pub async fn analyze_workload_pattern(
298 &mut self,
299 events: &[StreamEvent],
300 ) -> Result<WorkloadPattern> {
301 self.pattern_analyzer.analyze_pattern(events).await
302 }
303
304 pub async fn recommend_backend(
306 &self,
307 pattern: &WorkloadPattern,
308 ) -> Result<Vec<BackendRecommendation>> {
309 let mut recommendations = Vec::new();
310 let performance_map = self.backend_performance.read().await;
311
312 for (backend_type, performance) in performance_map.iter() {
313 let cost = self
314 .cost_calculator
315 .calculate_cost(backend_type, pattern, performance)
316 .await?;
317
318 let predicted_performance = if let Some(predictor) = &self.ml_predictor {
319 predictor.predict_performance(backend_type, pattern).await?
320 } else {
321 performance.clone()
322 };
323
324 let score = self.calculate_backend_score(&cost, &predicted_performance, pattern);
325 let confidence = self.calculate_confidence(&predicted_performance, pattern);
326
327 let recommendation = BackendRecommendation {
328 backend_type: backend_type.clone(),
329 score,
330 predicted_latency: predicted_performance.measured_latency_p50,
331 predicted_throughput: predicted_performance.measured_throughput,
332 predicted_cost: cost.total_cost,
333 confidence,
334 strengths: self.analyze_strengths(backend_type, pattern),
335 weaknesses: self.analyze_weaknesses(backend_type, pattern),
336 };
337
338 recommendations.push(recommendation);
339 }
340
341 recommendations.sort_by(|a, b| {
343 b.score
344 .partial_cmp(&a.score)
345 .unwrap_or(std::cmp::Ordering::Equal)
346 });
347
348 info!(
349 "Generated {} backend recommendations for workload pattern: {:?}",
350 recommendations.len(),
351 pattern.pattern_type
352 );
353
354 Ok(recommendations)
355 }
356
357 pub async fn train_predictor(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
359 if let Some(predictor) = &mut self.ml_predictor {
360 predictor.add_training_data(data_point).await?;
361
362 if predictor.performance_history.len() >= self.config.min_samples_for_prediction {
363 predictor.retrain_models().await?;
364 }
365 }
366 Ok(())
367 }
368
369 pub async fn record_decision(&self, decision: OptimizationDecision) -> Result<()> {
371 let mut history = self.optimization_history.write().await;
372 history.push(decision);
373
374 if history.len() > 1000 {
376 history.drain(0..100);
377 }
378 Ok(())
379 }
380
381 pub async fn get_optimization_stats(&self) -> Result<OptimizationStats> {
383 let history = self.optimization_history.read().await;
384 let performance_map = self.backend_performance.read().await;
385
386 let total_decisions = history.len();
387 let backend_usage = history.iter().fold(HashMap::new(), |mut acc, decision| {
388 *acc.entry(decision.selected_backend.clone()).or_insert(0) += 1;
389 acc
390 });
391
392 let average_confidence = if total_decisions > 0 {
393 history.iter().map(|d| d.confidence).sum::<f64>() / total_decisions as f64
394 } else {
395 0.0
396 };
397
398 let performance_improvements = self.calculate_performance_improvements(&history).await?;
399
400 Ok(OptimizationStats {
401 total_decisions,
402 backend_usage,
403 average_confidence,
404 performance_improvements,
405 active_backends: performance_map.len(),
406 last_optimization: history.last().map(|d| d.timestamp),
407 })
408 }
409
410 fn calculate_backend_score(
412 &self,
413 cost: &CostModel,
414 performance: &BackendPerformance,
415 pattern: &WorkloadPattern,
416 ) -> f64 {
417 let latency_score = match pattern.pattern_type {
418 PatternType::RealTime => {
419 if performance.measured_latency_p99 < 10.0 {
421 1.0
422 } else if performance.measured_latency_p99 < 50.0 {
423 0.7
424 } else {
425 0.3
426 }
427 }
428 _ => {
429 (100.0 / (performance.measured_latency_p50 + 1.0)).min(1.0)
431 }
432 };
433
434 let throughput_score = match pattern.pattern_type {
435 PatternType::BatchOriented => {
436 (performance.measured_throughput / pattern.event_rate).min(2.0) / 2.0
438 }
439 _ => (performance.measured_throughput / (pattern.event_rate * 1.2)).min(1.0),
440 };
441
442 let reliability_score = performance.reliability_score;
443 let cost_score = 1.0 / (cost.total_cost + 1.0);
444
445 (latency_score * self.config.cost_weight_latency
447 + throughput_score * self.config.cost_weight_throughput
448 + reliability_score * self.config.cost_weight_reliability
449 + cost_score * self.config.cost_weight_resource_usage)
450 / (self.config.cost_weight_latency
451 + self.config.cost_weight_throughput
452 + self.config.cost_weight_reliability
453 + self.config.cost_weight_resource_usage)
454 }
455
456 fn calculate_confidence(
458 &self,
459 performance: &BackendPerformance,
460 _pattern: &WorkloadPattern,
461 ) -> f64 {
462 let sample_confidence = (performance.sample_count as f64 / 1000.0).min(1.0);
463 let recency_confidence = {
464 let age_hours = Utc::now()
465 .signed_duration_since(performance.last_updated)
466 .num_hours() as f64;
467 (1.0 / (age_hours / 24.0 + 1.0)).max(0.1)
468 };
469
470 (sample_confidence + recency_confidence) / 2.0
471 }
472
473 fn analyze_strengths(
475 &self,
476 backend_type: &BackendType,
477 pattern: &WorkloadPattern,
478 ) -> Vec<String> {
479 let mut strengths = Vec::new();
480
481 match backend_type {
482 BackendType::Kafka => {
483 strengths.push("High throughput".to_string());
484 strengths.push("Strong durability".to_string());
485 strengths.push("Excellent ordering guarantees".to_string());
486 if matches!(
487 pattern.consistency_requirements,
488 ConsistencyLevel::ExactlyOnce
489 ) {
490 strengths.push("Exactly-once semantics".to_string());
491 }
492 }
493 BackendType::Nats => {
494 strengths.push("Low latency".to_string());
495 strengths.push("Simple setup".to_string());
496 strengths.push("Built-in clustering".to_string());
497 if matches!(pattern.pattern_type, PatternType::RealTime) {
498 strengths.push("Real-time performance".to_string());
499 }
500 }
501 BackendType::Redis => {
502 strengths.push("In-memory speed".to_string());
503 strengths.push("Low latency".to_string());
504 strengths.push("Rich data structures".to_string());
505 }
506 BackendType::Kinesis => {
507 strengths.push("AWS native integration".to_string());
508 strengths.push("Auto-scaling".to_string());
509 strengths.push("Pay-per-use model".to_string());
510 }
511 BackendType::Pulsar => {
512 strengths.push("Multi-tenancy".to_string());
513 strengths.push("Geo-replication".to_string());
514 strengths.push("Unified messaging".to_string());
515 }
516 BackendType::RabbitMQ => {
517 strengths.push("Mature and stable".to_string());
518 strengths.push("Rich routing capabilities".to_string());
519 strengths.push("Strong reliability guarantees".to_string());
520 if matches!(
521 pattern.consistency_requirements,
522 ConsistencyLevel::AtLeastOnce
523 ) {
524 strengths.push("Persistent message delivery".to_string());
525 }
526 }
527 BackendType::Memory => {
528 strengths.push("Zero latency".to_string());
529 strengths.push("Perfect for testing".to_string());
530 }
531 }
532
533 strengths
534 }
535
536 fn analyze_weaknesses(
538 &self,
539 backend_type: &BackendType,
540 pattern: &WorkloadPattern,
541 ) -> Vec<String> {
542 let mut weaknesses = Vec::new();
543
544 match backend_type {
545 BackendType::Kafka => {
546 weaknesses.push("Complex setup".to_string());
547 weaknesses.push("Higher resource usage".to_string());
548 if matches!(pattern.pattern_type, PatternType::RealTime) {
549 weaknesses.push("Higher latency than NATS".to_string());
550 }
551 }
552 BackendType::Nats => {
553 if matches!(pattern.consistency_requirements, ConsistencyLevel::Strong) {
554 weaknesses.push("Limited durability options".to_string());
555 }
556 if pattern.event_rate > 100000.0 {
557 weaknesses.push("May not handle extreme throughput".to_string());
558 }
559 }
560 BackendType::Redis => {
561 weaknesses.push("Memory-bound".to_string());
562 weaknesses.push("Limited durability".to_string());
563 if pattern.event_size_bytes > 1000000 {
564 weaknesses.push("Not suitable for large events".to_string());
565 }
566 }
567 BackendType::Kinesis => {
568 weaknesses.push("AWS vendor lock-in".to_string());
569 weaknesses.push("Cost can scale quickly".to_string());
570 weaknesses.push("Regional limitations".to_string());
571 }
572 BackendType::Pulsar => {
573 weaknesses.push("Newer ecosystem".to_string());
574 weaknesses.push("Complex architecture".to_string());
575 }
576 BackendType::RabbitMQ => {
577 weaknesses.push("Lower throughput than Kafka".to_string());
578 weaknesses.push("Memory-based by default".to_string());
579 if matches!(pattern.pattern_type, PatternType::BatchOriented) {
580 weaknesses.push("Not optimized for batch processing".to_string());
581 }
582 }
583 BackendType::Memory => {
584 weaknesses.push("No persistence".to_string());
585 weaknesses.push("Single node only".to_string());
586 weaknesses.push("Memory limitations".to_string());
587 }
588 }
589
590 weaknesses
591 }
592
593 async fn calculate_performance_improvements(
595 &self,
596 history: &[OptimizationDecision],
597 ) -> Result<HashMap<String, f64>> {
598 let mut improvements = HashMap::new();
599
600 if history.len() < 10 {
601 return Ok(improvements);
602 }
603
604 let recent_decisions = &history[history.len() - 10..];
605 let older_decisions = &history[0..10.min(history.len() - 10)];
606
607 let recent_avg_latency = recent_decisions
608 .iter()
609 .map(|d| d.predicted_performance.measured_latency_p50)
610 .sum::<f64>()
611 / recent_decisions.len() as f64;
612
613 let older_avg_latency = older_decisions
614 .iter()
615 .map(|d| d.predicted_performance.measured_latency_p50)
616 .sum::<f64>()
617 / older_decisions.len() as f64;
618
619 let latency_improvement =
620 (older_avg_latency - recent_avg_latency) / older_avg_latency * 100.0;
621 improvements.insert(
622 "latency_improvement_percent".to_string(),
623 latency_improvement,
624 );
625
626 let recent_avg_throughput = recent_decisions
627 .iter()
628 .map(|d| d.predicted_performance.measured_throughput)
629 .sum::<f64>()
630 / recent_decisions.len() as f64;
631
632 let older_avg_throughput = older_decisions
633 .iter()
634 .map(|d| d.predicted_performance.measured_throughput)
635 .sum::<f64>()
636 / older_decisions.len() as f64;
637
638 let throughput_improvement =
639 (recent_avg_throughput - older_avg_throughput) / older_avg_throughput * 100.0;
640 improvements.insert(
641 "throughput_improvement_percent".to_string(),
642 throughput_improvement,
643 );
644
645 Ok(improvements)
646 }
647}
648
649impl PatternAnalyzer {
650 pub fn new(analysis_window: ChronoDuration) -> Self {
651 Self {
652 event_history: Vec::new(),
653 _pattern_cache: HashMap::new(),
654 analysis_window,
655 }
656 }
657
658 pub async fn analyze_pattern(&mut self, events: &[StreamEvent]) -> Result<WorkloadPattern> {
659 let now = Utc::now();
661 for event in events {
662 self.event_history.push((now, event.clone()));
663 }
664
665 let cutoff = now - self.analysis_window;
667 self.event_history
668 .retain(|(timestamp, _)| *timestamp >= cutoff);
669
670 if self.event_history.is_empty() {
671 return Ok(WorkloadPattern::default());
672 }
673
674 let duration_seconds = self.analysis_window.num_seconds() as f64;
676 let event_rate = self.event_history.len() as f64 / duration_seconds;
677
678 let temporal_distribution = self.analyze_temporal_distribution().await?;
680
681 let pattern_type = self
683 .classify_pattern_type(event_rate, &temporal_distribution)
684 .await?;
685
686 let avg_event_size = self.calculate_average_event_size().await?;
688
689 let data_characteristics = self.analyze_data_characteristics().await?;
691
692 let consistency_requirements = self.determine_consistency_requirements().await?;
694
695 Ok(WorkloadPattern {
696 pattern_type,
697 event_rate,
698 batch_size: self.estimate_optimal_batch_size(event_rate),
699 event_size_bytes: avg_event_size,
700 temporal_distribution,
701 data_characteristics,
702 consistency_requirements,
703 })
704 }
705
706 async fn analyze_temporal_distribution(&self) -> Result<TemporalDistribution> {
707 if self.event_history.len() < 10 {
708 return Ok(TemporalDistribution::Uniform);
709 }
710
711 let mut intervals = Vec::new();
713 for i in 1..self.event_history.len() {
714 let interval = self.event_history[i]
715 .0
716 .signed_duration_since(self.event_history[i - 1].0)
717 .num_milliseconds() as f64;
718 intervals.push(interval);
719 }
720
721 let mean = intervals.iter().sum::<f64>() / intervals.len() as f64;
723 let variance =
724 intervals.iter().map(|x| (x - mean).powi(2)).sum::<f64>() / intervals.len() as f64;
725 let std_dev = variance.sqrt();
726
727 let cv = std_dev / mean; if cv < 0.1 {
731 Ok(TemporalDistribution::Uniform)
732 } else if cv < 0.5 {
733 Ok(TemporalDistribution::Normal { mean, std_dev })
734 } else {
735 Ok(TemporalDistribution::Exponential { lambda: 1.0 / mean })
736 }
737 }
738
739 async fn classify_pattern_type(
740 &self,
741 event_rate: f64,
742 temporal_dist: &TemporalDistribution,
743 ) -> Result<PatternType> {
744 match temporal_dist {
746 TemporalDistribution::Uniform => {
747 if event_rate > 10000.0 {
748 Ok(PatternType::BatchOriented)
749 } else if event_rate > 100.0 {
750 Ok(PatternType::Steady)
751 } else {
752 Ok(PatternType::RealTime)
753 }
754 }
755 TemporalDistribution::Exponential { .. } => Ok(PatternType::Bursty),
756 TemporalDistribution::Normal { std_dev, mean } => {
757 if std_dev / mean > 1.0 {
758 Ok(PatternType::Random)
759 } else {
760 Ok(PatternType::Steady)
761 }
762 }
763 _ => Ok(PatternType::Steady),
764 }
765 }
766
767 async fn calculate_average_event_size(&self) -> Result<u64> {
768 if self.event_history.is_empty() {
769 return Ok(1024); }
771
772 let avg_size = self
774 .event_history
775 .iter()
776 .map(|(_, event)| self.estimate_event_size(event))
777 .sum::<u64>()
778 / self.event_history.len() as u64;
779
780 Ok(avg_size)
781 }
782
783 fn estimate_event_size(&self, event: &StreamEvent) -> u64 {
784 match event {
786 StreamEvent::TripleAdded {
787 subject,
788 predicate,
789 object,
790 ..
791 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
792 StreamEvent::TripleRemoved {
793 subject,
794 predicate,
795 object,
796 ..
797 } => (subject.len() + predicate.len() + object.len() + 100) as u64,
798 StreamEvent::GraphCreated { .. } => 200,
799 StreamEvent::SparqlUpdate { query, .. } => (query.len() + 200) as u64,
800 StreamEvent::TransactionBegin { .. } => 150,
801 StreamEvent::TransactionCommit { .. } => 100,
802 StreamEvent::Heartbeat { .. } => 50,
803 _ => 300, }
805 }
806
807 async fn analyze_data_characteristics(&self) -> Result<DataCharacteristics> {
808 let has_complex_structures = self
809 .event_history
810 .iter()
811 .any(|(_, event)| self.is_complex_event(event));
812
813 let requires_ordering = self
814 .event_history
815 .iter()
816 .any(|(_, event)| self.requires_ordering(event));
817
818 Ok(DataCharacteristics {
819 compression_ratio: 0.7, serialization_overhead: 0.1, has_complex_structures,
822 requires_ordering,
823 has_time_windows: false, requires_deduplication: true, })
826 }
827
828 fn is_complex_event(&self, event: &StreamEvent) -> bool {
829 matches!(
830 event,
831 StreamEvent::SparqlUpdate { .. }
832 | StreamEvent::SchemaChanged { .. }
833 | StreamEvent::QueryCompleted { .. }
834 )
835 }
836
837 fn requires_ordering(&self, event: &StreamEvent) -> bool {
838 matches!(
839 event,
840 StreamEvent::TransactionBegin { .. }
841 | StreamEvent::TransactionCommit { .. }
842 | StreamEvent::TransactionAbort { .. }
843 )
844 }
845
846 async fn determine_consistency_requirements(&self) -> Result<ConsistencyLevel> {
847 let has_transactions = self.event_history.iter().any(|(_, event)| {
848 matches!(
849 event,
850 StreamEvent::TransactionBegin { .. }
851 | StreamEvent::TransactionCommit { .. }
852 | StreamEvent::TransactionAbort { .. }
853 )
854 });
855
856 if has_transactions {
857 Ok(ConsistencyLevel::ExactlyOnce)
858 } else {
859 Ok(ConsistencyLevel::AtLeastOnce)
860 }
861 }
862
863 fn estimate_optimal_batch_size(&self, event_rate: f64) -> u32 {
864 if event_rate > 10000.0 {
865 1000
866 } else if event_rate > 1000.0 {
867 500
868 } else if event_rate > 100.0 {
869 100
870 } else {
871 10
872 }
873 }
874}
875
876impl CostCalculator {
877 pub fn new(config: OptimizerConfig) -> Self {
878 let mut baseline_costs = HashMap::new();
879
880 baseline_costs.insert(BackendType::Memory, 0.0);
882 baseline_costs.insert(BackendType::Redis, 0.1);
883 baseline_costs.insert(BackendType::Nats, 0.2);
884 baseline_costs.insert(BackendType::Kafka, 0.5);
885 baseline_costs.insert(BackendType::Pulsar, 0.4);
886 baseline_costs.insert(BackendType::Kinesis, 0.8);
887
888 Self {
889 config,
890 baseline_costs,
891 }
892 }
893
894 pub async fn calculate_cost(
895 &self,
896 backend_type: &BackendType,
897 pattern: &WorkloadPattern,
898 performance: &BackendPerformance,
899 ) -> Result<CostModel> {
900 let base_cost = self.baseline_costs.get(backend_type).unwrap_or(&1.0);
901
902 let latency_cost = self.calculate_latency_cost(performance.measured_latency_p50, pattern);
904 let throughput_cost =
905 self.calculate_throughput_cost(performance.measured_throughput, pattern);
906 let reliability_cost =
907 self.calculate_reliability_cost(performance.reliability_score, pattern);
908 let resource_cost = self.calculate_resource_cost(&performance.resource_usage, pattern);
909 let scaling_cost = self.calculate_scaling_cost(backend_type, pattern);
910 let maintenance_cost =
911 self.calculate_maintenance_cost(backend_type, performance.setup_complexity);
912
913 let total_cost = base_cost
914 + latency_cost * self.config.cost_weight_latency
915 + throughput_cost * self.config.cost_weight_throughput
916 + reliability_cost * self.config.cost_weight_reliability
917 + resource_cost * self.config.cost_weight_resource_usage
918 + scaling_cost * 0.1
919 + maintenance_cost * 0.1;
920
921 Ok(CostModel {
922 total_cost,
923 latency_cost,
924 throughput_cost,
925 reliability_cost,
926 resource_cost,
927 scaling_cost,
928 maintenance_cost,
929 })
930 }
931
932 fn calculate_latency_cost(&self, latency: f64, pattern: &WorkloadPattern) -> f64 {
933 let latency_penalty = match pattern.pattern_type {
934 PatternType::RealTime => latency / 10.0, PatternType::Bursty => latency / 50.0,
936 _ => latency / 100.0,
937 };
938 latency_penalty.min(2.0) }
940
941 fn calculate_throughput_cost(&self, throughput: f64, pattern: &WorkloadPattern) -> f64 {
942 let required_throughput = pattern.event_rate * 1.5; if throughput < required_throughput {
944 (required_throughput - throughput) / required_throughput
945 } else {
946 0.0
947 }
948 }
949
950 fn calculate_reliability_cost(&self, reliability: f64, pattern: &WorkloadPattern) -> f64 {
951 let required_reliability = match pattern.consistency_requirements {
952 ConsistencyLevel::ExactlyOnce => 0.999,
953 ConsistencyLevel::AtLeastOnce => 0.995,
954 _ => 0.99,
955 };
956
957 if reliability < required_reliability {
958 (required_reliability - reliability) * 10.0
959 } else {
960 0.0
961 }
962 }
963
964 fn calculate_resource_cost(&self, usage: &ResourceUsage, _pattern: &WorkloadPattern) -> f64 {
965 (usage.cpu_usage_percent / 100.0) * 0.1
967 + (usage.memory_usage_mb / 1000.0) * 0.05
968 + (usage.network_usage_mbps / 100.0) * 0.02
969 }
970
971 fn calculate_scaling_cost(&self, backend_type: &BackendType, pattern: &WorkloadPattern) -> f64 {
972 let scaling_factor = match backend_type {
973 BackendType::Kinesis => 0.1, BackendType::Kafka => 0.5, BackendType::Memory => 1.0, _ => 0.3,
977 };
978
979 match pattern.pattern_type {
980 PatternType::Bursty | PatternType::Random => scaling_factor,
981 _ => 0.0,
982 }
983 }
984
985 fn calculate_maintenance_cost(&self, _backend_type: &BackendType, setup_complexity: u8) -> f64 {
986 setup_complexity as f64 / 10.0
987 }
988}
989
990impl Default for MLPredictor {
991 fn default() -> Self {
992 Self::new()
993 }
994}
995
996impl MLPredictor {
997 pub fn new() -> Self {
998 Self {
999 performance_history: Vec::new(),
1000 patterns: HashMap::new(),
1001 _feature_weights: vec![1.0; 10], _confidence_threshold: 0.7,
1003 }
1004 }
1005
1006 pub async fn add_training_data(&mut self, data_point: PerformanceDataPoint) -> Result<()> {
1007 self.performance_history.push(data_point);
1008
1009 if self.performance_history.len() > 10000 {
1011 self.performance_history.drain(0..1000);
1012 }
1013
1014 Ok(())
1015 }
1016
1017 pub async fn predict_performance(
1018 &self,
1019 backend_type: &BackendType,
1020 pattern: &WorkloadPattern,
1021 ) -> Result<BackendPerformance> {
1022 let relevant_data: Vec<&PerformanceDataPoint> = self
1024 .performance_history
1025 .iter()
1026 .filter(|dp| dp.backend_type == *backend_type)
1027 .collect();
1028
1029 if relevant_data.is_empty() {
1030 return Err(anyhow!(
1031 "No historical data for backend type: {:?}",
1032 backend_type
1033 ));
1034 }
1035
1036 let similar_data: Vec<&PerformanceDataPoint> = relevant_data
1038 .iter()
1039 .filter(|dp| self.pattern_similarity(&dp.workload_pattern, pattern) > 0.7)
1040 .cloned()
1041 .collect();
1042
1043 let prediction_data = if similar_data.is_empty() {
1044 &relevant_data
1045 } else {
1046 &similar_data
1047 };
1048
1049 let predicted_latency = prediction_data
1051 .iter()
1052 .map(|dp| dp.actual_latency)
1053 .sum::<f64>()
1054 / prediction_data.len() as f64;
1055
1056 let predicted_throughput = prediction_data
1057 .iter()
1058 .map(|dp| dp.actual_throughput)
1059 .sum::<f64>()
1060 / prediction_data.len() as f64;
1061
1062 let predicted_reliability = prediction_data
1063 .iter()
1064 .map(|dp| dp.actual_reliability)
1065 .sum::<f64>()
1066 / prediction_data.len() as f64;
1067
1068 Ok(BackendPerformance {
1069 backend_type: backend_type.clone(),
1070 measured_latency_p50: predicted_latency,
1071 measured_latency_p95: predicted_latency * 1.5,
1072 measured_latency_p99: predicted_latency * 2.0,
1073 measured_throughput: predicted_throughput,
1074 reliability_score: predicted_reliability,
1075 resource_usage: prediction_data[0].resource_usage.clone(),
1076 cost_per_hour: 0.0, setup_complexity: 5, scalability_factor: 1.0,
1079 last_updated: Utc::now(),
1080 sample_count: prediction_data.len() as u64,
1081 })
1082 }
1083
1084 pub async fn retrain_models(&mut self) -> Result<()> {
1085 for backend_type in [BackendType::Kafka, BackendType::Nats, BackendType::Redis].iter() {
1087 let backend_data: Vec<&PerformanceDataPoint> = self
1088 .performance_history
1089 .iter()
1090 .filter(|dp| dp.backend_type == *backend_type)
1091 .collect();
1092
1093 if backend_data.len() < 10 {
1094 continue;
1095 }
1096
1097 let pattern_name = format!("{backend_type:?}_model");
1099 let model = self.train_linear_model(&backend_data).await?;
1100 self.patterns.insert(pattern_name, model);
1101 }
1102
1103 info!("Retrained ML models for {} patterns", self.patterns.len());
1104 Ok(())
1105 }
1106
1107 async fn train_linear_model(&self, data: &[&PerformanceDataPoint]) -> Result<PatternModel> {
1108 let n = data.len() as f64;
1110
1111 let features: Vec<Vec<f64>> = data
1113 .iter()
1114 .map(|dp| self.extract_features(&dp.workload_pattern))
1115 .collect();
1116
1117 let targets: Vec<f64> = data.iter().map(|dp| dp.actual_latency).collect();
1118
1119 let feature_count = features[0].len();
1121 let mut coefficients = vec![0.0; feature_count];
1122 let intercept = targets.iter().sum::<f64>() / n;
1123
1124 for i in 0..feature_count {
1126 let feature_values: Vec<f64> = features.iter().map(|f| f[i]).collect();
1127 let correlation = self.calculate_correlation(&feature_values, &targets);
1128 coefficients[i] = correlation * 0.1; }
1130
1131 Ok(PatternModel {
1132 pattern_name: "latency_model".to_string(),
1133 coefficients,
1134 intercept,
1135 confidence: 0.8, last_trained: Utc::now(),
1137 sample_count: data.len(),
1138 })
1139 }
1140
1141 fn extract_features(&self, pattern: &WorkloadPattern) -> Vec<f64> {
1142 vec![
1143 pattern.event_rate,
1144 pattern.batch_size as f64,
1145 pattern.event_size_bytes as f64,
1146 pattern.data_characteristics.compression_ratio,
1147 pattern.data_characteristics.serialization_overhead,
1148 if pattern.data_characteristics.has_complex_structures {
1149 1.0
1150 } else {
1151 0.0
1152 },
1153 if pattern.data_characteristics.requires_ordering {
1154 1.0
1155 } else {
1156 0.0
1157 },
1158 match pattern.pattern_type {
1159 PatternType::RealTime => 1.0,
1160 PatternType::BatchOriented => 2.0,
1161 PatternType::Bursty => 3.0,
1162 _ => 0.0,
1163 },
1164 match pattern.consistency_requirements {
1165 ConsistencyLevel::ExactlyOnce => 3.0,
1166 ConsistencyLevel::AtLeastOnce => 2.0,
1167 _ => 1.0,
1168 },
1169 1.0, ]
1171 }
1172
1173 fn pattern_similarity(&self, p1: &WorkloadPattern, p2: &WorkloadPattern) -> f64 {
1174 let rate_similarity =
1175 1.0 - (p1.event_rate - p2.event_rate).abs() / (p1.event_rate + p2.event_rate + 1.0);
1176 let size_similarity = 1.0
1177 - (p1.event_size_bytes as f64 - p2.event_size_bytes as f64).abs()
1178 / (p1.event_size_bytes as f64 + p2.event_size_bytes as f64 + 1.0);
1179 let type_similarity = if std::mem::discriminant(&p1.pattern_type)
1180 == std::mem::discriminant(&p2.pattern_type)
1181 {
1182 1.0
1183 } else {
1184 0.0
1185 };
1186
1187 (rate_similarity + size_similarity + type_similarity) / 3.0
1188 }
1189
1190 fn calculate_correlation(&self, x: &[f64], y: &[f64]) -> f64 {
1191 let n = x.len() as f64;
1192 let mean_x = x.iter().sum::<f64>() / n;
1193 let mean_y = y.iter().sum::<f64>() / n;
1194
1195 let numerator: f64 = x
1196 .iter()
1197 .zip(y.iter())
1198 .map(|(xi, yi)| (xi - mean_x) * (yi - mean_y))
1199 .sum();
1200
1201 let denom_x: f64 = x.iter().map(|xi| (xi - mean_x).powi(2)).sum();
1202 let denom_y: f64 = y.iter().map(|yi| (yi - mean_y).powi(2)).sum();
1203
1204 if denom_x * denom_y == 0.0 {
1205 0.0
1206 } else {
1207 numerator / (denom_x * denom_y).sqrt()
1208 }
1209 }
1210}
1211
1212impl BackendPerformance {
1213 pub fn new(backend_type: BackendType) -> Self {
1214 Self {
1215 backend_type,
1216 measured_latency_p50: 100.0, measured_latency_p95: 200.0,
1218 measured_latency_p99: 500.0,
1219 measured_throughput: 1000.0, reliability_score: 0.99,
1221 resource_usage: ResourceUsage::default(),
1222 cost_per_hour: 0.1,
1223 setup_complexity: 5,
1224 scalability_factor: 1.0,
1225 last_updated: Utc::now(),
1226 sample_count: 0,
1227 }
1228 }
1229}
1230
1231impl Default for ResourceUsage {
1232 fn default() -> Self {
1233 Self {
1234 cpu_usage_percent: 10.0,
1235 memory_usage_mb: 100.0,
1236 network_usage_mbps: 1.0,
1237 disk_io_ops_per_sec: 100.0,
1238 connection_count: 10,
1239 }
1240 }
1241}
1242
1243impl Default for WorkloadPattern {
1244 fn default() -> Self {
1245 Self {
1246 pattern_type: PatternType::Steady,
1247 event_rate: 100.0,
1248 batch_size: 100,
1249 event_size_bytes: 1024,
1250 temporal_distribution: TemporalDistribution::Uniform,
1251 data_characteristics: DataCharacteristics {
1252 compression_ratio: 0.7,
1253 serialization_overhead: 0.1,
1254 has_complex_structures: false,
1255 requires_ordering: false,
1256 has_time_windows: false,
1257 requires_deduplication: true,
1258 },
1259 consistency_requirements: ConsistencyLevel::AtLeastOnce,
1260 }
1261 }
1262}
1263
1264#[derive(Debug, Clone, Serialize, Deserialize)]
1266pub struct OptimizationStats {
1267 pub total_decisions: usize,
1268 pub backend_usage: HashMap<BackendType, usize>,
1269 pub average_confidence: f64,
1270 pub performance_improvements: HashMap<String, f64>,
1271 pub active_backends: usize,
1272 pub last_optimization: Option<DateTime<Utc>>,
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use super::*;
1278 use crate::{EventMetadata, StreamEvent};
1279
1280 fn create_test_event() -> StreamEvent {
1281 StreamEvent::TripleAdded {
1282 subject: "http://example.org/subject".to_string(),
1283 predicate: "http://example.org/predicate".to_string(),
1284 object: "http://example.org/object".to_string(),
1285 graph: None,
1286 metadata: EventMetadata {
1287 event_id: uuid::Uuid::new_v4().to_string(),
1288 timestamp: Utc::now(),
1289 source: "test".to_string(),
1290 user: None,
1291 context: None,
1292 caused_by: None,
1293 version: "1.0".to_string(),
1294 properties: std::collections::HashMap::new(),
1295 checksum: None,
1296 },
1297 }
1298 }
1299
1300 #[tokio::test]
1301 async fn test_backend_optimizer_creation() {
1302 let config = OptimizerConfig::default();
1303 let optimizer = BackendOptimizer::new(config);
1304
1305 assert!(optimizer.ml_predictor.is_some());
1306 assert_eq!(optimizer.backend_performance.read().await.len(), 0);
1307 }
1308
1309 #[tokio::test]
1310 async fn test_pattern_analysis() {
1311 let mut analyzer = PatternAnalyzer::new(ChronoDuration::minutes(10));
1312 let events = vec![create_test_event(); 100];
1313
1314 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1315
1316 assert!(pattern.event_rate > 0.0);
1317 assert!(pattern.batch_size > 0);
1318 assert!(pattern.event_size_bytes > 0);
1319 }
1320
1321 #[tokio::test]
1322 async fn test_cost_calculation() {
1323 let config = OptimizerConfig::default();
1324 let calculator = CostCalculator::new(config);
1325 let pattern = WorkloadPattern::default();
1326 let performance = BackendPerformance::new(BackendType::Kafka);
1327
1328 let cost = calculator
1329 .calculate_cost(&BackendType::Kafka, &pattern, &performance)
1330 .await
1331 .unwrap();
1332
1333 assert!(cost.total_cost > 0.0);
1334 assert!(cost.latency_cost >= 0.0);
1335 assert!(cost.throughput_cost >= 0.0);
1336 }
1337
1338 #[tokio::test]
1339 async fn test_backend_recommendation() {
1340 let config = OptimizerConfig {
1341 enable_ml_prediction: false, ..Default::default()
1343 };
1344 let optimizer = BackendOptimizer::new(config);
1345
1346 let metrics = StreamingMetrics::default();
1348 optimizer
1349 .update_backend_performance(BackendType::Kafka, &metrics)
1350 .await
1351 .unwrap();
1352 optimizer
1353 .update_backend_performance(BackendType::Nats, &metrics)
1354 .await
1355 .unwrap();
1356
1357 let pattern = WorkloadPattern::default();
1358 let recommendations = optimizer.recommend_backend(&pattern).await.unwrap();
1359
1360 assert!(recommendations.len() >= 2);
1361 assert!(recommendations[0].score >= recommendations[1].score);
1362 }
1363
1364 #[tokio::test]
1365 async fn test_ml_predictor() {
1366 let mut predictor = MLPredictor::new();
1367
1368 let data_point = PerformanceDataPoint {
1369 timestamp: Utc::now(),
1370 backend_type: BackendType::Kafka,
1371 workload_pattern: WorkloadPattern::default(),
1372 actual_latency: 50.0,
1373 actual_throughput: 1000.0,
1374 actual_reliability: 0.99,
1375 resource_usage: ResourceUsage::default(),
1376 external_factors: HashMap::new(),
1377 };
1378
1379 predictor.add_training_data(data_point).await.unwrap();
1380 assert_eq!(predictor.performance_history.len(), 1);
1381 }
1382
1383 #[test]
1384 fn test_pattern_similarity() {
1385 let predictor = MLPredictor::new();
1386 let pattern1 = WorkloadPattern {
1387 event_rate: 100.0,
1388 pattern_type: PatternType::Steady,
1389 ..Default::default()
1390 };
1391 let pattern2 = WorkloadPattern {
1392 event_rate: 110.0,
1393 pattern_type: PatternType::Steady,
1394 ..Default::default()
1395 };
1396
1397 let similarity = predictor.pattern_similarity(&pattern1, &pattern2);
1398 assert!(similarity > 0.8);
1399 }
1400
1401 #[tokio::test]
1402 async fn test_workload_pattern_classification() {
1403 let mut analyzer = PatternAnalyzer::new(ChronoDuration::seconds(30));
1405
1406 let mut events = Vec::new();
1408 let base_time = Utc::now();
1409 for i in 0..10 {
1410 let mut event = create_test_event();
1411 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1412 metadata.timestamp = base_time + ChronoDuration::seconds(i as i64);
1413 }
1414 events.push(event);
1415 }
1416 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1417 assert!(matches!(
1419 pattern.pattern_type,
1420 PatternType::RealTime | PatternType::Steady | PatternType::Bursty | PatternType::Random
1421 ));
1422
1423 let mut events = Vec::new();
1425 let base_time = Utc::now();
1426 for i in 0..3500 {
1428 let mut event = create_test_event();
1429 if let StreamEvent::TripleAdded { metadata, .. } = &mut event {
1430 metadata.timestamp = base_time + ChronoDuration::milliseconds(i as i64 * 8);
1431 }
1432 events.push(event);
1433 }
1434 let pattern = analyzer.analyze_pattern(&events).await.unwrap();
1435 assert!(pattern.event_rate > 100.0);
1437 }
1438
1439 #[test]
1440 fn test_backend_strengths_analysis() {
1441 let config = OptimizerConfig::default();
1442 let optimizer = BackendOptimizer::new(config);
1443 let pattern = WorkloadPattern {
1444 pattern_type: PatternType::RealTime,
1445 consistency_requirements: ConsistencyLevel::ExactlyOnce,
1446 ..Default::default()
1447 };
1448
1449 let kafka_strengths = optimizer.analyze_strengths(&BackendType::Kafka, &pattern);
1450 assert!(kafka_strengths.contains(&"Exactly-once semantics".to_string()));
1451
1452 let nats_strengths = optimizer.analyze_strengths(&BackendType::Nats, &pattern);
1453 assert!(nats_strengths.contains(&"Real-time performance".to_string()));
1454 }
1455
1456 #[test]
1457 fn test_config_serialization() {
1458 let config = OptimizerConfig::default();
1459 let serialized = serde_json::to_string(&config).unwrap();
1460 let deserialized: OptimizerConfig = serde_json::from_str(&serialized).unwrap();
1461
1462 assert_eq!(
1463 config.enable_cost_modeling,
1464 deserialized.enable_cost_modeling
1465 );
1466 assert_eq!(config.cost_weight_latency, deserialized.cost_weight_latency);
1467 }
1468}