Skip to main content

oxirs_vec/tiering/
metrics.rs

1//! Metrics collection for tiering system
2
3use super::types::{StorageTier, TierStatistics, TierTransition};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::{Arc, Mutex};
7use std::time::{Duration, SystemTime};
8
9/// Tier metrics collector
10#[derive(Debug, Clone)]
11pub struct TierMetrics {
12    /// Statistics for each tier
13    tier_stats: Arc<Mutex<HashMap<StorageTier, TierStatistics>>>,
14    /// Transition history
15    transition_history: Arc<Mutex<Vec<TierTransition>>>,
16    /// Performance metrics
17    performance_metrics: Arc<Mutex<PerformanceMetrics>>,
18    /// Cost metrics
19    cost_metrics: Arc<Mutex<CostMetrics>>,
20}
21
22impl Default for TierMetrics {
23    fn default() -> Self {
24        Self::new()
25    }
26}
27
28impl TierMetrics {
29    /// Create a new metrics collector
30    pub fn new() -> Self {
31        let mut tier_stats = HashMap::new();
32        tier_stats.insert(StorageTier::Hot, TierStatistics::default());
33        tier_stats.insert(StorageTier::Warm, TierStatistics::default());
34        tier_stats.insert(StorageTier::Cold, TierStatistics::default());
35
36        Self {
37            tier_stats: Arc::new(Mutex::new(tier_stats)),
38            transition_history: Arc::new(Mutex::new(Vec::new())),
39            performance_metrics: Arc::new(Mutex::new(PerformanceMetrics::default())),
40            cost_metrics: Arc::new(Mutex::new(CostMetrics::default())),
41        }
42    }
43
44    /// Record a query to a tier
45    pub fn record_query(&self, tier: StorageTier, latency_us: u64, hit: bool) {
46        let mut stats = self.tier_stats.lock().expect("lock poisoned");
47        if let Some(tier_stat) = stats.get_mut(&tier) {
48            tier_stat.total_queries += 1;
49
50            // Update average latency (exponential moving average)
51            let alpha = 0.1;
52            tier_stat.avg_query_latency_us = ((1.0 - alpha) * tier_stat.avg_query_latency_us as f64
53                + alpha * latency_us as f64) as u64;
54
55            // Update hit rate
56            if hit {
57                tier_stat.hit_rate = (tier_stat.hit_rate * (tier_stat.total_queries - 1) as f64
58                    + 1.0)
59                    / tier_stat.total_queries as f64;
60            } else {
61                tier_stat.hit_rate = (tier_stat.hit_rate * (tier_stat.total_queries - 1) as f64)
62                    / tier_stat.total_queries as f64;
63            }
64
65            tier_stat.last_updated = SystemTime::now();
66        }
67    }
68
69    /// Record a tier transition
70    pub fn record_transition(&self, transition: TierTransition) {
71        // Extract tier information before moving transition
72        let from_tier = transition.from_tier;
73        let to_tier = transition.to_tier;
74
75        let mut history = self.transition_history.lock().expect("lock poisoned");
76        history.push(transition);
77
78        // Update tier statistics
79        let mut stats = self.tier_stats.lock().expect("lock poisoned");
80        if let Some(from_tier_stats) = stats.get_mut(&from_tier) {
81            from_tier_stats.promotions += 1;
82        }
83        if let Some(to_tier_stats) = stats.get_mut(&to_tier) {
84            to_tier_stats.demotions += 1;
85        }
86    }
87
88    /// Update tier capacity and usage
89    pub fn update_tier_usage(&self, tier: StorageTier, used_bytes: u64, capacity_bytes: u64) {
90        let mut stats = self.tier_stats.lock().expect("lock poisoned");
91        if let Some(tier_stat) = stats.get_mut(&tier) {
92            tier_stat.used_bytes = used_bytes;
93            tier_stat.capacity_bytes = capacity_bytes;
94            tier_stat.last_updated = SystemTime::now();
95        }
96    }
97
98    /// Update index count for a tier
99    pub fn update_index_count(&self, tier: StorageTier, count: usize) {
100        let mut stats = self.tier_stats.lock().expect("lock poisoned");
101        if let Some(tier_stat) = stats.get_mut(&tier) {
102            tier_stat.index_count = count;
103        }
104    }
105
106    /// Record bytes read from a tier
107    pub fn record_bytes_read(&self, tier: StorageTier, bytes: u64) {
108        let mut stats = self.tier_stats.lock().expect("lock poisoned");
109        if let Some(tier_stat) = stats.get_mut(&tier) {
110            tier_stat.bytes_read += bytes;
111        }
112    }
113
114    /// Record bytes written to a tier
115    pub fn record_bytes_written(&self, tier: StorageTier, bytes: u64) {
116        let mut stats = self.tier_stats.lock().expect("lock poisoned");
117        if let Some(tier_stat) = stats.get_mut(&tier) {
118            tier_stat.bytes_written += bytes;
119        }
120    }
121
122    /// Get statistics for a tier
123    pub fn get_tier_statistics(&self, tier: StorageTier) -> TierStatistics {
124        let stats = self.tier_stats.lock().expect("lock poisoned");
125        stats.get(&tier).cloned().unwrap_or_default()
126    }
127
128    /// Get all tier statistics
129    pub fn get_all_tier_statistics(&self) -> HashMap<StorageTier, TierStatistics> {
130        self.tier_stats.lock().expect("lock poisoned").clone()
131    }
132
133    /// Get transition history
134    pub fn get_transition_history(&self, limit: Option<usize>) -> Vec<TierTransition> {
135        let history = self.transition_history.lock().expect("lock poisoned");
136        if let Some(lim) = limit {
137            history.iter().rev().take(lim).cloned().collect()
138        } else {
139            history.clone()
140        }
141    }
142
143    /// Get performance metrics
144    pub fn get_performance_metrics(&self) -> PerformanceMetrics {
145        self.performance_metrics
146            .lock()
147            .expect("lock poisoned")
148            .clone()
149    }
150
151    /// Update performance metrics
152    pub fn update_performance_metrics<F>(&self, update_fn: F)
153    where
154        F: FnOnce(&mut PerformanceMetrics),
155    {
156        let mut metrics = self.performance_metrics.lock().expect("lock poisoned");
157        update_fn(&mut metrics);
158    }
159
160    /// Get cost metrics
161    pub fn get_cost_metrics(&self) -> CostMetrics {
162        self.cost_metrics.lock().expect("lock poisoned").clone()
163    }
164
165    /// Update cost metrics
166    pub fn update_cost_metrics<F>(&self, update_fn: F)
167    where
168        F: FnOnce(&mut CostMetrics),
169    {
170        let mut metrics = self.cost_metrics.lock().expect("lock poisoned");
171        update_fn(&mut metrics);
172    }
173
174    /// Calculate overall system efficiency
175    pub fn calculate_efficiency(&self) -> TieringEfficiency {
176        let stats = self.tier_stats.lock().expect("lock poisoned");
177        let perf = self.performance_metrics.lock().expect("lock poisoned");
178        let cost = self.cost_metrics.lock().expect("lock poisoned");
179
180        // Overall hit rate (weighted by tier)
181        let hot_stat = stats
182            .get(&StorageTier::Hot)
183            .expect("Hot tier should exist in stats");
184        let warm_stat = stats
185            .get(&StorageTier::Warm)
186            .expect("Warm tier should exist in stats");
187        let cold_stat = stats
188            .get(&StorageTier::Cold)
189            .expect("Cold tier should exist in stats");
190
191        let total_queries =
192            hot_stat.total_queries + warm_stat.total_queries + cold_stat.total_queries;
193        let overall_hit_rate = if total_queries > 0 {
194            (hot_stat.hit_rate * hot_stat.total_queries as f64
195                + warm_stat.hit_rate * warm_stat.total_queries as f64
196                + cold_stat.hit_rate * cold_stat.total_queries as f64)
197                / total_queries as f64
198        } else {
199            0.0
200        };
201
202        // Average latency (weighted by tier)
203        let avg_latency = (hot_stat.avg_query_latency_us * hot_stat.total_queries
204            + warm_stat.avg_query_latency_us * warm_stat.total_queries
205            + cold_stat.avg_query_latency_us * cold_stat.total_queries)
206            .checked_div(total_queries)
207            .unwrap_or(0);
208
209        // Utilization efficiency
210        let total_capacity =
211            hot_stat.capacity_bytes + warm_stat.capacity_bytes + cold_stat.capacity_bytes;
212        let total_used = hot_stat.used_bytes + warm_stat.used_bytes + cold_stat.used_bytes;
213        let utilization_efficiency = if total_capacity > 0 {
214            total_used as f64 / total_capacity as f64
215        } else {
216            0.0
217        };
218
219        TieringEfficiency {
220            overall_hit_rate,
221            avg_query_latency_us: avg_latency,
222            utilization_efficiency,
223            cost_per_query: cost.total_query_cost / total_queries.max(1) as f64,
224            cost_per_gb_hour: cost.total_storage_cost
225                / ((total_used as f64 / (1024.0 * 1024.0 * 1024.0)) * perf.uptime_hours),
226            transitions_per_hour: perf.total_transitions as f64 / perf.uptime_hours,
227        }
228    }
229
230    /// Reset all metrics
231    pub fn reset(&self) {
232        let mut stats = self.tier_stats.lock().expect("lock poisoned");
233        for tier_stat in stats.values_mut() {
234            *tier_stat = TierStatistics::default();
235        }
236
237        let mut history = self.transition_history.lock().expect("lock poisoned");
238        history.clear();
239
240        let mut perf = self.performance_metrics.lock().expect("lock poisoned");
241        *perf = PerformanceMetrics::default();
242
243        let mut cost = self.cost_metrics.lock().expect("lock poisoned");
244        *cost = CostMetrics::default();
245    }
246}
247
248/// Performance metrics
249#[derive(Debug, Clone, Default, Serialize, Deserialize)]
250pub struct PerformanceMetrics {
251    /// Total number of tier transitions
252    pub total_transitions: u64,
253    /// Successful transitions
254    pub successful_transitions: u64,
255    /// Failed transitions
256    pub failed_transitions: u64,
257    /// Average transition duration
258    pub avg_transition_duration: Duration,
259    /// Total system uptime in hours
260    pub uptime_hours: f64,
261    /// Peak memory usage in bytes
262    pub peak_memory_usage_bytes: u64,
263    /// Average CPU utilization (0.0 - 1.0)
264    pub avg_cpu_utilization: f64,
265}
266
267/// Cost metrics
268#[derive(Debug, Clone, Default, Serialize, Deserialize)]
269pub struct CostMetrics {
270    /// Total storage cost
271    pub total_storage_cost: f64,
272    /// Total query cost
273    pub total_query_cost: f64,
274    /// Total transition cost
275    pub total_transition_cost: f64,
276    /// Cost by tier
277    pub cost_by_tier: HashMap<String, f64>,
278}
279
280/// Overall tiering efficiency
281#[derive(Debug, Clone, Serialize, Deserialize)]
282pub struct TieringEfficiency {
283    /// Overall hit rate across all tiers
284    pub overall_hit_rate: f64,
285    /// Average query latency in microseconds
286    pub avg_query_latency_us: u64,
287    /// Storage utilization efficiency (0.0 - 1.0)
288    pub utilization_efficiency: f64,
289    /// Cost per query
290    pub cost_per_query: f64,
291    /// Cost per GB-hour
292    pub cost_per_gb_hour: f64,
293    /// Transitions per hour
294    pub transitions_per_hour: f64,
295}
296
297#[cfg(test)]
298mod tests {
299    use super::*;
300
301    #[test]
302    fn test_tier_metrics_basic() {
303        let metrics = TierMetrics::new();
304
305        // Record some queries
306        metrics.record_query(StorageTier::Hot, 100, true);
307        metrics.record_query(StorageTier::Hot, 200, true);
308        metrics.record_query(StorageTier::Hot, 150, false);
309
310        let stats = metrics.get_tier_statistics(StorageTier::Hot);
311        assert_eq!(stats.total_queries, 3);
312        assert!(stats.avg_query_latency_us > 0);
313        assert!(stats.hit_rate > 0.0);
314    }
315
316    #[test]
317    fn test_transition_recording() {
318        let metrics = TierMetrics::new();
319
320        let transition = TierTransition {
321            index_id: "test_index".to_string(),
322            from_tier: StorageTier::Warm,
323            to_tier: StorageTier::Hot,
324            reason: "High access frequency".to_string(),
325            timestamp: SystemTime::now(),
326            duration: Duration::from_secs(5),
327            success: true,
328            error: None,
329        };
330
331        metrics.record_transition(transition);
332
333        let history = metrics.get_transition_history(Some(10));
334        assert_eq!(history.len(), 1);
335    }
336
337    #[test]
338    fn test_efficiency_calculation() {
339        let metrics = TierMetrics::new();
340
341        // Setup some statistics
342        metrics.update_tier_usage(
343            StorageTier::Hot,
344            8 * 1024 * 1024 * 1024,
345            16 * 1024 * 1024 * 1024,
346        );
347        metrics.record_query(StorageTier::Hot, 100, true);
348        metrics.record_query(StorageTier::Hot, 150, true);
349
350        let efficiency = metrics.calculate_efficiency();
351        assert!(efficiency.overall_hit_rate > 0.0);
352        assert!(efficiency.avg_query_latency_us > 0);
353    }
354}