lsm_tree/compaction/leveled/
mod.rs1#[cfg(test)]
6mod test;
7
8use super::{Choice, CompactionStrategy, Input as CompactionInput};
9use crate::{
10 compaction::state::{hidden_set::HiddenSet, CompactionState},
11 config::Config,
12 slice_windows::{GrowingWindowsExt, ShrinkingWindowsExt},
13 table::{util::aggregate_run_key_range, Table},
14 version::{Run, Version},
15 HashSet, TableId,
16};
17
18fn pick_minimal_compaction(
20 curr_run: &Run<Table>,
21 next_run: Option<&Run<Table>>,
22 hidden_set: &HiddenSet,
23 overshoot: u64,
24 table_base_size: u64,
25) -> Option<(HashSet<TableId>, bool)> {
26 if let Some(window) = curr_run.shrinking_windows().find(|window| {
28 if hidden_set.is_blocked(window.iter().map(Table::id)) {
29 return false;
32 }
33
34 let Some(next_run) = &next_run else {
35 return true;
37 };
38
39 let key_range = aggregate_run_key_range(window);
40
41 next_run.get_overlapping(&key_range).is_empty()
42 }) {
43 let ids = window.iter().map(Table::id).collect();
44 return Some((ids, true));
45 }
46
47 if let Some(next_run) = &next_run {
49 next_run
50 .growing_windows()
51 .take_while(|window| {
52 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
57 next_level_size <= (50 * table_base_size)
58 })
59 .filter_map(|window| {
60 if hidden_set.is_blocked(window.iter().map(Table::id)) {
61 return None;
64 }
65
66 let key_range = aggregate_run_key_range(window);
67
68 let curr_level_pull_in = curr_run.get_contained(&key_range);
70
71 let curr_level_size = curr_level_pull_in.iter().map(Table::file_size).sum::<u64>();
72
73 if curr_level_size == 0 {
74 return None;
75 }
76
77 if hidden_set.is_blocked(curr_level_pull_in.iter().map(Table::id)) {
84 return None;
87 }
88
89 let next_level_size = window.iter().map(Table::file_size).sum::<u64>();
90
91 let compaction_bytes = curr_level_size + next_level_size;
92
93 #[expect(clippy::cast_precision_loss)]
94 let write_amp = (next_level_size as f32) / (curr_level_size as f32);
95
96 Some((window, curr_level_pull_in, write_amp, compaction_bytes))
97 })
98 .min_by_key(|(_, _, _waf, bytes)| *bytes)
100 .map(|(window, curr_level_pull_in, _, _)| {
101 let mut ids: HashSet<_> = window.iter().map(Table::id).collect();
102 ids.extend(curr_level_pull_in.iter().map(Table::id));
103 (ids, false)
104 })
105 } else {
106 None
107 }
108}
109
110#[doc(hidden)]
111pub const NAME: &str = "LeveledCompaction";
112
113#[derive(Clone)]
125pub struct Strategy {
126 l0_threshold: u8,
127
128 target_size: u64,
130
131 level_ratio_policy: Vec<f32>,
133}
134
135impl Default for Strategy {
136 fn default() -> Self {
137 Self {
138 l0_threshold: 4,
139 target_size:64 * 1_024 * 1_024,
140 level_ratio_policy: vec![10.0],
141 }
142 }
143}
144
145impl Strategy {
146 #[must_use]
152 pub fn with_level_ratio_policy(mut self, policy: Vec<f32>) -> Self {
153 self.level_ratio_policy = policy;
154 self
155 }
156
157 #[must_use]
166 pub fn with_l0_threshold(mut self, threshold: u8) -> Self {
167 self.l0_threshold = threshold;
168 self
169 }
170
171 #[must_use]
177 pub fn with_table_target_size(mut self, bytes: u64) -> Self {
178 self.target_size = bytes;
179 self
180 }
181
182 fn level_base_size(&self) -> u64 {
184 self.target_size * u64::from(self.l0_threshold)
185 }
186
187 fn level_target_size(&self, canonical_level_idx: u8) -> u64 {
197 assert!(
198 canonical_level_idx >= 1,
199 "level_target_size does not apply to L0",
200 );
201
202 if canonical_level_idx == 1 {
203 self.level_base_size()
205 } else {
206 #[expect(
207 clippy::cast_precision_loss,
208 reason = "precision loss is acceptable for level size calculations"
209 )]
210 let mut size = self.level_base_size() as f32;
211
212 for idx in 0..=(canonical_level_idx - 2) {
214 let ratio = self
215 .level_ratio_policy
216 .get(usize::from(idx))
217 .copied()
218 .unwrap_or_else(|| self.level_ratio_policy.last().copied().unwrap_or(10.0));
219
220 size *= ratio;
221 }
222
223 #[expect(
224 clippy::cast_possible_truncation,
225 clippy::cast_sign_loss,
226 reason = "size is always positive and will never even come close to u64::MAX"
227 )]
228 {
229 size as u64
230 }
231 }
232 }
233}
234
235impl CompactionStrategy for Strategy {
236 fn get_name(&self) -> &'static str {
237 NAME
238 }
239
240 fn get_config(&self) -> Vec<crate::KvPair> {
241 vec![
242 (
243 crate::UserKey::from("leveled_l0_threshold"),
244 crate::UserValue::from(self.l0_threshold.to_le_bytes()),
245 ),
246 (
247 crate::UserKey::from("leveled_target_size"),
248 crate::UserValue::from(self.target_size.to_le_bytes()),
249 ),
250 (
251 crate::UserKey::from("leveled_level_ratio_policy"),
252 crate::UserValue::from({
253 use byteorder::{LittleEndian, WriteBytesExt};
254
255 let mut v = vec![];
256
257 #[expect(
258 clippy::expect_used,
259 clippy::cast_possible_truncation,
260 reason = "writing into Vec should not fail; policies have length of 255 max"
261 )]
262 v.write_u8(self.level_ratio_policy.len() as u8)
263 .expect("cannot fail");
264
265 for &f in &self.level_ratio_policy {
266 #[expect(clippy::expect_used, reason = "writing into Vec should not fail")]
267 v.write_f32::<LittleEndian>(f).expect("cannot fail");
268 }
269
270 v
271 }),
272 ),
273 ]
274 }
275
276 #[expect(clippy::too_many_lines)]
277 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
278 assert!(version.level_count() == 7, "should have exactly 7 levels");
279
280 'trivial_lmax: {
282 let l0 = version.level(0).expect("first level should exist");
283
284 if !l0.is_empty() && l0.is_disjoint() {
285 let lmax_index = version.level_count() - 1;
286
287 if (1..lmax_index)
288 .any(|idx| !version.level(idx).expect("level should exist").is_empty())
289 {
290 break 'trivial_lmax;
292 }
293
294 let lmax = version.level(lmax_index).expect("last level should exist");
295
296 if !lmax
297 .aggregate_key_range()
298 .overlaps_with_key_range(&l0.aggregate_key_range())
299 {
300 return Choice::Move(CompactionInput {
301 table_ids: l0.list_ids(),
302 dest_level: lmax_index as u8,
303 canonical_level: 1,
304 target_size: self.target_size,
305 });
306 }
307 }
308 }
309
310 #[expect(clippy::map_unwrap_or)]
312 let first_non_empty_level = version
313 .iter_levels()
314 .enumerate()
315 .skip(1)
316 .find(|(_, lvl)| !lvl.is_empty())
317 .map(|(idx, _)| idx)
318 .unwrap_or_else(|| version.level_count() - 1);
319
320 let mut canonical_l1_idx = first_non_empty_level;
321
322 let mut level_shift = canonical_l1_idx - 1;
324
325 if canonical_l1_idx > 1 && version.iter_levels().skip(1).any(|lvl| !lvl.is_empty()) {
326 let need_new_l1 = version
327 .iter_levels()
328 .enumerate()
329 .skip(1)
330 .filter(|(_, lvl)| !lvl.is_empty())
331 .all(|(idx, level)| {
332 let level_size = level
333 .iter()
334 .flat_map(|x| x.iter())
335 .filter(|x| !state.hidden_set().is_hidden(x.id()))
338 .map(Table::file_size)
339 .sum::<u64>();
340
341 #[expect(
342 clippy::cast_possible_truncation,
343 reason = "level index is bounded by level count (7, technically 255)"
344 )]
345 let target_size = self.level_target_size((idx - level_shift) as u8);
346
347 level_size > target_size
348 });
349
350 if need_new_l1 {
352 canonical_l1_idx -= 1;
353 level_shift -= 1;
354 }
355 }
356
357 'trivial: {
359 let first_level = version.l0();
360 let target_level_idx = first_non_empty_level.min(canonical_l1_idx);
361
362 if first_level.run_count() == 1 {
363 if version.level_is_busy(0, state.hidden_set())
364 || version.level_is_busy(target_level_idx, state.hidden_set())
365 {
366 break 'trivial;
367 }
368
369 let Some(target_level) = &version.level(target_level_idx) else {
370 break 'trivial;
371 };
372
373 if target_level.run_count() != 1 {
374 break 'trivial;
375 }
376
377 let key_range = first_level.aggregate_key_range();
378
379 let get_overlapping = target_level
381 .iter()
382 .flat_map(|run| run.get_overlapping(&key_range))
383 .map(Table::id)
384 .next();
385
386 if get_overlapping.is_none() && first_level.is_disjoint() {
387 #[expect(
388 clippy::cast_possible_truncation,
389 reason = "level index is bounded by level count (7)"
390 )]
391 return Choice::Move(CompactionInput {
392 table_ids: first_level.list_ids(),
393 dest_level: target_level_idx as u8,
394 canonical_level: 1,
395 target_size: self.target_size,
396 });
397 }
398 }
399 }
400
401 let mut scores = [(0.0, 0u64); 7];
403
404 {
405 let first_level = version.l0();
411
412 if first_level.table_count() >= usize::from(self.l0_threshold) {
413 #[expect(
414 clippy::cast_precision_loss,
415 reason = "precision loss is acceptable for scoring calculations"
416 )]
417 let ratio = (first_level.table_count() as f64) / f64::from(self.l0_threshold);
418 scores[0] = (ratio, 0);
419 }
420
421 for (idx, level) in version.iter_levels().enumerate().skip(1) {
423 if level.is_empty() {
424 continue;
425 }
426
427 let level_size = level
428 .iter()
429 .flat_map(|x| x.iter())
430 .filter(|x| !state.hidden_set().is_hidden(x.id()))
433 .map(Table::file_size)
434 .sum::<u64>();
435
436 #[expect(
437 clippy::cast_possible_truncation,
438 reason = "level index is bounded by level count (7, technically 255)"
439 )]
440 let target_size = self.level_target_size((idx - level_shift) as u8);
441
442 #[expect(clippy::indexing_slicing)]
444 if level_size > target_size {
445 #[expect(
446 clippy::cast_precision_loss,
447 reason = "precision loss is acceptable for scoring calculations"
448 )]
449 let score = level_size as f64 / target_size as f64;
450 scores[idx] = (score, level_size - target_size);
451
452 if version
454 .level(idx + 1)
455 .is_some_and(|next_level| next_level.is_empty())
456 {
457 scores[idx] = (99.99, 999);
458 }
459 }
460 }
461
462 {
464 scores[6] = (0.0, 0);
465 }
466 }
467
468 #[expect(clippy::expect_used, reason = "highest score is expected to exist")]
470 let (level_idx_with_highest_score, (score, overshoot_bytes)) = scores
471 .into_iter()
472 .enumerate()
473 .max_by(|(_, (score_a, _)), (_, (score_b, _))| {
474 score_a
475 .partial_cmp(score_b)
476 .unwrap_or(std::cmp::Ordering::Equal)
477 })
478 .expect("should have highest score somewhere");
479
480 if score < 1.0 {
481 return Choice::DoNothing;
482 }
483
484 if level_idx_with_highest_score == 0 {
486 let Some(first_level) = version.level(0) else {
487 return Choice::DoNothing;
488 };
489
490 if version.level_is_busy(0, state.hidden_set())
491 || version.level_is_busy(canonical_l1_idx, state.hidden_set())
492 {
493 return Choice::DoNothing;
494 }
495
496 let Some(target_level) = &version.level(canonical_l1_idx) else {
497 return Choice::DoNothing;
498 };
499
500 let mut table_ids = first_level.list_ids();
501
502 let key_range = first_level.aggregate_key_range();
503
504 let target_level_overlapping_table_ids: Vec<_> = target_level
506 .iter()
507 .flat_map(|run| run.get_overlapping(&key_range))
508 .map(Table::id)
509 .collect();
510
511 table_ids.extend(&target_level_overlapping_table_ids);
512
513 #[expect(
514 clippy::cast_possible_truncation,
515 reason = "level index is bounded by level count (7, technically 255)"
516 )]
517 let choice = CompactionInput {
518 table_ids,
519 dest_level: canonical_l1_idx as u8,
520 canonical_level: 1,
521 target_size: self.target_size,
522 };
523
524 if target_level_overlapping_table_ids.is_empty() && first_level.is_disjoint() {
525 return Choice::Move(choice);
526 }
527 return Choice::Merge(choice);
528 }
529
530 #[expect(clippy::cast_possible_truncation)]
534 let curr_level_index = level_idx_with_highest_score as u8;
535
536 let next_level_index = curr_level_index + 1;
537
538 let Some(level) = version.level(level_idx_with_highest_score) else {
539 return Choice::DoNothing;
540 };
541
542 let Some(next_level) = version.level(next_level_index as usize) else {
543 return Choice::DoNothing;
544 };
545
546 debug_assert!(level.is_disjoint(), "level should be disjoint");
547 debug_assert!(next_level.is_disjoint(), "next level should be disjoint");
548
549 #[expect(
550 clippy::expect_used,
551 reason = "first run should exist because score is >0.0"
552 )]
553 let Some((table_ids, can_trivial_move)) = pick_minimal_compaction(
554 level.first_run().expect("should have exactly one run"),
555 next_level.first_run().map(std::ops::Deref::deref),
556 state.hidden_set(),
557 overshoot_bytes,
558 self.target_size,
559 ) else {
560 return Choice::DoNothing;
561 };
562
563 #[expect(
564 clippy::cast_possible_truncation,
565 reason = "level shift is bounded by level count (7, technically 255)"
566 )]
567 let choice = CompactionInput {
568 table_ids,
569 dest_level: next_level_index,
570 canonical_level: next_level_index - (level_shift as u8),
571 target_size: self.target_size,
572 };
573
574 if can_trivial_move && level.is_disjoint() {
575 return Choice::Move(choice);
576 }
577 Choice::Merge(choice)
578 }
579}