oxirs_vec/tiering/
tier_optimizer.rs

1//! Tier optimization using ML and statistical analysis
2
3use super::config::TieringConfig;
4use super::policies::{PolicyEvaluator, TierTransitionReason};
5use super::types::{IndexMetadata, StorageTier, TierStatistics};
6use scirs2_core::ndarray_ext::{Array1, Array2};
7use std::collections::HashMap;
8use std::time::SystemTime;
9
10/// Tier optimizer using ML and statistical analysis
11pub struct TierOptimizer {
12    /// Policy evaluator
13    policy_evaluator: PolicyEvaluator,
14    /// Configuration
15    config: TieringConfig,
16    /// Historical decisions
17    decision_history: Vec<OptimizationDecision>,
18    /// Feature importance weights (learned)
19    feature_weights: Array1<f64>,
20}
21
22/// Optimization decision record
23#[derive(Debug, Clone)]
24struct OptimizationDecision {
25    index_id: String,
26    from_tier: StorageTier,
27    to_tier: StorageTier,
28    reason: TierTransitionReason,
29    timestamp: SystemTime,
30    features: Array1<f64>,
31    outcome_score: f64,
32}
33
34impl TierOptimizer {
35    /// Create a new tier optimizer
36    pub fn new(config: TieringConfig) -> Self {
37        let policy = config.policy;
38        let feature_weights = Array1::from_vec(vec![1.0; 10]); // 10 features
39
40        Self {
41            policy_evaluator: PolicyEvaluator::new(policy),
42            config,
43            decision_history: Vec::new(),
44            feature_weights,
45        }
46    }
47
48    /// Optimize tier placements for all indices
49    pub fn optimize_tier_placements(
50        &mut self,
51        indices: &[IndexMetadata],
52        tier_stats: &[TierStatistics; 3],
53    ) -> Vec<TierOptimizationRecommendation> {
54        let mut recommendations = Vec::new();
55
56        for metadata in indices {
57            if let Some(recommendation) = self.evaluate_index(metadata, tier_stats) {
58                recommendations.push(recommendation);
59            }
60        }
61
62        // Sort by priority (highest first)
63        recommendations.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
64
65        recommendations
66    }
67
68    /// Evaluate a single index for tier optimization
69    fn evaluate_index(
70        &mut self,
71        metadata: &IndexMetadata,
72        tier_stats: &[TierStatistics; 3],
73    ) -> Option<TierOptimizationRecommendation> {
74        let current_tier = metadata.current_tier;
75        let (optimal_tier, reason) =
76            self.policy_evaluator
77                .evaluate_optimal_tier(metadata, tier_stats, SystemTime::now());
78
79        // Check if transition is needed
80        if optimal_tier == current_tier {
81            return None;
82        }
83
84        // Extract features
85        let features = self.extract_features(metadata, tier_stats);
86
87        // Calculate priority using learned weights
88        let priority = self.calculate_priority(&features, current_tier, optimal_tier);
89
90        // Estimate benefit
91        let benefit = self.estimate_benefit(metadata, current_tier, optimal_tier);
92
93        Some(TierOptimizationRecommendation {
94            index_id: metadata.index_id.clone(),
95            current_tier,
96            recommended_tier: optimal_tier,
97            reason,
98            priority,
99            estimated_benefit: benefit,
100            confidence: self.calculate_confidence(&features),
101        })
102    }
103
104    /// Extract features for ML-based optimization
105    fn extract_features(
106        &self,
107        metadata: &IndexMetadata,
108        tier_stats: &[TierStatistics; 3],
109    ) -> Array1<f64> {
110        let mut features = Vec::with_capacity(10);
111
112        // Feature 1: Access frequency (QPS)
113        features.push(metadata.access_stats.avg_qps.ln_1p());
114
115        // Feature 2: Index size (GB)
116        let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
117        features.push(size_gb.ln_1p());
118
119        // Feature 3: Recency (hours since last access)
120        let recency = metadata
121            .access_stats
122            .last_access_time
123            .and_then(|t| SystemTime::now().duration_since(t).ok())
124            .map(|d| d.as_secs() as f64 / 3600.0)
125            .unwrap_or(1000.0);
126        features.push(recency.ln_1p());
127
128        // Feature 4: Query latency (p95)
129        features.push((metadata.access_stats.query_latencies.p95 as f64).ln_1p());
130
131        // Feature 5: Current tier utilization
132        let tier_idx = match metadata.current_tier {
133            StorageTier::Hot => 0,
134            StorageTier::Warm => 1,
135            StorageTier::Cold => 2,
136        };
137        features.push(tier_stats[tier_idx].utilization());
138
139        // Feature 6: Peak QPS
140        features.push(metadata.access_stats.peak_qps.ln_1p());
141
142        // Feature 7: Total queries
143        features.push((metadata.access_stats.total_queries as f64).ln_1p());
144
145        // Feature 8: Memory footprint
146        let memory_gb =
147            metadata.performance_metrics.memory_footprint_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
148        features.push(memory_gb.ln_1p());
149
150        // Feature 9: Cache hit rate
151        features.push(metadata.performance_metrics.cache_hit_rate);
152
153        // Feature 10: Time in current tier (hours)
154        let time_in_tier = metadata
155            .last_modified
156            .elapsed()
157            .map(|d| d.as_secs() as f64 / 3600.0)
158            .unwrap_or(0.0);
159        features.push(time_in_tier.ln_1p());
160
161        Array1::from_vec(features)
162    }
163
164    /// Calculate priority for tier transition
165    fn calculate_priority(
166        &self,
167        features: &Array1<f64>,
168        from_tier: StorageTier,
169        to_tier: StorageTier,
170    ) -> f64 {
171        // Weighted sum of features
172        let mut priority = features.dot(&self.feature_weights);
173
174        // Boost priority for promotions (moving to faster tier)
175        if matches!(
176            (from_tier, to_tier),
177            (StorageTier::Cold, StorageTier::Warm)
178                | (StorageTier::Cold, StorageTier::Hot)
179                | (StorageTier::Warm, StorageTier::Hot)
180        ) {
181            priority *= 1.5;
182        }
183
184        // Reduce priority for demotions
185        if matches!(
186            (from_tier, to_tier),
187            (StorageTier::Hot, StorageTier::Warm)
188                | (StorageTier::Hot, StorageTier::Cold)
189                | (StorageTier::Warm, StorageTier::Cold)
190        ) {
191            priority *= 0.7;
192        }
193
194        priority.max(0.0)
195    }
196
197    /// Estimate benefit of tier transition
198    fn estimate_benefit(
199        &self,
200        metadata: &IndexMetadata,
201        from_tier: StorageTier,
202        to_tier: StorageTier,
203    ) -> f64 {
204        // Latency improvement
205        let latency_improvement = from_tier.typical_latency().as_micros() as f64
206            - to_tier.typical_latency().as_micros() as f64;
207        let latency_benefit = latency_improvement * metadata.access_stats.avg_qps;
208
209        // Cost change
210        let from_cost = from_tier.cost_factor();
211        let to_cost = to_tier.cost_factor();
212        let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
213        let cost_change = (from_cost - to_cost) * size_gb;
214
215        // Combined benefit (latency savings - cost increase)
216        latency_benefit + cost_change * 1000.0 // Weight cost changes
217    }
218
219    /// Calculate confidence in recommendation
220    fn calculate_confidence(&self, features: &Array1<f64>) -> f64 {
221        // Use variance of features as uncertainty measure
222        let mean = features.mean().unwrap_or(0.0);
223        let variance =
224            features.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / features.len() as f64;
225
226        // Higher variance = lower confidence
227        let confidence = 1.0 / (1.0 + variance);
228        confidence.clamp(0.0, 1.0)
229    }
230
231    /// Learn from optimization outcomes
232    pub fn update_from_feedback(
233        &mut self,
234        index_id: &str,
235        from_tier: StorageTier,
236        to_tier: StorageTier,
237        reason: TierTransitionReason,
238        features: Array1<f64>,
239        outcome_score: f64,
240    ) {
241        let decision = OptimizationDecision {
242            index_id: index_id.to_string(),
243            from_tier,
244            to_tier,
245            reason,
246            timestamp: SystemTime::now(),
247            features,
248            outcome_score,
249        };
250
251        self.decision_history.push(decision);
252
253        // Update feature weights periodically
254        if self.decision_history.len() % 100 == 0 {
255            self.update_feature_weights();
256        }
257    }
258
259    /// Update feature weights based on historical decisions
260    fn update_feature_weights(&mut self) {
261        if self.decision_history.len() < 10 {
262            return; // Need minimum history
263        }
264
265        // Build feature matrix and outcome vector
266        let n = self.decision_history.len();
267        let mut features = Array2::zeros((n, 10));
268        let mut outcomes = Array1::zeros(n);
269
270        for (i, decision) in self.decision_history.iter().enumerate() {
271            for (j, &feature) in decision.features.iter().enumerate() {
272                features[[i, j]] = feature;
273            }
274            outcomes[i] = decision.outcome_score;
275        }
276
277        // Simple gradient descent update (learning rate = 0.01)
278        let learning_rate = 0.01;
279        for i in 0..10 {
280            let feature_col = features.column(i);
281            let correlation = feature_col.dot(&outcomes) / n as f64;
282            self.feature_weights[i] += learning_rate * correlation;
283        }
284
285        // Normalize weights
286        let sum: f64 = self.feature_weights.iter().map(|w| w.abs()).sum();
287        if sum > 0.0 {
288            self.feature_weights /= sum;
289        }
290    }
291
292    /// Predict optimal tier for a new index
293    pub fn predict_optimal_tier(
294        &mut self,
295        metadata: &IndexMetadata,
296        tier_stats: &[TierStatistics; 3],
297    ) -> (StorageTier, f64) {
298        let features = self.extract_features(metadata, tier_stats);
299
300        // Calculate scores for each tier
301        let mut best_tier = StorageTier::Cold;
302        let mut best_score = f64::NEG_INFINITY;
303
304        for &tier in &[StorageTier::Hot, StorageTier::Warm, StorageTier::Cold] {
305            let score = self.calculate_tier_score(&features, tier);
306            if score > best_score {
307                best_score = score;
308                best_tier = tier;
309            }
310        }
311
312        (best_tier, best_score)
313    }
314
315    /// Calculate score for placing index in a tier
316    fn calculate_tier_score(&self, features: &Array1<f64>, tier: StorageTier) -> f64 {
317        let base_score = features.dot(&self.feature_weights);
318
319        // Adjust score based on tier characteristics
320        let tier_multiplier = match tier {
321            StorageTier::Hot => 1.2,
322            StorageTier::Warm => 1.0,
323            StorageTier::Cold => 0.8,
324        };
325
326        base_score * tier_multiplier
327    }
328
329    /// Get optimization statistics
330    pub fn get_optimization_stats(&self) -> OptimizationStats {
331        let total_decisions = self.decision_history.len();
332        let avg_outcome = if total_decisions > 0 {
333            self.decision_history
334                .iter()
335                .map(|d| d.outcome_score)
336                .sum::<f64>()
337                / total_decisions as f64
338        } else {
339            0.0
340        };
341
342        let tier_transitions = self.count_tier_transitions();
343
344        OptimizationStats {
345            total_decisions,
346            avg_outcome_score: avg_outcome,
347            tier_transitions,
348            feature_importance: self.feature_weights.clone(),
349        }
350    }
351
352    /// Count transitions between tiers
353    fn count_tier_transitions(&self) -> HashMap<(StorageTier, StorageTier), usize> {
354        let mut transitions = HashMap::new();
355
356        for decision in &self.decision_history {
357            *transitions
358                .entry((decision.from_tier, decision.to_tier))
359                .or_insert(0) += 1;
360        }
361
362        transitions
363    }
364}
365
366/// Tier optimization recommendation
367#[derive(Debug, Clone)]
368pub struct TierOptimizationRecommendation {
369    /// Index identifier
370    pub index_id: String,
371    /// Current tier
372    pub current_tier: StorageTier,
373    /// Recommended tier
374    pub recommended_tier: StorageTier,
375    /// Reason for recommendation
376    pub reason: TierTransitionReason,
377    /// Priority (higher = more important)
378    pub priority: f64,
379    /// Estimated benefit of transition
380    pub estimated_benefit: f64,
381    /// Confidence in recommendation (0.0 - 1.0)
382    pub confidence: f64,
383}
384
385/// Optimization statistics
386#[derive(Debug, Clone)]
387pub struct OptimizationStats {
388    /// Total number of decisions made
389    pub total_decisions: usize,
390    /// Average outcome score
391    pub avg_outcome_score: f64,
392    /// Transition counts between tiers
393    pub tier_transitions: HashMap<(StorageTier, StorageTier), usize>,
394    /// Learned feature importance weights
395    pub feature_importance: Array1<f64>,
396}
397
398#[cfg(test)]
399mod tests {
400    use super::*;
401    use crate::tiering::types::{
402        AccessPattern, AccessStatistics, IndexType, LatencyPercentiles, PerformanceMetrics,
403    };
404    use std::collections::HashMap;
405
406    fn create_test_metadata() -> IndexMetadata {
407        IndexMetadata {
408            index_id: "test_index".to_string(),
409            current_tier: StorageTier::Warm,
410            size_bytes: 1024 * 1024 * 1024, // 1 GB
411            compressed_size_bytes: 512 * 1024 * 1024,
412            vector_count: 1_000_000,
413            dimension: 768,
414            index_type: IndexType::Hnsw,
415            created_at: SystemTime::now(),
416            last_accessed: SystemTime::now(),
417            last_modified: SystemTime::now(),
418            access_stats: AccessStatistics {
419                total_queries: 100_000,
420                queries_last_hour: 3600,
421                queries_last_day: 86400,
422                queries_last_week: 604800,
423                avg_qps: 10.0,
424                peak_qps: 20.0,
425                last_access_time: Some(SystemTime::now()),
426                access_pattern: AccessPattern::Hot,
427                query_latencies: LatencyPercentiles::default(),
428            },
429            performance_metrics: PerformanceMetrics::default(),
430            storage_path: None,
431            custom_metadata: HashMap::new(),
432        }
433    }
434
435    fn create_test_tier_stats() -> [TierStatistics; 3] {
436        [
437            TierStatistics {
438                capacity_bytes: 16 * 1024 * 1024 * 1024,
439                used_bytes: 8 * 1024 * 1024 * 1024,
440                ..Default::default()
441            },
442            TierStatistics {
443                capacity_bytes: 128 * 1024 * 1024 * 1024,
444                used_bytes: 64 * 1024 * 1024 * 1024,
445                ..Default::default()
446            },
447            TierStatistics {
448                capacity_bytes: 1024 * 1024 * 1024 * 1024,
449                used_bytes: 256 * 1024 * 1024 * 1024,
450                ..Default::default()
451            },
452        ]
453    }
454
455    #[test]
456    fn test_tier_optimizer_creation() {
457        let config = TieringConfig::default();
458        let optimizer = TierOptimizer::new(config);
459
460        assert_eq!(optimizer.feature_weights.len(), 10);
461    }
462
463    #[test]
464    fn test_feature_extraction() {
465        let config = TieringConfig::default();
466        let optimizer = TierOptimizer::new(config);
467
468        let metadata = create_test_metadata();
469        let tier_stats = create_test_tier_stats();
470
471        let features = optimizer.extract_features(&metadata, &tier_stats);
472        assert_eq!(features.len(), 10);
473        assert!(features.iter().all(|&f| f.is_finite()));
474    }
475
476    #[test]
477    fn test_optimization_recommendations() {
478        let config = TieringConfig::default();
479        let mut optimizer = TierOptimizer::new(config);
480
481        let metadata = create_test_metadata();
482        let tier_stats = create_test_tier_stats();
483
484        let recommendations = optimizer.optimize_tier_placements(&[metadata], &tier_stats);
485        assert!(recommendations.len() <= 1);
486    }
487}