Skip to main content

oxirs_vec/compaction/
manager.rs

1//! Main compaction manager
2
3use super::config::CompactionConfig;
4use super::metrics::CompactionMetrics;
5use super::strategies::StrategyEvaluator;
6use super::types::{
7    CompactionBatch, CompactionCandidate, CompactionPhase, CompactionProgress, CompactionReason,
8    CompactionResult, CompactionState, CompactionStatistics, FragmentInfo,
9};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16/// Main compaction manager
17pub struct CompactionManager {
18    /// Configuration
19    config: CompactionConfig,
20    /// Metrics collector
21    metrics: Arc<CompactionMetrics>,
22    /// Strategy evaluator
23    strategy: Arc<RwLock<StrategyEvaluator>>,
24    /// Fragment map (vector_id -> fragment info)
25    fragments: Arc<RwLock<HashMap<String, FragmentInfo>>>,
26    /// Current progress
27    progress: Arc<RwLock<Option<CompactionProgress>>>,
28    /// Compaction enabled flag
29    enabled: Arc<RwLock<bool>>,
30}
31
32impl CompactionManager {
33    /// Create a new compaction manager
34    pub fn new(config: CompactionConfig) -> Result<Self> {
35        config.validate()?;
36
37        let strategy = StrategyEvaluator::new(config.strategy);
38
39        Ok(Self {
40            config,
41            metrics: Arc::new(CompactionMetrics::default()),
42            strategy: Arc::new(RwLock::new(strategy)),
43            fragments: Arc::new(RwLock::new(HashMap::new())),
44            progress: Arc::new(RwLock::new(None)),
45            enabled: Arc::new(RwLock::new(true)),
46        })
47    }
48
49    /// Register a vector fragment
50    pub fn register_fragment(&self, vector_id: String, offset: usize, size: usize) {
51        let mut fragments = self.fragments.write();
52        fragments.insert(
53            vector_id,
54            FragmentInfo {
55                offset,
56                size,
57                is_free: false,
58                age: Duration::ZERO,
59            },
60        );
61    }
62
63    /// Mark a vector as deleted (creates free fragment)
64    pub fn mark_deleted(&self, vector_id: &str) -> Result<()> {
65        let mut fragments = self.fragments.write();
66        if let Some(fragment) = fragments.get_mut(vector_id) {
67            fragment.is_free = true;
68            Ok(())
69        } else {
70            anyhow::bail!("Vector {} not found", vector_id)
71        }
72    }
73
74    /// Calculate current fragmentation ratio
75    pub fn calculate_fragmentation(&self) -> f64 {
76        let fragments = self.fragments.read();
77
78        if fragments.is_empty() {
79            return 0.0;
80        }
81
82        let total_size: usize = fragments.values().map(|f| f.size).sum();
83        let free_size: usize = fragments
84            .values()
85            .filter(|f| f.is_free)
86            .map(|f| f.size)
87            .sum();
88
89        if total_size == 0 {
90            0.0
91        } else {
92            free_size as f64 / total_size as f64
93        }
94    }
95
96    /// Check if compaction should be triggered
97    pub fn should_compact(&self) -> bool {
98        if !*self.enabled.read() {
99            return false;
100        }
101
102        let fragmentation = self.calculate_fragmentation();
103        let wasted_bytes = self.calculate_wasted_bytes();
104        let time_since_last = self.strategy.read().time_since_last_compaction();
105
106        let strategy = self.strategy.read();
107        strategy.should_compact(
108            fragmentation,
109            wasted_bytes,
110            time_since_last,
111            self.config.compaction_interval,
112            self.config.fragmentation_threshold,
113            self.config.min_free_space_bytes,
114        )
115    }
116
117    /// Calculate wasted bytes (from deleted vectors)
118    fn calculate_wasted_bytes(&self) -> u64 {
119        let fragments = self.fragments.read();
120        fragments
121            .values()
122            .filter(|f| f.is_free)
123            .map(|f| f.size as u64)
124            .sum()
125    }
126
127    /// Trigger manual compaction
128    pub fn compact_now(&self) -> Result<CompactionResult> {
129        if !*self.enabled.read() {
130            anyhow::bail!("Compaction is disabled");
131        }
132
133        self.perform_compaction()
134    }
135
136    /// Perform compaction
137    fn perform_compaction(&self) -> Result<CompactionResult> {
138        let start_time = SystemTime::now();
139
140        // Update state
141        self.metrics.update_state(CompactionState::Running);
142
143        let fragmentation_before = self.calculate_fragmentation();
144
145        // Phase 1: Analyze and identify candidates
146        self.update_progress(CompactionPhase::IdentifyingCandidates, 0.0);
147        let candidates = self.identify_candidates()?;
148
149        if candidates.is_empty() {
150            // Nothing to compact
151            return Ok(CompactionResult {
152                start_time,
153                end_time: SystemTime::now(),
154                duration: start_time.elapsed().unwrap_or(Duration::ZERO),
155                vectors_processed: 0,
156                vectors_removed: 0,
157                bytes_reclaimed: 0,
158                fragmentation_before,
159                fragmentation_after: fragmentation_before,
160                success: true,
161                error: None,
162            });
163        }
164
165        // Phase 2: Create batches
166        let batches = self.create_batches(candidates);
167
168        // Phase 3: Process batches
169        let mut vectors_processed = 0;
170        let mut vectors_removed = 0;
171        let mut bytes_reclaimed = 0u64;
172
173        for (i, batch) in batches.iter().enumerate() {
174            self.update_progress(
175                CompactionPhase::MovingVectors,
176                i as f64 / batches.len() as f64,
177            );
178
179            let result = self.process_batch(batch)?;
180            vectors_processed += result.0;
181            vectors_removed += result.1;
182            bytes_reclaimed += result.2;
183
184            // Pause between batches
185            std::thread::sleep(self.config.pause_between_batches);
186        }
187
188        // Phase 4: Reclaim space
189        self.update_progress(CompactionPhase::ReclaimingSpace, 0.9);
190        self.reclaim_space();
191
192        // Phase 5: Verify (if enabled)
193        if self.config.enable_verification {
194            self.update_progress(CompactionPhase::Verifying, 0.95);
195            self.verify_integrity()?;
196        }
197
198        let end_time = SystemTime::now();
199        let duration = end_time
200            .duration_since(start_time)
201            .unwrap_or(Duration::ZERO);
202        let fragmentation_after = self.calculate_fragmentation();
203
204        // Update state
205        self.metrics.update_state(CompactionState::Completed);
206        self.update_progress(CompactionPhase::Completed, 1.0);
207
208        // Record compaction
209        self.strategy.write().record_compaction();
210
211        let result = CompactionResult {
212            start_time,
213            end_time,
214            duration,
215            vectors_processed,
216            vectors_removed,
217            bytes_reclaimed,
218            fragmentation_before,
219            fragmentation_after,
220            success: true,
221            error: None,
222        };
223
224        self.metrics.record_compaction(result.clone());
225
226        Ok(result)
227    }
228
229    /// Identify compaction candidates
230    fn identify_candidates(&self) -> Result<Vec<CompactionCandidate>> {
231        let fragments = self.fragments.read();
232        let mut candidates = Vec::new();
233
234        for (vector_id, fragment) in fragments.iter() {
235            if fragment.is_free {
236                candidates.push(CompactionCandidate {
237                    vector_id: vector_id.clone(),
238                    current_offset: fragment.offset,
239                    size_bytes: fragment.size,
240                    priority: 1.0, // Highest priority for deleted vectors
241                    reason: CompactionReason::DeletedCleanup,
242                });
243            }
244        }
245
246        Ok(candidates)
247    }
248
249    /// Create batches from candidates
250    fn create_batches(&self, mut candidates: Vec<CompactionCandidate>) -> Vec<CompactionBatch> {
251        // Sort by priority (highest first)
252        candidates.sort_by(|a, b| {
253            b.priority
254                .partial_cmp(&a.priority)
255                .unwrap_or(std::cmp::Ordering::Equal)
256        });
257
258        let mut batches = Vec::new();
259        let mut current_batch = Vec::new();
260        let mut current_size = 0;
261
262        for candidate in candidates {
263            current_batch.push(candidate.clone());
264            current_size += candidate.size_bytes;
265
266            if current_batch.len() >= self.config.batch_size {
267                batches.push(CompactionBatch {
268                    batch_id: batches.len() as u64,
269                    candidates: current_batch.clone(),
270                    total_size_bytes: current_size,
271                    estimated_duration: Duration::from_millis(100), // Estimate
272                });
273                current_batch.clear();
274                current_size = 0;
275            }
276        }
277
278        // Add remaining candidates
279        if !current_batch.is_empty() {
280            batches.push(CompactionBatch {
281                batch_id: batches.len() as u64,
282                candidates: current_batch,
283                total_size_bytes: current_size,
284                estimated_duration: Duration::from_millis(100),
285            });
286        }
287
288        batches
289    }
290
291    /// Process a single batch
292    fn process_batch(&self, batch: &CompactionBatch) -> Result<(usize, usize, u64)> {
293        let mut vectors_processed = 0;
294        let mut vectors_removed = 0;
295        let mut bytes_reclaimed = 0u64;
296
297        let mut fragments = self.fragments.write();
298
299        for candidate in &batch.candidates {
300            if let Some(_fragment) = fragments.get(&candidate.vector_id) {
301                // Remove the fragment
302                fragments.remove(&candidate.vector_id);
303                vectors_processed += 1;
304                vectors_removed += 1;
305                bytes_reclaimed += candidate.size_bytes as u64;
306            }
307        }
308
309        Ok((vectors_processed, vectors_removed, bytes_reclaimed))
310    }
311
312    /// Reclaim space (cleanup internal structures)
313    fn reclaim_space(&self) {
314        // In a real implementation, this would:
315        // 1. Compact the underlying storage
316        // 2. Update offsets
317        // 3. Rebuild indices
318        // For now, we just update fragmentation
319        let fragmentation = self.calculate_fragmentation();
320        self.metrics.update_fragmentation(fragmentation);
321    }
322
323    /// Verify integrity after compaction
324    fn verify_integrity(&self) -> Result<()> {
325        // In a real implementation, this would:
326        // 1. Verify all vector IDs are accessible
327        // 2. Check index consistency
328        // 3. Validate checksums
329        Ok(())
330    }
331
332    /// Update progress
333    fn update_progress(&self, phase: CompactionPhase, progress: f64) {
334        let mut prog = self.progress.write();
335        *prog = Some(CompactionProgress {
336            phase,
337            phase_progress: progress,
338            overall_progress: self.calculate_overall_progress(phase, progress),
339            estimated_time_remaining: None,
340            throughput: 0.0,
341        });
342    }
343
344    /// Calculate overall progress
345    fn calculate_overall_progress(&self, phase: CompactionPhase, phase_progress: f64) -> f64 {
346        if matches!(phase, CompactionPhase::Completed) {
347            return 1.0;
348        }
349
350        let phase_weight = match phase {
351            CompactionPhase::Analyzing => 0.05,
352            CompactionPhase::IdentifyingCandidates => 0.1,
353            CompactionPhase::MovingVectors => 0.6,
354            CompactionPhase::UpdatingIndices => 0.1,
355            CompactionPhase::ReclaimingSpace => 0.1,
356            CompactionPhase::Verifying => 0.05,
357            CompactionPhase::Completed => 0.0,
358        };
359
360        let base_progress = match phase {
361            CompactionPhase::Analyzing => 0.0,
362            CompactionPhase::IdentifyingCandidates => 0.05,
363            CompactionPhase::MovingVectors => 0.15,
364            CompactionPhase::UpdatingIndices => 0.75,
365            CompactionPhase::ReclaimingSpace => 0.85,
366            CompactionPhase::Verifying => 0.95,
367            CompactionPhase::Completed => 1.0,
368        };
369
370        let progress = base_progress + (phase_progress * phase_weight);
371        progress.min(1.0) // Clamp to max 1.0
372    }
373
374    /// Get current progress
375    pub fn get_progress(&self) -> Option<CompactionProgress> {
376        self.progress.read().clone()
377    }
378
379    /// Get statistics
380    pub fn get_statistics(&self) -> CompactionStatistics {
381        self.metrics.get_statistics()
382    }
383
384    /// Enable/disable compaction
385    pub fn set_enabled(&self, enabled: bool) {
386        *self.enabled.write() = enabled;
387    }
388
389    /// Check if compaction is enabled
390    pub fn is_enabled(&self) -> bool {
391        *self.enabled.read()
392    }
393
394    /// Get metrics
395    pub fn get_metrics(&self) -> Arc<CompactionMetrics> {
396        self.metrics.clone()
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use super::*;
403
404    #[test]
405    fn test_compaction_manager_creation() {
406        let config = CompactionConfig::default();
407        let manager = CompactionManager::new(config).unwrap();
408        assert!(manager.is_enabled());
409    }
410
411    #[test]
412    fn test_fragment_registration() {
413        let config = CompactionConfig::default();
414        let manager = CompactionManager::new(config).unwrap();
415
416        manager.register_fragment("vec1".to_string(), 0, 1024);
417        manager.register_fragment("vec2".to_string(), 1024, 1024);
418
419        assert_eq!(manager.calculate_fragmentation(), 0.0);
420    }
421
422    #[test]
423    fn test_fragmentation_calculation() {
424        let config = CompactionConfig::default();
425        let manager = CompactionManager::new(config).unwrap();
426
427        manager.register_fragment("vec1".to_string(), 0, 1024);
428        manager.register_fragment("vec2".to_string(), 1024, 1024);
429        manager.register_fragment("vec3".to_string(), 2048, 1024);
430
431        // Mark one as deleted
432        manager.mark_deleted("vec2").unwrap();
433
434        // Fragmentation should be ~33% (1024 / 3072)
435        let frag = manager.calculate_fragmentation();
436        assert!((frag - 0.333).abs() < 0.01);
437    }
438
439    #[test]
440    fn test_should_compact_threshold() {
441        let config = CompactionConfig {
442            strategy: super::super::strategies::CompactionStrategy::ThresholdBased,
443            fragmentation_threshold: 0.3,
444            ..Default::default()
445        };
446        let manager = CompactionManager::new(config).unwrap();
447
448        manager.register_fragment("vec1".to_string(), 0, 1024);
449        manager.register_fragment("vec2".to_string(), 1024, 1024);
450
451        assert!(!manager.should_compact());
452
453        manager.mark_deleted("vec1").unwrap();
454        manager.mark_deleted("vec2").unwrap();
455
456        // Should compact with high fragmentation
457        assert!(manager.should_compact());
458    }
459
460    #[test]
461    fn test_compact_empty() {
462        let config = CompactionConfig::default();
463        let manager = CompactionManager::new(config).unwrap();
464
465        let result = manager.compact_now().unwrap();
466        assert!(result.success);
467        assert_eq!(result.vectors_removed, 0);
468    }
469
470    #[test]
471    fn test_compact_with_deletions() {
472        let config = CompactionConfig::default();
473        let manager = CompactionManager::new(config).unwrap();
474
475        manager.register_fragment("vec1".to_string(), 0, 1024);
476        manager.register_fragment("vec2".to_string(), 1024, 1024);
477        manager.register_fragment("vec3".to_string(), 2048, 1024);
478
479        manager.mark_deleted("vec1").unwrap();
480        manager.mark_deleted("vec3").unwrap();
481
482        let result = manager.compact_now().unwrap();
483        assert!(result.success);
484        assert_eq!(result.vectors_removed, 2);
485        assert_eq!(result.bytes_reclaimed, 2048);
486    }
487}