1use crate::manifest::Manifest;
7use crate::sstable::{delete_sstable, SSTableEntry, SSTableMeta, SSTableReader, SSTableWriter};
8use rustlite_core::Result;
9use std::cmp::Ordering;
10use std::collections::BinaryHeap;
11use std::path::{Path, PathBuf};
12use std::sync::atomic::{AtomicBool, AtomicU64, Ordering as AtomicOrdering};
13use std::sync::Arc;
14use tracing::{info, instrument, warn};
15
16#[derive(Debug, Clone)]
18pub struct CompactionConfig {
19 pub level0_trigger: usize,
21 pub level_multiplier: usize,
23 pub level1_max_size: u64,
25 pub max_levels: u32,
27 pub target_file_size: u64,
29}
30
31impl Default for CompactionConfig {
32 fn default() -> Self {
33 Self {
34 level0_trigger: 4,
35 level_multiplier: 10,
36 level1_max_size: 10 * 1024 * 1024, max_levels: 7,
38 target_file_size: 2 * 1024 * 1024, }
40 }
41}
42
43#[derive(Debug, Clone, Default)]
45pub struct CompactionStats {
46 pub bytes_read: u64,
48 pub bytes_written: u64,
50 pub compaction_count: u64,
52 pub entries_removed: u64,
54}
55
56#[derive(Debug)]
58struct MergeEntry {
59 key: Vec<u8>,
60 entry: SSTableEntry,
61 source_idx: usize,
62}
63
64impl PartialEq for MergeEntry {
65 fn eq(&self, other: &Self) -> bool {
66 self.key == other.key && self.source_idx == other.source_idx
67 }
68}
69
70impl Eq for MergeEntry {}
71
72impl PartialOrd for MergeEntry {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 Some(self.cmp(other))
75 }
76}
77
78impl Ord for MergeEntry {
79 fn cmp(&self, other: &Self) -> Ordering {
80 match other.key.cmp(&self.key) {
83 Ordering::Equal => self.source_idx.cmp(&other.source_idx),
84 ord => ord,
85 }
86 }
87}
88
89pub struct CompactionWorker {
91 dir: PathBuf,
93 config: CompactionConfig,
95 stats: CompactionStats,
97 file_counter: AtomicU64,
99 stop_flag: Arc<AtomicBool>,
101}
102
103impl CompactionWorker {
104 pub fn new(dir: impl AsRef<Path>, config: CompactionConfig) -> Self {
106 Self {
107 dir: dir.as_ref().to_path_buf(),
108 config,
109 stats: CompactionStats::default(),
110 file_counter: AtomicU64::new(0),
111 stop_flag: Arc::new(AtomicBool::new(false)),
112 }
113 }
114
115 pub fn stop_flag(&self) -> Arc<AtomicBool> {
117 Arc::clone(&self.stop_flag)
118 }
119
120 pub fn needs_compaction(&self, manifest: &Manifest) -> bool {
122 let level0_count = manifest.sstables_at_level(0).len();
123 level0_count >= self.config.level0_trigger
124 }
125
126 pub fn pick_compaction_level(&self, manifest: &Manifest) -> Option<u32> {
128 let level0_count = manifest.sstables_at_level(0).len();
130 if level0_count >= self.config.level0_trigger {
131 return Some(0);
132 }
133
134 for level in 1..self.config.max_levels {
136 let level_size: u64 = manifest
137 .sstables_at_level(level)
138 .iter()
139 .map(|s| s.file_size)
140 .sum();
141
142 let max_size = self.max_size_for_level(level);
143 if level_size > max_size {
144 return Some(level);
145 }
146 }
147
148 None
149 }
150
151 fn max_size_for_level(&self, level: u32) -> u64 {
153 if level == 0 {
154 return u64::MAX; }
156
157 let mut size = self.config.level1_max_size;
158 for _ in 1..level {
159 size *= self.config.level_multiplier as u64;
160 }
161 size
162 }
163
164 fn next_sstable_path(&self, level: u32) -> PathBuf {
166 let counter = self.file_counter.fetch_add(1, AtomicOrdering::SeqCst);
167 let timestamp = std::time::SystemTime::now()
168 .duration_since(std::time::UNIX_EPOCH)
169 .unwrap_or_default()
170 .as_millis();
171
172 self.dir
173 .join("sst")
174 .join(format!("L{}_{}_{}.sst", level, timestamp, counter))
175 }
176
177 #[instrument(skip(self, manifest))]
179 pub fn compact_level0(&mut self, manifest: &mut Manifest) -> Result<()> {
180 let level0_sstables = manifest.sstables_at_level(0);
181 if level0_sstables.is_empty() {
182 return Ok(());
183 }
184
185 info!(
186 level0_count = level0_sstables.len(),
187 "Starting level 0 compaction"
188 );
189
190 let input_paths: Vec<PathBuf> = level0_sstables
192 .iter()
193 .map(|s| PathBuf::from(&s.path))
194 .collect();
195
196 let level1_sstables = manifest.sstables_at_level(1);
198
199 let mut all_inputs: Vec<PathBuf> = input_paths.clone();
201
202 let min_key: Vec<u8> = level0_sstables
204 .iter()
205 .map(|s| s.min_key.clone())
206 .min()
207 .unwrap_or_default();
208 let max_key: Vec<u8> = level0_sstables
209 .iter()
210 .map(|s| s.max_key.clone())
211 .max()
212 .unwrap_or_default();
213
214 for sst in level1_sstables {
216 if sst.max_key >= min_key && sst.min_key <= max_key {
217 all_inputs.push(PathBuf::from(&sst.path));
218 }
219 }
220
221 let outputs = self.merge_sstables(&all_inputs, 1)?;
223
224 manifest.record_compaction(0, all_inputs.clone(), outputs)?;
226
227 for path in all_inputs {
229 let _ = delete_sstable(&path);
230 }
231
232 self.stats.compaction_count += 1;
233
234 Ok(())
235 }
236
237 fn merge_sstables(
239 &mut self,
240 inputs: &[PathBuf],
241 target_level: u32,
242 ) -> Result<Vec<SSTableMeta>> {
243 if inputs.is_empty() {
244 return Ok(Vec::new());
245 }
246
247 let sst_dir = self.dir.join("sst");
249 std::fs::create_dir_all(&sst_dir)?;
250
251 let mut readers: Vec<SSTableReader> = Vec::new();
253 for path in inputs {
254 if path.exists() {
255 match SSTableReader::open(path) {
256 Ok(reader) => {
257 self.stats.bytes_read += reader.metadata().file_size;
258 readers.push(reader);
259 }
260 Err(_) => continue, }
262 }
263 }
264
265 if readers.is_empty() {
266 return Ok(Vec::new());
267 }
268
269 let mut heap: BinaryHeap<MergeEntry> = BinaryHeap::new();
271 let mut iterators: Vec<_> = readers
272 .iter_mut()
273 .map(|r| r.iter())
274 .collect::<Result<Vec<_>>>()?;
275
276 for (idx, iter) in iterators.iter_mut().enumerate() {
278 if let Some(entry) = iter.next_entry()? {
279 heap.push(MergeEntry {
280 key: entry.key.clone(),
281 entry,
282 source_idx: idx,
283 });
284 }
285 }
286
287 let mut outputs: Vec<SSTableMeta> = Vec::new();
289 let mut current_writer: Option<SSTableWriter> = None;
290 let mut current_size: u64 = 0;
291 let mut last_key: Option<Vec<u8>> = None;
292
293 while let Some(merge_entry) = heap.pop() {
294 if last_key.as_ref() == Some(&merge_entry.key) {
296 self.stats.entries_removed += 1;
297 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
299 heap.push(MergeEntry {
300 key: next.key.clone(),
301 entry: next,
302 source_idx: merge_entry.source_idx,
303 });
304 }
305 continue;
306 }
307
308 if current_writer.is_none() || current_size >= self.config.target_file_size {
310 if let Some(writer) = current_writer.take() {
312 let meta = writer.finish()?;
313 self.stats.bytes_written += meta.file_size;
314 outputs.push(meta);
315 }
316
317 let path = self.next_sstable_path(target_level);
319 current_writer = Some(SSTableWriter::new(&path)?);
320 current_size = 0;
321 }
322
323 if let Some(ref mut writer) = current_writer {
325 let entry_size = merge_entry.entry.key.len() + merge_entry.entry.value.len() + 10;
326 writer.add(merge_entry.entry.clone())?;
327 current_size += entry_size as u64;
328 }
329
330 last_key = Some(merge_entry.key);
331
332 if let Some(next) = iterators[merge_entry.source_idx].next_entry()? {
334 heap.push(MergeEntry {
335 key: next.key.clone(),
336 entry: next,
337 source_idx: merge_entry.source_idx,
338 });
339 }
340 }
341
342 if let Some(writer) = current_writer {
344 let meta = writer.finish()?;
345 self.stats.bytes_written += meta.file_size;
346 outputs.push(meta);
347 }
348
349 let outputs: Vec<SSTableMeta> = outputs
351 .into_iter()
352 .map(|mut m| {
353 m.level = target_level;
354 m
355 })
356 .collect();
357
358 Ok(outputs)
359 }
360
361 pub fn stats(&self) -> &CompactionStats {
363 &self.stats
364 }
365
366 pub fn run_once(&mut self, manifest: &mut Manifest) -> Result<bool> {
368 if self.stop_flag.load(AtomicOrdering::Relaxed) {
369 return Ok(false);
370 }
371
372 if let Some(level) = self.pick_compaction_level(manifest) {
373 if level == 0 {
374 self.compact_level0(manifest)?;
375 return Ok(true);
376 }
377 }
379
380 Ok(false)
381 }
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387 use crate::sstable::SSTableWriter;
388 use tempfile::tempdir;
389
390 #[test]
391 fn test_compaction_config_default() {
392 let config = CompactionConfig::default();
393 assert_eq!(config.level0_trigger, 4);
394 assert_eq!(config.max_levels, 7);
395 }
396
397 #[test]
398 fn test_merge_entry_ordering() {
399 let e1 = MergeEntry {
400 key: b"a".to_vec(),
401 entry: SSTableEntry::value(b"a".to_vec(), b"1".to_vec()),
402 source_idx: 0,
403 };
404 let e2 = MergeEntry {
405 key: b"b".to_vec(),
406 entry: SSTableEntry::value(b"b".to_vec(), b"2".to_vec()),
407 source_idx: 0,
408 };
409
410 assert!(e1 > e2);
413 }
414
415 #[test]
416 fn test_needs_compaction() {
417 let dir = tempdir().unwrap();
418 let config = CompactionConfig {
419 level0_trigger: 2,
420 ..Default::default()
421 };
422 let worker = CompactionWorker::new(dir.path(), config);
423 let mut manifest = Manifest::open(dir.path()).unwrap();
424
425 assert!(!worker.needs_compaction(&manifest));
426
427 for i in 0..2 {
429 let meta = SSTableMeta {
430 path: PathBuf::from(format!("test{}.sst", i)),
431 min_key: vec![],
432 max_key: vec![],
433 entry_count: 0,
434 file_size: 0,
435 level: 0,
436 sequence: 0,
437 };
438 manifest.add_sstable(&meta).unwrap();
439 }
440
441 assert!(worker.needs_compaction(&manifest));
442 }
443
444 #[test]
445 fn test_merge_sstables() {
446 let dir = tempdir().unwrap();
447 let sst_dir = dir.path().join("sst");
448 std::fs::create_dir_all(&sst_dir).unwrap();
449
450 let path1 = sst_dir.join("test1.sst");
452 let mut writer1 = SSTableWriter::new(&path1).unwrap();
453 writer1
454 .add(SSTableEntry::value(b"a".to_vec(), b"1".to_vec()))
455 .unwrap();
456 writer1
457 .add(SSTableEntry::value(b"c".to_vec(), b"3".to_vec()))
458 .unwrap();
459 writer1.finish().unwrap();
460
461 let path2 = sst_dir.join("test2.sst");
462 let mut writer2 = SSTableWriter::new(&path2).unwrap();
463 writer2
464 .add(SSTableEntry::value(b"b".to_vec(), b"2".to_vec()))
465 .unwrap();
466 writer2
467 .add(SSTableEntry::value(b"c".to_vec(), b"3-new".to_vec()))
468 .unwrap(); writer2.finish().unwrap();
470
471 let config = CompactionConfig::default();
473 let mut worker = CompactionWorker::new(dir.path(), config);
474 let outputs = worker.merge_sstables(&[path1, path2], 1).unwrap();
475
476 assert!(!outputs.is_empty());
477
478 let mut reader = SSTableReader::open(&outputs[0].path).unwrap();
480 assert_eq!(reader.get(b"a").unwrap().unwrap().value, b"1".to_vec());
481 assert_eq!(reader.get(b"b").unwrap().unwrap().value, b"2".to_vec());
482 assert_eq!(reader.get(b"c").unwrap().unwrap().value, b"3-new".to_vec());
484 }
485}