1use super::{Choice, CompactionStrategy, Input as CompactionInput};
6use crate::{level_manifest::LevelManifest, Config, Segment};
7
8fn desired_level_size_in_bytes(level_idx: u8, ratio: u8, base_size: u32) -> usize {
9 (ratio as usize).pow(u32::from(level_idx + 1)) * (base_size as usize)
10}
11
12#[derive(Clone)]
18pub struct Strategy {
19 pub base_size: u32,
21
22 #[allow(clippy::doc_markdown)]
29 pub level_ratio: u8,
30}
31
32impl Strategy {
33 #[must_use]
35 pub fn new(base_size: u32, level_ratio: u8) -> Self {
36 Self {
37 base_size,
38 level_ratio,
39 }
40 }
41}
42
43impl Default for Strategy {
44 fn default() -> Self {
45 Self {
46 base_size: 64 * 1_024 * 1_024,
47 level_ratio: 4,
48 }
49 }
50}
51
52impl CompactionStrategy for Strategy {
53 fn get_name(&self) -> &'static str {
54 "TieredStrategy"
55 }
56
57 fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
58 let resolved_view = levels.resolved_view();
59
60 for (curr_level_index, level) in resolved_view
61 .iter()
62 .enumerate()
63 .take(resolved_view.len() - 1)
64 .rev()
65 {
66 #[allow(clippy::cast_possible_truncation)]
68 let curr_level_index = curr_level_index as u8;
69
70 let next_level_index = curr_level_index + 1;
71
72 if level.is_empty() {
73 continue;
74 }
75
76 let level_size: u64 = level
77 .segments
78 .iter()
79 .filter(|x| !levels.hidden_set().is_hidden(x.id()))
82 .map(|x| x.metadata.file_size)
83 .sum();
84
85 let desired_bytes =
86 desired_level_size_in_bytes(curr_level_index, self.level_ratio, self.base_size)
87 as u64;
88
89 if level_size >= desired_bytes {
90 let mut overshoot = desired_bytes;
93
94 let mut segments_to_compact = vec![];
95
96 for segment in level.iter().rev().take(self.level_ratio.into()).cloned() {
97 if overshoot == 0 {
98 break;
99 }
100
101 overshoot = overshoot.saturating_sub(segment.metadata.file_size);
102 segments_to_compact.push(segment);
103 }
104
105 let segment_ids = segments_to_compact.iter().map(Segment::id).collect();
106
107 return Choice::Merge(CompactionInput {
108 segment_ids,
109 dest_level: next_level_index,
110 target_size: u64::MAX,
111 });
112 }
113 }
114
115 super::maintenance::Strategy.choose(levels, config)
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::Strategy;
132 use crate::{
133 block_cache::BlockCache,
134 bloom::BloomFilter,
135 compaction::{Choice, CompactionStrategy, Input as CompactionInput},
136 config::Config,
137 descriptor_table::FileDescriptorTable,
138 file::LEVELS_MANIFEST_FILE,
139 key_range::KeyRange,
140 level_manifest::LevelManifest,
141 segment::{
142 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
143 file_offsets::FileOffsets,
144 meta::{Metadata, SegmentId},
145 value_block::BlockOffset,
146 Segment, SegmentInner,
147 },
148 HashSet, SeqNo,
149 };
150 use std::sync::Arc;
151 use test_log::test;
152
153 #[allow(clippy::expect_used)]
154 fn fixture_segment(id: SegmentId, size_mib: u64, max_seqno: SeqNo) -> Segment {
155 let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
156
157 let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
158 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
159
160 SegmentInner {
161 tree_id: 0,
162 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
163 block_index,
164
165 offsets: FileOffsets {
166 bloom_ptr: BlockOffset(0),
167 range_filter_ptr: BlockOffset(0),
168 index_block_ptr: BlockOffset(0),
169 metadata_ptr: BlockOffset(0),
170 range_tombstones_ptr: BlockOffset(0),
171 tli_ptr: BlockOffset(0),
172 pfx_ptr: BlockOffset(0),
173 },
174
175 metadata: Metadata {
176 data_block_count: 0,
177 index_block_count: 0,
178 data_block_size: 4_096,
179 index_block_size: 4_096,
180 created_at: 0,
181 id,
182 file_size: size_mib * 1_024 * 1_024,
183 compression: crate::segment::meta::CompressionType::None,
184 table_type: crate::segment::meta::TableType::Block,
185 item_count: 0,
186 key_count: 0,
187 key_range: KeyRange::new((vec![].into(), vec![].into())),
188 tombstone_count: 0,
189 range_tombstone_count: 0,
190 uncompressed_size: size_mib * 1_024 * 1_024,
191 seqnos: (0, max_seqno),
192 },
193 block_cache,
194
195 bloom_filter: Some(BloomFilter::with_fp_rate(1, 0.1)),
196 }
197 .into()
198 }
199
200 #[test]
201 fn tiered_empty_levels() -> crate::Result<()> {
202 let tempdir = tempfile::tempdir()?;
203
204 let compactor = Strategy {
205 base_size: 8 * 1_024 * 1_024,
206 level_ratio: 8,
207 };
208
209 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
210
211 assert_eq!(
212 compactor.choose(&levels, &Config::default()),
213 Choice::DoNothing
214 );
215
216 Ok(())
217 }
218
219 #[test]
220 fn tiered_default_l0() -> crate::Result<()> {
221 let tempdir = tempfile::tempdir()?;
222
223 let compactor = Strategy {
224 base_size: 8 * 1_024 * 1_024,
225 level_ratio: 4,
226 };
227 let config = Config::default();
228
229 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
230
231 levels.add(fixture_segment(1, 8, 5));
232 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
233
234 levels.add(fixture_segment(2, 8, 6));
235 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
236
237 levels.add(fixture_segment(3, 8, 7));
238 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
239
240 levels.add(fixture_segment(4, 8, 8));
241
242 assert_eq!(
243 compactor.choose(&levels, &config),
244 Choice::Merge(CompactionInput {
245 dest_level: 1,
246 segment_ids: set![1, 2, 3, 4],
247 target_size: u64::MAX,
248 })
249 );
250
251 Ok(())
252 }
253
254 #[test]
255 fn tiered_ordering() -> crate::Result<()> {
256 let tempdir = tempfile::tempdir()?;
257
258 let compactor = Strategy {
259 base_size: 8 * 1_024 * 1_024,
260 level_ratio: 2,
261 };
262 let config = Config::default();
263
264 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
265
266 levels.add(fixture_segment(1, 8, 0));
267 levels.add(fixture_segment(2, 8, 1));
268 levels.add(fixture_segment(3, 8, 2));
269 levels.add(fixture_segment(4, 8, 3));
270
271 assert_eq!(
272 compactor.choose(&levels, &config),
273 Choice::Merge(CompactionInput {
274 dest_level: 1,
275 segment_ids: set![1, 2],
276 target_size: u64::MAX,
277 })
278 );
279
280 Ok(())
281 }
282
283 #[test]
284 fn tiered_more_than_min() -> crate::Result<()> {
285 let tempdir = tempfile::tempdir()?;
286
287 let compactor = Strategy {
288 base_size: 8 * 1_024 * 1_024,
289 level_ratio: 4,
290 };
291 let config = Config::default();
292
293 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
294 levels.add(fixture_segment(1, 8, 5));
295 levels.add(fixture_segment(2, 8, 6));
296 levels.add(fixture_segment(3, 8, 7));
297 levels.add(fixture_segment(4, 8, 8));
298
299 levels.insert_into_level(1, fixture_segment(5, 8 * 4, 9));
300 levels.insert_into_level(1, fixture_segment(6, 8 * 4, 10));
301 levels.insert_into_level(1, fixture_segment(7, 8 * 4, 11));
302 levels.insert_into_level(1, fixture_segment(8, 8 * 4, 12));
303
304 assert_eq!(
305 compactor.choose(&levels, &config),
306 Choice::Merge(CompactionInput {
307 dest_level: 2,
308 segment_ids: set![5, 6, 7, 8],
309 target_size: u64::MAX,
310 })
311 );
312
313 Ok(())
314 }
315
316 #[test]
317 fn tiered_many_segments() -> crate::Result<()> {
318 let tempdir = tempfile::tempdir()?;
319
320 let compactor = Strategy {
321 base_size: 8 * 1_024 * 1_024,
322 level_ratio: 2,
323 };
324 let config = Config::default();
325
326 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
327 levels.add(fixture_segment(1, 8, 5));
328 levels.add(fixture_segment(2, 8, 6));
329 levels.add(fixture_segment(3, 8, 7));
330 levels.add(fixture_segment(4, 8, 8));
331
332 assert_eq!(
333 compactor.choose(&levels, &config),
334 Choice::Merge(CompactionInput {
335 dest_level: 1,
336 segment_ids: set![1, 2],
337 target_size: u64::MAX,
338 })
339 );
340
341 Ok(())
342 }
343
344 #[test]
345 fn tiered_deeper_level() -> crate::Result<()> {
346 let tempdir = tempfile::tempdir()?;
347
348 let compactor = Strategy {
349 base_size: 8 * 1_024 * 1_024,
350 level_ratio: 2,
351 };
352 let config = Config::default();
353
354 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
355 levels.add(fixture_segment(1, 8, 5));
356
357 levels.insert_into_level(1, fixture_segment(2, 8 * 2, 6));
358 levels.insert_into_level(1, fixture_segment(3, 8 * 2, 7));
359
360 assert_eq!(
361 compactor.choose(&levels, &config),
362 Choice::Merge(CompactionInput {
363 dest_level: 2,
364 segment_ids: set![2, 3],
365 target_size: u64::MAX,
366 })
367 );
368
369 let tempdir = tempfile::tempdir()?;
370 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
371
372 levels.insert_into_level(2, fixture_segment(2, 8 * 4, 5));
373 levels.insert_into_level(2, fixture_segment(3, 8 * 4, 6));
374
375 assert_eq!(
376 compactor.choose(&levels, &config),
377 Choice::Merge(CompactionInput {
378 dest_level: 3,
379 segment_ids: set![2, 3],
380 target_size: u64::MAX,
381 })
382 );
383
384 Ok(())
385 }
386
387 #[test]
388 fn tiered_last_level() -> crate::Result<()> {
389 let tempdir = tempfile::tempdir()?;
390
391 let compactor = Strategy {
392 base_size: 8 * 1_024 * 1_024,
393 level_ratio: 2,
394 };
395 let config = Config::default();
396
397 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
398 levels.insert_into_level(3, fixture_segment(2, 8, 5));
399 levels.insert_into_level(3, fixture_segment(3, 8, 5));
400
401 assert_eq!(compactor.choose(&levels, &config), Choice::DoNothing);
402
403 Ok(())
404 }
405}