lsm_tree/compaction/leveled/
mod.rs1#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10 compaction::state::{hidden_set::HiddenSet, CompactionState},
11 config::Config,
12 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13 table::{util::aggregate_run_key_range, Table},
14 version::{Run, Version},
15 HashSet, TableId,
16};
17
18fn pick_minimal_compaction(
20 curr_run: &Run<Table>,
21 next_run: Option<&Run<Table>>,
22 hidden_set: &HiddenSet,
23 _overshoot: u64,
24 table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26 if let Some(window) = curr_run.shrinking_windows().find(|window| {
28 if hidden_set.is_blocked(window.iter().map(Table::id)) {
29 return false;
32 }
33
34 let Some(next_run) = &next_run else {
35 return true;
37 };
38
39 let key_range = aggregate_run_key_range(window);
40
41 next_run.get_overlapping(&key_range).is_empty()
42 }) {
43 let ids = window.iter().map(Table::id).collect();
44 return Some((ids, true));
45 }
46
47 if let Some(next_run) = &next_run {
49 next_run
50 .growing_windows()
51 .take_while(|window| {
52 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57 next_level_size <= (50 * table_base_size)
58 })
59 .filter_map(|window| {
60 if hidden_set.is_blocked(window.iter().map(Table::id)) {
61 return None;
64 }
65
66 let key_range = aggregate_run_key_range(window);
67
68 let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
78 return None;
81 }
82
83 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
84
85 #[expect(clippy::cast_precision_loss)]
88 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
89
90 Some((window, curr_level_pull_in, write_amp))
91 })
92 .min_by(|a, b| a.2.partial_cmp(&b.2).unwrap_or(std::cmp::Ordering::Equal))
94 .map(|(window, curr_level_pull_in, _)| {
95 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
96 ids.extend(curr_level_pull_in.iter().map(Table::id));
97 (ids, false)
98 })
99 } else {
100 None
101 }
102}
103
104#[doc(hidden)]
105pub const NAME: &str = "LeveledCompaction";
106
107#[derive(Clone)]
119pub struct Strategy {
120 l0_threshold: u8,
121
122 target_size: u64,
124
125 level_ratio_policy: Vec<f32>,
127}
128
129impl Default for Strategy {
130 fn default() -> Self {
131 Self {
132 l0_threshold: 4,
133 target_size:64 * 1_024 * 1_024,
134 level_ratio_policy: vec![10.0],
135 }
136 }
137}
138
139impl Strategy {
140 #[must_use]
146 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
147 self.level_ratio_policy = policy;
148 self
149 }
150
151 #[must_use]
160 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
161 self.l0_threshold = threshold;
162 self
163 }
164
165 #[must_use]
171 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
172 self.target_size = bytes;
173 self
174 }
175
176 fn level_base_size(&self) -> u64 {
178 self.target_size * u64::from(self.l0_threshold)
179 }
180
181 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
191 assert!(
192 canonical_level_idx >= 1,
193 "level_target_size does not apply to L0",
194 );
195
196 if canonical_level_idx == 1 {
197 self.level_base_size()
199 } else {
200 let mut size = self.level_base_size() as f32;
201
202 for idx in 0..=(canonical_level_idx - 2) {
204 let ratio = self
205 .level_ratio_policy
206 .get(usize::from(idx))
207 .copied()
208 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
209
210 size *= ratio;
211 }
212
213 size as u64
214 }
215 }
216}
217
218impl CompactionStrategy for Strategy {
219 fn get_name(&self) -> &'static str {
220 NAME
221 }
222
223 fn get_config(&self) -> Vec<crate::KvPair> {
224 vec![
225 (
226 crate::UserKey::from("leveled_l0_threshold"),
227 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
228 ),
229 (
230 crate::UserKey::from("leveled_target_size"),
231 crate::UserValue::from(self.target_size.to_le_bytes()),
232 ),
233 (
234 crate::UserKey::from("leveled_level_ratio_policy"),
235 crate::UserValue::from({
236 use byteorder::{LittleEndian, WriteBytesExt};
237
238 let mut v = vec![];
239
240 v.write_u8(self.level_ratio_policy.len() as u8)
241 .expect("cannot fail");
242
243 for &f in &self.level_ratio_policy {
244 v.write_f32::<LittleEndian>(f).expect("cannot fail");
245 }
246
247 v
248 }),
249 ),
250 ]
251 }
252
253 #[expect(clippy::too_many_lines)]
254 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
255 assert!(version.level_count() == 7, "should have exactly 7 levels");
256
257 #[expect(clippy::map_unwrap_or)]
259 let mut canonical_l1_idx = version
260 .iter_levels()
261 .enumerate()
262 .skip(1)
263 .find(|(_, lvl)| !lvl.is_empty())
264 .map(|(idx, _)| idx)
265 .unwrap_or_else(|| version.level_count() - 1);
266
267 let mut level_shift = canonical_l1_idx - 1;
269
270 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
271 let need_new_l1 = version
272 .iter_levels()
273 .enumerate()
274 .skip(1)
275 .filter(|(_, lvl)| !lvl.is_empty())
276 .all(|(idx, level)| {
277 let level_size = level
278 .iter()
279 .flat_map(|x| x.iter())
280 .filter(|x| !state.hidden_set().is_hidden(x.id()))
283 .map(Table::file_size)
284 .sum::<u64>();
285
286 let target_size = self.level_target_size((idx - level_shift) as u8);
287
288 level_size > target_size
289 });
290
291 if need_new_l1 {
293 canonical_l1_idx -= 1;
294 level_shift -= 1;
295 }
296 }
297
298 'trivial: {
300 let first_level = version.l0();
301
302 if first_level.run_count() == 1 {
303 if version.level_is_busy(0, state.hidden_set())
304 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
305 {
306 break 'trivial;
307 }
308
309 let Some(target_level) = &version.level(canonical_l1_idx) else {
310 break 'trivial;
311 };
312
313 if target_level.run_count() != 1 {
314 break 'trivial;
315 }
316
317 let key_range = first_level.aggregate_key_range();
318
319 let get_overlapping = target_level
321 .iter()
322 .flat_map(|run| run.get_overlapping(&key_range))
323 .map(Table::id)
324 .next();
325
326 if get_overlapping.is_none() && first_level.is_disjoint() {
327 return Choice::Move(CompactionInput {
328 table_ids: first_level.list_ids(),
329 dest_level: canonical_l1_idx as u8,
330 canonical_level: 1,
331 target_size: self.target_size,
332 });
333 }
334 }
335 }
336
337 let mut scores = [(0.0, 0u64); 7];
339
340 {
341 let first_level = version.l0();
348
349 if first_level.table_count() >= usize::from(self.l0_threshold) {
351 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
352 scores[0] = (ratio, 0);
353 }
354
355 for (idx, level) in version.iter_levels().enumerate().skip(1) {
357 if level.is_empty() {
358 continue;
359 }
360
361 let level_size = level
362 .iter()
363 .flat_map(|x| x.iter())
364 .filter(|x| !state.hidden_set().is_hidden(x.id()))
367 .map(Table::file_size)
368 .sum::<u64>();
369
370 let target_size = self.level_target_size((idx - level_shift) as u8);
371
372 #[expect(clippy::indexing_slicing)]
374 if level_size > target_size {
375 scores[idx] = (
376 level_size as f64 / target_size as f64,
377 level_size - target_size,
378 );
379
380 if version
382 .level(idx + 1)
383 .is_some_and(|next_level| next_level.is_empty())
384 {
385 scores[idx] = (99.99, 999);
386 }
387 }
388 }
389
390 {
392 scores[6] = (0.0, 0);
393 }
394 }
395
396 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
398 .into_iter()
399 .enumerate()
400 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
401 score_a
402 .partial_cmp(score_b)
403 .unwrap_or(std::cmp::Ordering::Equal)
404 })
405 .expect("should have highest score somewhere");
406
407 if score < 1.0 {
408 return Choice::DoNothing;
409 }
410
411 if level_idx_with_highest_score == 0 {
413 let Some(first_level) = version.level(0) else {
414 return Choice::DoNothing;
415 };
416
417 if version.level_is_busy(0, state.hidden_set())
418 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
419 {
420 return Choice::DoNothing;
421 }
422
423 let Some(target_level) = &version.level(canonical_l1_idx) else {
424 return Choice::DoNothing;
425 };
426
427 let mut table_ids = first_level.list_ids();
428
429 let key_range = first_level.aggregate_key_range();
430
431 let target_level_overlapping_table_ids: Vec<_> = target_level
433 .iter()
434 .flat_map(|run| run.get_overlapping(&key_range))
435 .map(Table::id)
436 .collect();
437
438 table_ids.extend(&target_level_overlapping_table_ids);
439
440 let choice = CompactionInput {
441 table_ids,
442 dest_level: canonical_l1_idx as u8,
443 canonical_level: 1,
444 target_size: self.target_size,
445 };
446
447 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
454 return Choice::Move(choice);
455 }
456 return Choice::Merge(choice);
457 }
458
459 #[expect(clippy::cast_possible_truncation)]
463 let curr_level_index = level_idx_with_highest_score as u8;
464
465 let next_level_index = curr_level_index + 1;
466
467 let Some(level) = version.level(level_idx_with_highest_score) else {
468 return Choice::DoNothing;
469 };
470
471 let Some(next_level) = version.level(next_level_index as usize) else {
472 return Choice::DoNothing;
473 };
474
475 debug_assert!(level.is_disjoint(), "level should be disjoint");
476 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
477
478 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
479 level.first_run().expect("should have exactly one run"),
480 next_level.first_run().map(std::ops::Deref::deref),
481 state.hidden_set(),
482 overshoot_bytes,
483 self.target_size,
484 ) else {
485 return Choice::DoNothing;
486 };
487
488 let choice = CompactionInput {
489 table_ids,
490 dest_level: next_level_index,
491 canonical_level: next_level_index - (level_shift as u8),
492 target_size: self.target_size,
493 };
494
495 if can_trivial_move && level.is_disjoint() {
504 return Choice::Move(choice);
505 }
506 Choice::Merge(choice)
507 }
508}