oxirs_vec/tiering/
metrics.rs

1//! Metrics collection for tiering system
2
3use super::types::{StorageTier, TierStatistics, TierTransition};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, SystemTime};
8
9/// Tier metrics collector
10#[derive(Debug, Clone)]
11pub struct TierMetrics {
12    /// Statistics for each tier
13    tier_stats: Arc<Mutex<HashMap<StorageTier, TierStatistics>>>,
14    /// Transition history
15    transition_history: Arc<Mutex<Vec<TierTransition>>>,
16    /// Performance metrics
17    performance_metrics: Arc<Mutex<PerformanceMetrics>>,
18    /// Cost metrics
19    cost_metrics: Arc<Mutex<CostMetrics>>,
20}
21
22impl Default for TierMetrics {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl TierMetrics {
29    /// Create a new metrics collector
30    pub fn new() -> Self {
31        let mut tier_stats = HashMap::new();
32        tier_stats.insert(StorageTier::Hot, TierStatistics::default());
33        tier_stats.insert(StorageTier::Warm, TierStatistics::default());
34        tier_stats.insert(StorageTier::Cold, TierStatistics::default());
35
36        Self {
37            tier_stats: Arc::new(Mutex::new(tier_stats)),
38            transition_history: Arc::new(Mutex::new(Vec::new())),
39            performance_metrics: Arc::new(Mutex::new(PerformanceMetrics::default())),
40            cost_metrics: Arc::new(Mutex::new(CostMetrics::default())),
41        }
42    }
43
44    /// Record a query to a tier
45    pub fn record_query(&self, tier: StorageTier, latency_us: u64, hit: bool) {
46        let mut stats = self.tier_stats.lock().expect("lock poisoned");
47        if let Some(tier_stat) = stats.get_mut(&tier) {
48            tier_stat.total_queries += 1;
49
50            // Update average latency (exponential moving average)
51            let alpha = 0.1;
52            tier_stat.avg_query_latency_us = ((1.0 - alpha) * tier_stat.avg_query_latency_us as f64
53                + alpha * latency_us as f64) as u64;
54
55            // Update hit rate
56            if hit {
57                tier_stat.hit_rate = (tier_stat.hit_rate * (tier_stat.total_queries - 1) as f64
58                    + 1.0)
59                    / tier_stat.total_queries as f64;
60            } else {
61                tier_stat.hit_rate = (tier_stat.hit_rate * (tier_stat.total_queries - 1) as f64)
62                    / tier_stat.total_queries as f64;
63            }
64
65            tier_stat.last_updated = SystemTime::now();
66        }
67    }
68
69    /// Record a tier transition
70    pub fn record_transition(&self, transition: TierTransition) {
71        // Extract tier information before moving transition
72        let from_tier = transition.from_tier;
73        let to_tier = transition.to_tier;
74
75        let mut history = self.transition_history.lock().expect("lock poisoned");
76        history.push(transition);
77
78        // Update tier statistics
79        let mut stats = self.tier_stats.lock().expect("lock poisoned");
80        if let Some(from_tier_stats) = stats.get_mut(&from_tier) {
81            from_tier_stats.promotions += 1;
82        }
83        if let Some(to_tier_stats) = stats.get_mut(&to_tier) {
84            to_tier_stats.demotions += 1;
85        }
86    }
87
88    /// Update tier capacity and usage
89    pub fn update_tier_usage(&self, tier: StorageTier, used_bytes: u64, capacity_bytes: u64) {
90        let mut stats = self.tier_stats.lock().expect("lock poisoned");
91        if let Some(tier_stat) = stats.get_mut(&tier) {
92            tier_stat.used_bytes = used_bytes;
93            tier_stat.capacity_bytes = capacity_bytes;
94            tier_stat.last_updated = SystemTime::now();
95        }
96    }
97
98    /// Update index count for a tier
99    pub fn update_index_count(&self, tier: StorageTier, count: usize) {
100        let mut stats = self.tier_stats.lock().expect("lock poisoned");
101        if let Some(tier_stat) = stats.get_mut(&tier) {
102            tier_stat.index_count = count;
103        }
104    }
105
106    /// Record bytes read from a tier
107    pub fn record_bytes_read(&self, tier: StorageTier, bytes: u64) {
108        let mut stats = self.tier_stats.lock().expect("lock poisoned");
109        if let Some(tier_stat) = stats.get_mut(&tier) {
110            tier_stat.bytes_read += bytes;
111        }
112    }
113
114    /// Record bytes written to a tier
115    pub fn record_bytes_written(&self, tier: StorageTier, bytes: u64) {
116        let mut stats = self.tier_stats.lock().expect("lock poisoned");
117        if let Some(tier_stat) = stats.get_mut(&tier) {
118            tier_stat.bytes_written += bytes;
119        }
120    }
121
122    /// Get statistics for a tier
123    pub fn get_tier_statistics(&self, tier: StorageTier) -> TierStatistics {
124        let stats = self.tier_stats.lock().expect("lock poisoned");
125        stats.get(&tier).cloned().unwrap_or_default()
126    }
127
128    /// Get all tier statistics
129    pub fn get_all_tier_statistics(&self) -> HashMap<StorageTier, TierStatistics> {
130        self.tier_stats.lock().expect("lock poisoned").clone()
131    }
132
133    /// Get transition history
134    pub fn get_transition_history(&self, limit: Option<usize>) -> Vec<TierTransition> {
135        let history = self.transition_history.lock().expect("lock poisoned");
136        if let Some(lim) = limit {
137            history.iter().rev().take(lim).cloned().collect()
138        } else {
139            history.clone()
140        }
141    }
142
143    /// Get performance metrics
144    pub fn get_performance_metrics(&self) -> PerformanceMetrics {
145        self.performance_metrics
146            .lock()
147            .expect("lock poisoned")
148            .clone()
149    }
150
151    /// Update performance metrics
152    pub fn update_performance_metrics<F>(&self, update_fn: F)
153    where
154        F: FnOnce(&mut PerformanceMetrics),
155    {
156        let mut metrics = self.performance_metrics.lock().expect("lock poisoned");
157        update_fn(&mut metrics);
158    }
159
160    /// Get cost metrics
161    pub fn get_cost_metrics(&self) -> CostMetrics {
162        self.cost_metrics.lock().expect("lock poisoned").clone()
163    }
164
165    /// Update cost metrics
166    pub fn update_cost_metrics<F>(&self, update_fn: F)
167    where
168        F: FnOnce(&mut CostMetrics),
169    {
170        let mut metrics = self.cost_metrics.lock().expect("lock poisoned");
171        update_fn(&mut metrics);
172    }
173
174    /// Calculate overall system efficiency
175    pub fn calculate_efficiency(&self) -> TieringEfficiency {
176        let stats = self.tier_stats.lock().expect("lock poisoned");
177        let perf = self.performance_metrics.lock().expect("lock poisoned");
178        let cost = self.cost_metrics.lock().expect("lock poisoned");
179
180        // Overall hit rate (weighted by tier)
181        let hot_stat = stats
182            .get(&StorageTier::Hot)
183            .expect("Hot tier should exist in stats");
184        let warm_stat = stats
185            .get(&StorageTier::Warm)
186            .expect("Warm tier should exist in stats");
187        let cold_stat = stats
188            .get(&StorageTier::Cold)
189            .expect("Cold tier should exist in stats");
190
191        let total_queries =
192            hot_stat.total_queries + warm_stat.total_queries + cold_stat.total_queries;
193        let overall_hit_rate = if total_queries > 0 {
194            (hot_stat.hit_rate * hot_stat.total_queries as f64
195                + warm_stat.hit_rate * warm_stat.total_queries as f64
196                + cold_stat.hit_rate * cold_stat.total_queries as f64)
197                / total_queries as f64
198        } else {
199            0.0
200        };
201
202        // Average latency (weighted by tier)
203        let avg_latency = if total_queries > 0 {
204            (hot_stat.avg_query_latency_us * hot_stat.total_queries
205                + warm_stat.avg_query_latency_us * warm_stat.total_queries
206                + cold_stat.avg_query_latency_us * cold_stat.total_queries)
207                / total_queries
208        } else {
209            0
210        };
211
212        // Utilization efficiency
213        let total_capacity =
214            hot_stat.capacity_bytes + warm_stat.capacity_bytes + cold_stat.capacity_bytes;
215        let total_used = hot_stat.used_bytes + warm_stat.used_bytes + cold_stat.used_bytes;
216        let utilization_efficiency = if total_capacity > 0 {
217            total_used as f64 / total_capacity as f64
218        } else {
219            0.0
220        };
221
222        TieringEfficiency {
223            overall_hit_rate,
224            avg_query_latency_us: avg_latency,
225            utilization_efficiency,
226            cost_per_query: cost.total_query_cost / total_queries.max(1) as f64,
227            cost_per_gb_hour: cost.total_storage_cost
228                / ((total_used as f64 / (1024.0 * 1024.0 * 1024.0)) * perf.uptime_hours),
229            transitions_per_hour: perf.total_transitions as f64 / perf.uptime_hours,
230        }
231    }
232
233    /// Reset all metrics
234    pub fn reset(&self) {
235        let mut stats = self.tier_stats.lock().expect("lock poisoned");
236        for tier_stat in stats.values_mut() {
237            *tier_stat = TierStatistics::default();
238        }
239
240        let mut history = self.transition_history.lock().expect("lock poisoned");
241        history.clear();
242
243        let mut perf = self.performance_metrics.lock().expect("lock poisoned");
244        *perf = PerformanceMetrics::default();
245
246        let mut cost = self.cost_metrics.lock().expect("lock poisoned");
247        *cost = CostMetrics::default();
248    }
249}
250
251/// Performance metrics
252#[derive(Debug, Clone, Default, Serialize, Deserialize)]
253pub struct PerformanceMetrics {
254    /// Total number of tier transitions
255    pub total_transitions: u64,
256    /// Successful transitions
257    pub successful_transitions: u64,
258    /// Failed transitions
259    pub failed_transitions: u64,
260    /// Average transition duration
261    pub avg_transition_duration: Duration,
262    /// Total system uptime in hours
263    pub uptime_hours: f64,
264    /// Peak memory usage in bytes
265    pub peak_memory_usage_bytes: u64,
266    /// Average CPU utilization (0.0 - 1.0)
267    pub avg_cpu_utilization: f64,
268}
269
270/// Cost metrics
271#[derive(Debug, Clone, Default, Serialize, Deserialize)]
272pub struct CostMetrics {
273    /// Total storage cost
274    pub total_storage_cost: f64,
275    /// Total query cost
276    pub total_query_cost: f64,
277    /// Total transition cost
278    pub total_transition_cost: f64,
279    /// Cost by tier
280    pub cost_by_tier: HashMap<String, f64>,
281}
282
283/// Overall tiering efficiency
284#[derive(Debug, Clone, Serialize, Deserialize)]
285pub struct TieringEfficiency {
286    /// Overall hit rate across all tiers
287    pub overall_hit_rate: f64,
288    /// Average query latency in microseconds
289    pub avg_query_latency_us: u64,
290    /// Storage utilization efficiency (0.0 - 1.0)
291    pub utilization_efficiency: f64,
292    /// Cost per query
293    pub cost_per_query: f64,
294    /// Cost per GB-hour
295    pub cost_per_gb_hour: f64,
296    /// Transitions per hour
297    pub transitions_per_hour: f64,
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn test_tier_metrics_basic() {
306        let metrics = TierMetrics::new();
307
308        // Record some queries
309        metrics.record_query(StorageTier::Hot, 100, true);
310        metrics.record_query(StorageTier::Hot, 200, true);
311        metrics.record_query(StorageTier::Hot, 150, false);
312
313        let stats = metrics.get_tier_statistics(StorageTier::Hot);
314        assert_eq!(stats.total_queries, 3);
315        assert!(stats.avg_query_latency_us > 0);
316        assert!(stats.hit_rate > 0.0);
317    }
318
319    #[test]
320    fn test_transition_recording() {
321        let metrics = TierMetrics::new();
322
323        let transition = TierTransition {
324            index_id: "test_index".to_string(),
325            from_tier: StorageTier::Warm,
326            to_tier: StorageTier::Hot,
327            reason: "High access frequency".to_string(),
328            timestamp: SystemTime::now(),
329            duration: Duration::from_secs(5),
330            success: true,
331            error: None,
332        };
333
334        metrics.record_transition(transition);
335
336        let history = metrics.get_transition_history(Some(10));
337        assert_eq!(history.len(), 1);
338    }
339
340    #[test]
341    fn test_efficiency_calculation() {
342        let metrics = TierMetrics::new();
343
344        // Setup some statistics
345        metrics.update_tier_usage(
346            StorageTier::Hot,
347            8 * 1024 * 1024 * 1024,
348            16 * 1024 * 1024 * 1024,
349        );
350        metrics.record_query(StorageTier::Hot, 100, true);
351        metrics.record_query(StorageTier::Hot, 150, true);
352
353        let efficiency = metrics.calculate_efficiency();
354        assert!(efficiency.overall_hit_rate > 0.0);
355        assert!(efficiency.avg_query_latency_us > 0);
356    }
357}