siftdb_core/
compaction.rs1use crate::tombstone::{TombstoneManager, TombstoneStats};
2use crate::types::Manifest;
3use anyhow::{Context, Result};
4use std::collections::HashMap;
5use std::fs;
6use std::path::{Path, PathBuf};
7use std::time::{SystemTime, UNIX_EPOCH};
8use serde::{Deserialize, Serialize};
9
10#[derive(Debug, Clone, Serialize, Deserialize)]
12pub struct CompactionStats {
13 pub started_at: u64,
14 pub completed_at: u64,
15 pub duration_secs: u64,
16 pub tombstones_removed: usize,
17 pub segments_compacted: usize,
18 pub space_reclaimed_bytes: u64,
19 pub before_epoch: u64,
20 pub after_epoch: u64,
21}
22
23#[derive(Debug, Clone)]
25pub struct CompactionConfig {
26 pub min_tombstone_age_epochs: u64,
28 pub min_tombstone_count: usize,
30 pub max_duration_secs: u64,
32}
33
34impl Default for CompactionConfig {
35 fn default() -> Self {
36 Self {
37 min_tombstone_age_epochs: 5, min_tombstone_count: 100, max_duration_secs: 300, }
41 }
42}
43
44pub struct CollectionCompactor {
46 collection_path: PathBuf,
47 config: CompactionConfig,
48}
49
50impl CollectionCompactor {
51 pub fn new(collection_path: &Path) -> Self {
52 Self {
53 collection_path: collection_path.to_path_buf(),
54 config: CompactionConfig::default(),
55 }
56 }
57
58 pub fn with_config(mut self, config: CompactionConfig) -> Self {
59 self.config = config;
60 self
61 }
62
63 pub fn needs_compaction(&self) -> Result<bool> {
65 let tombstone_manager = TombstoneManager::new(&self.collection_path);
66 let stats = tombstone_manager.get_stats()?;
67
68 let manifest_path = self.collection_path.join("MANIFEST.a");
70 let manifest = Manifest::read_from_file(&manifest_path)?;
71
72 let cutoff_epoch = manifest.epoch.saturating_sub(self.config.min_tombstone_age_epochs);
74 let old_tombstones = tombstone_manager.get_compaction_candidates(cutoff_epoch)?;
75
76 Ok(stats.total_count >= self.config.min_tombstone_count &&
77 old_tombstones.len() >= self.config.min_tombstone_count)
78 }
79
80 pub fn compact(&self) -> Result<CompactionStats> {
82 let started_at = SystemTime::now()
83 .duration_since(UNIX_EPOCH)
84 .unwrap()
85 .as_secs();
86
87 println!("๐งน Starting collection compaction...");
88
89 let manifest_path = self.collection_path.join("MANIFEST.a");
91 let manifest = Manifest::read_from_file(&manifest_path)?;
92 let before_epoch = manifest.epoch;
93
94 let tombstone_manager = TombstoneManager::new(&self.collection_path);
95 let before_stats = tombstone_manager.get_stats()?;
96
97 println!(" Current epoch: {}", before_epoch);
98 println!(" Tombstones before: {}", before_stats.total_count);
99
100 let cutoff_epoch = before_epoch.saturating_sub(self.config.min_tombstone_age_epochs);
102
103 let candidates = tombstone_manager.get_compaction_candidates(cutoff_epoch)?;
105
106 if candidates.is_empty() {
107 println!(" No tombstones eligible for compaction");
108 let completed_at = SystemTime::now()
109 .duration_since(UNIX_EPOCH)
110 .unwrap()
111 .as_secs();
112
113 return Ok(CompactionStats {
114 started_at,
115 completed_at,
116 duration_secs: completed_at - started_at,
117 tombstones_removed: 0,
118 segments_compacted: 0,
119 space_reclaimed_bytes: 0,
120 before_epoch,
121 after_epoch: before_epoch,
122 });
123 }
124
125 println!(" Compacting {} tombstones older than epoch {}", candidates.len(), cutoff_epoch);
126
127 let mut segments_to_compact: HashMap<u32, Vec<_>> = HashMap::new();
129 for tombstone in &candidates {
130 segments_to_compact
131 .entry(tombstone.segment_id)
132 .or_insert_with(Vec::new)
133 .push(tombstone);
134 }
135
136 println!(" Segments affected: {}", segments_to_compact.len());
137
138 let tombstones_removed = tombstone_manager.compact_tombstones(cutoff_epoch)?;
140
141 let space_reclaimed_bytes = (tombstones_removed * 256) as u64; let new_epoch = before_epoch + 1;
146 let mut new_manifest = manifest;
147 new_manifest.epoch = new_epoch;
148 new_manifest.write_to_file(&manifest_path)?;
149
150 let completed_at = SystemTime::now()
151 .duration_since(UNIX_EPOCH)
152 .unwrap()
153 .as_secs();
154
155 let stats = CompactionStats {
156 started_at,
157 completed_at,
158 duration_secs: completed_at - started_at,
159 tombstones_removed,
160 segments_compacted: segments_to_compact.len(),
161 space_reclaimed_bytes,
162 before_epoch,
163 after_epoch: new_epoch,
164 };
165
166 println!("โ
Compaction completed:");
167 println!(" Duration: {}s", stats.duration_secs);
168 println!(" Tombstones removed: {}", stats.tombstones_removed);
169 println!(" Segments compacted: {}", stats.segments_compacted);
170 println!(" Space reclaimed: ~{} bytes", stats.space_reclaimed_bytes);
171 println!(" New epoch: {}", stats.after_epoch);
172
173 self.save_compaction_stats(&stats)?;
175
176 Ok(stats)
177 }
178
179 pub fn get_compaction_history(&self) -> Result<Vec<CompactionStats>> {
181 let stats_dir = self.collection_path.join("gc");
182 let mut history = Vec::new();
183
184 if !stats_dir.exists() {
185 return Ok(history);
186 }
187
188 for entry in fs::read_dir(&stats_dir)? {
189 let entry = entry?;
190 let path = entry.path();
191
192 if let Some(filename) = path.file_name() {
193 if let Some(filename_str) = filename.to_str() {
194 if filename_str.starts_with("compaction-") && filename_str.ends_with(".json") {
195 if let Ok(content) = fs::read_to_string(&path) {
196 if let Ok(stats) = serde_json::from_str::<CompactionStats>(&content) {
197 history.push(stats);
198 }
199 }
200 }
201 }
202 }
203 }
204
205 history.sort_by_key(|s| s.completed_at);
207 Ok(history)
208 }
209
210 pub fn auto_compact_if_needed(&self) -> Result<bool> {
212 if self.needs_compaction()? {
213 println!("๐ Auto-compaction triggered");
214 self.compact()?;
215 Ok(true)
216 } else {
217 Ok(false)
218 }
219 }
220
221 fn save_compaction_stats(&self, stats: &CompactionStats) -> Result<()> {
222 let stats_dir = self.collection_path.join("gc");
223 fs::create_dir_all(&stats_dir)?;
224
225 let stats_file = stats_dir.join(format!("compaction-{:010}.json", stats.completed_at));
226 let json = serde_json::to_string_pretty(stats)
227 .context("Failed to serialize compaction stats")?;
228
229 fs::write(&stats_file, json)
230 .context("Failed to write compaction stats")?;
231
232 Ok(())
233 }
234}
235
236pub struct CompactionManager {
238 collection_path: PathBuf,
239}
240
241impl CompactionManager {
242 pub fn new(collection_path: &Path) -> Self {
243 Self {
244 collection_path: collection_path.to_path_buf(),
245 }
246 }
247
248 pub fn compact_with_config(&self, config: CompactionConfig) -> Result<CompactionStats> {
250 let compactor = CollectionCompactor::new(&self.collection_path)
251 .with_config(config);
252 compactor.compact()
253 }
254
255 pub fn status(&self) -> Result<CompactionStatus> {
257 let tombstone_manager = TombstoneManager::new(&self.collection_path);
258 let tombstone_stats = tombstone_manager.get_stats()?;
259
260 let compactor = CollectionCompactor::new(&self.collection_path);
261 let needs_compaction = compactor.needs_compaction()?;
262 let history = compactor.get_compaction_history()?;
263
264 let last_compaction = history.last().map(|s| s.completed_at);
265
266 Ok(CompactionStatus {
267 needs_compaction,
268 total_tombstones: tombstone_stats.total_count,
269 oldest_tombstone_epoch: tombstone_stats.oldest_epoch,
270 newest_tombstone_epoch: tombstone_stats.newest_epoch,
271 last_compaction_at: last_compaction,
272 compaction_count: history.len(),
273 })
274 }
275}
276
277#[derive(Debug, Serialize, Deserialize)]
278pub struct CompactionStatus {
279 pub needs_compaction: bool,
280 pub total_tombstones: usize,
281 pub oldest_tombstone_epoch: u64,
282 pub newest_tombstone_epoch: u64,
283 pub last_compaction_at: Option<u64>,
284 pub compaction_count: usize,
285}