1use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14
15#[derive(Debug, Clone)]
17pub struct CompactionConfig {
18 pub level0_trigger: usize,
20 pub level_multiplier: usize,
22 pub level1_max_size: u64,
24 pub max_levels: u32,
26 pub target_file_size: u64,
28}
29
30impl Default for CompactionConfig {
31 fn default() -> Self {
32 Self {
33 level0_trigger: 4,
34 level_multiplier: 10,
35 level1_max_size: 10 * 1024 * 1024, max_levels: 7,
37 target_file_size: 2 * 1024 * 1024, }
39 }
40}
41
42#[derive(Debug, Clone, Default)]
44pub struct CompactionStats {
45 pub bytes_read: u64,
47 pub bytes_written: u64,
49 pub compaction_count: u64,
51 pub entries_removed: u64,
53}
54
55#[derive(Debug)]
57struct MergeEntry {
58 key: Vec<u8>,
59 entry: SSTableEntry,
60 source_idx: usize,
61}
62
63impl PartialEq for MergeEntry {
64 fn eq(&self, other: &Self) -> bool {
65 self.key == other.key && self.source_idx == other.source_idx
66 }
67}
68
69impl Eq for MergeEntry {}
70
71impl PartialOrd for MergeEntry {
72 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
73 Some(self.cmp(other))
74 }
75}
76
77impl Ord for MergeEntry {
78 fn cmp(&self, other: &Self) -> Ordering {
79 match other.key.cmp(&self.key) {
82 Ordering::Equal => self.source_idx.cmp(&other.source_idx),
83 ord => ord,
84 }
85 }
86}
87
88pub struct CompactionWorker {
90 dir: PathBuf,
92 config: CompactionConfig,
94 stats: CompactionStats,
96 file_counter: AtomicU64,
98 stop_flag: Arc<AtomicBool>,
100}
101
102impl CompactionWorker {
103 pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
105 Self {
106 dir: dir.as_ref().to_path_buf(),
107 config,
108 stats: CompactionStats::default(),
109 file_counter: AtomicU64::new(0),
110 stop_flag: Arc::new(AtomicBool::new(false)),
111 }
112 }
113
114 pub fn stop_flag(&self) -> Arc<AtomicBool> {
116 Arc::clone(&self.stop_flag)
117 }
118
119 pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
121 let level0_count = manifest.sstables_at_level(0).len();
122 level0_count >= self.config.level0_trigger
123 }
124
125 pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
127 let level0_count = manifest.sstables_at_level(0).len();
129 if level0_count >= self.config.level0_trigger {
130 return Some(0);
131 }
132
133 for level in 1..self.config.max_levels {
135 let level_size: u64 = manifest
136 .sstables_at_level(level)
137 .iter()
138 .map(|s| s.file_size)
139 .sum();
140
141 let max_size = self.max_size_for_level(level);
142 if level_size > max_size {
143 return Some(level);
144 }
145 }
146
147 None
148 }
149
150 fn max_size_for_level(&self, level: u32) -> u64 {
152 if level == 0 {
153 return u64::MAX; }
155
156 let mut size = self.config.level1_max_size;
157 for _ in 1..level {
158 size *= self.config.level_multiplier as u64;
159 }
160 size
161 }
162
163 fn next_sstable_path(&self, level: u32) -> PathBuf {
165 let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
166 let timestamp = std::time::SystemTime::now()
167 .duration_since(std::time::UNIX_EPOCH)
168 .unwrap_or_default()
169 .as_millis();
170
171 self.dir
172 .join("sst")
173 .join(format!("L{}_{}_{}.sst", level, timestamp, counter))
174 }
175
176 pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
178 let level0_sstables = manifest.sstables_at_level(0);
179 if level0_sstables.is_empty() {
180 return Ok(());
181 }
182
183 let input_paths: Vec<PathBuf> = level0_sstables
185 .iter()
186 .map(|s| PathBuf::from(&s.path))
187 .collect();
188
189 let level1_sstables = manifest.sstables_at_level(1);
191
192 let mut all_inputs: Vec<PathBuf> = input_paths.clone();
194
195 let min_key: Vec<u8> = level0_sstables
197 .iter()
198 .map(|s| s.min_key.clone())
199 .min()
200 .unwrap_or_default();
201 let max_key: Vec<u8> = level0_sstables
202 .iter()
203 .map(|s| s.max_key.clone())
204 .max()
205 .unwrap_or_default();
206
207 for sst in level1_sstables {
209 if sst.max_key >= min_key && sst.min_key <= max_key {
210 all_inputs.push(PathBuf::from(&sst.path));
211 }
212 }
213
214 let outputs = self.merge_sstables(&all_inputs, 1)?;
216
217 manifest.record_compaction(0, all_inputs.clone(), outputs)?;
219
220 for path in all_inputs {
222 let _ = delete_sstable(&path);
223 }
224
225 self.stats.compaction_count += 1;
226
227 Ok(())
228 }
229
230 fn merge_sstables(
232 &mut self,
233 inputs: &[PathBuf],
234 target_level: u32,
235 ) -> Result<Vec<SSTableMeta>> {
236 if inputs.is_empty() {
237 return Ok(Vec::new());
238 }
239
240 let sst_dir = self.dir.join("sst");
242 std::fs::create_dir_all(&sst_dir)?;
243
244 let mut readers: Vec<SSTableReader> = Vec::new();
246 for path in inputs {
247 if path.exists() {
248 match SSTableReader::open(path) {
249 Ok(reader) => {
250 self.stats.bytes_read += reader.metadata().file_size;
251 readers.push(reader);
252 }
253 Err(_) => continue, }
255 }
256 }
257
258 if readers.is_empty() {
259 return Ok(Vec::new());
260 }
261
262 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
264 let mut iterators: Vec<_> = readers
265 .iter_mut()
266 .map(|r| r.iter())
267 .collect::<Result<Vec<_>>>()?;
268
269 for (idx, iter) in iterators.iter_mut().enumerate() {
271 if let Some(entry) = iter.next_entry()? {
272 heap.push(MergeEntry {
273 key: entry.key.clone(),
274 entry,
275 source_idx: idx,
276 });
277 }
278 }
279
280 let mut outputs: Vec<SSTableMeta> = Vec::new();
282 let mut current_writer: Option<SSTableWriter> = None;
283 let mut current_size: u64 = 0;
284 let mut last_key: Option<Vec<u8>> = None;
285
286 while let Some(merge_entry) = heap.pop() {
287 if last_key.as_ref() == Some(&merge_entry.key) {
289 self.stats.entries_removed += 1;
290 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
292 heap.push(MergeEntry {
293 key: next.key.clone(),
294 entry: next,
295 source_idx: merge_entry.source_idx,
296 });
297 }
298 continue;
299 }
300
301 if current_writer.is_none() || current_size >= self.config.target_file_size {
303 if let Some(writer) = current_writer.take() {
305 let meta = writer.finish()?;
306 self.stats.bytes_written += meta.file_size;
307 outputs.push(meta);
308 }
309
310 let path = self.next_sstable_path(target_level);
312 current_writer = Some(SSTableWriter::new(&path)?);
313 current_size = 0;
314 }
315
316 if let Some(ref mut writer) = current_writer {
318 let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
319 writer.add(merge_entry.entry.clone())?;
320 current_size += entry_size as u64;
321 }
322
323 last_key = Some(merge_entry.key);
324
325 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
327 heap.push(MergeEntry {
328 key: next.key.clone(),
329 entry: next,
330 source_idx: merge_entry.source_idx,
331 });
332 }
333 }
334
335 if let Some(writer) = current_writer {
337 let meta = writer.finish()?;
338 self.stats.bytes_written += meta.file_size;
339 outputs.push(meta);
340 }
341
342 let outputs: Vec<SSTableMeta> = outputs
344 .into_iter()
345 .map(|mut m| {
346 m.level = target_level;
347 m
348 })
349 .collect();
350
351 Ok(outputs)
352 }
353
354 pub fn stats(&self) -> &CompactionStats {
356 &self.stats
357 }
358
359 pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
361 if self.stop_flag.load(AtomicOrdering::Relaxed) {
362 return Ok(false);
363 }
364
365 if let Some(level) = self.pick_compaction_level(manifest) {
366 if level == 0 {
367 self.compact_level0(manifest)?;
368 return Ok(true);
369 }
370 }
372
373 Ok(false)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use crate::sstable::SSTableWriter;
381 use tempfile::tempdir;
382
383 #[test]
384 fn test_compaction_config_default() {
385 let config = CompactionConfig::default();
386 assert_eq!(config.level0_trigger, 4);
387 assert_eq!(config.max_levels, 7);
388 }
389
390 #[test]
391 fn test_merge_entry_ordering() {
392 let e1 = MergeEntry {
393 key: b"a".to_vec(),
394 entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
395 source_idx: 0,
396 };
397 let e2 = MergeEntry {
398 key: b"b".to_vec(),
399 entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
400 source_idx: 0,
401 };
402
403 assert!(e1 > e2);
406 }
407
408 #[test]
409 fn test_needs_compaction() {
410 let dir = tempdir().unwrap();
411 let config = CompactionConfig {
412 level0_trigger: 2,
413 ..Default::default()
414 };
415 let worker = CompactionWorker::new(dir.path(), config);
416 let mut manifest = Manifest::open(dir.path()).unwrap();
417
418 assert!(!worker.needs_compaction(&manifest));
419
420 for i in 0..2 {
422 let meta = SSTableMeta {
423 path: PathBuf::from(format!("test{}.sst", i)),
424 min_key: vec![],
425 max_key: vec![],
426 entry_count: 0,
427 file_size: 0,
428 level: 0,
429 sequence: 0,
430 };
431 manifest.add_sstable(&meta).unwrap();
432 }
433
434 assert!(worker.needs_compaction(&manifest));
435 }
436
437 #[test]
438 fn test_merge_sstables() {
439 let dir = tempdir().unwrap();
440 let sst_dir = dir.path().join("sst");
441 std::fs::create_dir_all(&sst_dir).unwrap();
442
443 let path1 = sst_dir.join("test1.sst");
445 let mut writer1 = SSTableWriter::new(&path1).unwrap();
446 writer1
447 .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
448 .unwrap();
449 writer1
450 .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
451 .unwrap();
452 writer1.finish().unwrap();
453
454 let path2 = sst_dir.join("test2.sst");
455 let mut writer2 = SSTableWriter::new(&path2).unwrap();
456 writer2
457 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
458 .unwrap();
459 writer2
460 .add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec()))
461 .unwrap(); writer2.finish().unwrap();
463
464 let config = CompactionConfig::default();
466 let mut worker = CompactionWorker::new(dir.path(), config);
467 let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
468
469 assert!(!outputs.is_empty());
470
471 let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
473 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
474 assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
475 assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
477 }
478}