oxirs_vec/compaction/
metrics.rs

1//! Metrics collection for compaction system
2
3use super::types::{CompactionResult, CompactionState, CompactionStatistics};
4use serde::{Deserialize, Serialize};
5use std::collections::VecDeque;
6use std::sync::{Arc, Mutex};
7use std::time::Duration;
8
9/// Compaction metrics collector
10#[derive(Debug, Clone)]
11pub struct CompactionMetrics {
12    /// Current state
13    state: Arc<Mutex<CompactionState>>,
14    /// Statistics
15    statistics: Arc<Mutex<CompactionStatistics>>,
16    /// Recent compaction history
17    history: Arc<Mutex<VecDeque<CompactionResult>>>,
18    /// Maximum history size
19    max_history_size: usize,
20}
21
22impl Default for CompactionMetrics {
23    fn default() -> Self {
24        Self::new(100)
25    }
26}
27
28impl CompactionMetrics {
29    /// Create a new metrics collector
30    pub fn new(max_history_size: usize) -> Self {
31        Self {
32            state: Arc::new(Mutex::new(CompactionState::Idle)),
33            statistics: Arc::new(Mutex::new(CompactionStatistics::default())),
34            history: Arc::new(Mutex::new(VecDeque::new())),
35            max_history_size,
36        }
37    }
38
39    /// Update state
40    pub fn update_state(&self, state: CompactionState) {
41        let mut s = self
42            .state
43            .lock()
44            .expect("mutex lock should not be poisoned");
45        *s = state;
46    }
47
48    /// Get current state
49    pub fn get_state(&self) -> CompactionState {
50        *self
51            .state
52            .lock()
53            .expect("mutex lock should not be poisoned")
54    }
55
56    /// Record compaction result
57    pub fn record_compaction(&self, result: CompactionResult) {
58        let mut stats = self
59            .statistics
60            .lock()
61            .expect("mutex lock should not be poisoned");
62        let mut history = self
63            .history
64            .lock()
65            .expect("mutex lock should not be poisoned");
66
67        // Update statistics
68        stats.total_compactions += 1;
69        if result.success {
70            stats.successful_compactions += 1;
71        } else {
72            stats.failed_compactions += 1;
73        }
74
75        stats.total_vectors_processed += result.vectors_processed;
76        stats.total_vectors_removed += result.vectors_removed;
77        stats.total_bytes_reclaimed += result.bytes_reclaimed;
78        stats.current_fragmentation = result.fragmentation_after;
79        stats.last_compaction_time = Some(result.end_time);
80        stats.last_compaction_result = Some(result.clone());
81
82        // Update average duration
83        if stats.total_compactions > 0 {
84            let total_duration = stats.avg_compaction_duration.as_secs_f64()
85                * (stats.total_compactions - 1) as f64
86                + result.duration.as_secs_f64();
87            stats.avg_compaction_duration =
88                Duration::from_secs_f64(total_duration / stats.total_compactions as f64);
89        } else {
90            stats.avg_compaction_duration = result.duration;
91        }
92
93        // Add to history
94        history.push_back(result);
95        while history.len() > self.max_history_size {
96            history.pop_front();
97        }
98    }
99
100    /// Update fragmentation
101    pub fn update_fragmentation(&self, fragmentation: f64) {
102        let mut stats = self
103            .statistics
104            .lock()
105            .expect("mutex lock should not be poisoned");
106        stats.current_fragmentation = fragmentation;
107    }
108
109    /// Get statistics
110    pub fn get_statistics(&self) -> CompactionStatistics {
111        self.statistics
112            .lock()
113            .expect("mutex lock should not be poisoned")
114            .clone()
115    }
116
117    /// Get compaction history
118    pub fn get_history(&self, limit: Option<usize>) -> Vec<CompactionResult> {
119        let history = self
120            .history
121            .lock()
122            .expect("mutex lock should not be poisoned");
123        if let Some(lim) = limit {
124            history.iter().rev().take(lim).cloned().collect()
125        } else {
126            history.iter().cloned().collect()
127        }
128    }
129
130    /// Calculate compaction efficiency
131    pub fn calculate_efficiency(&self) -> CompactionEfficiency {
132        let stats = self
133            .statistics
134            .lock()
135            .expect("mutex lock should not be poisoned");
136
137        let success_rate = if stats.total_compactions > 0 {
138            stats.successful_compactions as f64 / stats.total_compactions as f64
139        } else {
140            0.0
141        };
142
143        let avg_space_reclaimed = if stats.successful_compactions > 0 {
144            stats.total_bytes_reclaimed as f64 / stats.successful_compactions as f64
145        } else {
146            0.0
147        };
148
149        let avg_vectors_removed = if stats.successful_compactions > 0 {
150            stats.total_vectors_removed as f64 / stats.successful_compactions as f64
151        } else {
152            0.0
153        };
154
155        CompactionEfficiency {
156            success_rate,
157            avg_space_reclaimed_bytes: avg_space_reclaimed as u64,
158            avg_vectors_removed: avg_vectors_removed as usize,
159            avg_duration: stats.avg_compaction_duration,
160            current_fragmentation: stats.current_fragmentation,
161        }
162    }
163
164    /// Reset metrics
165    pub fn reset(&self) {
166        let mut stats = self
167            .statistics
168            .lock()
169            .expect("mutex lock should not be poisoned");
170        *stats = CompactionStatistics::default();
171
172        let mut history = self
173            .history
174            .lock()
175            .expect("mutex lock should not be poisoned");
176        history.clear();
177
178        let mut state = self
179            .state
180            .lock()
181            .expect("mutex lock should not be poisoned");
182        *state = CompactionState::Idle;
183    }
184}
185
186/// Compaction efficiency metrics
187#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct CompactionEfficiency {
189    /// Success rate (0.0 - 1.0)
190    pub success_rate: f64,
191    /// Average space reclaimed per compaction
192    pub avg_space_reclaimed_bytes: u64,
193    /// Average vectors removed per compaction
194    pub avg_vectors_removed: usize,
195    /// Average duration
196    pub avg_duration: Duration,
197    /// Current fragmentation
198    pub current_fragmentation: f64,
199}
200
201#[cfg(test)]
202mod tests {
203    use super::*;
204    use crate::compaction::types::CompactionResult;
205    use std::time::SystemTime;
206
207    fn create_test_result(success: bool, bytes_reclaimed: u64) -> CompactionResult {
208        CompactionResult {
209            start_time: SystemTime::now(),
210            end_time: SystemTime::now(),
211            duration: Duration::from_secs(10),
212            vectors_processed: 1000,
213            vectors_removed: 100,
214            bytes_reclaimed,
215            fragmentation_before: 0.4,
216            fragmentation_after: 0.1,
217            success,
218            error: None,
219        }
220    }
221
222    #[test]
223    fn test_metrics_recording() {
224        let metrics = CompactionMetrics::new(10);
225
226        let result = create_test_result(true, 1_000_000);
227        metrics.record_compaction(result);
228
229        let stats = metrics.get_statistics();
230        assert_eq!(stats.total_compactions, 1);
231        assert_eq!(stats.successful_compactions, 1);
232        assert_eq!(stats.total_bytes_reclaimed, 1_000_000);
233    }
234
235    #[test]
236    fn test_efficiency_calculation() {
237        let metrics = CompactionMetrics::new(10);
238
239        metrics.record_compaction(create_test_result(true, 1_000_000));
240        metrics.record_compaction(create_test_result(true, 2_000_000));
241        metrics.record_compaction(create_test_result(false, 0));
242
243        let efficiency = metrics.calculate_efficiency();
244        assert!((efficiency.success_rate - 0.666).abs() < 0.01);
245        assert_eq!(efficiency.avg_space_reclaimed_bytes, 1_500_000);
246    }
247
248    #[test]
249    fn test_history_limit() {
250        let metrics = CompactionMetrics::new(5);
251
252        for i in 0..10 {
253            metrics.record_compaction(create_test_result(true, i * 1000));
254        }
255
256        let history = metrics.get_history(None);
257        assert_eq!(history.len(), 5);
258    }
259
260    #[test]
261    fn test_state_updates() {
262        let metrics = CompactionMetrics::new(10);
263
264        assert_eq!(metrics.get_state(), CompactionState::Idle);
265
266        metrics.update_state(CompactionState::Running);
267        assert_eq!(metrics.get_state(), CompactionState::Running);
268
269        metrics.update_state(CompactionState::Completed);
270        assert_eq!(metrics.get_state(), CompactionState::Completed);
271    }
272}