1use super::config::CompactionConfig;
4use super::metrics::CompactionMetrics;
5use super::strategies::StrategyEvaluator;
6use super::types::{
7 CompactionBatch, CompactionCandidate, CompactionPhase, CompactionProgress, CompactionReason,
8 CompactionResult, CompactionState, CompactionStatistics, FragmentInfo,
9};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16pub struct CompactionManager {
18 config: CompactionConfig,
20 metrics: Arc<CompactionMetrics>,
22 strategy: Arc<RwLock<StrategyEvaluator>>,
24 fragments: Arc<RwLock<HashMap<String, FragmentInfo>>>,
26 progress: Arc<RwLock<Option<CompactionProgress>>>,
28 enabled: Arc<RwLock<bool>>,
30}
31
32impl CompactionManager {
33 pub fn new(config: CompactionConfig) -> Result<Self> {
35 config.validate()?;
36
37 let strategy = StrategyEvaluator::new(config.strategy);
38
39 Ok(Self {
40 config,
41 metrics: Arc::new(CompactionMetrics::default()),
42 strategy: Arc::new(RwLock::new(strategy)),
43 fragments: Arc::new(RwLock::new(HashMap::new())),
44 progress: Arc::new(RwLock::new(None)),
45 enabled: Arc::new(RwLock::new(true)),
46 })
47 }
48
49 pub fn register_fragment(&self, vector_id: String, offset: usize, size: usize) {
51 let mut fragments = self.fragments.write();
52 fragments.insert(
53 vector_id,
54 FragmentInfo {
55 offset,
56 size,
57 is_free: false,
58 age: Duration::ZERO,
59 },
60 );
61 }
62
63 pub fn mark_deleted(&self, vector_id: &str) -> Result<()> {
65 let mut fragments = self.fragments.write();
66 if let Some(fragment) = fragments.get_mut(vector_id) {
67 fragment.is_free = true;
68 Ok(())
69 } else {
70 anyhow::bail!("Vector {} not found", vector_id)
71 }
72 }
73
74 pub fn calculate_fragmentation(&self) -> f64 {
76 let fragments = self.fragments.read();
77
78 if fragments.is_empty() {
79 return 0.0;
80 }
81
82 let total_size: usize = fragments.values().map(|f| f.size).sum();
83 let free_size: usize = fragments
84 .values()
85 .filter(|f| f.is_free)
86 .map(|f| f.size)
87 .sum();
88
89 if total_size == 0 {
90 0.0
91 } else {
92 free_size as f64 / total_size as f64
93 }
94 }
95
96 pub fn should_compact(&self) -> bool {
98 if !*self.enabled.read() {
99 return false;
100 }
101
102 let fragmentation = self.calculate_fragmentation();
103 let wasted_bytes = self.calculate_wasted_bytes();
104 let time_since_last = self.strategy.read().time_since_last_compaction();
105
106 let strategy = self.strategy.read();
107 strategy.should_compact(
108 fragmentation,
109 wasted_bytes,
110 time_since_last,
111 self.config.compaction_interval,
112 self.config.fragmentation_threshold,
113 self.config.min_free_space_bytes,
114 )
115 }
116
117 fn calculate_wasted_bytes(&self) -> u64 {
119 let fragments = self.fragments.read();
120 fragments
121 .values()
122 .filter(|f| f.is_free)
123 .map(|f| f.size as u64)
124 .sum()
125 }
126
127 pub fn compact_now(&self) -> Result<CompactionResult> {
129 if !*self.enabled.read() {
130 anyhow::bail!("Compaction is disabled");
131 }
132
133 self.perform_compaction()
134 }
135
136 fn perform_compaction(&self) -> Result<CompactionResult> {
138 let start_time = SystemTime::now();
139
140 self.metrics.update_state(CompactionState::Running);
142
143 let fragmentation_before = self.calculate_fragmentation();
144
145 self.update_progress(CompactionPhase::IdentifyingCandidates, 0.0);
147 let candidates = self.identify_candidates()?;
148
149 if candidates.is_empty() {
150 return Ok(CompactionResult {
152 start_time,
153 end_time: SystemTime::now(),
154 duration: start_time.elapsed().unwrap_or(Duration::ZERO),
155 vectors_processed: 0,
156 vectors_removed: 0,
157 bytes_reclaimed: 0,
158 fragmentation_before,
159 fragmentation_after: fragmentation_before,
160 success: true,
161 error: None,
162 });
163 }
164
165 let batches = self.create_batches(candidates);
167
168 let mut vectors_processed = 0;
170 let mut vectors_removed = 0;
171 let mut bytes_reclaimed = 0u64;
172
173 for (i, batch) in batches.iter().enumerate() {
174 self.update_progress(
175 CompactionPhase::MovingVectors,
176 i as f64 / batches.len() as f64,
177 );
178
179 let result = self.process_batch(batch)?;
180 vectors_processed += result.0;
181 vectors_removed += result.1;
182 bytes_reclaimed += result.2;
183
184 std::thread::sleep(self.config.pause_between_batches);
186 }
187
188 self.update_progress(CompactionPhase::ReclaimingSpace, 0.9);
190 self.reclaim_space();
191
192 if self.config.enable_verification {
194 self.update_progress(CompactionPhase::Verifying, 0.95);
195 self.verify_integrity()?;
196 }
197
198 let end_time = SystemTime::now();
199 let duration = end_time
200 .duration_since(start_time)
201 .unwrap_or(Duration::ZERO);
202 let fragmentation_after = self.calculate_fragmentation();
203
204 self.metrics.update_state(CompactionState::Completed);
206 self.update_progress(CompactionPhase::Completed, 1.0);
207
208 self.strategy.write().record_compaction();
210
211 let result = CompactionResult {
212 start_time,
213 end_time,
214 duration,
215 vectors_processed,
216 vectors_removed,
217 bytes_reclaimed,
218 fragmentation_before,
219 fragmentation_after,
220 success: true,
221 error: None,
222 };
223
224 self.metrics.record_compaction(result.clone());
225
226 Ok(result)
227 }
228
229 fn identify_candidates(&self) -> Result<Vec<CompactionCandidate>> {
231 let fragments = self.fragments.read();
232 let mut candidates = Vec::new();
233
234 for (vector_id, fragment) in fragments.iter() {
235 if fragment.is_free {
236 candidates.push(CompactionCandidate {
237 vector_id: vector_id.clone(),
238 current_offset: fragment.offset,
239 size_bytes: fragment.size,
240 priority: 1.0, reason: CompactionReason::DeletedCleanup,
242 });
243 }
244 }
245
246 Ok(candidates)
247 }
248
249 fn create_batches(&self, mut candidates: Vec<CompactionCandidate>) -> Vec<CompactionBatch> {
251 candidates.sort_by(|a, b| b.priority.partial_cmp(&a.priority).unwrap());
253
254 let mut batches = Vec::new();
255 let mut current_batch = Vec::new();
256 let mut current_size = 0;
257
258 for candidate in candidates {
259 current_batch.push(candidate.clone());
260 current_size += candidate.size_bytes;
261
262 if current_batch.len() >= self.config.batch_size {
263 batches.push(CompactionBatch {
264 batch_id: batches.len() as u64,
265 candidates: current_batch.clone(),
266 total_size_bytes: current_size,
267 estimated_duration: Duration::from_millis(100), });
269 current_batch.clear();
270 current_size = 0;
271 }
272 }
273
274 if !current_batch.is_empty() {
276 batches.push(CompactionBatch {
277 batch_id: batches.len() as u64,
278 candidates: current_batch,
279 total_size_bytes: current_size,
280 estimated_duration: Duration::from_millis(100),
281 });
282 }
283
284 batches
285 }
286
287 fn process_batch(&self, batch: &CompactionBatch) -> Result<(usize, usize, u64)> {
289 let mut vectors_processed = 0;
290 let mut vectors_removed = 0;
291 let mut bytes_reclaimed = 0u64;
292
293 let mut fragments = self.fragments.write();
294
295 for candidate in &batch.candidates {
296 if let Some(_fragment) = fragments.get(&candidate.vector_id) {
297 fragments.remove(&candidate.vector_id);
299 vectors_processed += 1;
300 vectors_removed += 1;
301 bytes_reclaimed += candidate.size_bytes as u64;
302 }
303 }
304
305 Ok((vectors_processed, vectors_removed, bytes_reclaimed))
306 }
307
308 fn reclaim_space(&self) {
310 let fragmentation = self.calculate_fragmentation();
316 self.metrics.update_fragmentation(fragmentation);
317 }
318
319 fn verify_integrity(&self) -> Result<()> {
321 Ok(())
326 }
327
328 fn update_progress(&self, phase: CompactionPhase, progress: f64) {
330 let mut prog = self.progress.write();
331 *prog = Some(CompactionProgress {
332 phase,
333 phase_progress: progress,
334 overall_progress: self.calculate_overall_progress(phase, progress),
335 estimated_time_remaining: None,
336 throughput: 0.0,
337 });
338 }
339
340 fn calculate_overall_progress(&self, phase: CompactionPhase, phase_progress: f64) -> f64 {
342 if matches!(phase, CompactionPhase::Completed) {
343 return 1.0;
344 }
345
346 let phase_weight = match phase {
347 CompactionPhase::Analyzing => 0.05,
348 CompactionPhase::IdentifyingCandidates => 0.1,
349 CompactionPhase::MovingVectors => 0.6,
350 CompactionPhase::UpdatingIndices => 0.1,
351 CompactionPhase::ReclaimingSpace => 0.1,
352 CompactionPhase::Verifying => 0.05,
353 CompactionPhase::Completed => 0.0,
354 };
355
356 let base_progress = match phase {
357 CompactionPhase::Analyzing => 0.0,
358 CompactionPhase::IdentifyingCandidates => 0.05,
359 CompactionPhase::MovingVectors => 0.15,
360 CompactionPhase::UpdatingIndices => 0.75,
361 CompactionPhase::ReclaimingSpace => 0.85,
362 CompactionPhase::Verifying => 0.95,
363 CompactionPhase::Completed => 1.0,
364 };
365
366 let progress = base_progress + (phase_progress * phase_weight);
367 progress.min(1.0) }
369
370 pub fn get_progress(&self) -> Option<CompactionProgress> {
372 self.progress.read().clone()
373 }
374
375 pub fn get_statistics(&self) -> CompactionStatistics {
377 self.metrics.get_statistics()
378 }
379
380 pub fn set_enabled(&self, enabled: bool) {
382 *self.enabled.write() = enabled;
383 }
384
385 pub fn is_enabled(&self) -> bool {
387 *self.enabled.read()
388 }
389
390 pub fn get_metrics(&self) -> Arc<CompactionMetrics> {
392 self.metrics.clone()
393 }
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[test]
401 fn test_compaction_manager_creation() {
402 let config = CompactionConfig::default();
403 let manager = CompactionManager::new(config).unwrap();
404 assert!(manager.is_enabled());
405 }
406
407 #[test]
408 fn test_fragment_registration() {
409 let config = CompactionConfig::default();
410 let manager = CompactionManager::new(config).unwrap();
411
412 manager.register_fragment("vec1".to_string(), 0, 1024);
413 manager.register_fragment("vec2".to_string(), 1024, 1024);
414
415 assert_eq!(manager.calculate_fragmentation(), 0.0);
416 }
417
418 #[test]
419 fn test_fragmentation_calculation() {
420 let config = CompactionConfig::default();
421 let manager = CompactionManager::new(config).unwrap();
422
423 manager.register_fragment("vec1".to_string(), 0, 1024);
424 manager.register_fragment("vec2".to_string(), 1024, 1024);
425 manager.register_fragment("vec3".to_string(), 2048, 1024);
426
427 manager.mark_deleted("vec2").unwrap();
429
430 let frag = manager.calculate_fragmentation();
432 assert!((frag - 0.333).abs() < 0.01);
433 }
434
435 #[test]
436 fn test_should_compact_threshold() {
437 let config = CompactionConfig {
438 strategy: super::super::strategies::CompactionStrategy::ThresholdBased,
439 fragmentation_threshold: 0.3,
440 ..Default::default()
441 };
442 let manager = CompactionManager::new(config).unwrap();
443
444 manager.register_fragment("vec1".to_string(), 0, 1024);
445 manager.register_fragment("vec2".to_string(), 1024, 1024);
446
447 assert!(!manager.should_compact());
448
449 manager.mark_deleted("vec1").unwrap();
450 manager.mark_deleted("vec2").unwrap();
451
452 assert!(manager.should_compact());
454 }
455
456 #[test]
457 fn test_compact_empty() {
458 let config = CompactionConfig::default();
459 let manager = CompactionManager::new(config).unwrap();
460
461 let result = manager.compact_now().unwrap();
462 assert!(result.success);
463 assert_eq!(result.vectors_removed, 0);
464 }
465
466 #[test]
467 fn test_compact_with_deletions() {
468 let config = CompactionConfig::default();
469 let manager = CompactionManager::new(config).unwrap();
470
471 manager.register_fragment("vec1".to_string(), 0, 1024);
472 manager.register_fragment("vec2".to_string(), 1024, 1024);
473 manager.register_fragment("vec3".to_string(), 2048, 1024);
474
475 manager.mark_deleted("vec1").unwrap();
476 manager.mark_deleted("vec3").unwrap();
477
478 let result = manager.compact_now().unwrap();
479 assert!(result.success);
480 assert_eq!(result.vectors_removed, 2);
481 assert_eq!(result.bytes_reclaimed, 2048);
482 }
483}