oxirs_vec/compaction/
manager.rs

1//! Main compaction manager
2
3use super::config::CompactionConfig;
4use super::metrics::CompactionMetrics;
5use super::strategies::StrategyEvaluator;
6use super::types::{
7    CompactionBatch, CompactionCandidate, CompactionPhase, CompactionProgress, CompactionReason,
8    CompactionResult, CompactionState, CompactionStatistics, FragmentInfo,
9};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16/// Main compaction manager
17pub struct CompactionManager {
18    /// Configuration
19    config: CompactionConfig,
20    /// Metrics collector
21    metrics: Arc<CompactionMetrics>,
22    /// Strategy evaluator
23    strategy: Arc<RwLock<StrategyEvaluator>>,
24    /// Fragment map (vector_id -> fragment info)
25    fragments: Arc<RwLock<HashMap<String, FragmentInfo>>>,
26    /// Current progress
27    progress: Arc<RwLock<Option<CompactionProgress>>>,
28    /// Compaction enabled flag
29    enabled: Arc<RwLock<bool>>,
30}
31
32impl CompactionManager {
33    /// Create a new compaction manager
34    pub fn new(config: CompactionConfig) -> Result<Self> {
35        config.validate()?;
36
37        let strategy = StrategyEvaluator::new(config.strategy);
38
39        Ok(Self {
40            config,
41            metrics: Arc::new(CompactionMetrics::default()),
42            strategy: Arc::new(RwLock::new(strategy)),
43            fragments: Arc::new(RwLock::new(HashMap::new())),
44            progress: Arc::new(RwLock::new(None)),
45            enabled: Arc::new(RwLock::new(true)),
46        })
47    }
48
49    /// Register a vector fragment
50    pub fn register_fragment(&self, vector_id: String, offset: usize, size: usize) {
51        let mut fragments = self.fragments.write();
52        fragments.insert(
53            vector_id,
54            FragmentInfo {
55                offset,
56                size,
57                is_free: false,
58                age: Duration::ZERO,
59            },
60        );
61    }
62
63    /// Mark a vector as deleted (creates free fragment)
64    pub fn mark_deleted(&self, vector_id: &str) -> Result<()> {
65        let mut fragments = self.fragments.write();
66        if let Some(fragment) = fragments.get_mut(vector_id) {
67            fragment.is_free = true;
68            Ok(())
69        } else {
70            anyhow::bail!("Vector {} not found", vector_id)
71        }
72    }
73
74    /// Calculate current fragmentation ratio
75    pub fn calculate_fragmentation(&self) -> f64 {
76        let fragments = self.fragments.read();
77
78        if fragments.is_empty() {
79            return 0.0;
80        }
81
82        let total_size: usize = fragments.values().map(|f| f.size).sum();
83        let free_size: usize = fragments
84            .values()
85            .filter(|f| f.is_free)
86            .map(|f| f.size)
87            .sum();
88
89        if total_size == 0 {
90            0.0
91        } else {
92            free_size as f64 / total_size as f64
93        }
94    }
95
96    /// Check if compaction should be triggered
97    pub fn should_compact(&self) -> bool {
98        if !*self.enabled.read() {
99            return false;
100        }
101
102        let fragmentation = self.calculate_fragmentation();
103        let wasted_bytes = self.calculate_wasted_bytes();
104        let time_since_last = self.strategy.read().time_since_last_compaction();
105
106        let strategy = self.strategy.read();
107        strategy.should_compact(
108            fragmentation,
109            wasted_bytes,
110            time_since_last,
111            self.config.compaction_interval,
112            self.config.fragmentation_threshold,
113            self.config.min_free_space_bytes,
114        )
115    }
116
117    /// Calculate wasted bytes (from deleted vectors)
118    fn calculate_wasted_bytes(&self) -> u64 {
119        let fragments = self.fragments.read();
120        fragments
121            .values()
122            .filter(|f| f.is_free)
123            .map(|f| f.size as u64)
124            .sum()
125    }
126
127    /// Trigger manual compaction
128    pub fn compact_now(&self) -> Result<CompactionResult> {
129        if !*self.enabled.read() {
130            anyhow::bail!("Compaction is disabled");
131        }
132
133        self.perform_compaction()
134    }
135
136    /// Perform compaction
137    fn perform_compaction(&self) -> Result<CompactionResult> {
138        let start_time = SystemTime::now();
139
140        // Update state
141        self.metrics.update_state(CompactionState::Running);
142
143        let fragmentation_before = self.calculate_fragmentation();
144
145        // Phase 1: Analyze and identify candidates
146        self.update_progress(CompactionPhase::IdentifyingCandidates, 0.0);
147        let candidates = self.identify_candidates()?;
148
149        if candidates.is_empty() {
150            // Nothing to compact
151            return Ok(CompactionResult {
152                start_time,
153                end_time: SystemTime::now(),
154                duration: start_time.elapsed().unwrap_or(Duration::ZERO),
155                vectors_processed: 0,
156                vectors_removed: 0,
157                bytes_reclaimed: 0,
158                fragmentation_before,
159                fragmentation_after: fragmentation_before,
160                success: true,
161                error: None,
162            });
163        }
164
165        // Phase 2: Create batches
166        let batches = self.create_batches(candidates);
167
168        // Phase 3: Process batches
169        let mut vectors_processed = 0;
170        let mut vectors_removed = 0;
171        let mut bytes_reclaimed = 0u64;
172
173        for (i, batch) in batches.iter().enumerate() {
174            self.update_progress(
175                CompactionPhase::MovingVectors,
176                i as f64 / batches.len() as f64,
177            );
178
179            let result = self.process_batch(batch)?;
180            vectors_processed += result.0;
181            vectors_removed += result.1;
182            bytes_reclaimed += result.2;
183
184            // Pause between batches
185            std::thread::sleep(self.config.pause_between_batches);
186        }
187
188        // Phase 4: Reclaim space
189        self.update_progress(CompactionPhase::ReclaimingSpace, 0.9);
190        self.reclaim_space();
191
192        // Phase 5: Verify (if enabled)
193        if self.config.enable_verification {
194            self.update_progress(CompactionPhase::Verifying, 0.95);
195            self.verify_integrity()?;
196        }
197
198        let end_time = SystemTime::now();
199        let duration = end_time
200            .duration_since(start_time)
201            .unwrap_or(Duration::ZERO);
202        let fragmentation_after = self.calculate_fragmentation();
203
204        // Update state
205        self.metrics.update_state(CompactionState::Completed);
206        self.update_progress(CompactionPhase::Completed, 1.0);
207
208        // Record compaction
209        self.strategy.write().record_compaction();
210
211        let result = CompactionResult {
212            start_time,
213            end_time,
214            duration,
215            vectors_processed,
216            vectors_removed,
217            bytes_reclaimed,
218            fragmentation_before,
219            fragmentation_after,
220            success: true,
221            error: None,
222        };
223
224        self.metrics.record_compaction(result.clone());
225
226        Ok(result)
227    }
228
229    /// Identify compaction candidates
230    fn identify_candidates(&self) -> Result<Vec<CompactionCandidate>> {
231        let fragments = self.fragments.read();
232        let mut candidates = Vec::new();
233
234        for (vector_id, fragment) in fragments.iter() {
235            if fragment.is_free {
236                candidates.push(CompactionCandidate {
237                    vector_id: vector_id.clone(),
238                    current_offset: fragment.offset,
239                    size_bytes: fragment.size,
240                    priority: 1.0, // Highest priority for deleted vectors
241                    reason: CompactionReason::DeletedCleanup,
242                });
243            }
244        }
245
246        Ok(candidates)
247    }
248
249    /// Create batches from candidates
250    fn create_batches(&self, mut candidates: Vec<CompactionCandidate>) -> Vec<CompactionBatch> {
251        // Sort by priority (highest first)
252        candidates.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
253
254        let mut batches = Vec::new();
255        let mut current_batch = Vec::new();
256        let mut current_size = 0;
257
258        for candidate in candidates {
259            current_batch.push(candidate.clone());
260            current_size += candidate.size_bytes;
261
262            if current_batch.len() >= self.config.batch_size {
263                batches.push(CompactionBatch {
264                    batch_id: batches.len() as u64,
265                    candidates: current_batch.clone(),
266                    total_size_bytes: current_size,
267                    estimated_duration: Duration::from_millis(100), // Estimate
268                });
269                current_batch.clear();
270                current_size = 0;
271            }
272        }
273
274        // Add remaining candidates
275        if !current_batch.is_empty() {
276            batches.push(CompactionBatch {
277                batch_id: batches.len() as u64,
278                candidates: current_batch,
279                total_size_bytes: current_size,
280                estimated_duration: Duration::from_millis(100),
281            });
282        }
283
284        batches
285    }
286
287    /// Process a single batch
288    fn process_batch(&self, batch: &CompactionBatch) -> Result<(usize, usize, u64)> {
289        let mut vectors_processed = 0;
290        let mut vectors_removed = 0;
291        let mut bytes_reclaimed = 0u64;
292
293        let mut fragments = self.fragments.write();
294
295        for candidate in &batch.candidates {
296            if let Some(_fragment) = fragments.get(&candidate.vector_id) {
297                // Remove the fragment
298                fragments.remove(&candidate.vector_id);
299                vectors_processed += 1;
300                vectors_removed += 1;
301                bytes_reclaimed += candidate.size_bytes as u64;
302            }
303        }
304
305        Ok((vectors_processed, vectors_removed, bytes_reclaimed))
306    }
307
308    /// Reclaim space (cleanup internal structures)
309    fn reclaim_space(&self) {
310        // In a real implementation, this would:
311        // 1. Compact the underlying storage
312        // 2. Update offsets
313        // 3. Rebuild indices
314        // For now, we just update fragmentation
315        let fragmentation = self.calculate_fragmentation();
316        self.metrics.update_fragmentation(fragmentation);
317    }
318
319    /// Verify integrity after compaction
320    fn verify_integrity(&self) -> Result<()> {
321        // In a real implementation, this would:
322        // 1. Verify all vector IDs are accessible
323        // 2. Check index consistency
324        // 3. Validate checksums
325        Ok(())
326    }
327
328    /// Update progress
329    fn update_progress(&self, phase: CompactionPhase, progress: f64) {
330        let mut prog = self.progress.write();
331        *prog = Some(CompactionProgress {
332            phase,
333            phase_progress: progress,
334            overall_progress: self.calculate_overall_progress(phase, progress),
335            estimated_time_remaining: None,
336            throughput: 0.0,
337        });
338    }
339
340    /// Calculate overall progress
341    fn calculate_overall_progress(&self, phase: CompactionPhase, phase_progress: f64) -> f64 {
342        if matches!(phase, CompactionPhase::Completed) {
343            return 1.0;
344        }
345
346        let phase_weight = match phase {
347            CompactionPhase::Analyzing => 0.05,
348            CompactionPhase::IdentifyingCandidates => 0.1,
349            CompactionPhase::MovingVectors => 0.6,
350            CompactionPhase::UpdatingIndices => 0.1,
351            CompactionPhase::ReclaimingSpace => 0.1,
352            CompactionPhase::Verifying => 0.05,
353            CompactionPhase::Completed => 0.0,
354        };
355
356        let base_progress = match phase {
357            CompactionPhase::Analyzing => 0.0,
358            CompactionPhase::IdentifyingCandidates => 0.05,
359            CompactionPhase::MovingVectors => 0.15,
360            CompactionPhase::UpdatingIndices => 0.75,
361            CompactionPhase::ReclaimingSpace => 0.85,
362            CompactionPhase::Verifying => 0.95,
363            CompactionPhase::Completed => 1.0,
364        };
365
366        let progress = base_progress + (phase_progress * phase_weight);
367        progress.min(1.0) // Clamp to max 1.0
368    }
369
370    /// Get current progress
371    pub fn get_progress(&self) -> Option<CompactionProgress> {
372        self.progress.read().clone()
373    }
374
375    /// Get statistics
376    pub fn get_statistics(&self) -> CompactionStatistics {
377        self.metrics.get_statistics()
378    }
379
380    /// Enable/disable compaction
381    pub fn set_enabled(&self, enabled: bool) {
382        *self.enabled.write() = enabled;
383    }
384
385    /// Check if compaction is enabled
386    pub fn is_enabled(&self) -> bool {
387        *self.enabled.read()
388    }
389
390    /// Get metrics
391    pub fn get_metrics(&self) -> Arc<CompactionMetrics> {
392        self.metrics.clone()
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[test]
401    fn test_compaction_manager_creation() {
402        let config = CompactionConfig::default();
403        let manager = CompactionManager::new(config).unwrap();
404        assert!(manager.is_enabled());
405    }
406
407    #[test]
408    fn test_fragment_registration() {
409        let config = CompactionConfig::default();
410        let manager = CompactionManager::new(config).unwrap();
411
412        manager.register_fragment("vec1".to_string(), 0, 1024);
413        manager.register_fragment("vec2".to_string(), 1024, 1024);
414
415        assert_eq!(manager.calculate_fragmentation(), 0.0);
416    }
417
418    #[test]
419    fn test_fragmentation_calculation() {
420        let config = CompactionConfig::default();
421        let manager = CompactionManager::new(config).unwrap();
422
423        manager.register_fragment("vec1".to_string(), 0, 1024);
424        manager.register_fragment("vec2".to_string(), 1024, 1024);
425        manager.register_fragment("vec3".to_string(), 2048, 1024);
426
427        // Mark one as deleted
428        manager.mark_deleted("vec2").unwrap();
429
430        // Fragmentation should be ~33% (1024 / 3072)
431        let frag = manager.calculate_fragmentation();
432        assert!((frag - 0.333).abs() < 0.01);
433    }
434
435    #[test]
436    fn test_should_compact_threshold() {
437        let config = CompactionConfig {
438            strategy: super::super::strategies::CompactionStrategy::ThresholdBased,
439            fragmentation_threshold: 0.3,
440            ..Default::default()
441        };
442        let manager = CompactionManager::new(config).unwrap();
443
444        manager.register_fragment("vec1".to_string(), 0, 1024);
445        manager.register_fragment("vec2".to_string(), 1024, 1024);
446
447        assert!(!manager.should_compact());
448
449        manager.mark_deleted("vec1").unwrap();
450        manager.mark_deleted("vec2").unwrap();
451
452        // Should compact with high fragmentation
453        assert!(manager.should_compact());
454    }
455
456    #[test]
457    fn test_compact_empty() {
458        let config = CompactionConfig::default();
459        let manager = CompactionManager::new(config).unwrap();
460
461        let result = manager.compact_now().unwrap();
462        assert!(result.success);
463        assert_eq!(result.vectors_removed, 0);
464    }
465
466    #[test]
467    fn test_compact_with_deletions() {
468        let config = CompactionConfig::default();
469        let manager = CompactionManager::new(config).unwrap();
470
471        manager.register_fragment("vec1".to_string(), 0, 1024);
472        manager.register_fragment("vec2".to_string(), 1024, 1024);
473        manager.register_fragment("vec3".to_string(), 2048, 1024);
474
475        manager.mark_deleted("vec1").unwrap();
476        manager.mark_deleted("vec3").unwrap();
477
478        let result = manager.compact_now().unwrap();
479        assert!(result.success);
480        assert_eq!(result.vectors_removed, 2);
481        assert_eq!(result.bytes_reclaimed, 2048);
482    }
483}