1use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::collections::BinaryHeap;
10use std::cmp::Ordering;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14
15#[derive(Debug, Clone)]
17pub struct CompactionConfig {
18 pub level0_trigger: usize,
20 pub level_multiplier: usize,
22 pub level1_max_size: u64,
24 pub max_levels: u32,
26 pub target_file_size: u64,
28}
29
30impl Default for CompactionConfig {
31 fn default() -> Self {
32 Self {
33 level0_trigger: 4,
34 level_multiplier: 10,
35 level1_max_size: 10 * 1024 * 1024, max_levels: 7,
37 target_file_size: 2 * 1024 * 1024, }
39 }
40}
41
42#[derive(Debug, Clone, Default)]
44pub struct CompactionStats {
45 pub bytes_read: u64,
47 pub bytes_written: u64,
49 pub compaction_count: u64,
51 pub entries_removed: u64,
53}
54
55#[derive(Debug)]
57struct MergeEntry {
58 key: Vec<u8>,
59 entry: SSTableEntry,
60 source_idx: usize,
61}
62
63impl PartialEq for MergeEntry {
64 fn eq(&self, other: &Self) -> bool {
65 self.key == other.key && self.source_idx == other.source_idx
66 }
67}
68
69impl Eq for MergeEntry {}
70
71impl PartialOrd for MergeEntry {
72 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for MergeEntry {
78 fn cmp(&self, other: &Self) -> Ordering {
79 match other.key.cmp(&self.key) {
82 Ordering::Equal => self.source_idx.cmp(&other.source_idx),
83 ord => ord,
84 }
85 }
86}
87
88pub struct CompactionWorker {
90 dir: PathBuf,
92 config: CompactionConfig,
94 stats: CompactionStats,
96 file_counter: AtomicU64,
98 stop_flag: Arc<AtomicBool>,
100}
101
102impl CompactionWorker {
103 pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
105 Self {
106 dir: dir.as_ref().to_path_buf(),
107 config,
108 stats: CompactionStats::default(),
109 file_counter: AtomicU64::new(0),
110 stop_flag: Arc::new(AtomicBool::new(false)),
111 }
112 }
113
114 pub fn stop_flag(&self) -> Arc<AtomicBool> {
116 Arc::clone(&self.stop_flag)
117 }
118
119 pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
121 let level0_count = manifest.sstables_at_level(0).len();
122 level0_count >= self.config.level0_trigger
123 }
124
125 pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
127 let level0_count = manifest.sstables_at_level(0).len();
129 if level0_count >= self.config.level0_trigger {
130 return Some(0);
131 }
132
133 for level in 1..self.config.max_levels {
135 let level_size: u64 = manifest.sstables_at_level(level)
136 .iter()
137 .map(|s| s.file_size)
138 .sum();
139
140 let max_size = self.max_size_for_level(level);
141 if level_size > max_size {
142 return Some(level);
143 }
144 }
145
146 None
147 }
148
149 fn max_size_for_level(&self, level: u32) -> u64 {
151 if level == 0 {
152 return u64::MAX; }
154
155 let mut size = self.config.level1_max_size;
156 for _ in 1..level {
157 size *= self.config.level_multiplier as u64;
158 }
159 size
160 }
161
162 fn next_sstable_path(&self, level: u32) -> PathBuf {
164 let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
165 let timestamp = std::time::SystemTime::now()
166 .duration_since(std::time::UNIX_EPOCH)
167 .unwrap_or_default()
168 .as_millis();
169
170 self.dir.join("sst").join(format!(
171 "L{}_{}_{}.sst",
172 level, timestamp, counter
173 ))
174 }
175
176 pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
178 let level0_sstables = manifest.sstables_at_level(0);
179 if level0_sstables.is_empty() {
180 return Ok(());
181 }
182
183 let input_paths: Vec<PathBuf> = level0_sstables
185 .iter()
186 .map(|s| PathBuf::from(&s.path))
187 .collect();
188
189 let level1_sstables = manifest.sstables_at_level(1);
191
192 let mut all_inputs: Vec<PathBuf> = input_paths.clone();
194
195 let min_key: Vec<u8> = level0_sstables.iter()
197 .map(|s| s.min_key.clone())
198 .min()
199 .unwrap_or_default();
200 let max_key: Vec<u8> = level0_sstables.iter()
201 .map(|s| s.max_key.clone())
202 .max()
203 .unwrap_or_default();
204
205 for sst in level1_sstables {
207 if sst.max_key >= min_key && sst.min_key <= max_key {
208 all_inputs.push(PathBuf::from(&sst.path));
209 }
210 }
211
212 let outputs = self.merge_sstables(&all_inputs, 1)?;
214
215 manifest.record_compaction(0, all_inputs.clone(), outputs)?;
217
218 for path in all_inputs {
220 let _ = delete_sstable(&path);
221 }
222
223 self.stats.compaction_count += 1;
224
225 Ok(())
226 }
227
228 fn merge_sstables(&mut self, inputs: &[PathBuf], target_level: u32) -> Result<Vec<SSTableMeta>> {
230 if inputs.is_empty() {
231 return Ok(Vec::new());
232 }
233
234 let sst_dir = self.dir.join("sst");
236 std::fs::create_dir_all(&sst_dir)?;
237
238 let mut readers: Vec<SSTableReader> = Vec::new();
240 for path in inputs {
241 if path.exists() {
242 match SSTableReader::open(path) {
243 Ok(reader) => {
244 self.stats.bytes_read += reader.metadata().file_size;
245 readers.push(reader);
246 }
247 Err(_) => continue, }
249 }
250 }
251
252 if readers.is_empty() {
253 return Ok(Vec::new());
254 }
255
256 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
258 let mut iterators: Vec<_> = readers.iter_mut()
259 .map(|r| r.iter())
260 .collect::<Result<Vec<_>>>()?;
261
262 for (idx, iter) in iterators.iter_mut().enumerate() {
264 if let Some(entry) = iter.next_entry()? {
265 heap.push(MergeEntry {
266 key: entry.key.clone(),
267 entry,
268 source_idx: idx,
269 });
270 }
271 }
272
273 let mut outputs: Vec<SSTableMeta> = Vec::new();
275 let mut current_writer: Option<SSTableWriter> = None;
276 let mut current_size: u64 = 0;
277 let mut last_key: Option<Vec<u8>> = None;
278
279 while let Some(merge_entry) = heap.pop() {
280 if last_key.as_ref() == Some(&merge_entry.key) {
282 self.stats.entries_removed += 1;
283 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
285 heap.push(MergeEntry {
286 key: next.key.clone(),
287 entry: next,
288 source_idx: merge_entry.source_idx,
289 });
290 }
291 continue;
292 }
293
294 if current_writer.is_none() || current_size >= self.config.target_file_size {
296 if let Some(writer) = current_writer.take() {
298 let meta = writer.finish()?;
299 self.stats.bytes_written += meta.file_size;
300 outputs.push(meta);
301 }
302
303 let path = self.next_sstable_path(target_level);
305 current_writer = Some(SSTableWriter::new(&path)?);
306 current_size = 0;
307 }
308
309 if let Some(ref mut writer) = current_writer {
311 let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
312 writer.add(merge_entry.entry.clone())?;
313 current_size += entry_size as u64;
314 }
315
316 last_key = Some(merge_entry.key);
317
318 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
320 heap.push(MergeEntry {
321 key: next.key.clone(),
322 entry: next,
323 source_idx: merge_entry.source_idx,
324 });
325 }
326 }
327
328 if let Some(writer) = current_writer {
330 let meta = writer.finish()?;
331 self.stats.bytes_written += meta.file_size;
332 outputs.push(meta);
333 }
334
335 let outputs: Vec<SSTableMeta> = outputs.into_iter()
337 .map(|mut m| {
338 m.level = target_level;
339 m
340 })
341 .collect();
342
343 Ok(outputs)
344 }
345
346 pub fn stats(&self) -> &CompactionStats {
348 &self.stats
349 }
350
351 pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
353 if self.stop_flag.load(AtomicOrdering::Relaxed) {
354 return Ok(false);
355 }
356
357 if let Some(level) = self.pick_compaction_level(manifest) {
358 if level == 0 {
359 self.compact_level0(manifest)?;
360 return Ok(true);
361 }
362 }
364
365 Ok(false)
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use super::*;
372 use crate::sstable::SSTableWriter;
373 use tempfile::tempdir;
374
375 #[test]
376 fn test_compaction_config_default() {
377 let config = CompactionConfig::default();
378 assert_eq!(config.level0_trigger, 4);
379 assert_eq!(config.max_levels, 7);
380 }
381
382 #[test]
383 fn test_merge_entry_ordering() {
384 let e1 = MergeEntry {
385 key: b"a".to_vec(),
386 entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
387 source_idx: 0,
388 };
389 let e2 = MergeEntry {
390 key: b"b".to_vec(),
391 entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
392 source_idx: 0,
393 };
394
395 assert!(e1 > e2);
398 }
399
400 #[test]
401 fn test_needs_compaction() {
402 let dir = tempdir().unwrap();
403 let config = CompactionConfig {
404 level0_trigger: 2,
405 ..Default::default()
406 };
407 let worker = CompactionWorker::new(dir.path(), config);
408 let mut manifest = Manifest::open(dir.path()).unwrap();
409
410 assert!(!worker.needs_compaction(&manifest));
411
412 for i in 0..2 {
414 let meta = SSTableMeta {
415 path: PathBuf::from(format!("test{}.sst", i)),
416 min_key: vec![],
417 max_key: vec![],
418 entry_count: 0,
419 file_size: 0,
420 level: 0,
421 sequence: 0,
422 };
423 manifest.add_sstable(&meta).unwrap();
424 }
425
426 assert!(worker.needs_compaction(&manifest));
427 }
428
429 #[test]
430 fn test_merge_sstables() {
431 let dir = tempdir().unwrap();
432 let sst_dir = dir.path().join("sst");
433 std::fs::create_dir_all(&sst_dir).unwrap();
434
435 let path1 = sst_dir.join("test1.sst");
437 let mut writer1 = SSTableWriter::new(&path1).unwrap();
438 writer1.add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec())).unwrap();
439 writer1.add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec())).unwrap();
440 writer1.finish().unwrap();
441
442 let path2 = sst_dir.join("test2.sst");
443 let mut writer2 = SSTableWriter::new(&path2).unwrap();
444 writer2.add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec())).unwrap();
445 writer2.add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec())).unwrap(); writer2.finish().unwrap();
447
448 let config = CompactionConfig::default();
450 let mut worker = CompactionWorker::new(dir.path(), config);
451 let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
452
453 assert!(!outputs.is_empty());
454
455 let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
457 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
458 assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
459 assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
461 }
462}