1use crate::error::{AmateRSError, ErrorContext, Result};
10use crate::storage::{SSTableConfig, SSTableMetadata, SSTableReader, SSTableWriter};
11use crate::types::{CipherBlob, Key};
12use std::collections::BTreeMap;
13use std::path::{Path, PathBuf};
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum CompactionStrategy {
18 LevelBased,
20 SizeTiered,
22}
23
24#[derive(Debug, Clone)]
26pub struct CompactionConfig {
27 pub strategy: CompactionStrategy,
29 pub l0_threshold: usize,
31 pub level_multiplier: usize,
33 pub base_level_size: u64,
35 pub max_compaction_bytes: u64,
37}
38
39impl Default for CompactionConfig {
40 fn default() -> Self {
41 Self {
42 strategy: CompactionStrategy::LevelBased,
43 l0_threshold: 4,
44 level_multiplier: 10,
45 base_level_size: 10 * 1024 * 1024, max_compaction_bytes: 100 * 1024 * 1024, }
48 }
49}
50
51#[derive(Debug, Clone)]
53pub struct CompactionTask {
54 pub source_level: usize,
56 pub target_level: usize,
58 pub source_sstables: Vec<SSTableMetadata>,
60 pub target_sstables: Vec<SSTableMetadata>,
62}
63
64#[derive(Debug, Clone, Default)]
66pub struct CompactionStats {
67 pub total_compactions: u64,
69 pub bytes_read: u64,
71 pub bytes_written: u64,
73 pub keys_processed: u64,
75 pub tombstones_removed: u64,
77}
78
79pub struct CompactionPlanner {
81 config: CompactionConfig,
82}
83
84impl CompactionPlanner {
85 pub fn new(config: CompactionConfig) -> Self {
87 Self { config }
88 }
89
90 pub fn needs_l0_compaction(&self, l0_sstable_count: usize) -> bool {
92 l0_sstable_count >= self.config.l0_threshold
93 }
94
95 pub fn needs_level_compaction(&self, level: usize, level_size: u64) -> bool {
97 if level == 0 {
98 return false; }
100
101 let target_size = self.level_target_size(level);
102 level_size > target_size
103 }
104
105 pub fn level_target_size(&self, level: usize) -> u64 {
107 if level == 0 {
108 return 0; }
110
111 self.config.base_level_size * (self.config.level_multiplier as u64).pow(level as u32 - 1)
112 }
113
114 pub fn plan_compaction(
116 &self,
117 source_level: usize,
118 source_sstables: Vec<SSTableMetadata>,
119 target_sstables: Vec<SSTableMetadata>,
120 ) -> Option<CompactionTask> {
121 if source_sstables.is_empty() {
122 return None;
123 }
124
125 let source_to_compact = if source_level == 0 {
127 source_sstables
128 } else {
129 self.select_sstables_for_compaction(source_sstables)
131 };
132
133 if source_to_compact.is_empty() {
134 return None;
135 }
136
137 let target_to_merge = self.find_overlapping_sstables(&source_to_compact, &target_sstables);
139
140 Some(CompactionTask {
141 source_level,
142 target_level: source_level + 1,
143 source_sstables: source_to_compact,
144 target_sstables: target_to_merge,
145 })
146 }
147
148 fn select_sstables_for_compaction(
150 &self,
151 sstables: Vec<SSTableMetadata>,
152 ) -> Vec<SSTableMetadata> {
153 let mut selected = Vec::new();
160 let mut total_size = 0u64;
161
162 for sstable in sstables {
163 if total_size + sstable.file_size > self.config.max_compaction_bytes {
164 break;
165 }
166
167 total_size += sstable.file_size;
168 selected.push(sstable);
169
170 if selected.len() >= 2 {
172 break;
173 }
174 }
175
176 selected
177 }
178
179 fn find_overlapping_sstables(
181 &self,
182 source_sstables: &[SSTableMetadata],
183 target_sstables: &[SSTableMetadata],
184 ) -> Vec<SSTableMetadata> {
185 if source_sstables.is_empty() {
186 return Vec::new();
187 }
188
189 let min_key = source_sstables
191 .iter()
192 .map(|s| &s.min_key)
193 .min()
194 .expect("source_sstables is non-empty");
195
196 let max_key = source_sstables
197 .iter()
198 .map(|s| &s.max_key)
199 .max()
200 .expect("source_sstables is non-empty");
201
202 target_sstables
204 .iter()
205 .filter(|sstable| {
206 !(&sstable.max_key < min_key || &sstable.min_key > max_key)
208 })
209 .cloned()
210 .collect()
211 }
212}
213
214pub struct CompactionExecutor {
216 config: SSTableConfig,
217 stats: CompactionStats,
218}
219
220impl CompactionExecutor {
221 pub fn new(config: SSTableConfig) -> Self {
223 Self {
224 config,
225 stats: CompactionStats::default(),
226 }
227 }
228
229 pub fn execute_compaction(
231 &mut self,
232 task: CompactionTask,
233 output_dir: &Path,
234 next_sstable_id: &mut u64,
235 ) -> Result<Vec<SSTableMetadata>> {
236 let mut all_entries = BTreeMap::new();
238
239 for sstable in &task.source_sstables {
241 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
242 self.stats.bytes_read += sstable.file_size;
243 }
244
245 for sstable in &task.target_sstables {
247 self.read_sstable_entries(&sstable.path, &mut all_entries)?;
248 self.stats.bytes_read += sstable.file_size;
249 }
250
251 let output_sstables = self.write_compacted_sstables(
253 all_entries,
254 task.target_level,
255 output_dir,
256 next_sstable_id,
257 )?;
258
259 self.stats.total_compactions += 1;
260
261 Ok(output_sstables)
262 }
263
264 fn read_sstable_entries(
266 &mut self,
267 path: &Path,
268 entries: &mut BTreeMap<Key, Option<CipherBlob>>,
269 ) -> Result<()> {
270 let reader = SSTableReader::open(path)?;
271 let sstable_entries = reader.iter()?;
272
273 for (key, value) in sstable_entries {
274 self.stats.keys_processed += 1;
275 entries.insert(key, Some(value));
277 }
278
279 Ok(())
280 }
281
282 fn write_compacted_sstables(
284 &mut self,
285 entries: BTreeMap<Key, Option<CipherBlob>>,
286 target_level: usize,
287 output_dir: &Path,
288 next_id: &mut u64,
289 ) -> Result<Vec<SSTableMetadata>> {
290 let mut output_sstables = Vec::new();
291 let mut current_writer: Option<SSTableWriter> = None;
292 let mut current_path: Option<PathBuf> = None;
293 let mut current_size = 0usize;
294 let mut current_min_key: Option<Key> = None;
295 let mut current_max_key: Option<Key> = None;
296 let mut current_entries = 0usize;
297
298 const MAX_SSTABLE_SIZE: usize = 2 * 1024 * 1024; for (key, value_opt) in entries {
301 let value = match value_opt {
303 Some(v) => v,
304 None => {
305 self.stats.tombstones_removed += 1;
306 continue;
307 }
308 };
309
310 if current_writer.is_none() || current_size >= MAX_SSTABLE_SIZE {
312 if let Some(writer) = current_writer.take() {
314 writer.finish()?;
315
316 if let (Some(path), Some(min_key), Some(max_key)) = (
317 current_path.take(),
318 current_min_key.take(),
319 current_max_key.take(),
320 ) {
321 let file_size = std::fs::metadata(&path)
322 .map_err(|e| {
323 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
324 "Failed to get SSTable size: {}",
325 e
326 )))
327 })?
328 .len();
329
330 self.stats.bytes_written += file_size;
331
332 output_sstables.push(SSTableMetadata {
333 path,
334 min_key,
335 max_key,
336 num_entries: current_entries,
337 file_size,
338 level: target_level,
339 });
340 }
341 }
342
343 let id = *next_id;
345 *next_id += 1;
346 let path = output_dir.join(format!("L{}_{:08}.sst", target_level, id));
347 let writer = SSTableWriter::new(&path, self.config.clone())?;
348
349 current_writer = Some(writer);
350 current_path = Some(path);
351 current_size = 0;
352 current_min_key = None;
353 current_max_key = None;
354 current_entries = 0;
355 }
356
357 if let Some(ref mut writer) = current_writer {
359 let entry_size = 16 + key.as_bytes().len() + value.as_bytes().len();
360 writer.add(key.clone(), value)?;
361 current_size += entry_size;
362 current_entries += 1;
363
364 if current_min_key.is_none() {
365 current_min_key = Some(key.clone());
366 }
367 current_max_key = Some(key);
368 }
369 }
370
371 if let Some(writer) = current_writer {
373 writer.finish()?;
374
375 if let (Some(path), Some(min_key), Some(max_key)) =
376 (current_path, current_min_key, current_max_key)
377 {
378 let file_size = std::fs::metadata(&path)
379 .map_err(|e| {
380 AmateRSError::StorageIntegrity(ErrorContext::new(format!(
381 "Failed to get SSTable size: {}",
382 e
383 )))
384 })?
385 .len();
386
387 self.stats.bytes_written += file_size;
388
389 output_sstables.push(SSTableMetadata {
390 path,
391 min_key,
392 max_key,
393 num_entries: current_entries,
394 file_size,
395 level: target_level,
396 });
397 }
398 }
399
400 Ok(output_sstables)
401 }
402
403 pub fn stats(&self) -> &CompactionStats {
405 &self.stats
406 }
407}
408
409#[cfg(test)]
410mod tests {
411 use super::*;
412
413 #[test]
414 fn test_compaction_planner_l0_threshold() {
415 let config = CompactionConfig::default();
416 let planner = CompactionPlanner::new(config);
417
418 assert!(!planner.needs_l0_compaction(3));
419 assert!(planner.needs_l0_compaction(4));
420 assert!(planner.needs_l0_compaction(5));
421 }
422
423 #[test]
424 fn test_compaction_planner_level_sizes() {
425 let config = CompactionConfig {
426 base_level_size: 10 * 1024 * 1024, level_multiplier: 10,
428 ..Default::default()
429 };
430 let planner = CompactionPlanner::new(config);
431
432 assert_eq!(planner.level_target_size(1), 10 * 1024 * 1024); assert_eq!(planner.level_target_size(2), 100 * 1024 * 1024); assert_eq!(planner.level_target_size(3), 1000 * 1024 * 1024); }
436
437 #[test]
438 fn test_compaction_planner_needs_compaction() {
439 let config = CompactionConfig::default();
440 let planner = CompactionPlanner::new(config);
441
442 assert!(!planner.needs_level_compaction(0, 100 * 1024 * 1024));
444
445 assert!(!planner.needs_level_compaction(1, 5 * 1024 * 1024));
447 assert!(planner.needs_level_compaction(1, 15 * 1024 * 1024));
448 }
449
450 #[test]
451 fn test_find_overlapping_sstables() {
452 let config = CompactionConfig::default();
453 let planner = CompactionPlanner::new(config);
454
455 let source = vec![SSTableMetadata {
456 path: PathBuf::from("s1.sst"),
457 min_key: Key::from_str("key_005"),
458 max_key: Key::from_str("key_015"),
459 num_entries: 10,
460 file_size: 1000,
461 level: 0,
462 }];
463
464 let target = vec![
465 SSTableMetadata {
466 path: PathBuf::from("t1.sst"),
467 min_key: Key::from_str("key_000"),
468 max_key: Key::from_str("key_010"),
469 num_entries: 10,
470 file_size: 1000,
471 level: 1,
472 },
473 SSTableMetadata {
474 path: PathBuf::from("t2.sst"),
475 min_key: Key::from_str("key_020"),
476 max_key: Key::from_str("key_030"),
477 num_entries: 10,
478 file_size: 1000,
479 level: 1,
480 },
481 ];
482
483 let overlapping = planner.find_overlapping_sstables(&source, &target);
484
485 assert_eq!(overlapping.len(), 1);
486 assert_eq!(overlapping[0].path, PathBuf::from("t1.sst"));
487 }
488}