1use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{level_manifest::LevelManifest, Config, HashSet, Segment};
7
8fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, base_size: u32) -> usize {
9 (ratio as usize).pow(u32::from(level_idx + 1)) * (base_size as usize)
10}
11
12#[derive(Clone)]
18pub struct Strategy {
19 pub base_size: u32,
21
22 #[allow(clippy::doc_markdown)]
29 pub level_ratio: u8,
30}
31
32impl Strategy {
33 #[must_use]
35 pub fn new(base_size: u32, level_ratio: u8) -> Self {
36 Self {
37 base_size,
38 level_ratio,
39 }
40 }
41}
42
43impl Default for Strategy {
44 fn default() -> Self {
45 Self {
46 base_size: 64 * 1_024 * 1_024,
47 level_ratio: 4,
48 }
49 }
50}
51
52impl CompactionStrategy for Strategy {
53 fn get_name(&self) -> &'static str {
54 "TieredStrategy"
55 }
56
57 fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
58 let resolved_view = levels.resolved_view();
59
60 for (curr_level_index, level) in resolved_view
61 .iter()
62 .enumerate()
63 .take(resolved_view.len() - 1)
64 .rev()
65 {
66 #[allow(clippy::cast_possible_truncation)]
68 let curr_level_index = curr_level_index as u8;
69
70 let next_level_index = curr_level_index + 1;
71
72 if level.is_empty() {
73 continue;
74 }
75
76 let level_size: u64 = level
77 .segments
78 .iter()
79 .filter(|x| !levels.hidden_set().is_hidden(x.id()))
82 .map(|x| x.metadata.file_size)
83 .sum();
84
85 let desired_bytes =
86 desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.base_size)
87 as u64;
88
89 if level_size >= desired_bytes {
90 let mut overshoot = desired_bytes;
93
94 let mut segments_to_compact = vec![];
95
96 for segment in level.iter().rev().take(self.level_ratio.into()).cloned() {
97 if overshoot == 0 {
98 break;
99 }
100
101 overshoot = overshoot.saturating_sub(segment.metadata.file_size);
102 segments_to_compact.push(segment);
103 }
104
105 let mut segment_ids: HashSet<_> =
106 segments_to_compact.iter().map(Segment::id).collect();
107
108 if next_level_index == 6 {
115 if levels.busy_levels().contains(&next_level_index) {
117 continue;
118 }
119
120 segment_ids.extend(
121 levels
122 .levels
123 .last()
124 .expect("last level should always exist")
125 .list_ids(),
126 );
127 }
128
129 return Choice::Merge(CompactionInput {
130 segment_ids,
131 dest_level: next_level_index,
132 target_size: u64::MAX,
133 });
134 }
135 }
136
137 super::maintenance::Strategy.choose(levels, config)
151 }
152}
153
154#[cfg(test)]
155mod tests {
156 use super::Strategy;
157 use crate::{
158 bloom::BloomFilter,
159 cache::Cache,
160 compaction::{Choice, CompactionStrategy, Input as CompactionInput},
161 config::Config,
162 descriptor_table::FileDescriptorTable,
163 file::LEVELS_MANIFEST_FILE,
164 level_manifest::LevelManifest,
165 segment::{
166 block::offset::BlockOffset,
167 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
168 file_offsets::FileOffsets,
169 meta::{Metadata, SegmentId},
170 Segment, SegmentInner,
171 },
172 HashSet, KeyRange, SeqNo,
173 };
174 use std::sync::{atomic::AtomicBool, Arc};
175 use test_log::test;
176
177 #[allow(clippy::expect_used)]
178 fn fixture_segment(id: SegmentId, size_mib: u64, max_seqno: SeqNo) -> Segment {
179 let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
180
181 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
182 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
183
184 SegmentInner {
185 tree_id: 0,
186 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
187 block_index,
188
189 offsets: FileOffsets {
190 bloom_ptr: BlockOffset(0),
191 range_filter_ptr: BlockOffset(0),
192 index_block_ptr: BlockOffset(0),
193 metadata_ptr: BlockOffset(0),
194 range_tombstones_ptr: BlockOffset(0),
195 tli_ptr: BlockOffset(0),
196 pfx_ptr: BlockOffset(0),
197 },
198
199 metadata: Metadata {
200 data_block_count: 0,
201 index_block_count: 0,
202 data_block_size: 4_096,
203 index_block_size: 4_096,
204 created_at: 0,
205 id,
206 file_size: size_mib * 1_024 * 1_024,
207 compression: crate::segment::meta::CompressionType::None,
208 table_type: crate::segment::meta::TableType::Block,
209 item_count: 0,
210 key_count: 0,
211 key_range: KeyRange::new((vec![].into(), vec![].into())),
212 tombstone_count: 0,
213 range_tombstone_count: 0,
214 uncompressed_size: size_mib * 1_024 * 1_024,
215 seqnos: (0, max_seqno),
216 },
217 cache,
218
219 bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
220
221 path: "a".into(),
222 is_deleted: AtomicBool::default(),
223 }
224 .into()
225 }
226
227 #[test]
228 fn tiered_empty_levels() -> crate::Result<()> {
229 let tempdir = tempfile::tempdir()?;
230
231 let compactor = Strategy {
232 base_size: 8 * 1_024 * 1_024,
233 level_ratio: 8,
234 };
235
236 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
237
238 assert_eq!(
239 compactor.choose(&levels, &Config::default()),
240 Choice::DoNothing
241 );
242
243 Ok(())
244 }
245
246 #[test]
247 fn tiered_default_l0() -> crate::Result<()> {
248 let tempdir = tempfile::tempdir()?;
249
250 let compactor = Strategy {
251 base_size: 8 * 1_024 * 1_024,
252 level_ratio: 4,
253 };
254 let config = Config::default();
255
256 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
257
258 levels.add(fixture_segment(1, 8, 5));
259 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
260
261 levels.add(fixture_segment(2, 8, 6));
262 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
263
264 levels.add(fixture_segment(3, 8, 7));
265 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
266
267 levels.add(fixture_segment(4, 8, 8));
268
269 assert_eq!(
270 compactor.choose(&levels, &config),
271 Choice::Merge(CompactionInput {
272 dest_level: 1,
273 segment_ids: set![1, 2, 3, 4],
274 target_size: u64::MAX,
275 })
276 );
277
278 Ok(())
279 }
280
281 #[test]
282 fn tiered_ordering() -> crate::Result<()> {
283 let tempdir = tempfile::tempdir()?;
284
285 let compactor = Strategy {
286 base_size: 8 * 1_024 * 1_024,
287 level_ratio: 2,
288 };
289 let config = Config::default();
290
291 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
292
293 levels.add(fixture_segment(1, 8, 0));
294 levels.add(fixture_segment(2, 8, 1));
295 levels.add(fixture_segment(3, 8, 2));
296 levels.add(fixture_segment(4, 8, 3));
297
298 assert_eq!(
299 compactor.choose(&levels, &config),
300 Choice::Merge(CompactionInput {
301 dest_level: 1,
302 segment_ids: set![1, 2],
303 target_size: u64::MAX,
304 })
305 );
306
307 Ok(())
308 }
309
310 #[test]
311 fn tiered_more_than_min() -> crate::Result<()> {
312 let tempdir = tempfile::tempdir()?;
313
314 let compactor = Strategy {
315 base_size: 8 * 1_024 * 1_024,
316 level_ratio: 4,
317 };
318 let config = Config::default();
319
320 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
321 levels.add(fixture_segment(1, 8, 5));
322 levels.add(fixture_segment(2, 8, 6));
323 levels.add(fixture_segment(3, 8, 7));
324 levels.add(fixture_segment(4, 8, 8));
325
326 levels.insert_into_level(1, fixture_segment(5, 8 * 4, 9));
327 levels.insert_into_level(1, fixture_segment(6, 8 * 4, 10));
328 levels.insert_into_level(1, fixture_segment(7, 8 * 4, 11));
329 levels.insert_into_level(1, fixture_segment(8, 8 * 4, 12));
330
331 assert_eq!(
332 compactor.choose(&levels, &config),
333 Choice::Merge(CompactionInput {
334 dest_level: 2,
335 segment_ids: set![5, 6, 7, 8],
336 target_size: u64::MAX,
337 })
338 );
339
340 Ok(())
341 }
342
343 #[test]
344 fn tiered_many_segments() -> crate::Result<()> {
345 let tempdir = tempfile::tempdir()?;
346
347 let compactor = Strategy {
348 base_size: 8 * 1_024 * 1_024,
349 level_ratio: 2,
350 };
351 let config = Config::default();
352
353 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
354 levels.add(fixture_segment(1, 8, 5));
355 levels.add(fixture_segment(2, 8, 6));
356 levels.add(fixture_segment(3, 8, 7));
357 levels.add(fixture_segment(4, 8, 8));
358
359 assert_eq!(
360 compactor.choose(&levels, &config),
361 Choice::Merge(CompactionInput {
362 dest_level: 1,
363 segment_ids: set![1, 2],
364 target_size: u64::MAX,
365 })
366 );
367
368 Ok(())
369 }
370
371 #[test]
372 fn tiered_deeper_level() -> crate::Result<()> {
373 let tempdir = tempfile::tempdir()?;
374
375 let compactor = Strategy {
376 base_size: 8 * 1_024 * 1_024,
377 level_ratio: 2,
378 };
379 let config = Config::default();
380
381 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
382 levels.add(fixture_segment(1, 8, 5));
383
384 levels.insert_into_level(1, fixture_segment(2, 8 * 2, 6));
385 levels.insert_into_level(1, fixture_segment(3, 8 * 2, 7));
386
387 assert_eq!(
388 compactor.choose(&levels, &config),
389 Choice::Merge(CompactionInput {
390 dest_level: 2,
391 segment_ids: set![2, 3],
392 target_size: u64::MAX,
393 })
394 );
395
396 let tempdir = tempfile::tempdir()?;
397 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
398
399 levels.insert_into_level(2, fixture_segment(2, 8 * 4, 5));
400 levels.insert_into_level(2, fixture_segment(3, 8 * 4, 6));
401
402 assert_eq!(
403 compactor.choose(&levels, &config),
404 Choice::Merge(CompactionInput {
405 dest_level: 3,
406 segment_ids: set![2, 3],
407 target_size: u64::MAX,
408 })
409 );
410
411 Ok(())
412 }
413
414 #[test]
415 fn tiered_last_level() -> crate::Result<()> {
416 let tempdir = tempfile::tempdir()?;
417
418 let compactor = Strategy {
419 base_size: 8 * 1_024 * 1_024,
420 level_ratio: 2,
421 };
422 let config = Config::default();
423
424 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
425 levels.insert_into_level(3, fixture_segment(2, 8, 5));
426 levels.insert_into_level(3, fixture_segment(3, 8, 5));
427
428 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
429
430 Ok(())
431 }
432}