oxirs_vec/tiering/
manager.rs

1//! Main tiering manager that coordinates tier operations
2
3use super::access_tracker::AccessTracker;
4use super::config::TieringConfig;
5use super::metrics::TierMetrics;
6use super::policies::TierTransitionReason;
7use super::storage_backends::{ColdTierStorage, HotTierStorage, StorageBackend, WarmTierStorage};
8use super::tier_optimizer::{TierOptimizationRecommendation, TierOptimizer};
9use super::types::{IndexMetadata, StorageTier, TierStatistics, TierTransition};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16/// Main tiering manager
17pub struct TieringManager {
18    /// Configuration
19    config: TieringConfig,
20    /// Index metadata registry
21    indices: Arc<RwLock<HashMap<String, IndexMetadata>>>,
22    /// Hot tier storage
23    hot_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
24    /// Warm tier storage
25    warm_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
26    /// Cold tier storage
27    cold_storage: Arc<RwLock<Box<dyn StorageBackend>>>,
28    /// Access tracker
29    access_tracker: Arc<RwLock<AccessTracker>>,
30    /// Metrics collector
31    metrics: Arc<TierMetrics>,
32    /// Tier optimizer
33    optimizer: Arc<RwLock<TierOptimizer>>,
34    /// Background task handle (for cleanup and optimization)
35    _background_task: Option<std::thread::JoinHandle<()>>,
36}
37
38impl TieringManager {
39    /// Create a new tiering manager
40    pub fn new(config: TieringConfig) -> Result<Self> {
41        config.validate()?;
42
43        // Create storage backends
44        let hot_storage: Box<dyn StorageBackend> = Box::new(HotTierStorage::new());
45
46        let warm_storage: Box<dyn StorageBackend> = Box::new(WarmTierStorage::new(
47            config.storage_base_path.join("warm"),
48            config.warm_tier_compression,
49            config.warm_tier_compression_level,
50        )?);
51
52        let cold_storage: Box<dyn StorageBackend> = Box::new(ColdTierStorage::new(
53            config.storage_base_path.join("cold"),
54            config.cold_tier_compression_level,
55        )?);
56
57        // Create access tracker
58        let access_tracker = AccessTracker::new(10000, Duration::from_secs(86400));
59
60        // Create tier optimizer
61        let optimizer = TierOptimizer::new(config.clone());
62
63        // Initialize metrics
64        let metrics = Arc::new(TierMetrics::new());
65
66        // Set initial tier capacities
67        metrics.update_tier_usage(StorageTier::Hot, 0, config.hot_tier_capacity_bytes());
68        metrics.update_tier_usage(StorageTier::Warm, 0, config.warm_tier_capacity_bytes());
69        metrics.update_tier_usage(StorageTier::Cold, 0, config.cold_tier_capacity_bytes());
70
71        Ok(Self {
72            config,
73            indices: Arc::new(RwLock::new(HashMap::new())),
74            hot_storage: Arc::new(RwLock::new(hot_storage)),
75            warm_storage: Arc::new(RwLock::new(warm_storage)),
76            cold_storage: Arc::new(RwLock::new(cold_storage)),
77            access_tracker: Arc::new(RwLock::new(access_tracker)),
78            metrics,
79            optimizer: Arc::new(RwLock::new(optimizer)),
80            _background_task: None,
81        })
82    }
83
84    /// Register a new index with metadata
85    pub fn register_index(&self, index_id: String, metadata: IndexMetadata) -> Result<()> {
86        let mut indices = self.indices.write();
87        indices.insert(index_id, metadata);
88        Ok(())
89    }
90
91    /// Store index data in appropriate tier
92    pub fn store_index(&self, index_id: &str, data: &[u8], tier: StorageTier) -> Result<()> {
93        let start = SystemTime::now();
94
95        // Store in appropriate backend
96        let storage = match tier {
97            StorageTier::Hot => self.hot_storage.clone(),
98            StorageTier::Warm => self.warm_storage.clone(),
99            StorageTier::Cold => self.cold_storage.clone(),
100        };
101
102        storage.write().save_index(index_id, data)?;
103
104        // Update metadata
105        let mut indices = self.indices.write();
106        if let Some(metadata) = indices.get_mut(index_id) {
107            metadata.current_tier = tier;
108            metadata.size_bytes = data.len() as u64;
109            metadata.last_modified = SystemTime::now();
110        }
111
112        // Update metrics
113        let duration = start.elapsed().unwrap_or(Duration::ZERO);
114        self.metrics.record_bytes_written(tier, data.len() as u64);
115
116        tracing::info!(
117            "Stored index {} in {:?} tier ({} bytes, {:?})",
118            index_id,
119            tier,
120            data.len(),
121            duration
122        );
123
124        Ok(())
125    }
126
127    /// Load index data from its current tier
128    pub fn load_index(&self, index_id: &str) -> Result<Vec<u8>> {
129        let start = SystemTime::now();
130
131        // Get current tier
132        let tier = {
133            let indices = self.indices.read();
134            indices
135                .get(index_id)
136                .map(|m| m.current_tier)
137                .ok_or_else(|| anyhow::anyhow!("Index {} not found", index_id))?
138        };
139
140        // Load from appropriate backend
141        let storage = match tier {
142            StorageTier::Hot => self.hot_storage.clone(),
143            StorageTier::Warm => self.warm_storage.clone(),
144            StorageTier::Cold => self.cold_storage.clone(),
145        };
146
147        let data = storage.read().load_index(index_id)?;
148
149        // Record access
150        let latency_us = start.elapsed().unwrap_or(Duration::ZERO).as_micros() as u64;
151        self.access_tracker
152            .write()
153            .record_access(index_id, latency_us);
154
155        // Update metrics
156        self.metrics.record_query(tier, latency_us, true);
157        self.metrics.record_bytes_read(tier, data.len() as u64);
158
159        // Update metadata
160        {
161            let mut indices = self.indices.write();
162            if let Some(metadata) = indices.get_mut(index_id) {
163                self.access_tracker.read().update_metadata(metadata);
164            }
165        }
166
167        Ok(data)
168    }
169
170    /// Transition an index between tiers
171    pub fn transition_index(
172        &self,
173        index_id: &str,
174        target_tier: StorageTier,
175        reason: TierTransitionReason,
176    ) -> Result<()> {
177        let start = SystemTime::now();
178
179        // Get current state
180        let (current_tier, size_bytes) = {
181            let indices = self.indices.read();
182            let metadata = indices
183                .get(index_id)
184                .ok_or_else(|| anyhow::anyhow!("Index {} not found", index_id))?;
185            (metadata.current_tier, metadata.size_bytes)
186        };
187
188        if current_tier == target_tier {
189            return Ok(()); // Already in target tier
190        }
191
192        // Load from current tier
193        let source_storage = match current_tier {
194            StorageTier::Hot => self.hot_storage.clone(),
195            StorageTier::Warm => self.warm_storage.clone(),
196            StorageTier::Cold => self.cold_storage.clone(),
197        };
198
199        let data = source_storage.read().load_index(index_id)?;
200
201        // Save to target tier
202        let target_storage = match target_tier {
203            StorageTier::Hot => self.hot_storage.clone(),
204            StorageTier::Warm => self.warm_storage.clone(),
205            StorageTier::Cold => self.cold_storage.clone(),
206        };
207
208        target_storage.write().save_index(index_id, &data)?;
209
210        // Delete from source tier (unless it's a promotion and we want gradual transition)
211        if !self.config.gradual_transition.enabled {
212            source_storage.write().delete_index(index_id)?;
213        }
214
215        // Update metadata
216        {
217            let mut indices = self.indices.write();
218            if let Some(metadata) = indices.get_mut(index_id) {
219                metadata.current_tier = target_tier;
220                metadata.last_modified = SystemTime::now();
221            }
222        }
223
224        // Record transition
225        let duration = start.elapsed().unwrap_or(Duration::ZERO);
226        let transition = TierTransition {
227            index_id: index_id.to_string(),
228            from_tier: current_tier,
229            to_tier: target_tier,
230            reason: format!("{:?}", reason),
231            timestamp: SystemTime::now(),
232            duration,
233            success: true,
234            error: None,
235        };
236
237        self.metrics.record_transition(transition);
238
239        // Update tier usage
240        self.update_tier_usage_metrics();
241
242        tracing::info!(
243            "Transitioned index {} from {:?} to {:?} ({} bytes, {:?})",
244            index_id,
245            current_tier,
246            target_tier,
247            size_bytes,
248            duration
249        );
250
251        Ok(())
252    }
253
254    /// Run tier optimization and return recommendations
255    pub fn optimize_tiers(&self) -> Result<Vec<TierOptimizationRecommendation>> {
256        let indices: Vec<IndexMetadata> = {
257            let indices = self.indices.read();
258            indices.values().cloned().collect()
259        };
260
261        let tier_stats = self.get_tier_statistics_array();
262
263        let mut optimizer = self.optimizer.write();
264        let recommendations = optimizer.optimize_tier_placements(&indices, &tier_stats);
265
266        Ok(recommendations)
267    }
268
269    /// Apply optimization recommendations automatically
270    pub fn apply_optimizations(&self, limit: Option<usize>) -> Result<Vec<String>> {
271        let recommendations = self.optimize_tiers()?;
272
273        let mut applied = Vec::new();
274        let limit = limit.unwrap_or(usize::MAX);
275
276        for (i, rec) in recommendations.iter().enumerate() {
277            if i >= limit {
278                break;
279            }
280
281            // Check if transition is worthwhile
282            if rec.priority < 0.5 {
283                continue; // Skip low-priority transitions
284            }
285
286            match self.transition_index(&rec.index_id, rec.recommended_tier, rec.reason.clone()) {
287                Ok(_) => {
288                    applied.push(rec.index_id.clone());
289                }
290                Err(e) => {
291                    tracing::warn!("Failed to transition index {}: {}", rec.index_id, e);
292                }
293            }
294        }
295
296        Ok(applied)
297    }
298
299    /// Get statistics for all tiers
300    pub fn get_tier_statistics(&self) -> HashMap<StorageTier, TierStatistics> {
301        self.metrics.get_all_tier_statistics()
302    }
303
304    /// Get tier statistics as array [Hot, Warm, Cold]
305    fn get_tier_statistics_array(&self) -> [TierStatistics; 3] {
306        [
307            self.metrics.get_tier_statistics(StorageTier::Hot),
308            self.metrics.get_tier_statistics(StorageTier::Warm),
309            self.metrics.get_tier_statistics(StorageTier::Cold),
310        ]
311    }
312
313    /// Update tier usage metrics
314    fn update_tier_usage_metrics(&self) {
315        // Calculate usage for each tier
316        let indices = self.indices.read();
317
318        let mut hot_usage = 0u64;
319        let mut warm_usage = 0u64;
320        let mut cold_usage = 0u64;
321
322        let mut hot_count = 0;
323        let mut warm_count = 0;
324        let mut cold_count = 0;
325
326        for metadata in indices.values() {
327            match metadata.current_tier {
328                StorageTier::Hot => {
329                    hot_usage += metadata.size_bytes;
330                    hot_count += 1;
331                }
332                StorageTier::Warm => {
333                    warm_usage += metadata.size_bytes;
334                    warm_count += 1;
335                }
336                StorageTier::Cold => {
337                    cold_usage += metadata.size_bytes;
338                    cold_count += 1;
339                }
340            }
341        }
342
343        self.metrics.update_tier_usage(
344            StorageTier::Hot,
345            hot_usage,
346            self.config.hot_tier_capacity_bytes(),
347        );
348        self.metrics.update_tier_usage(
349            StorageTier::Warm,
350            warm_usage,
351            self.config.warm_tier_capacity_bytes(),
352        );
353        self.metrics.update_tier_usage(
354            StorageTier::Cold,
355            cold_usage,
356            self.config.cold_tier_capacity_bytes(),
357        );
358
359        self.metrics.update_index_count(StorageTier::Hot, hot_count);
360        self.metrics
361            .update_index_count(StorageTier::Warm, warm_count);
362        self.metrics
363            .update_index_count(StorageTier::Cold, cold_count);
364    }
365
366    /// Get index metadata
367    pub fn get_index_metadata(&self, index_id: &str) -> Option<IndexMetadata> {
368        let indices = self.indices.read();
369        indices.get(index_id).cloned()
370    }
371
372    /// List all registered indices
373    pub fn list_indices(&self) -> Vec<String> {
374        let indices = self.indices.read();
375        indices.keys().cloned().collect()
376    }
377
378    /// Get metrics
379    pub fn get_metrics(&self) -> Arc<TierMetrics> {
380        self.metrics.clone()
381    }
382
383    /// Cleanup old access history
384    pub fn cleanup_history(&self) {
385        let mut tracker = self.access_tracker.write();
386        tracker.cleanup_old_entries(self.config.metrics_retention);
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use super::*;
393    use crate::tiering::types::{AccessStatistics, IndexType, PerformanceMetrics};
394    use std::collections::HashMap;
395
396    fn create_test_config() -> TieringConfig {
397        TieringConfig::development()
398    }
399
400    fn create_test_metadata(id: &str, tier: StorageTier) -> IndexMetadata {
401        IndexMetadata {
402            index_id: id.to_string(),
403            current_tier: tier,
404            size_bytes: 1024 * 1024, // 1 MB
405            compressed_size_bytes: 512 * 1024,
406            vector_count: 10_000,
407            dimension: 128,
408            index_type: IndexType::Hnsw,
409            created_at: SystemTime::now(),
410            last_accessed: SystemTime::now(),
411            last_modified: SystemTime::now(),
412            access_stats: AccessStatistics::default(),
413            performance_metrics: PerformanceMetrics::default(),
414            storage_path: None,
415            custom_metadata: HashMap::new(),
416        }
417    }
418
419    #[test]
420    fn test_tiering_manager_creation() {
421        let config = create_test_config();
422        let manager = TieringManager::new(config).unwrap();
423
424        let stats = manager.get_tier_statistics();
425        assert!(stats.contains_key(&StorageTier::Hot));
426        assert!(stats.contains_key(&StorageTier::Warm));
427        assert!(stats.contains_key(&StorageTier::Cold));
428    }
429
430    #[test]
431    fn test_register_and_store_index() {
432        let config = create_test_config();
433        let manager = TieringManager::new(config).unwrap();
434
435        let metadata = create_test_metadata("test_index", StorageTier::Hot);
436        manager
437            .register_index("test_index".to_string(), metadata)
438            .unwrap();
439
440        let data = vec![1, 2, 3, 4, 5];
441        manager
442            .store_index("test_index", &data, StorageTier::Hot)
443            .unwrap();
444
445        let loaded = manager.load_index("test_index").unwrap();
446        assert_eq!(loaded, data);
447    }
448
449    #[test]
450    fn test_tier_transition() {
451        let config = create_test_config();
452        let manager = TieringManager::new(config).unwrap();
453
454        let metadata = create_test_metadata("test_index", StorageTier::Hot);
455        manager
456            .register_index("test_index".to_string(), metadata)
457            .unwrap();
458
459        let data = vec![1, 2, 3, 4, 5];
460        manager
461            .store_index("test_index", &data, StorageTier::Hot)
462            .unwrap();
463
464        // Transition to warm tier
465        manager
466            .transition_index(
467                "test_index",
468                StorageTier::Warm,
469                TierTransitionReason::CostOptimization,
470            )
471            .unwrap();
472
473        let metadata = manager.get_index_metadata("test_index").unwrap();
474        assert_eq!(metadata.current_tier, StorageTier::Warm);
475    }
476
477    #[test]
478    fn test_list_indices() {
479        let config = create_test_config();
480        let manager = TieringManager::new(config).unwrap();
481
482        let metadata1 = create_test_metadata("index1", StorageTier::Hot);
483        let metadata2 = create_test_metadata("index2", StorageTier::Warm);
484
485        manager
486            .register_index("index1".to_string(), metadata1)
487            .unwrap();
488        manager
489            .register_index("index2".to_string(), metadata2)
490            .unwrap();
491
492        let indices = manager.list_indices();
493        assert_eq!(indices.len(), 2);
494        assert!(indices.contains(&"index1".to_string()));
495        assert!(indices.contains(&"index2".to_string()));
496    }
497}