1use super::config::CompactionConfig;
4use super::metrics::CompactionMetrics;
5use super::strategies::StrategyEvaluator;
6use super::types::{
7 CompactionBatch, CompactionCandidate, CompactionPhase, CompactionProgress, CompactionReason,
8 CompactionResult, CompactionState, CompactionStatistics, FragmentInfo,
9};
10use anyhow::Result;
11use parking_lot::RwLock;
12use std::collections::HashMap;
13use std::sync::Arc;
14use std::time::{Duration, SystemTime};
15
16pub struct CompactionManager {
18 config: CompactionConfig,
20 metrics: Arc<CompactionMetrics>,
22 strategy: Arc<RwLock<StrategyEvaluator>>,
24 fragments: Arc<RwLock<HashMap<String, FragmentInfo>>>,
26 progress: Arc<RwLock<Option<CompactionProgress>>>,
28 enabled: Arc<RwLock<bool>>,
30}
31
32impl CompactionManager {
33 pub fn new(config: CompactionConfig) -> Result<Self> {
35 config.validate()?;
36
37 let strategy = StrategyEvaluator::new(config.strategy);
38
39 Ok(Self {
40 config,
41 metrics: Arc::new(CompactionMetrics::default()),
42 strategy: Arc::new(RwLock::new(strategy)),
43 fragments: Arc::new(RwLock::new(HashMap::new())),
44 progress: Arc::new(RwLock::new(None)),
45 enabled: Arc::new(RwLock::new(true)),
46 })
47 }
48
49 pub fn register_fragment(&self, vector_id: String, offset: usize, size: usize) {
51 let mut fragments = self.fragments.write();
52 fragments.insert(
53 vector_id,
54 FragmentInfo {
55 offset,
56 size,
57 is_free: false,
58 age: Duration::ZERO,
59 },
60 );
61 }
62
63 pub fn mark_deleted(&self, vector_id: &str) -> Result<()> {
65 let mut fragments = self.fragments.write();
66 if let Some(fragment) = fragments.get_mut(vector_id) {
67 fragment.is_free = true;
68 Ok(())
69 } else {
70 anyhow::bail!("Vector {} not found", vector_id)
71 }
72 }
73
74 pub fn calculate_fragmentation(&self) -> f64 {
76 let fragments = self.fragments.read();
77
78 if fragments.is_empty() {
79 return 0.0;
80 }
81
82 let total_size: usize = fragments.values().map(|f| f.size).sum();
83 let free_size: usize = fragments
84 .values()
85 .filter(|f| f.is_free)
86 .map(|f| f.size)
87 .sum();
88
89 if total_size == 0 {
90 0.0
91 } else {
92 free_size as f64 / total_size as f64
93 }
94 }
95
96 pub fn should_compact(&self) -> bool {
98 if !*self.enabled.read() {
99 return false;
100 }
101
102 let fragmentation = self.calculate_fragmentation();
103 let wasted_bytes = self.calculate_wasted_bytes();
104 let time_since_last = self.strategy.read().time_since_last_compaction();
105
106 let strategy = self.strategy.read();
107 strategy.should_compact(
108 fragmentation,
109 wasted_bytes,
110 time_since_last,
111 self.config.compaction_interval,
112 self.config.fragmentation_threshold,
113 self.config.min_free_space_bytes,
114 )
115 }
116
117 fn calculate_wasted_bytes(&self) -> u64 {
119 let fragments = self.fragments.read();
120 fragments
121 .values()
122 .filter(|f| f.is_free)
123 .map(|f| f.size as u64)
124 .sum()
125 }
126
127 pub fn compact_now(&self) -> Result<CompactionResult> {
129 if !*self.enabled.read() {
130 anyhow::bail!("Compaction is disabled");
131 }
132
133 self.perform_compaction()
134 }
135
136 fn perform_compaction(&self) -> Result<CompactionResult> {
138 let start_time = SystemTime::now();
139
140 self.metrics.update_state(CompactionState::Running);
142
143 let fragmentation_before = self.calculate_fragmentation();
144
145 self.update_progress(CompactionPhase::IdentifyingCandidates, 0.0);
147 let candidates = self.identify_candidates()?;
148
149 if candidates.is_empty() {
150 return Ok(CompactionResult {
152 start_time,
153 end_time: SystemTime::now(),
154 duration: start_time.elapsed().unwrap_or(Duration::ZERO),
155 vectors_processed: 0,
156 vectors_removed: 0,
157 bytes_reclaimed: 0,
158 fragmentation_before,
159 fragmentation_after: fragmentation_before,
160 success: true,
161 error: None,
162 });
163 }
164
165 let batches = self.create_batches(candidates);
167
168 let mut vectors_processed = 0;
170 let mut vectors_removed = 0;
171 let mut bytes_reclaimed = 0u64;
172
173 for (i, batch) in batches.iter().enumerate() {
174 self.update_progress(
175 CompactionPhase::MovingVectors,
176 i as f64 / batches.len() as f64,
177 );
178
179 let result = self.process_batch(batch)?;
180 vectors_processed += result.0;
181 vectors_removed += result.1;
182 bytes_reclaimed += result.2;
183
184 std::thread::sleep(self.config.pause_between_batches);
186 }
187
188 self.update_progress(CompactionPhase::ReclaimingSpace, 0.9);
190 self.reclaim_space();
191
192 if self.config.enable_verification {
194 self.update_progress(CompactionPhase::Verifying, 0.95);
195 self.verify_integrity()?;
196 }
197
198 let end_time = SystemTime::now();
199 let duration = end_time
200 .duration_since(start_time)
201 .unwrap_or(Duration::ZERO);
202 let fragmentation_after = self.calculate_fragmentation();
203
204 self.metrics.update_state(CompactionState::Completed);
206 self.update_progress(CompactionPhase::Completed, 1.0);
207
208 self.strategy.write().record_compaction();
210
211 let result = CompactionResult {
212 start_time,
213 end_time,
214 duration,
215 vectors_processed,
216 vectors_removed,
217 bytes_reclaimed,
218 fragmentation_before,
219 fragmentation_after,
220 success: true,
221 error: None,
222 };
223
224 self.metrics.record_compaction(result.clone());
225
226 Ok(result)
227 }
228
229 fn identify_candidates(&self) -> Result<Vec<CompactionCandidate>> {
231 let fragments = self.fragments.read();
232 let mut candidates = Vec::new();
233
234 for (vector_id, fragment) in fragments.iter() {
235 if fragment.is_free {
236 candidates.push(CompactionCandidate {
237 vector_id: vector_id.clone(),
238 current_offset: fragment.offset,
239 size_bytes: fragment.size,
240 priority: 1.0, reason: CompactionReason::DeletedCleanup,
242 });
243 }
244 }
245
246 Ok(candidates)
247 }
248
249 fn create_batches(&self, mut candidates: Vec<CompactionCandidate>) -> Vec<CompactionBatch> {
251 candidates.sort_by(|a, b| {
253 b.priority
254 .partial_cmp(&a.priority)
255 .unwrap_or(std::cmp::Ordering::Equal)
256 });
257
258 let mut batches = Vec::new();
259 let mut current_batch = Vec::new();
260 let mut current_size = 0;
261
262 for candidate in candidates {
263 current_batch.push(candidate.clone());
264 current_size += candidate.size_bytes;
265
266 if current_batch.len() >= self.config.batch_size {
267 batches.push(CompactionBatch {
268 batch_id: batches.len() as u64,
269 candidates: current_batch.clone(),
270 total_size_bytes: current_size,
271 estimated_duration: Duration::from_millis(100), });
273 current_batch.clear();
274 current_size = 0;
275 }
276 }
277
278 if !current_batch.is_empty() {
280 batches.push(CompactionBatch {
281 batch_id: batches.len() as u64,
282 candidates: current_batch,
283 total_size_bytes: current_size,
284 estimated_duration: Duration::from_millis(100),
285 });
286 }
287
288 batches
289 }
290
291 fn process_batch(&self, batch: &CompactionBatch) -> Result<(usize, usize, u64)> {
293 let mut vectors_processed = 0;
294 let mut vectors_removed = 0;
295 let mut bytes_reclaimed = 0u64;
296
297 let mut fragments = self.fragments.write();
298
299 for candidate in &batch.candidates {
300 if let Some(_fragment) = fragments.get(&candidate.vector_id) {
301 fragments.remove(&candidate.vector_id);
303 vectors_processed += 1;
304 vectors_removed += 1;
305 bytes_reclaimed += candidate.size_bytes as u64;
306 }
307 }
308
309 Ok((vectors_processed, vectors_removed, bytes_reclaimed))
310 }
311
312 fn reclaim_space(&self) {
314 let fragmentation = self.calculate_fragmentation();
320 self.metrics.update_fragmentation(fragmentation);
321 }
322
323 fn verify_integrity(&self) -> Result<()> {
325 Ok(())
330 }
331
332 fn update_progress(&self, phase: CompactionPhase, progress: f64) {
334 let mut prog = self.progress.write();
335 *prog = Some(CompactionProgress {
336 phase,
337 phase_progress: progress,
338 overall_progress: self.calculate_overall_progress(phase, progress),
339 estimated_time_remaining: None,
340 throughput: 0.0,
341 });
342 }
343
344 fn calculate_overall_progress(&self, phase: CompactionPhase, phase_progress: f64) -> f64 {
346 if matches!(phase, CompactionPhase::Completed) {
347 return 1.0;
348 }
349
350 let phase_weight = match phase {
351 CompactionPhase::Analyzing => 0.05,
352 CompactionPhase::IdentifyingCandidates => 0.1,
353 CompactionPhase::MovingVectors => 0.6,
354 CompactionPhase::UpdatingIndices => 0.1,
355 CompactionPhase::ReclaimingSpace => 0.1,
356 CompactionPhase::Verifying => 0.05,
357 CompactionPhase::Completed => 0.0,
358 };
359
360 let base_progress = match phase {
361 CompactionPhase::Analyzing => 0.0,
362 CompactionPhase::IdentifyingCandidates => 0.05,
363 CompactionPhase::MovingVectors => 0.15,
364 CompactionPhase::UpdatingIndices => 0.75,
365 CompactionPhase::ReclaimingSpace => 0.85,
366 CompactionPhase::Verifying => 0.95,
367 CompactionPhase::Completed => 1.0,
368 };
369
370 let progress = base_progress + (phase_progress * phase_weight);
371 progress.min(1.0) }
373
374 pub fn get_progress(&self) -> Option<CompactionProgress> {
376 self.progress.read().clone()
377 }
378
379 pub fn get_statistics(&self) -> CompactionStatistics {
381 self.metrics.get_statistics()
382 }
383
384 pub fn set_enabled(&self, enabled: bool) {
386 *self.enabled.write() = enabled;
387 }
388
389 pub fn is_enabled(&self) -> bool {
391 *self.enabled.read()
392 }
393
394 pub fn get_metrics(&self) -> Arc<CompactionMetrics> {
396 self.metrics.clone()
397 }
398}
399
400#[cfg(test)]
401mod tests {
402 use super::*;
403
404 #[test]
405 fn test_compaction_manager_creation() {
406 let config = CompactionConfig::default();
407 let manager = CompactionManager::new(config).unwrap();
408 assert!(manager.is_enabled());
409 }
410
411 #[test]
412 fn test_fragment_registration() {
413 let config = CompactionConfig::default();
414 let manager = CompactionManager::new(config).unwrap();
415
416 manager.register_fragment("vec1".to_string(), 0, 1024);
417 manager.register_fragment("vec2".to_string(), 1024, 1024);
418
419 assert_eq!(manager.calculate_fragmentation(), 0.0);
420 }
421
422 #[test]
423 fn test_fragmentation_calculation() {
424 let config = CompactionConfig::default();
425 let manager = CompactionManager::new(config).unwrap();
426
427 manager.register_fragment("vec1".to_string(), 0, 1024);
428 manager.register_fragment("vec2".to_string(), 1024, 1024);
429 manager.register_fragment("vec3".to_string(), 2048, 1024);
430
431 manager.mark_deleted("vec2").unwrap();
433
434 let frag = manager.calculate_fragmentation();
436 assert!((frag - 0.333).abs() < 0.01);
437 }
438
439 #[test]
440 fn test_should_compact_threshold() {
441 let config = CompactionConfig {
442 strategy: super::super::strategies::CompactionStrategy::ThresholdBased,
443 fragmentation_threshold: 0.3,
444 ..Default::default()
445 };
446 let manager = CompactionManager::new(config).unwrap();
447
448 manager.register_fragment("vec1".to_string(), 0, 1024);
449 manager.register_fragment("vec2".to_string(), 1024, 1024);
450
451 assert!(!manager.should_compact());
452
453 manager.mark_deleted("vec1").unwrap();
454 manager.mark_deleted("vec2").unwrap();
455
456 assert!(manager.should_compact());
458 }
459
460 #[test]
461 fn test_compact_empty() {
462 let config = CompactionConfig::default();
463 let manager = CompactionManager::new(config).unwrap();
464
465 let result = manager.compact_now().unwrap();
466 assert!(result.success);
467 assert_eq!(result.vectors_removed, 0);
468 }
469
470 #[test]
471 fn test_compact_with_deletions() {
472 let config = CompactionConfig::default();
473 let manager = CompactionManager::new(config).unwrap();
474
475 manager.register_fragment("vec1".to_string(), 0, 1024);
476 manager.register_fragment("vec2".to_string(), 1024, 1024);
477 manager.register_fragment("vec3".to_string(), 2048, 1024);
478
479 manager.mark_deleted("vec1").unwrap();
480 manager.mark_deleted("vec3").unwrap();
481
482 let result = manager.compact_now().unwrap();
483 assert!(result.success);
484 assert_eq!(result.vectors_removed, 2);
485 assert_eq!(result.bytes_reclaimed, 2048);
486 }
487}