1use super::access_tracker::AccessTracker;
4use super::config::TieringConfig;
5use super::metrics::TierMetrics;
6use super::policies::TierTransitionReason;
7use super::storage_backends::{ColdTierStorage, HotTierStorage, StorageBackend, WarmTierStorage};
8use super::tier_optimizer::{TierOptimizationRecommendation, TierOptimizer};
9use super::types::{IndexMetadata, StorageTier, TierStatistics, TierTransition};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16pub struct TieringManager {
18 config: TieringConfig,
20 indices: Arc<RwLock<HashMap<String, IndexMetadata>>>,
22 hot_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
24 warm_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
26 cold_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
28 access_tracker: Arc<RwLock<AccessTracker>>,
30 metrics: Arc<TierMetrics>,
32 optimizer: Arc<RwLock<TierOptimizer>>,
34 _background_task: Option<std::thread::JoinHandle<()>>,
36}
37
38impl TieringManager {
39 pub fn new(config: TieringConfig) -> Result<Self> {
41 config.validate()?;
42
43 let hot_storage: Box<dyn StorageBackend> = Box::new(HotTierStorage::new());
45
46 let warm_storage: Box<dyn StorageBackend> = Box::new(WarmTierStorage::new(
47 config.storage_base_path.join("warm"),
48 config.warm_tier_compression,
49 config.warm_tier_compression_level,
50 )?);
51
52 let cold_storage: Box<dyn StorageBackend> = Box::new(ColdTierStorage::new(
53 config.storage_base_path.join("cold"),
54 config.cold_tier_compression_level,
55 )?);
56
57 let access_tracker = AccessTracker::new(10000, Duration::from_secs(86400));
59
60 let optimizer = TierOptimizer::new(config.clone());
62
63 let metrics = Arc::new(TierMetrics::new());
65
66 metrics.update_tier_usage(StorageTier::Hot, 0, config.hot_tier_capacity_bytes());
68 metrics.update_tier_usage(StorageTier::Warm, 0, config.warm_tier_capacity_bytes());
69 metrics.update_tier_usage(StorageTier::Cold, 0, config.cold_tier_capacity_bytes());
70
71 Ok(Self {
72 config,
73 indices: Arc::new(RwLock::new(HashMap::new())),
74 hot_storage: Arc::new(RwLock::new(hot_storage)),
75 warm_storage: Arc::new(RwLock::new(warm_storage)),
76 cold_storage: Arc::new(RwLock::new(cold_storage)),
77 access_tracker: Arc::new(RwLock::new(access_tracker)),
78 metrics,
79 optimizer: Arc::new(RwLock::new(optimizer)),
80 _background_task: None,
81 })
82 }
83
84 pub fn register_index(&self, index_id: String, metadata: IndexMetadata) -> Result<()> {
86 let mut indices = self.indices.write();
87 indices.insert(index_id, metadata);
88 Ok(())
89 }
90
91 pub fn store_index(&self, index_id: &str, data: &[u8], tier: StorageTier) -> Result<()> {
93 let start = SystemTime::now();
94
95 let storage = match tier {
97 StorageTier::Hot => self.hot_storage.clone(),
98 StorageTier::Warm => self.warm_storage.clone(),
99 StorageTier::Cold => self.cold_storage.clone(),
100 };
101
102 storage.write().save_index(index_id, data)?;
103
104 let mut indices = self.indices.write();
106 if let Some(metadata) = indices.get_mut(index_id) {
107 metadata.current_tier = tier;
108 metadata.size_bytes = data.len() as u64;
109 metadata.last_modified = SystemTime::now();
110 }
111
112 let duration = start.elapsed().unwrap_or(Duration::ZERO);
114 self.metrics.record_bytes_written(tier, data.len() as u64);
115
116 tracing::info!(
117 "Stored index {} in {:?} tier ({} bytes, {:?})",
118 index_id,
119 tier,
120 data.len(),
121 duration
122 );
123
124 Ok(())
125 }
126
127 pub fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
129 let start = SystemTime::now();
130
131 let tier = {
133 let indices = self.indices.read();
134 indices
135 .get(index_id)
136 .map(|m| m.current_tier)
137 .ok_or_else(|| anyhow::anyhow!("Index {} not found", index_id))?
138 };
139
140 let storage = match tier {
142 StorageTier::Hot => self.hot_storage.clone(),
143 StorageTier::Warm => self.warm_storage.clone(),
144 StorageTier::Cold => self.cold_storage.clone(),
145 };
146
147 let data = storage.read().load_index(index_id)?;
148
149 let latency_us = start.elapsed().unwrap_or(Duration::ZERO).as_micros() as u64;
151 self.access_tracker
152 .write()
153 .record_access(index_id, latency_us);
154
155 self.metrics.record_query(tier, latency_us, true);
157 self.metrics.record_bytes_read(tier, data.len() as u64);
158
159 {
161 let mut indices = self.indices.write();
162 if let Some(metadata) = indices.get_mut(index_id) {
163 self.access_tracker.read().update_metadata(metadata);
164 }
165 }
166
167 Ok(data)
168 }
169
170 pub fn transition_index(
172 &self,
173 index_id: &str,
174 target_tier: StorageTier,
175 reason: TierTransitionReason,
176 ) -> Result<()> {
177 let start = SystemTime::now();
178
179 let (current_tier, size_bytes) = {
181 let indices = self.indices.read();
182 let metadata = indices
183 .get(index_id)
184 .ok_or_else(|| anyhow::anyhow!("Index {} not found", index_id))?;
185 (metadata.current_tier, metadata.size_bytes)
186 };
187
188 if current_tier == target_tier {
189 return Ok(()); }
191
192 let source_storage = match current_tier {
194 StorageTier::Hot => self.hot_storage.clone(),
195 StorageTier::Warm => self.warm_storage.clone(),
196 StorageTier::Cold => self.cold_storage.clone(),
197 };
198
199 let data = source_storage.read().load_index(index_id)?;
200
201 let target_storage = match target_tier {
203 StorageTier::Hot => self.hot_storage.clone(),
204 StorageTier::Warm => self.warm_storage.clone(),
205 StorageTier::Cold => self.cold_storage.clone(),
206 };
207
208 target_storage.write().save_index(index_id, &data)?;
209
210 if !self.config.gradual_transition.enabled {
212 source_storage.write().delete_index(index_id)?;
213 }
214
215 {
217 let mut indices = self.indices.write();
218 if let Some(metadata) = indices.get_mut(index_id) {
219 metadata.current_tier = target_tier;
220 metadata.last_modified = SystemTime::now();
221 }
222 }
223
224 let duration = start.elapsed().unwrap_or(Duration::ZERO);
226 let transition = TierTransition {
227 index_id: index_id.to_string(),
228 from_tier: current_tier,
229 to_tier: target_tier,
230 reason: format!("{:?}", reason),
231 timestamp: SystemTime::now(),
232 duration,
233 success: true,
234 error: None,
235 };
236
237 self.metrics.record_transition(transition);
238
239 self.update_tier_usage_metrics();
241
242 tracing::info!(
243 "Transitioned index {} from {:?} to {:?} ({} bytes, {:?})",
244 index_id,
245 current_tier,
246 target_tier,
247 size_bytes,
248 duration
249 );
250
251 Ok(())
252 }
253
254 pub fn optimize_tiers(&self) -> Result<Vec<TierOptimizationRecommendation>> {
256 let indices: Vec<IndexMetadata> = {
257 let indices = self.indices.read();
258 indices.values().cloned().collect()
259 };
260
261 let tier_stats = self.get_tier_statistics_array();
262
263 let mut optimizer = self.optimizer.write();
264 let recommendations = optimizer.optimize_tier_placements(&indices, &tier_stats);
265
266 Ok(recommendations)
267 }
268
269 pub fn apply_optimizations(&self, limit: Option<usize>) -> Result<Vec<String>> {
271 let recommendations = self.optimize_tiers()?;
272
273 let mut applied = Vec::new();
274 let limit = limit.unwrap_or(usize::MAX);
275
276 for (i, rec) in recommendations.iter().enumerate() {
277 if i >= limit {
278 break;
279 }
280
281 if rec.priority < 0.5 {
283 continue; }
285
286 match self.transition_index(&rec.index_id, rec.recommended_tier, rec.reason.clone()) {
287 Ok(_) => {
288 applied.push(rec.index_id.clone());
289 }
290 Err(e) => {
291 tracing::warn!("Failed to transition index {}: {}", rec.index_id, e);
292 }
293 }
294 }
295
296 Ok(applied)
297 }
298
299 pub fn get_tier_statistics(&self) -> HashMap<StorageTier, TierStatistics> {
301 self.metrics.get_all_tier_statistics()
302 }
303
304 fn get_tier_statistics_array(&self) -> [TierStatistics; 3] {
306 [
307 self.metrics.get_tier_statistics(StorageTier::Hot),
308 self.metrics.get_tier_statistics(StorageTier::Warm),
309 self.metrics.get_tier_statistics(StorageTier::Cold),
310 ]
311 }
312
313 fn update_tier_usage_metrics(&self) {
315 let indices = self.indices.read();
317
318 let mut hot_usage = 0u64;
319 let mut warm_usage = 0u64;
320 let mut cold_usage = 0u64;
321
322 let mut hot_count = 0;
323 let mut warm_count = 0;
324 let mut cold_count = 0;
325
326 for metadata in indices.values() {
327 match metadata.current_tier {
328 StorageTier::Hot => {
329 hot_usage += metadata.size_bytes;
330 hot_count += 1;
331 }
332 StorageTier::Warm => {
333 warm_usage += metadata.size_bytes;
334 warm_count += 1;
335 }
336 StorageTier::Cold => {
337 cold_usage += metadata.size_bytes;
338 cold_count += 1;
339 }
340 }
341 }
342
343 self.metrics.update_tier_usage(
344 StorageTier::Hot,
345 hot_usage,
346 self.config.hot_tier_capacity_bytes(),
347 );
348 self.metrics.update_tier_usage(
349 StorageTier::Warm,
350 warm_usage,
351 self.config.warm_tier_capacity_bytes(),
352 );
353 self.metrics.update_tier_usage(
354 StorageTier::Cold,
355 cold_usage,
356 self.config.cold_tier_capacity_bytes(),
357 );
358
359 self.metrics.update_index_count(StorageTier::Hot, hot_count);
360 self.metrics
361 .update_index_count(StorageTier::Warm, warm_count);
362 self.metrics
363 .update_index_count(StorageTier::Cold, cold_count);
364 }
365
366 pub fn get_index_metadata(&self, index_id: &str) -> Option<IndexMetadata> {
368 let indices = self.indices.read();
369 indices.get(index_id).cloned()
370 }
371
372 pub fn list_indices(&self) -> Vec<String> {
374 let indices = self.indices.read();
375 indices.keys().cloned().collect()
376 }
377
378 pub fn get_metrics(&self) -> Arc<TierMetrics> {
380 self.metrics.clone()
381 }
382
383 pub fn cleanup_history(&self) {
385 let mut tracker = self.access_tracker.write();
386 tracker.cleanup_old_entries(self.config.metrics_retention);
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use super::*;
393 use crate::tiering::types::{AccessStatistics, IndexType, PerformanceMetrics};
394 use std::collections::HashMap;
395
396 fn create_test_config() -> TieringConfig {
397 TieringConfig::development()
398 }
399
400 fn create_test_metadata(id: &str, tier: StorageTier) -> IndexMetadata {
401 IndexMetadata {
402 index_id: id.to_string(),
403 current_tier: tier,
404 size_bytes: 1024 * 1024, compressed_size_bytes: 512 * 1024,
406 vector_count: 10_000,
407 dimension: 128,
408 index_type: IndexType::Hnsw,
409 created_at: SystemTime::now(),
410 last_accessed: SystemTime::now(),
411 last_modified: SystemTime::now(),
412 access_stats: AccessStatistics::default(),
413 performance_metrics: PerformanceMetrics::default(),
414 storage_path: None,
415 custom_metadata: HashMap::new(),
416 }
417 }
418
419 #[test]
420 fn test_tiering_manager_creation() {
421 let config = create_test_config();
422 let manager = TieringManager::new(config).unwrap();
423
424 let stats = manager.get_tier_statistics();
425 assert!(stats.contains_key(&StorageTier::Hot));
426 assert!(stats.contains_key(&StorageTier::Warm));
427 assert!(stats.contains_key(&StorageTier::Cold));
428 }
429
430 #[test]
431 fn test_register_and_store_index() {
432 let config = create_test_config();
433 let manager = TieringManager::new(config).unwrap();
434
435 let metadata = create_test_metadata("test_index", StorageTier::Hot);
436 manager
437 .register_index("test_index".to_string(), metadata)
438 .unwrap();
439
440 let data = vec![1, 2, 3, 4, 5];
441 manager
442 .store_index("test_index", &data, StorageTier::Hot)
443 .unwrap();
444
445 let loaded = manager.load_index("test_index").unwrap();
446 assert_eq!(loaded, data);
447 }
448
449 #[test]
450 fn test_tier_transition() {
451 let config = create_test_config();
452 let manager = TieringManager::new(config).unwrap();
453
454 let metadata = create_test_metadata("test_index", StorageTier::Hot);
455 manager
456 .register_index("test_index".to_string(), metadata)
457 .unwrap();
458
459 let data = vec![1, 2, 3, 4, 5];
460 manager
461 .store_index("test_index", &data, StorageTier::Hot)
462 .unwrap();
463
464 manager
466 .transition_index(
467 "test_index",
468 StorageTier::Warm,
469 TierTransitionReason::CostOptimization,
470 )
471 .unwrap();
472
473 let metadata = manager.get_index_metadata("test_index").unwrap();
474 assert_eq!(metadata.current_tier, StorageTier::Warm);
475 }
476
477 #[test]
478 fn test_list_indices() {
479 let config = create_test_config();
480 let manager = TieringManager::new(config).unwrap();
481
482 let metadata1 = create_test_metadata("index1", StorageTier::Hot);
483 let metadata2 = create_test_metadata("index2", StorageTier::Warm);
484
485 manager
486 .register_index("index1".to_string(), metadata1)
487 .unwrap();
488 manager
489 .register_index("index2".to_string(), metadata2)
490 .unwrap();
491
492 let indices = manager.list_indices();
493 assert_eq!(indices.len(), 2);
494 assert!(indices.contains(&"index1".to_string()));
495 assert!(indices.contains(&"index2".to_string()));
496 }
497}