siftdb_core/
compaction.rs

1use crate::tombstone::{TombstoneManager, TombstoneStats};
2use crate::types::Manifest;
3use anyhow::{Context, Result};
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8use serde::{Deserialize, Serialize};
9
10/// Compaction statistics
11#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CompactionStats {
13    pub started_at: u64,
14    pub completed_at: u64,
15    pub duration_secs: u64,
16    pub tombstones_removed: usize,
17    pub segments_compacted: usize,
18    pub space_reclaimed_bytes: u64,
19    pub before_epoch: u64,
20    pub after_epoch: u64,
21}
22
23/// Compaction configuration
24#[derive(Debug, Clone)]
25pub struct CompactionConfig {
26    /// Only compact tombstones older than this many epochs
27    pub min_tombstone_age_epochs: u64,
28    /// Minimum number of tombstones to trigger compaction
29    pub min_tombstone_count: usize,
30    /// Maximum time to spend on compaction (seconds)
31    pub max_duration_secs: u64,
32}
33
34impl Default for CompactionConfig {
35    fn default() -> Self {
36        Self {
37            min_tombstone_age_epochs: 5,    // Only compact tombstones 5+ epochs old
38            min_tombstone_count: 100,       // Need at least 100 tombstones
39            max_duration_secs: 300,         // 5 minutes max
40        }
41    }
42}
43
44/// Collection compactor for cleaning up tombstones and optimizing storage
45pub struct CollectionCompactor {
46    collection_path: PathBuf,
47    config: CompactionConfig,
48}
49
50impl CollectionCompactor {
51    pub fn new(collection_path: &Path) -> Self {
52        Self {
53            collection_path: collection_path.to_path_buf(),
54            config: CompactionConfig::default(),
55        }
56    }
57
58    pub fn with_config(mut self, config: CompactionConfig) -> Self {
59        self.config = config;
60        self
61    }
62
63    /// Check if compaction is needed based on current state
64    pub fn needs_compaction(&self) -> Result<bool> {
65        let tombstone_manager = TombstoneManager::new(&self.collection_path);
66        let stats = tombstone_manager.get_stats()?;
67        
68        // Load current manifest to get current epoch
69        let manifest_path = self.collection_path.join("MANIFEST.a");
70        let manifest = Manifest::read_from_file(&manifest_path)?;
71        
72        // Check if we have enough old tombstones to warrant compaction
73        let cutoff_epoch = manifest.epoch.saturating_sub(self.config.min_tombstone_age_epochs);
74        let old_tombstones = tombstone_manager.get_compaction_candidates(cutoff_epoch)?;
75        
76        Ok(stats.total_count >= self.config.min_tombstone_count && 
77           old_tombstones.len() >= self.config.min_tombstone_count)
78    }
79
80    /// Perform collection compaction
81    pub fn compact(&self) -> Result<CompactionStats> {
82        let started_at = SystemTime::now()
83            .duration_since(UNIX_EPOCH)
84            .unwrap()
85            .as_secs();
86
87        println!("๐Ÿงน Starting collection compaction...");
88
89        // Load current state
90        let manifest_path = self.collection_path.join("MANIFEST.a");
91        let manifest = Manifest::read_from_file(&manifest_path)?;
92        let before_epoch = manifest.epoch;
93
94        let tombstone_manager = TombstoneManager::new(&self.collection_path);
95        let before_stats = tombstone_manager.get_stats()?;
96
97        println!("   Current epoch: {}", before_epoch);
98        println!("   Tombstones before: {}", before_stats.total_count);
99
100        // Determine compaction cutoff epoch
101        let cutoff_epoch = before_epoch.saturating_sub(self.config.min_tombstone_age_epochs);
102        
103        // Get tombstones to compact
104        let candidates = tombstone_manager.get_compaction_candidates(cutoff_epoch)?;
105        
106        if candidates.is_empty() {
107            println!("   No tombstones eligible for compaction");
108            let completed_at = SystemTime::now()
109                .duration_since(UNIX_EPOCH)
110                .unwrap()
111                .as_secs();
112
113            return Ok(CompactionStats {
114                started_at,
115                completed_at,
116                duration_secs: completed_at - started_at,
117                tombstones_removed: 0,
118                segments_compacted: 0,
119                space_reclaimed_bytes: 0,
120                before_epoch,
121                after_epoch: before_epoch,
122            });
123        }
124
125        println!("   Compacting {} tombstones older than epoch {}", candidates.len(), cutoff_epoch);
126
127        // Group tombstones by segment for efficient processing
128        let mut segments_to_compact: HashMap<u32, Vec<_>> = HashMap::new();
129        for tombstone in &candidates {
130            segments_to_compact
131                .entry(tombstone.segment_id)
132                .or_insert_with(Vec::new)
133                .push(tombstone);
134        }
135
136        println!("   Segments affected: {}", segments_to_compact.len());
137
138        // For now, just remove the tombstone records (full segment rewriting would be more complex)
139        let tombstones_removed = tombstone_manager.compact_tombstones(cutoff_epoch)?;
140        
141        // Calculate space reclaimed (estimate based on average tombstone overhead)
142        let space_reclaimed_bytes = (tombstones_removed * 256) as u64; // Rough estimate
143
144        // Update manifest with new epoch to mark compaction
145        let new_epoch = before_epoch + 1;
146        let mut new_manifest = manifest;
147        new_manifest.epoch = new_epoch;
148        new_manifest.write_to_file(&manifest_path)?;
149
150        let completed_at = SystemTime::now()
151            .duration_since(UNIX_EPOCH)
152            .unwrap()
153            .as_secs();
154
155        let stats = CompactionStats {
156            started_at,
157            completed_at,
158            duration_secs: completed_at - started_at,
159            tombstones_removed,
160            segments_compacted: segments_to_compact.len(),
161            space_reclaimed_bytes,
162            before_epoch,
163            after_epoch: new_epoch,
164        };
165
166        println!("โœ… Compaction completed:");
167        println!("   Duration: {}s", stats.duration_secs);
168        println!("   Tombstones removed: {}", stats.tombstones_removed);
169        println!("   Segments compacted: {}", stats.segments_compacted);
170        println!("   Space reclaimed: ~{} bytes", stats.space_reclaimed_bytes);
171        println!("   New epoch: {}", stats.after_epoch);
172
173        // Save compaction stats
174        self.save_compaction_stats(&stats)?;
175
176        Ok(stats)
177    }
178
179    /// Get compaction history
180    pub fn get_compaction_history(&self) -> Result<Vec<CompactionStats>> {
181        let stats_dir = self.collection_path.join("gc");
182        let mut history = Vec::new();
183
184        if !stats_dir.exists() {
185            return Ok(history);
186        }
187
188        for entry in fs::read_dir(&stats_dir)? {
189            let entry = entry?;
190            let path = entry.path();
191            
192            if let Some(filename) = path.file_name() {
193                if let Some(filename_str) = filename.to_str() {
194                    if filename_str.starts_with("compaction-") && filename_str.ends_with(".json") {
195                        if let Ok(content) = fs::read_to_string(&path) {
196                            if let Ok(stats) = serde_json::from_str::<CompactionStats>(&content) {
197                                history.push(stats);
198                            }
199                        }
200                    }
201                }
202            }
203        }
204
205        // Sort by completion time
206        history.sort_by_key(|s| s.completed_at);
207        Ok(history)
208    }
209
210    /// Auto-compaction check - returns true if compaction was performed
211    pub fn auto_compact_if_needed(&self) -> Result<bool> {
212        if self.needs_compaction()? {
213            println!("๐Ÿ”„ Auto-compaction triggered");
214            self.compact()?;
215            Ok(true)
216        } else {
217            Ok(false)
218        }
219    }
220
221    fn save_compaction_stats(&self, stats: &CompactionStats) -> Result<()> {
222        let stats_dir = self.collection_path.join("gc");
223        fs::create_dir_all(&stats_dir)?;
224        
225        let stats_file = stats_dir.join(format!("compaction-{:010}.json", stats.completed_at));
226        let json = serde_json::to_string_pretty(stats)
227            .context("Failed to serialize compaction stats")?;
228        
229        fs::write(&stats_file, json)
230            .context("Failed to write compaction stats")?;
231        
232        Ok(())
233    }
234}
235
236/// Compaction manager for scheduling and coordinating compaction operations
237pub struct CompactionManager {
238    collection_path: PathBuf,
239}
240
241impl CompactionManager {
242    pub fn new(collection_path: &Path) -> Self {
243        Self {
244            collection_path: collection_path.to_path_buf(),
245        }
246    }
247
248    /// Run compaction with custom configuration
249    pub fn compact_with_config(&self, config: CompactionConfig) -> Result<CompactionStats> {
250        let compactor = CollectionCompactor::new(&self.collection_path)
251            .with_config(config);
252        compactor.compact()
253    }
254
255    /// Quick compaction status check
256    pub fn status(&self) -> Result<CompactionStatus> {
257        let tombstone_manager = TombstoneManager::new(&self.collection_path);
258        let tombstone_stats = tombstone_manager.get_stats()?;
259        
260        let compactor = CollectionCompactor::new(&self.collection_path);
261        let needs_compaction = compactor.needs_compaction()?;
262        let history = compactor.get_compaction_history()?;
263        
264        let last_compaction = history.last().map(|s| s.completed_at);
265        
266        Ok(CompactionStatus {
267            needs_compaction,
268            total_tombstones: tombstone_stats.total_count,
269            oldest_tombstone_epoch: tombstone_stats.oldest_epoch,
270            newest_tombstone_epoch: tombstone_stats.newest_epoch,
271            last_compaction_at: last_compaction,
272            compaction_count: history.len(),
273        })
274    }
275}
276
277#[derive(Debug, Serialize, Deserialize)]
278pub struct CompactionStatus {
279    pub needs_compaction: bool,
280    pub total_tombstones: usize,
281    pub oldest_tombstone_epoch: u64,
282    pub newest_tombstone_epoch: u64,
283    pub last_compaction_at: Option<u64>,
284    pub compaction_count: usize,
285}