1use crate::{Error, Result, Record, sst::{SstWriter, SstReader}};
15use std::collections::BTreeMap;
16use std::path::PathBuf;
17use std::fs;
18use std::sync::atomic::{AtomicU64, Ordering};
19use std::sync::Arc;
20
21pub const DEFAULT_SST_THRESHOLD: usize = 10;
23
24pub const MIN_SSTS_TO_COMPACT: usize = 2;
26
27pub const COMPACTION_THRESHOLD: usize = DEFAULT_SST_THRESHOLD;
29
30#[derive(Clone, Debug)]
32pub struct CompactionConfig {
33 pub enabled: bool,
35
36 pub sst_threshold: usize,
38
39 pub check_interval_secs: u64,
41
42 pub max_concurrent_compactions: usize,
44}
45
46impl Default for CompactionConfig {
47 fn default() -> Self {
48 Self {
49 enabled: true,
50 sst_threshold: DEFAULT_SST_THRESHOLD,
51 check_interval_secs: 60, max_concurrent_compactions: 4, }
54 }
55}
56
57impl CompactionConfig {
58 pub fn new() -> Self {
60 Self::default()
61 }
62
63 pub fn disabled() -> Self {
65 Self {
66 enabled: false,
67 ..Default::default()
68 }
69 }
70
71 pub fn with_sst_threshold(mut self, threshold: usize) -> Self {
73 self.sst_threshold = threshold.max(MIN_SSTS_TO_COMPACT);
74 self
75 }
76
77 pub fn with_check_interval(mut self, seconds: u64) -> Self {
79 self.check_interval_secs = seconds;
80 self
81 }
82
83 pub fn with_max_concurrent(mut self, max: usize) -> Self {
85 self.max_concurrent_compactions = max.max(1);
86 self
87 }
88}
89
90#[derive(Clone, Debug, Default)]
92pub struct CompactionStats {
93 pub total_compactions: u64,
95
96 pub total_ssts_merged: u64,
98
99 pub total_ssts_created: u64,
101
102 pub total_bytes_read: u64,
104
105 pub total_bytes_written: u64,
107
108 pub total_bytes_reclaimed: u64,
110
111 pub total_records_deduplicated: u64,
113
114 pub total_tombstones_removed: u64,
116
117 pub active_compactions: u64,
119}
120
121#[derive(Clone)]
123pub struct CompactionStatsAtomic {
124 total_compactions: Arc<AtomicU64>,
125 total_ssts_merged: Arc<AtomicU64>,
126 total_ssts_created: Arc<AtomicU64>,
127 total_bytes_read: Arc<AtomicU64>,
128 total_bytes_written: Arc<AtomicU64>,
129 total_bytes_reclaimed: Arc<AtomicU64>,
130 total_records_deduplicated: Arc<AtomicU64>,
131 total_tombstones_removed: Arc<AtomicU64>,
132 active_compactions: Arc<AtomicU64>,
133}
134
135impl Default for CompactionStatsAtomic {
136 fn default() -> Self {
137 Self::new()
138 }
139}
140
141impl CompactionStatsAtomic {
142 pub fn new() -> Self {
144 Self {
145 total_compactions: Arc::new(AtomicU64::new(0)),
146 total_ssts_merged: Arc::new(AtomicU64::new(0)),
147 total_ssts_created: Arc::new(AtomicU64::new(0)),
148 total_bytes_read: Arc::new(AtomicU64::new(0)),
149 total_bytes_written: Arc::new(AtomicU64::new(0)),
150 total_bytes_reclaimed: Arc::new(AtomicU64::new(0)),
151 total_records_deduplicated: Arc::new(AtomicU64::new(0)),
152 total_tombstones_removed: Arc::new(AtomicU64::new(0)),
153 active_compactions: Arc::new(AtomicU64::new(0)),
154 }
155 }
156
157 pub fn snapshot(&self) -> CompactionStats {
159 CompactionStats {
160 total_compactions: self.total_compactions.load(Ordering::Relaxed),
161 total_ssts_merged: self.total_ssts_merged.load(Ordering::Relaxed),
162 total_ssts_created: self.total_ssts_created.load(Ordering::Relaxed),
163 total_bytes_read: self.total_bytes_read.load(Ordering::Relaxed),
164 total_bytes_written: self.total_bytes_written.load(Ordering::Relaxed),
165 total_bytes_reclaimed: self.total_bytes_reclaimed.load(Ordering::Relaxed),
166 total_records_deduplicated: self.total_records_deduplicated.load(Ordering::Relaxed),
167 total_tombstones_removed: self.total_tombstones_removed.load(Ordering::Relaxed),
168 active_compactions: self.active_compactions.load(Ordering::Relaxed),
169 }
170 }
171
172 pub fn start_compaction(&self) -> CompactionGuard {
174 self.total_compactions.fetch_add(1, Ordering::Relaxed);
175 self.active_compactions.fetch_add(1, Ordering::Relaxed);
176 CompactionGuard {
177 stats: self.clone(),
178 }
179 }
180
181 pub fn record_ssts_merged(&self, count: u64) {
183 self.total_ssts_merged.fetch_add(count, Ordering::Relaxed);
184 }
185
186 pub fn record_ssts_created(&self, count: u64) {
188 self.total_ssts_created.fetch_add(count, Ordering::Relaxed);
189 }
190
191 pub fn record_bytes_read(&self, bytes: u64) {
193 self.total_bytes_read.fetch_add(bytes, Ordering::Relaxed);
194 }
195
196 pub fn record_bytes_written(&self, bytes: u64) {
198 self.total_bytes_written.fetch_add(bytes, Ordering::Relaxed);
199 }
200
201 pub fn record_bytes_reclaimed(&self, bytes: u64) {
203 self.total_bytes_reclaimed.fetch_add(bytes, Ordering::Relaxed);
204 }
205
206 pub fn record_records_deduplicated(&self, count: u64) {
208 self.total_records_deduplicated
209 .fetch_add(count, Ordering::Relaxed);
210 }
211
212 pub fn record_tombstones_removed(&self, count: u64) {
214 self.total_tombstones_removed
215 .fetch_add(count, Ordering::Relaxed);
216 }
217}
218
219pub struct CompactionGuard {
221 stats: CompactionStatsAtomic,
222}
223
224impl Drop for CompactionGuard {
225 fn drop(&mut self) {
226 self.stats
227 .active_compactions
228 .fetch_sub(1, Ordering::Relaxed);
229 }
230}
231
232pub struct CompactionManager {
234 stripe_id: usize,
235 dir: PathBuf,
236}
237
238impl CompactionManager {
239 pub fn new(stripe_id: usize, dir: PathBuf) -> Self {
241 Self { stripe_id, dir }
242 }
243
244 pub fn needs_compaction(&self, sst_count: usize) -> bool {
246 sst_count >= COMPACTION_THRESHOLD
247 }
248
249 pub fn compact(
258 &self,
259 ssts: &[SstReader],
260 next_sst_id: u64,
261 ) -> Result<(SstReader, Vec<PathBuf>)> {
262 if ssts.is_empty() {
263 return Err(Error::InvalidArgument("Cannot compact zero SSTs".into()));
264 }
265
266 let mut records_by_key: BTreeMap<Vec<u8>, Record> = BTreeMap::new();
269
270 for sst in ssts {
271 for record in sst.scan()? {
272 let encoded_key = record.key.encode().to_vec();
273
274 records_by_key
276 .entry(encoded_key)
277 .and_modify(|existing| {
278 if record.seq > existing.seq {
279 *existing = record.clone();
280 }
281 })
282 .or_insert(record);
283 }
284 }
285
286 let mut records_to_write: Vec<Record> = records_by_key
288 .into_values()
289 .filter(|record| !record.is_tombstone())
290 .collect();
291
292 records_to_write.sort_by(|a, b| a.key.encode().cmp(&b.key.encode()));
294
295 let new_sst_path = self.dir.join(format!("{:03}-{}.sst", self.stripe_id, next_sst_id));
297 let mut writer = SstWriter::new();
298
299 for record in records_to_write {
300 writer.add(record);
301 }
302
303 writer.finish(&new_sst_path)?;
304
305 let new_reader = SstReader::open(&new_sst_path)?;
307
308 let old_sst_paths: Vec<PathBuf> = ssts
310 .iter()
311 .map(|sst| sst.path().to_path_buf())
312 .collect();
313
314 Ok((new_reader, old_sst_paths))
315 }
316
317 pub fn cleanup_old_ssts(&self, old_sst_paths: Vec<PathBuf>) -> Result<()> {
319 for path in old_sst_paths {
320 if path.exists() {
321 fs::remove_file(&path).map_err(|e| {
322 Error::Internal(format!("Failed to delete old SST {:?}: {}", path, e))
323 })?;
324 }
325 }
326 Ok(())
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use super::*;
333 use crate::{Key, Value, SeqNo, sst::SstWriter};
334 use tempfile::TempDir;
335 use std::collections::HashMap;
336
337 fn create_test_record(pk: &[u8], seq: SeqNo, value: &str) -> Record {
338 let mut item = HashMap::new();
339 item.insert("value".to_string(), Value::string(value));
340 Record::put(Key::new(pk.to_vec()), item, seq)
341 }
342
343 fn create_delete_record(pk: &[u8], seq: SeqNo) -> Record {
344 Record::delete(Key::new(pk.to_vec()), seq)
345 }
346
347 #[test]
348 fn test_compaction_manager_needs_compaction() {
349 let dir = TempDir::new().unwrap();
350 let manager = CompactionManager::new(0, dir.path().to_path_buf());
351
352 assert!(!manager.needs_compaction(5));
353 assert!(!manager.needs_compaction(9));
354 assert!(manager.needs_compaction(10));
355 assert!(manager.needs_compaction(15));
356 }
357
358 #[test]
359 fn test_compact_multiple_versions() {
360 let dir = TempDir::new().unwrap();
361 let manager = CompactionManager::new(0, dir.path().to_path_buf());
362
363 let sst1_path = dir.path().join("000-1.sst");
365 let mut writer1 = SstWriter::new();
366 writer1.add(create_test_record(b"key1", 1, "v1"));
367 writer1.add(create_test_record(b"key2", 1, "v1"));
368 writer1.finish(&sst1_path).unwrap();
369
370 let sst2_path = dir.path().join("000-2.sst");
372 let mut writer2 = SstWriter::new();
373 writer2.add(create_test_record(b"key1", 2, "v2"));
374 writer2.add(create_test_record(b"key3", 2, "v1"));
375 writer2.finish(&sst2_path).unwrap();
376
377 let sst1 = SstReader::open(&sst1_path).unwrap();
379 let sst2 = SstReader::open(&sst2_path).unwrap();
380 let (new_sst, old_paths) = manager.compact(&[sst1, sst2], 3).unwrap();
381
382 let records: Vec<Record> = new_sst.scan().unwrap().collect();
384 assert_eq!(records.len(), 3);
385
386 let key1_record = records.iter().find(|r| r.key.pk.as_ref() == b"key1").unwrap();
388 assert_eq!(key1_record.seq, 2);
389 assert_eq!(
390 key1_record.value.as_ref().unwrap().get("value").unwrap().as_string(),
391 Some("v2")
392 );
393
394 assert_eq!(old_paths.len(), 2);
395 }
396
397 #[test]
398 fn test_compact_filters_tombstones() {
399 let dir = TempDir::new().unwrap();
400 let manager = CompactionManager::new(0, dir.path().to_path_buf());
401
402 let sst1_path = dir.path().join("000-1.sst");
404 let mut writer1 = SstWriter::new();
405 writer1.add(create_test_record(b"key1", 1, "v1"));
406 writer1.add(create_test_record(b"key2", 1, "v1"));
407 writer1.finish(&sst1_path).unwrap();
408
409 let sst2_path = dir.path().join("000-2.sst");
411 let mut writer2 = SstWriter::new();
412 writer2.add(create_delete_record(b"key1", 2));
413 writer2.finish(&sst2_path).unwrap();
414
415 let sst1 = SstReader::open(&sst1_path).unwrap();
417 let sst2 = SstReader::open(&sst2_path).unwrap();
418 let (new_sst, _) = manager.compact(&[sst1, sst2], 3).unwrap();
419
420 let records: Vec<Record> = new_sst.scan().unwrap().collect();
422 assert_eq!(records.len(), 1);
423 assert_eq!(records[0].key.pk.as_ref(), b"key2");
424 }
425
426 #[test]
427 fn test_compact_empty_result() {
428 let dir = TempDir::new().unwrap();
429 let manager = CompactionManager::new(0, dir.path().to_path_buf());
430
431 let sst1_path = dir.path().join("000-1.sst");
433 let mut writer1 = SstWriter::new();
434 writer1.add(create_delete_record(b"key1", 1));
435 writer1.finish(&sst1_path).unwrap();
436
437 let sst1 = SstReader::open(&sst1_path).unwrap();
439 let (new_sst, _) = manager.compact(&[sst1], 2).unwrap();
440
441 let records: Vec<Record> = new_sst.scan().unwrap().collect();
443 assert_eq!(records.len(), 0);
444 }
445
446 #[test]
447 fn test_cleanup_old_ssts() {
448 let dir = TempDir::new().unwrap();
449 let manager = CompactionManager::new(0, dir.path().to_path_buf());
450
451 let file1 = dir.path().join("000-1.sst");
453 let file2 = dir.path().join("000-2.sst");
454 std::fs::write(&file1, b"test").unwrap();
455 std::fs::write(&file2, b"test").unwrap();
456
457 assert!(file1.exists());
458 assert!(file2.exists());
459
460 manager.cleanup_old_ssts(vec![file1.clone(), file2.clone()]).unwrap();
462
463 assert!(!file1.exists());
464 assert!(!file2.exists());
465 }
466
467 #[test]
468 fn test_compact_preserves_order() {
469 let dir = TempDir::new().unwrap();
470 let manager = CompactionManager::new(0, dir.path().to_path_buf());
471
472 let sst1_path = dir.path().join("000-1.sst");
474 let mut writer1 = SstWriter::new();
475 writer1.add(create_test_record(b"key1", 1, "v1"));
476 writer1.add(create_test_record(b"key3", 1, "v3"));
477 writer1.finish(&sst1_path).unwrap();
478
479 let sst2_path = dir.path().join("000-2.sst");
480 let mut writer2 = SstWriter::new();
481 writer2.add(create_test_record(b"key2", 2, "v2"));
482 writer2.add(create_test_record(b"key4", 2, "v4"));
483 writer2.finish(&sst2_path).unwrap();
484
485 let sst1 = SstReader::open(&sst1_path).unwrap();
487 let sst2 = SstReader::open(&sst2_path).unwrap();
488 let (new_sst, _) = manager.compact(&[sst1, sst2], 3).unwrap();
489
490 let records: Vec<Record> = new_sst.scan().unwrap().collect();
492 assert_eq!(records.len(), 4);
493 assert_eq!(records[0].key.pk.as_ref(), b"key1");
494 assert_eq!(records[1].key.pk.as_ref(), b"key2");
495 assert_eq!(records[2].key.pk.as_ref(), b"key3");
496 assert_eq!(records[3].key.pk.as_ref(), b"key4");
497 }
498
499 #[test]
500 fn test_compaction_config_defaults() {
501 let config = CompactionConfig::default();
502 assert!(config.enabled);
503 assert_eq!(config.sst_threshold, DEFAULT_SST_THRESHOLD);
504 assert_eq!(config.check_interval_secs, 60);
505 assert_eq!(config.max_concurrent_compactions, 4);
506 }
507
508 #[test]
509 fn test_compaction_config_disabled() {
510 let config = CompactionConfig::disabled();
511 assert!(!config.enabled);
512 }
513
514 #[test]
515 fn test_compaction_config_builder() {
516 let config = CompactionConfig::new()
517 .with_sst_threshold(5)
518 .with_check_interval(30)
519 .with_max_concurrent(2);
520
521 assert_eq!(config.sst_threshold, 5);
522 assert_eq!(config.check_interval_secs, 30);
523 assert_eq!(config.max_concurrent_compactions, 2);
524 }
525
526 #[test]
527 fn test_compaction_stats_atomic_snapshot() {
528 let stats = CompactionStatsAtomic::new();
529
530 stats.record_ssts_merged(5);
531 stats.record_ssts_created(1);
532 stats.record_bytes_read(1024);
533 stats.record_bytes_written(512);
534 stats.record_tombstones_removed(3);
535
536 let snapshot = stats.snapshot();
537 assert_eq!(snapshot.total_ssts_merged, 5);
538 assert_eq!(snapshot.total_ssts_created, 1);
539 assert_eq!(snapshot.total_bytes_read, 1024);
540 assert_eq!(snapshot.total_bytes_written, 512);
541 assert_eq!(snapshot.total_tombstones_removed, 3);
542 }
543
544 #[test]
545 fn test_compaction_guard_decrements_on_drop() {
546 let stats = CompactionStatsAtomic::new();
547
548 {
549 let _guard = stats.start_compaction();
550 assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 1);
551 assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 1);
552 }
553
554 assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 0);
556 assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 1);
557 }
558
559 #[test]
560 fn test_multiple_compaction_guards() {
561 let stats = CompactionStatsAtomic::new();
562
563 let _guard1 = stats.start_compaction();
564 let _guard2 = stats.start_compaction();
565 let _guard3 = stats.start_compaction();
566
567 assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 3);
568 assert_eq!(stats.total_compactions.load(Ordering::Relaxed), 3);
569
570 drop(_guard2);
571 assert_eq!(stats.active_compactions.load(Ordering::Relaxed), 2);
572 }
573}