use super::types::{AccessStatistics, IndexMetadata, StorageTier, TierStatistics};
use serde::{Deserialize, Serialize};
use std::time::{Duration, SystemTime};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
pub enum TieringPolicy {
Lru,
Lfu,
CostBased,
SizeBased,
LatencyOptimized,
#[default]
Adaptive,
Custom,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum TierTransitionReason {
HighAccessFrequency,
LowAccessFrequency,
CapacityExceeded,
CostOptimization,
LatencyOptimization,
Manual,
Predictive,
SlaRequirement,
Emergency,
}
pub struct PolicyEvaluator {
policy: TieringPolicy,
}
impl PolicyEvaluator {
pub fn new(policy: TieringPolicy) -> Self {
Self { policy }
}
pub fn evaluate_optimal_tier(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
current_time: SystemTime,
) -> (StorageTier, TierTransitionReason) {
match self.policy {
TieringPolicy::Lru => self.evaluate_lru(metadata, tier_stats, current_time),
TieringPolicy::Lfu => self.evaluate_lfu(metadata, tier_stats),
TieringPolicy::CostBased => self.evaluate_cost_based(metadata, tier_stats),
TieringPolicy::SizeBased => self.evaluate_size_based(metadata, tier_stats),
TieringPolicy::LatencyOptimized => {
self.evaluate_latency_optimized(metadata, tier_stats)
}
TieringPolicy::Adaptive => self.evaluate_adaptive(metadata, tier_stats, current_time),
TieringPolicy::Custom => {
self.evaluate_adaptive(metadata, tier_stats, current_time)
}
}
}
fn evaluate_lru(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
current_time: SystemTime,
) -> (StorageTier, TierTransitionReason) {
let time_since_access = metadata
.access_stats
.last_access_time
.and_then(|t| current_time.duration_since(t).ok())
.unwrap_or(Duration::from_secs(u64::MAX));
if time_since_access < Duration::from_secs(3600) {
if self.has_capacity(&tier_stats[0], metadata.size_bytes) {
(StorageTier::Hot, TierTransitionReason::HighAccessFrequency)
} else {
(StorageTier::Warm, TierTransitionReason::CapacityExceeded)
}
} else if time_since_access < Duration::from_secs(86400) {
if self.has_capacity(&tier_stats[1], metadata.size_bytes) {
(StorageTier::Warm, TierTransitionReason::LowAccessFrequency)
} else {
(StorageTier::Cold, TierTransitionReason::CapacityExceeded)
}
} else {
(StorageTier::Cold, TierTransitionReason::LowAccessFrequency)
}
}
fn evaluate_lfu(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> (StorageTier, TierTransitionReason) {
let qps = metadata.access_stats.avg_qps;
if qps > 10.0 {
if self.has_capacity(&tier_stats[0], metadata.size_bytes) {
(StorageTier::Hot, TierTransitionReason::HighAccessFrequency)
} else {
(StorageTier::Warm, TierTransitionReason::CapacityExceeded)
}
} else if qps > 1.0 {
if self.has_capacity(&tier_stats[1], metadata.size_bytes) {
(StorageTier::Warm, TierTransitionReason::HighAccessFrequency)
} else {
(StorageTier::Cold, TierTransitionReason::CapacityExceeded)
}
} else {
(StorageTier::Cold, TierTransitionReason::LowAccessFrequency)
}
}
fn evaluate_cost_based(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> (StorageTier, TierTransitionReason) {
let hot_cost = self.calculate_tier_cost(metadata, StorageTier::Hot);
let warm_cost = self.calculate_tier_cost(metadata, StorageTier::Warm);
let cold_cost = self.calculate_tier_cost(metadata, StorageTier::Cold);
if cold_cost <= warm_cost
&& cold_cost <= hot_cost
&& self.has_capacity(&tier_stats[2], metadata.size_bytes)
{
(StorageTier::Cold, TierTransitionReason::CostOptimization)
} else if warm_cost <= hot_cost && self.has_capacity(&tier_stats[1], metadata.size_bytes) {
(StorageTier::Warm, TierTransitionReason::CostOptimization)
} else if self.has_capacity(&tier_stats[0], metadata.size_bytes) {
(StorageTier::Hot, TierTransitionReason::CostOptimization)
} else {
(StorageTier::Cold, TierTransitionReason::CapacityExceeded)
}
}
fn evaluate_size_based(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> (StorageTier, TierTransitionReason) {
let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
if size_gb < 1.0 && self.has_capacity(&tier_stats[0], metadata.size_bytes) {
(StorageTier::Hot, TierTransitionReason::LatencyOptimization)
} else if size_gb < 10.0 && self.has_capacity(&tier_stats[1], metadata.size_bytes) {
(StorageTier::Warm, TierTransitionReason::CostOptimization)
} else {
(StorageTier::Cold, TierTransitionReason::CostOptimization)
}
}
fn evaluate_latency_optimized(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
) -> (StorageTier, TierTransitionReason) {
if metadata.access_stats.avg_qps > 0.1 {
if self.has_capacity(&tier_stats[0], metadata.size_bytes) {
(StorageTier::Hot, TierTransitionReason::LatencyOptimization)
} else if self.has_capacity(&tier_stats[1], metadata.size_bytes) {
(StorageTier::Warm, TierTransitionReason::LatencyOptimization)
} else {
(StorageTier::Cold, TierTransitionReason::CapacityExceeded)
}
} else {
(StorageTier::Cold, TierTransitionReason::LowAccessFrequency)
}
}
fn evaluate_adaptive(
&self,
metadata: &IndexMetadata,
tier_stats: &[TierStatistics; 3],
current_time: SystemTime,
) -> (StorageTier, TierTransitionReason) {
let hot_score = self.calculate_adaptive_score(metadata, StorageTier::Hot, current_time);
let warm_score = self.calculate_adaptive_score(metadata, StorageTier::Warm, current_time);
let cold_score = self.calculate_adaptive_score(metadata, StorageTier::Cold, current_time);
if hot_score >= warm_score
&& hot_score >= cold_score
&& self.has_capacity(&tier_stats[0], metadata.size_bytes)
{
(StorageTier::Hot, TierTransitionReason::HighAccessFrequency)
} else if warm_score >= cold_score && self.has_capacity(&tier_stats[1], metadata.size_bytes)
{
(StorageTier::Warm, TierTransitionReason::CostOptimization)
} else {
(StorageTier::Cold, TierTransitionReason::LowAccessFrequency)
}
}
fn calculate_adaptive_score(
&self,
metadata: &IndexMetadata,
tier: StorageTier,
current_time: SystemTime,
) -> f64 {
let mut score = 0.0;
let qps_factor = metadata.access_stats.avg_qps.min(100.0) / 100.0;
score += qps_factor * 0.4;
let recency_factor = metadata
.access_stats
.last_access_time
.and_then(|t| current_time.duration_since(t).ok())
.map(|d| {
let hours = d.as_secs() as f64 / 3600.0;
1.0 / (1.0 + hours)
})
.unwrap_or(0.0);
score += recency_factor * 0.3;
let cost = self.calculate_tier_cost(metadata, tier);
let cost_factor = 1.0 / (1.0 + cost);
score += cost_factor * 0.2;
let latency_factor = match tier {
StorageTier::Hot => 1.0,
StorageTier::Warm => 0.5,
StorageTier::Cold => 0.1,
};
score += latency_factor * 0.1;
score
}
fn calculate_tier_cost(&self, metadata: &IndexMetadata, tier: StorageTier) -> f64 {
let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
let storage_cost = size_gb * tier.cost_factor();
let query_cost = metadata.access_stats.avg_qps * tier.typical_latency().as_secs_f64();
storage_cost + query_cost
}
fn has_capacity(&self, tier_stats: &TierStatistics, size_bytes: u64) -> bool {
tier_stats.available_bytes() >= size_bytes
}
}
pub fn calculate_access_score(stats: &AccessStatistics) -> f64 {
let mut score = 0.0;
score += (stats.avg_qps.min(100.0) / 100.0) * 0.5;
score += (stats.peak_qps.min(1000.0) / 1000.0) * 0.2;
let total_queries_normalized = stats.total_queries.min(1_000_000) as f64 / 1_000_000.0;
score += total_queries_normalized * 0.15;
let recent_activity =
(stats.queries_last_hour as f64).max((stats.queries_last_day as f64) / 24.0);
score += (recent_activity.min(1000.0) / 1000.0) * 0.15;
score.clamp(0.0, 1.0)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::tiering::types::{AccessPattern, IndexType, LatencyPercentiles, PerformanceMetrics};
use std::collections::HashMap;
fn create_test_metadata(qps: f64, size_gb: f64) -> IndexMetadata {
IndexMetadata {
index_id: "test_index".to_string(),
current_tier: StorageTier::Warm,
size_bytes: (size_gb * 1024.0 * 1024.0 * 1024.0) as u64,
compressed_size_bytes: (size_gb * 512.0 * 1024.0 * 1024.0) as u64,
vector_count: 1_000_000,
dimension: 768,
index_type: IndexType::Hnsw,
created_at: SystemTime::now(),
last_accessed: SystemTime::now(),
last_modified: SystemTime::now(),
access_stats: AccessStatistics {
total_queries: 100_000,
queries_last_hour: (qps * 3600.0) as u64,
queries_last_day: (qps * 86400.0) as u64,
queries_last_week: (qps * 604800.0) as u64,
avg_qps: qps,
peak_qps: qps * 2.0,
last_access_time: Some(SystemTime::now()),
access_pattern: AccessPattern::Hot,
query_latencies: LatencyPercentiles::default(),
},
performance_metrics: PerformanceMetrics::default(),
storage_path: None,
custom_metadata: HashMap::new(),
}
}
fn create_test_tier_stats() -> [TierStatistics; 3] {
[
TierStatistics {
capacity_bytes: 16 * 1024 * 1024 * 1024, used_bytes: 8 * 1024 * 1024 * 1024, ..Default::default()
},
TierStatistics {
capacity_bytes: 128 * 1024 * 1024 * 1024, used_bytes: 64 * 1024 * 1024 * 1024, ..Default::default()
},
TierStatistics {
capacity_bytes: 1024 * 1024 * 1024 * 1024, used_bytes: 256 * 1024 * 1024 * 1024, ..Default::default()
},
]
}
#[test]
fn test_lfu_policy_high_qps() {
let evaluator = PolicyEvaluator::new(TieringPolicy::Lfu);
let metadata = create_test_metadata(20.0, 1.0); let tier_stats = create_test_tier_stats();
let (tier, _reason) = evaluator.evaluate_lfu(&metadata, &tier_stats);
assert_eq!(tier, StorageTier::Hot);
}
#[test]
fn test_lfu_policy_medium_qps() {
let evaluator = PolicyEvaluator::new(TieringPolicy::Lfu);
let metadata = create_test_metadata(5.0, 1.0); let tier_stats = create_test_tier_stats();
let (tier, _reason) = evaluator.evaluate_lfu(&metadata, &tier_stats);
assert_eq!(tier, StorageTier::Warm);
}
#[test]
fn test_lfu_policy_low_qps() {
let evaluator = PolicyEvaluator::new(TieringPolicy::Lfu);
let metadata = create_test_metadata(0.5, 1.0); let tier_stats = create_test_tier_stats();
let (tier, _reason) = evaluator.evaluate_lfu(&metadata, &tier_stats);
assert_eq!(tier, StorageTier::Cold);
}
#[test]
fn test_size_based_policy() {
let evaluator = PolicyEvaluator::new(TieringPolicy::SizeBased);
let tier_stats = create_test_tier_stats();
let small_metadata = create_test_metadata(1.0, 0.5);
let (tier, _) = evaluator.evaluate_size_based(&small_metadata, &tier_stats);
assert_eq!(tier, StorageTier::Hot);
let medium_metadata = create_test_metadata(1.0, 5.0);
let (tier, _) = evaluator.evaluate_size_based(&medium_metadata, &tier_stats);
assert_eq!(tier, StorageTier::Warm);
let large_metadata = create_test_metadata(1.0, 20.0);
let (tier, _) = evaluator.evaluate_size_based(&large_metadata, &tier_stats);
assert_eq!(tier, StorageTier::Cold);
}
#[test]
fn test_access_score_calculation() {
let stats = AccessStatistics {
total_queries: 500_000,
queries_last_hour: 3600,
queries_last_day: 86400,
queries_last_week: 604800,
avg_qps: 10.0,
peak_qps: 20.0,
last_access_time: Some(SystemTime::now()),
access_pattern: AccessPattern::Hot,
query_latencies: LatencyPercentiles::default(),
};
let score = calculate_access_score(&stats);
assert!(score > 0.0 && score <= 1.0);
}
}