use super::config::TieringConfig;
use super::policies::{PolicyEvaluator, TierTransitionReason};
use super::types::{IndexMetadata, StorageTier, TierStatistics};
use scirs2_core::ndarray_ext::{Array1, Array2};
use std::collections::HashMap;
use std::time::SystemTime;
pub struct TierOptimizer {
policy_evaluator: PolicyEvaluator,
config: TieringConfig,
decision_history: Vec<OptimizationDecision>,
feature_weights: Array1<f64>,
}
#[derive(Debug, Clone)]
struct OptimizationDecision {
index_id: String,
from_tier: StorageTier,
to_tier: StorageTier,
reason: TierTransitionReason,
timestamp: SystemTime,
features: Array1<f64>,
outcome_score: f64,
}
impl TierOptimizer {
pub fn new(config: TieringConfig) -> Self {
let policy = config.policy;
let feature_weights = Array1::from_vec(vec![1.0; 10]);
Self {
policy_evaluator: PolicyEvaluator::new(policy),
config,
decision_history: Vec::new(),
feature_weights,
}
}
pub fn optimize_tier_placements(
&mut self,
indices: &[IndexMetadata],
tier_stats: &[TierStatistics; 3],
) -> Vec<TierOptimizationRecommendation> {
let mut recommendations = Vec::new();
for metadata in indices {
if let Some(recommendation) = self.evaluate_index(metadata, tier_stats) {
recommendations.push(recommendation);
}
}
recommendations.sort_by(|a, b| {
b.priority
.partial_cmp(&a.priority)
.unwrap_or(std::cmp::Ordering::Equal)
});
recommendations
}
fn evaluate_index(
&mut self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> Option<TierOptimizationRecommendation> {
let current_tier = metadata.current_tier;
let (optimal_tier, reason) =
self.policy_evaluator
.evaluate_optimal_tier(metadata, tier_stats, SystemTime::now());
if optimal_tier == current_tier {
return None;
}
let features = self.extract_features(metadata, tier_stats);
let priority = self.calculate_priority(&features, current_tier, optimal_tier);
let benefit = self.estimate_benefit(metadata, current_tier, optimal_tier);
Some(TierOptimizationRecommendation {
index_id: metadata.index_id.clone(),
current_tier,
recommended_tier: optimal_tier,
reason,
priority,
estimated_benefit: benefit,
confidence: self.calculate_confidence(&features),
})
}
fn extract_features(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> Array1<f64> {
let mut features = Vec::with_capacity(10);
features.push(metadata.access_stats.avg_qps.ln_1p());
let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
features.push(size_gb.ln_1p());
let recency = metadata
.access_stats
.last_access_time
.and_then(|t| SystemTime::now().duration_since(t).ok())
.map(|d| d.as_secs() as f64 / 3600.0)
.unwrap_or(1000.0);
features.push(recency.ln_1p());
features.push((metadata.access_stats.query_latencies.p95 as f64).ln_1p());
let tier_idx = match metadata.current_tier {
StorageTier::Hot => 0,
StorageTier::Warm => 1,
StorageTier::Cold => 2,
};
features.push(tier_stats[tier_idx].utilization());
features.push(metadata.access_stats.peak_qps.ln_1p());
features.push((metadata.access_stats.total_queries as f64).ln_1p());
let memory_gb =
metadata.performance_metrics.memory_footprint_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
features.push(memory_gb.ln_1p());
features.push(metadata.performance_metrics.cache_hit_rate);
let time_in_tier = metadata
.last_modified
.elapsed()
.map(|d| d.as_secs() as f64 / 3600.0)
.unwrap_or(0.0);
features.push(time_in_tier.ln_1p());
Array1::from_vec(features)
}
fn calculate_priority(
&self,
features: &Array1<f64>,
from_tier: StorageTier,
to_tier: StorageTier,
) -> f64 {
let mut priority = features.dot(&self.feature_weights);
if matches!(
(from_tier, to_tier),
(StorageTier::Cold, StorageTier::Warm)
| (StorageTier::Cold, StorageTier::Hot)
| (StorageTier::Warm, StorageTier::Hot)
) {
priority *= 1.5;
}
if matches!(
(from_tier, to_tier),
(StorageTier::Hot, StorageTier::Warm)
| (StorageTier::Hot, StorageTier::Cold)
| (StorageTier::Warm, StorageTier::Cold)
) {
priority *= 0.7;
}
priority.max(0.0)
}
fn estimate_benefit(
&self,
metadata: &IndexMetadata,
from_tier: StorageTier,
to_tier: StorageTier,
) -> f64 {
let latency_improvement = from_tier.typical_latency().as_micros() as f64
- to_tier.typical_latency().as_micros() as f64;
let latency_benefit = latency_improvement * metadata.access_stats.avg_qps;
let from_cost = from_tier.cost_factor();
let to_cost = to_tier.cost_factor();
let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
let cost_change = (from_cost - to_cost) * size_gb;
latency_benefit + cost_change * 1000.0 }
fn calculate_confidence(&self, features: &Array1<f64>) -> f64 {
let mean = features.mean().unwrap_or(0.0);
let variance =
features.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / features.len() as f64;
let confidence = 1.0 / (1.0 + variance);
confidence.clamp(0.0, 1.0)
}
pub fn update_from_feedback(
&mut self,
index_id: &str,
from_tier: StorageTier,
to_tier: StorageTier,
reason: TierTransitionReason,
features: Array1<f64>,
outcome_score: f64,
) {
let decision = OptimizationDecision {
index_id: index_id.to_string(),
from_tier,
to_tier,
reason,
timestamp: SystemTime::now(),
features,
outcome_score,
};
self.decision_history.push(decision);
if self.decision_history.len() % 100 == 0 {
self.update_feature_weights();
}
}
fn update_feature_weights(&mut self) {
if self.decision_history.len() < 10 {
return; }
let n = self.decision_history.len();
let mut features = Array2::zeros((n, 10));
let mut outcomes = Array1::zeros(n);
for (i, decision) in self.decision_history.iter().enumerate() {
for (j, &feature) in decision.features.iter().enumerate() {
features[[i, j]] = feature;
}
outcomes[i] = decision.outcome_score;
}
let learning_rate = 0.01;
for i in 0..10 {
let feature_col = features.column(i);
let correlation = feature_col.dot(&outcomes) / n as f64;
self.feature_weights[i] += learning_rate * correlation;
}
let sum: f64 = self.feature_weights.iter().map(|w| w.abs()).sum();
if sum > 0.0 {
self.feature_weights /= sum;
}
}
pub fn predict_optimal_tier(
&mut self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> (StorageTier, f64) {
let features = self.extract_features(metadata, tier_stats);
let mut best_tier = StorageTier::Cold;
let mut best_score = f64::NEG_INFINITY;
for &tier in &[StorageTier::Hot, StorageTier::Warm, StorageTier::Cold] {
let score = self.calculate_tier_score(&features, tier);
if score > best_score {
best_score = score;
best_tier = tier;
}
}
(best_tier, best_score)
}
fn calculate_tier_score(&self, features: &Array1<f64>, tier: StorageTier) -> f64 {
let base_score = features.dot(&self.feature_weights);
let tier_multiplier = match tier {
StorageTier::Hot => 1.2,
StorageTier::Warm => 1.0,
StorageTier::Cold => 0.8,
};
base_score * tier_multiplier
}
pub fn get_optimization_stats(&self) -> OptimizationStats {
let total_decisions = self.decision_history.len();
let avg_outcome = if total_decisions > 0 {
self.decision_history
.iter()
.map(|d| d.outcome_score)
.sum::<f64>()
/ total_decisions as f64
} else {
0.0
};
let tier_transitions = self.count_tier_transitions();
OptimizationStats {
total_decisions,
avg_outcome_score: avg_outcome,
tier_transitions,
feature_importance: self.feature_weights.clone(),
}
}
fn count_tier_transitions(&self) -> HashMap<(StorageTier, StorageTier), usize> {
let mut transitions = HashMap::new();
for decision in &self.decision_history {
*transitions
.entry((decision.from_tier, decision.to_tier))
.or_insert(0) += 1;
}
transitions
}
}
#[derive(Debug, Clone)]
pub struct TierOptimizationRecommendation {
pub index_id: String,
pub current_tier: StorageTier,
pub recommended_tier: StorageTier,
pub reason: TierTransitionReason,
pub priority: f64,
pub estimated_benefit: f64,
pub confidence: f64,
}
#[derive(Debug, Clone)]
pub struct OptimizationStats {
pub total_decisions: usize,
pub avg_outcome_score: f64,
pub tier_transitions: HashMap<(StorageTier, StorageTier), usize>,
pub feature_importance: Array1<f64>,
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tiering::types::{
AccessPattern, AccessStatistics, IndexType, LatencyPercentiles, PerformanceMetrics,
};
use std::collections::HashMap;
fn create_test_metadata() -> IndexMetadata {
IndexMetadata {
index_id: "test_index".to_string(),
current_tier: StorageTier::Warm,
size_bytes: 1024 * 1024 * 1024, compressed_size_bytes: 512 * 1024 * 1024,
vector_count: 1_000_000,
dimension: 768,
index_type: IndexType::Hnsw,
created_at: SystemTime::now(),
last_accessed: SystemTime::now(),
last_modified: SystemTime::now(),
access_stats: AccessStatistics {
total_queries: 100_000,
queries_last_hour: 3600,
queries_last_day: 86400,
queries_last_week: 604800,
avg_qps: 10.0,
peak_qps: 20.0,
last_access_time: Some(SystemTime::now()),
access_pattern: AccessPattern::Hot,
query_latencies: LatencyPercentiles::default(),
},
performance_metrics: PerformanceMetrics::default(),
storage_path: None,
custom_metadata: HashMap::new(),
}
}
fn create_test_tier_stats() -> [TierStatistics; 3] {
[
TierStatistics {
capacity_bytes: 16 * 1024 * 1024 * 1024,
used_bytes: 8 * 1024 * 1024 * 1024,
..Default::default()
},
TierStatistics {
capacity_bytes: 128 * 1024 * 1024 * 1024,
used_bytes: 64 * 1024 * 1024 * 1024,
..Default::default()
},
TierStatistics {
capacity_bytes: 1024 * 1024 * 1024 * 1024,
used_bytes: 256 * 1024 * 1024 * 1024,
..Default::default()
},
]
}
#[test]
fn test_tier_optimizer_creation() {
let config = TieringConfig::default();
let optimizer = TierOptimizer::new(config);
assert_eq!(optimizer.feature_weights.len(), 10);
}
#[test]
fn test_feature_extraction() {
let config = TieringConfig::default();
let optimizer = TierOptimizer::new(config);
let metadata = create_test_metadata();
let tier_stats = create_test_tier_stats();
let features = optimizer.extract_features(&metadata, &tier_stats);
assert_eq!(features.len(), 10);
assert!(features.iter().all(|&f| f.is_finite()));
}
#[test]
fn test_optimization_recommendations() {
let config = TieringConfig::default();
let mut optimizer = TierOptimizer::new(config);
let metadata = create_test_metadata();
let tier_stats = create_test_tier_stats();
let recommendations = optimizer.optimize_tier_placements(&[metadata], &tier_stats);
assert!(recommendations.len() <= 1);
}
}