1use super::config::TieringConfig;
4use super::policies::{PolicyEvaluator, TierTransitionReason};
5use super::types::{IndexMetadata, StorageTier, TierStatistics};
6use scirs2_core::ndarray_ext::{Array1, Array2};
7use std::collections::HashMap;
8use std::time::SystemTime;
9
10pub struct TierOptimizer {
12 policy_evaluator: PolicyEvaluator,
14 config: TieringConfig,
16 decision_history: Vec<OptimizationDecision>,
18 feature_weights: Array1<f64>,
20}
21
22#[derive(Debug, Clone)]
24struct OptimizationDecision {
25 index_id: String,
26 from_tier: StorageTier,
27 to_tier: StorageTier,
28 reason: TierTransitionReason,
29 timestamp: SystemTime,
30 features: Array1<f64>,
31 outcome_score: f64,
32}
33
34impl TierOptimizer {
35 pub fn new(config: TieringConfig) -> Self {
37 let policy = config.policy;
38 let feature_weights = Array1::from_vec(vec![1.0; 10]); Self {
41 policy_evaluator: PolicyEvaluator::new(policy),
42 config,
43 decision_history: Vec::new(),
44 feature_weights,
45 }
46 }
47
48 pub fn optimize_tier_placements(
50 &mut self,
51 indices: &[IndexMetadata],
52 tier_stats: &[TierStatistics; 3],
53 ) -> Vec<TierOptimizationRecommendation> {
54 let mut recommendations = Vec::new();
55
56 for metadata in indices {
57 if let Some(recommendation) = self.evaluate_index(metadata, tier_stats) {
58 recommendations.push(recommendation);
59 }
60 }
61
62 recommendations.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
64
65 recommendations
66 }
67
68 fn evaluate_index(
70 &mut self,
71 metadata: &IndexMetadata,
72 tier_stats: &[TierStatistics; 3],
73 ) -> Option<TierOptimizationRecommendation> {
74 let current_tier = metadata.current_tier;
75 let (optimal_tier, reason) =
76 self.policy_evaluator
77 .evaluate_optimal_tier(metadata, tier_stats, SystemTime::now());
78
79 if optimal_tier == current_tier {
81 return None;
82 }
83
84 let features = self.extract_features(metadata, tier_stats);
86
87 let priority = self.calculate_priority(&features, current_tier, optimal_tier);
89
90 let benefit = self.estimate_benefit(metadata, current_tier, optimal_tier);
92
93 Some(TierOptimizationRecommendation {
94 index_id: metadata.index_id.clone(),
95 current_tier,
96 recommended_tier: optimal_tier,
97 reason,
98 priority,
99 estimated_benefit: benefit,
100 confidence: self.calculate_confidence(&features),
101 })
102 }
103
104 fn extract_features(
106 &self,
107 metadata: &IndexMetadata,
108 tier_stats: &[TierStatistics; 3],
109 ) -> Array1<f64> {
110 let mut features = Vec::with_capacity(10);
111
112 features.push(metadata.access_stats.avg_qps.ln_1p());
114
115 let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
117 features.push(size_gb.ln_1p());
118
119 let recency = metadata
121 .access_stats
122 .last_access_time
123 .and_then(|t| SystemTime::now().duration_since(t).ok())
124 .map(|d| d.as_secs() as f64 / 3600.0)
125 .unwrap_or(1000.0);
126 features.push(recency.ln_1p());
127
128 features.push((metadata.access_stats.query_latencies.p95 as f64).ln_1p());
130
131 let tier_idx = match metadata.current_tier {
133 StorageTier::Hot => 0,
134 StorageTier::Warm => 1,
135 StorageTier::Cold => 2,
136 };
137 features.push(tier_stats[tier_idx].utilization());
138
139 features.push(metadata.access_stats.peak_qps.ln_1p());
141
142 features.push((metadata.access_stats.total_queries as f64).ln_1p());
144
145 let memory_gb =
147 metadata.performance_metrics.memory_footprint_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
148 features.push(memory_gb.ln_1p());
149
150 features.push(metadata.performance_metrics.cache_hit_rate);
152
153 let time_in_tier = metadata
155 .last_modified
156 .elapsed()
157 .map(|d| d.as_secs() as f64 / 3600.0)
158 .unwrap_or(0.0);
159 features.push(time_in_tier.ln_1p());
160
161 Array1::from_vec(features)
162 }
163
164 fn calculate_priority(
166 &self,
167 features: &Array1<f64>,
168 from_tier: StorageTier,
169 to_tier: StorageTier,
170 ) -> f64 {
171 let mut priority = features.dot(&self.feature_weights);
173
174 if matches!(
176 (from_tier, to_tier),
177 (StorageTier::Cold, StorageTier::Warm)
178 | (StorageTier::Cold, StorageTier::Hot)
179 | (StorageTier::Warm, StorageTier::Hot)
180 ) {
181 priority *= 1.5;
182 }
183
184 if matches!(
186 (from_tier, to_tier),
187 (StorageTier::Hot, StorageTier::Warm)
188 | (StorageTier::Hot, StorageTier::Cold)
189 | (StorageTier::Warm, StorageTier::Cold)
190 ) {
191 priority *= 0.7;
192 }
193
194 priority.max(0.0)
195 }
196
197 fn estimate_benefit(
199 &self,
200 metadata: &IndexMetadata,
201 from_tier: StorageTier,
202 to_tier: StorageTier,
203 ) -> f64 {
204 let latency_improvement = from_tier.typical_latency().as_micros() as f64
206 - to_tier.typical_latency().as_micros() as f64;
207 let latency_benefit = latency_improvement * metadata.access_stats.avg_qps;
208
209 let from_cost = from_tier.cost_factor();
211 let to_cost = to_tier.cost_factor();
212 let size_gb = metadata.size_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
213 let cost_change = (from_cost - to_cost) * size_gb;
214
215 latency_benefit + cost_change * 1000.0 }
218
219 fn calculate_confidence(&self, features: &Array1<f64>) -> f64 {
221 let mean = features.mean().unwrap_or(0.0);
223 let variance =
224 features.iter().map(|&x| (x - mean).powi(2)).sum::<f64>() / features.len() as f64;
225
226 let confidence = 1.0 / (1.0 + variance);
228 confidence.clamp(0.0, 1.0)
229 }
230
231 pub fn update_from_feedback(
233 &mut self,
234 index_id: &str,
235 from_tier: StorageTier,
236 to_tier: StorageTier,
237 reason: TierTransitionReason,
238 features: Array1<f64>,
239 outcome_score: f64,
240 ) {
241 let decision = OptimizationDecision {
242 index_id: index_id.to_string(),
243 from_tier,
244 to_tier,
245 reason,
246 timestamp: SystemTime::now(),
247 features,
248 outcome_score,
249 };
250
251 self.decision_history.push(decision);
252
253 if self.decision_history.len() % 100 == 0 {
255 self.update_feature_weights();
256 }
257 }
258
259 fn update_feature_weights(&mut self) {
261 if self.decision_history.len() < 10 {
262 return; }
264
265 let n = self.decision_history.len();
267 let mut features = Array2::zeros((n, 10));
268 let mut outcomes = Array1::zeros(n);
269
270 for (i, decision) in self.decision_history.iter().enumerate() {
271 for (j, &feature) in decision.features.iter().enumerate() {
272 features[[i, j]] = feature;
273 }
274 outcomes[i] = decision.outcome_score;
275 }
276
277 let learning_rate = 0.01;
279 for i in 0..10 {
280 let feature_col = features.column(i);
281 let correlation = feature_col.dot(&outcomes) / n as f64;
282 self.feature_weights[i] += learning_rate * correlation;
283 }
284
285 let sum: f64 = self.feature_weights.iter().map(|w| w.abs()).sum();
287 if sum > 0.0 {
288 self.feature_weights /= sum;
289 }
290 }
291
292 pub fn predict_optimal_tier(
294 &mut self,
295 metadata: &IndexMetadata,
296 tier_stats: &[TierStatistics; 3],
297 ) -> (StorageTier, f64) {
298 let features = self.extract_features(metadata, tier_stats);
299
300 let mut best_tier = StorageTier::Cold;
302 let mut best_score = f64::NEG_INFINITY;
303
304 for &tier in &[StorageTier::Hot, StorageTier::Warm, StorageTier::Cold] {
305 let score = self.calculate_tier_score(&features, tier);
306 if score > best_score {
307 best_score = score;
308 best_tier = tier;
309 }
310 }
311
312 (best_tier, best_score)
313 }
314
315 fn calculate_tier_score(&self, features: &Array1<f64>, tier: StorageTier) -> f64 {
317 let base_score = features.dot(&self.feature_weights);
318
319 let tier_multiplier = match tier {
321 StorageTier::Hot => 1.2,
322 StorageTier::Warm => 1.0,
323 StorageTier::Cold => 0.8,
324 };
325
326 base_score * tier_multiplier
327 }
328
329 pub fn get_optimization_stats(&self) -> OptimizationStats {
331 let total_decisions = self.decision_history.len();
332 let avg_outcome = if total_decisions > 0 {
333 self.decision_history
334 .iter()
335 .map(|d| d.outcome_score)
336 .sum::<f64>()
337 / total_decisions as f64
338 } else {
339 0.0
340 };
341
342 let tier_transitions = self.count_tier_transitions();
343
344 OptimizationStats {
345 total_decisions,
346 avg_outcome_score: avg_outcome,
347 tier_transitions,
348 feature_importance: self.feature_weights.clone(),
349 }
350 }
351
352 fn count_tier_transitions(&self) -> HashMap<(StorageTier, StorageTier), usize> {
354 let mut transitions = HashMap::new();
355
356 for decision in &self.decision_history {
357 *transitions
358 .entry((decision.from_tier, decision.to_tier))
359 .or_insert(0) += 1;
360 }
361
362 transitions
363 }
364}
365
366#[derive(Debug, Clone)]
368pub struct TierOptimizationRecommendation {
369 pub index_id: String,
371 pub current_tier: StorageTier,
373 pub recommended_tier: StorageTier,
375 pub reason: TierTransitionReason,
377 pub priority: f64,
379 pub estimated_benefit: f64,
381 pub confidence: f64,
383}
384
385#[derive(Debug, Clone)]
387pub struct OptimizationStats {
388 pub total_decisions: usize,
390 pub avg_outcome_score: f64,
392 pub tier_transitions: HashMap<(StorageTier, StorageTier), usize>,
394 pub feature_importance: Array1<f64>,
396}
397
398#[cfg(test)]
399mod tests {
400 use super::*;
401 use crate::tiering::types::{
402 AccessPattern, AccessStatistics, IndexType, LatencyPercentiles, PerformanceMetrics,
403 };
404 use std::collections::HashMap;
405
406 fn create_test_metadata() -> IndexMetadata {
407 IndexMetadata {
408 index_id: "test_index".to_string(),
409 current_tier: StorageTier::Warm,
410 size_bytes: 1024 * 1024 * 1024, compressed_size_bytes: 512 * 1024 * 1024,
412 vector_count: 1_000_000,
413 dimension: 768,
414 index_type: IndexType::Hnsw,
415 created_at: SystemTime::now(),
416 last_accessed: SystemTime::now(),
417 last_modified: SystemTime::now(),
418 access_stats: AccessStatistics {
419 total_queries: 100_000,
420 queries_last_hour: 3600,
421 queries_last_day: 86400,
422 queries_last_week: 604800,
423 avg_qps: 10.0,
424 peak_qps: 20.0,
425 last_access_time: Some(SystemTime::now()),
426 access_pattern: AccessPattern::Hot,
427 query_latencies: LatencyPercentiles::default(),
428 },
429 performance_metrics: PerformanceMetrics::default(),
430 storage_path: None,
431 custom_metadata: HashMap::new(),
432 }
433 }
434
435 fn create_test_tier_stats() -> [TierStatistics; 3] {
436 [
437 TierStatistics {
438 capacity_bytes: 16 * 1024 * 1024 * 1024,
439 used_bytes: 8 * 1024 * 1024 * 1024,
440 ..Default::default()
441 },
442 TierStatistics {
443 capacity_bytes: 128 * 1024 * 1024 * 1024,
444 used_bytes: 64 * 1024 * 1024 * 1024,
445 ..Default::default()
446 },
447 TierStatistics {
448 capacity_bytes: 1024 * 1024 * 1024 * 1024,
449 used_bytes: 256 * 1024 * 1024 * 1024,
450 ..Default::default()
451 },
452 ]
453 }
454
455 #[test]
456 fn test_tier_optimizer_creation() {
457 let config = TieringConfig::default();
458 let optimizer = TierOptimizer::new(config);
459
460 assert_eq!(optimizer.feature_weights.len(), 10);
461 }
462
463 #[test]
464 fn test_feature_extraction() {
465 let config = TieringConfig::default();
466 let optimizer = TierOptimizer::new(config);
467
468 let metadata = create_test_metadata();
469 let tier_stats = create_test_tier_stats();
470
471 let features = optimizer.extract_features(&metadata, &tier_stats);
472 assert_eq!(features.len(), 10);
473 assert!(features.iter().all(|&f| f.is_finite()));
474 }
475
476 #[test]
477 fn test_optimization_recommendations() {
478 let config = TieringConfig::default();
479 let mut optimizer = TierOptimizer::new(config);
480
481 let metadata = create_test_metadata();
482 let tier_stats = create_test_tier_stats();
483
484 let recommendations = optimizer.optimize_tier_placements(&[metadata], &tier_stats);
485 assert!(recommendations.len() <= 1);
486 }
487}