Skip to main content

oxirs_vec/tiering/
tier_optimizer.rs

1//! Tier optimization using ML and statistical analysis
2
3use super::config::TieringConfig;
4use super::policies::{PolicyEvaluator, TierTransitionReason};
5use super::types::{IndexMetadata, StorageTier, TierStatistics};
6use scirs2_core::ndarray_ext::{Array1, Array2};
7use std::collections::HashMap;
8use std::time::SystemTime;
9
10/// Tier optimizer using ML and statistical analysis
11pub struct TierOptimizer {
12    /// Policy evaluator
13    policy_evaluator: PolicyEvaluator,
14    /// Configuration
15    config: TieringConfig,
16    /// Historical decisions
17    decision_history: Vec<OptimizationDecision>,
18    /// Feature importance weights (learned)
19    feature_weights: Array1<f64>,
20}
21
22/// Optimization decision record
23#[derive(Debug, Clone)]
24struct OptimizationDecision {
25    index_id: String,
26    from_tier: StorageTier,
27    to_tier: StorageTier,
28    reason: TierTransitionReason,
29    timestamp: SystemTime,
30    features: Array1<f64>,
31    outcome_score: f64,
32}
33
34impl TierOptimizer {
35    /// Create a new tier optimizer
36    pub fn new(config: TieringConfig) -> Self {
37        let policy = config.policy;
38        let feature_weights = Array1::from_vec(vec![1.0; 10]); // 10 features
39
40        Self {
41            policy_evaluator: PolicyEvaluator::new(policy),
42            config,
43            decision_history: Vec::new(),
44            feature_weights,
45        }
46    }
47
48    /// Optimize tier placements for all indices
49    pub fn optimize_tier_placements(
50        &mut self,
51        indices: &[IndexMetadata],
52        tier_stats: &[TierStatistics; 3],
53    ) -> Vec<TierOptimizationRecommendation> {
54        let mut recommendations = Vec::new();
55
56        for metadata in indices {
57            if let Some(recommendation) = self.evaluate_index(metadata, tier_stats) {
58                recommendations.push(recommendation);
59            }
60        }
61
62        // Sort by priority (highest first)
63        recommendations.sort_by(|a, b| {
64            b.priority
65                .partial_cmp(&a.priority)
66                .unwrap_or(std::cmp::Ordering::Equal)
67        });
68
69        recommendations
70    }
71
72    /// Evaluate a single index for tier optimization
73    fn evaluate_index(
74        &mut self,
75        metadata: &IndexMetadata,
76        tier_stats: &[TierStatistics; 3],
77    ) -> Option<TierOptimizationRecommendation> {
78        let current_tier = metadata.current_tier;
79        let (optimal_tier, reason) =
80            self.policy_evaluator
81                .evaluate_optimal_tier(metadata, tier_stats, SystemTime::now());
82
83        // Check if transition is needed
84        if optimal_tier == current_tier {
85            return None;
86        }
87
88        // Extract features
89        let features = self.extract_features(metadata, tier_stats);
90
91        // Calculate priority using learned weights
92        let priority = self.calculate_priority(&features, current_tier, optimal_tier);
93
94        // Estimate benefit
95        let benefit = self.estimate_benefit(metadata, current_tier, optimal_tier);
96
97        Some(TierOptimizationRecommendation {
98            index_id: metadata.index_id.clone(),
99            current_tier,
100            recommended_tier: optimal_tier,
101            reason,
102            priority,
103            estimated_benefit: benefit,
104            confidence: self.calculate_confidence(&features),
105        })
106    }
107
108    /// Extract features for ML-based optimization
109    fn extract_features(
110        &self,
111        metadata: &IndexMetadata,
112        tier_stats: &[TierStatistics; 3],
113    ) -> Array1<f64> {
114        let mut features = Vec::with_capacity(10);
115
116        // Feature 1: Access frequency (QPS)
117        features.push(metadata.access_stats.avg_qps.ln_1p());
118
119        // Feature 2: Index size (GB)
120        let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
121        features.push(size_gb.ln_1p());
122
123        // Feature 3: Recency (hours since last access)
124        let recency = metadata
125            .access_stats
126            .last_access_time
127            .and_then(|t| SystemTime::now().duration_since(t).ok())
128            .map(|d| d.as_secs() as f64 / 3600.0)
129            .unwrap_or(1000.0);
130        features.push(recency.ln_1p());
131
132        // Feature 4: Query latency (p95)
133        features.push((metadata.access_stats.query_latencies.p95 as f64).ln_1p());
134
135        // Feature 5: Current tier utilization
136        let tier_idx = match metadata.current_tier {
137            StorageTier::Hot => 0,
138            StorageTier::Warm => 1,
139            StorageTier::Cold => 2,
140        };
141        features.push(tier_stats[tier_idx].utilization());
142
143        // Feature 6: Peak QPS
144        features.push(metadata.access_stats.peak_qps.ln_1p());
145
146        // Feature 7: Total queries
147        features.push((metadata.access_stats.total_queries as f64).ln_1p());
148
149        // Feature 8: Memory footprint
150        let memory_gb =
151            metadata.performance_metrics.memory_footprint_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
152        features.push(memory_gb.ln_1p());
153
154        // Feature 9: Cache hit rate
155        features.push(metadata.performance_metrics.cache_hit_rate);
156
157        // Feature 10: Time in current tier (hours)
158        let time_in_tier = metadata
159            .last_modified
160            .elapsed()
161            .map(|d| d.as_secs() as f64 / 3600.0)
162            .unwrap_or(0.0);
163        features.push(time_in_tier.ln_1p());
164
165        Array1::from_vec(features)
166    }
167
168    /// Calculate priority for tier transition
169    fn calculate_priority(
170        &self,
171        features: &Array1<f64>,
172        from_tier: StorageTier,
173        to_tier: StorageTier,
174    ) -> f64 {
175        // Weighted sum of features
176        let mut priority = features.dot(&self.feature_weights);
177
178        // Boost priority for promotions (moving to faster tier)
179        if matches!(
180            (from_tier, to_tier),
181            (StorageTier::Cold, StorageTier::Warm)
182                | (StorageTier::Cold, StorageTier::Hot)
183                | (StorageTier::Warm, StorageTier::Hot)
184        ) {
185            priority *= 1.5;
186        }
187
188        // Reduce priority for demotions
189        if matches!(
190            (from_tier, to_tier),
191            (StorageTier::Hot, StorageTier::Warm)
192                | (StorageTier::Hot, StorageTier::Cold)
193                | (StorageTier::Warm, StorageTier::Cold)
194        ) {
195            priority *= 0.7;
196        }
197
198        priority.max(0.0)
199    }
200
201    /// Estimate benefit of tier transition
202    fn estimate_benefit(
203        &self,
204        metadata: &IndexMetadata,
205        from_tier: StorageTier,
206        to_tier: StorageTier,
207    ) -> f64 {
208        // Latency improvement
209        let latency_improvement = from_tier.typical_latency().as_micros() as f64
210            - to_tier.typical_latency().as_micros() as f64;
211        let latency_benefit = latency_improvement * metadata.access_stats.avg_qps;
212
213        // Cost change
214        let from_cost = from_tier.cost_factor();
215        let to_cost = to_tier.cost_factor();
216        let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
217        let cost_change = (from_cost - to_cost) * size_gb;
218
219        // Combined benefit (latency savings - cost increase)
220        latency_benefit + cost_change * 1000.0 // Weight cost changes
221    }
222
223    /// Calculate confidence in recommendation
224    fn calculate_confidence(&self, features: &Array1<f64>) -> f64 {
225        // Use variance of features as uncertainty measure
226        let mean = features.mean().unwrap_or(0.0);
227        let variance =
228            features.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / features.len() as f64;
229
230        // Higher variance = lower confidence
231        let confidence = 1.0 / (1.0 + variance);
232        confidence.clamp(0.0, 1.0)
233    }
234
235    /// Learn from optimization outcomes
236    pub fn update_from_feedback(
237        &mut self,
238        index_id: &str,
239        from_tier: StorageTier,
240        to_tier: StorageTier,
241        reason: TierTransitionReason,
242        features: Array1<f64>,
243        outcome_score: f64,
244    ) {
245        let decision = OptimizationDecision {
246            index_id: index_id.to_string(),
247            from_tier,
248            to_tier,
249            reason,
250            timestamp: SystemTime::now(),
251            features,
252            outcome_score,
253        };
254
255        self.decision_history.push(decision);
256
257        // Update feature weights periodically
258        if self.decision_history.len() % 100 == 0 {
259            self.update_feature_weights();
260        }
261    }
262
263    /// Update feature weights based on historical decisions
264    fn update_feature_weights(&mut self) {
265        if self.decision_history.len() < 10 {
266            return; // Need minimum history
267        }
268
269        // Build feature matrix and outcome vector
270        let n = self.decision_history.len();
271        let mut features = Array2::zeros((n, 10));
272        let mut outcomes = Array1::zeros(n);
273
274        for (i, decision) in self.decision_history.iter().enumerate() {
275            for (j, &feature) in decision.features.iter().enumerate() {
276                features[[i, j]] = feature;
277            }
278            outcomes[i] = decision.outcome_score;
279        }
280
281        // Simple gradient descent update (learning rate = 0.01)
282        let learning_rate = 0.01;
283        for i in 0..10 {
284            let feature_col = features.column(i);
285            let correlation = feature_col.dot(&outcomes) / n as f64;
286            self.feature_weights[i] += learning_rate * correlation;
287        }
288
289        // Normalize weights
290        let sum: f64 = self.feature_weights.iter().map(|w| w.abs()).sum();
291        if sum > 0.0 {
292            self.feature_weights /= sum;
293        }
294    }
295
296    /// Predict optimal tier for a new index
297    pub fn predict_optimal_tier(
298        &mut self,
299        metadata: &IndexMetadata,
300        tier_stats: &[TierStatistics; 3],
301    ) -> (StorageTier, f64) {
302        let features = self.extract_features(metadata, tier_stats);
303
304        // Calculate scores for each tier
305        let mut best_tier = StorageTier::Cold;
306        let mut best_score = f64::NEG_INFINITY;
307
308        for &tier in &[StorageTier::Hot, StorageTier::Warm, StorageTier::Cold] {
309            let score = self.calculate_tier_score(&features, tier);
310            if score > best_score {
311                best_score = score;
312                best_tier = tier;
313            }
314        }
315
316        (best_tier, best_score)
317    }
318
319    /// Calculate score for placing index in a tier
320    fn calculate_tier_score(&self, features: &Array1<f64>, tier: StorageTier) -> f64 {
321        let base_score = features.dot(&self.feature_weights);
322
323        // Adjust score based on tier characteristics
324        let tier_multiplier = match tier {
325            StorageTier::Hot => 1.2,
326            StorageTier::Warm => 1.0,
327            StorageTier::Cold => 0.8,
328        };
329
330        base_score * tier_multiplier
331    }
332
333    /// Get optimization statistics
334    pub fn get_optimization_stats(&self) -> OptimizationStats {
335        let total_decisions = self.decision_history.len();
336        let avg_outcome = if total_decisions > 0 {
337            self.decision_history
338                .iter()
339                .map(|d| d.outcome_score)
340                .sum::<f64>()
341                / total_decisions as f64
342        } else {
343            0.0
344        };
345
346        let tier_transitions = self.count_tier_transitions();
347
348        OptimizationStats {
349            total_decisions,
350            avg_outcome_score: avg_outcome,
351            tier_transitions,
352            feature_importance: self.feature_weights.clone(),
353        }
354    }
355
356    /// Count transitions between tiers
357    fn count_tier_transitions(&self) -> HashMap<(StorageTier, StorageTier), usize> {
358        let mut transitions = HashMap::new();
359
360        for decision in &self.decision_history {
361            *transitions
362                .entry((decision.from_tier, decision.to_tier))
363                .or_insert(0) += 1;
364        }
365
366        transitions
367    }
368}
369
370/// Tier optimization recommendation
371#[derive(Debug, Clone)]
372pub struct TierOptimizationRecommendation {
373    /// Index identifier
374    pub index_id: String,
375    /// Current tier
376    pub current_tier: StorageTier,
377    /// Recommended tier
378    pub recommended_tier: StorageTier,
379    /// Reason for recommendation
380    pub reason: TierTransitionReason,
381    /// Priority (higher = more important)
382    pub priority: f64,
383    /// Estimated benefit of transition
384    pub estimated_benefit: f64,
385    /// Confidence in recommendation (0.0 - 1.0)
386    pub confidence: f64,
387}
388
389/// Optimization statistics
390#[derive(Debug, Clone)]
391pub struct OptimizationStats {
392    /// Total number of decisions made
393    pub total_decisions: usize,
394    /// Average outcome score
395    pub avg_outcome_score: f64,
396    /// Transition counts between tiers
397    pub tier_transitions: HashMap<(StorageTier, StorageTier), usize>,
398    /// Learned feature importance weights
399    pub feature_importance: Array1<f64>,
400}
401
402#[cfg(test)]
403mod tests {
404    use super::*;
405    use crate::tiering::types::{
406        AccessPattern, AccessStatistics, IndexType, LatencyPercentiles, PerformanceMetrics,
407    };
408    use std::collections::HashMap;
409
410    fn create_test_metadata() -> IndexMetadata {
411        IndexMetadata {
412            index_id: "test_index".to_string(),
413            current_tier: StorageTier::Warm,
414            size_bytes: 1024 * 1024 * 1024, // 1 GB
415            compressed_size_bytes: 512 * 1024 * 1024,
416            vector_count: 1_000_000,
417            dimension: 768,
418            index_type: IndexType::Hnsw,
419            created_at: SystemTime::now(),
420            last_accessed: SystemTime::now(),
421            last_modified: SystemTime::now(),
422            access_stats: AccessStatistics {
423                total_queries: 100_000,
424                queries_last_hour: 3600,
425                queries_last_day: 86400,
426                queries_last_week: 604800,
427                avg_qps: 10.0,
428                peak_qps: 20.0,
429                last_access_time: Some(SystemTime::now()),
430                access_pattern: AccessPattern::Hot,
431                query_latencies: LatencyPercentiles::default(),
432            },
433            performance_metrics: PerformanceMetrics::default(),
434            storage_path: None,
435            custom_metadata: HashMap::new(),
436        }
437    }
438
439    fn create_test_tier_stats() -> [TierStatistics; 3] {
440        [
441            TierStatistics {
442                capacity_bytes: 16 * 1024 * 1024 * 1024,
443                used_bytes: 8 * 1024 * 1024 * 1024,
444                ..Default::default()
445            },
446            TierStatistics {
447                capacity_bytes: 128 * 1024 * 1024 * 1024,
448                used_bytes: 64 * 1024 * 1024 * 1024,
449                ..Default::default()
450            },
451            TierStatistics {
452                capacity_bytes: 1024 * 1024 * 1024 * 1024,
453                used_bytes: 256 * 1024 * 1024 * 1024,
454                ..Default::default()
455            },
456        ]
457    }
458
459    #[test]
460    fn test_tier_optimizer_creation() {
461        let config = TieringConfig::default();
462        let optimizer = TierOptimizer::new(config);
463
464        assert_eq!(optimizer.feature_weights.len(), 10);
465    }
466
467    #[test]
468    fn test_feature_extraction() {
469        let config = TieringConfig::default();
470        let optimizer = TierOptimizer::new(config);
471
472        let metadata = create_test_metadata();
473        let tier_stats = create_test_tier_stats();
474
475        let features = optimizer.extract_features(&metadata, &tier_stats);
476        assert_eq!(features.len(), 10);
477        assert!(features.iter().all(|&f| f.is_finite()));
478    }
479
480    #[test]
481    fn test_optimization_recommendations() {
482        let config = TieringConfig::default();
483        let mut optimizer = TierOptimizer::new(config);
484
485        let metadata = create_test_metadata();
486        let tier_stats = create_test_tier_stats();
487
488        let recommendations = optimizer.optimize_tier_placements(&[metadata], &tier_stats);
489        assert!(recommendations.len() <= 1);
490    }
491}