lsm_tree/compaction/fifo.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the number of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27 /// Data set size limit in bytes
28 pub limit: u64,
29
30 /// TTL in seconds, will be disabled if 0 or None
31 pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35 /// Configures a new `Fifo` compaction strategy
36 #[must_use]
37 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38 Self { limit, ttl_seconds }
39 }
40}
41
42impl CompactionStrategy for Strategy {
43 fn get_name(&self) -> &'static str {
44 "FifoCompaction"
45 }
46
47 // TODO: TTL
48 fn choose(&self, levels: &LevelManifest, _config: &Config) -> Choice {
49 // NOTE: We always have at least one level
50 #[allow(clippy::expect_used)]
51 let first_level = levels.as_slice().first().expect("should have first level");
52
53 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
54
55 let l0_size = first_level.size();
56
57 if l0_size > self.limit {
58 let overshoot = l0_size - self.limit;
59
60 let mut oldest_segments = HashSet::default();
61 let mut collected_bytes = 0;
62
63 for segment in first_level.iter().flat_map(|run| run.iter().rev()) {
64 if collected_bytes >= overshoot {
65 break;
66 }
67
68 oldest_segments.insert(segment.id());
69 collected_bytes += segment.file_size();
70 }
71
72 Choice::Drop(oldest_segments)
73 } else {
74 Choice::DoNothing
75 }
76 }
77}
78
79// TODO: restore tests
80/*
81#[cfg(test)]
82mod tests {
83 use super::Strategy;
84 use crate::{
85 cache::Cache,
86 compaction::{Choice, CompactionStrategy},
87 config::Config,
88 descriptor_table::FileDescriptorTable,
89 file::LEVELS_MANIFEST_FILE,
90 level_manifest::LevelManifest,
91 segment::{
92 block::offset::BlockOffset,
93 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
94 file_offsets::FileOffsets,
95 meta::{Metadata, SegmentId},
96 SegmentInner,
97 },
98 super_segment::Segment,
99 time::unix_timestamp,
100 HashSet, KeyRange,
101 };
102 use std::sync::{atomic::AtomicBool, Arc};
103 use test_log::test;
104
105 #[allow(clippy::expect_used)]
106 #[allow(clippy::cast_possible_truncation)]
107 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
108 todo!()
109
110 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
111
112 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
113 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
114
115 SegmentInner {
116 tree_id: 0,
117 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
118 block_index,
119
120 offsets: FileOffsets {
121 bloom_ptr: BlockOffset(0),
122 range_filter_ptr: BlockOffset(0),
123 index_block_ptr: BlockOffset(0),
124 metadata_ptr: BlockOffset(0),
125 range_tombstones_ptr: BlockOffset(0),
126 tli_ptr: BlockOffset(0),
127 pfx_ptr: BlockOffset(0),
128 },
129
130 metadata: Metadata {
131 data_block_count: 0,
132 index_block_count: 0,
133 data_block_size: 4_096,
134 index_block_size: 4_096,
135 created_at,
136 id,
137 file_size: 1,
138 compression: crate::segment::meta::CompressionType::None,
139 table_type: crate::segment::meta::TableType::Block,
140 item_count: 0,
141 key_count: 0,
142 key_range: KeyRange::new((vec![].into(), vec![].into())),
143 tombstone_count: 0,
144 range_tombstone_count: 0,
145 uncompressed_size: 0,
146 seqnos: (0, created_at as u64),
147 },
148 cache,
149
150 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
151
152 path: "a".into(),
153 is_deleted: AtomicBool::default(),
154 }
155 .into() */
156 }
157
158 #[test]
159 fn fifo_ttl() -> crate::Result<()> {
160 let tempdir = tempfile::tempdir()?;
161 let compactor = Strategy::new(u64::MAX, Some(5_000));
162
163 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
164
165 levels.add(fixture_segment(1, 1));
166 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
167
168 assert_eq!(
169 compactor.choose(&levels, &Config::default()),
170 Choice::Drop(set![1])
171 );
172
173 Ok(())
174 }
175
176 #[test]
177 fn fifo_empty_levels() -> crate::Result<()> {
178 let tempdir = tempfile::tempdir()?;
179 let compactor = Strategy::new(1, None);
180
181 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
182
183 assert_eq!(
184 compactor.choose(&levels, &Config::default()),
185 Choice::DoNothing
186 );
187
188 Ok(())
189 }
190
191 #[test]
192 fn fifo_below_limit() -> crate::Result<()> {
193 let tempdir = tempfile::tempdir()?;
194 let compactor = Strategy::new(4, None);
195
196 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
197
198 levels.add(fixture_segment(1, 1));
199 assert_eq!(
200 compactor.choose(&levels, &Config::default()),
201 Choice::DoNothing
202 );
203
204 levels.add(fixture_segment(2, 2));
205 assert_eq!(
206 compactor.choose(&levels, &Config::default()),
207 Choice::DoNothing
208 );
209
210 levels.add(fixture_segment(3, 3));
211 assert_eq!(
212 compactor.choose(&levels, &Config::default()),
213 Choice::DoNothing
214 );
215
216 levels.add(fixture_segment(4, 4));
217 assert_eq!(
218 compactor.choose(&levels, &Config::default()),
219 Choice::DoNothing
220 );
221
222 Ok(())
223 }
224
225 #[test]
226 fn fifo_more_than_limit() -> crate::Result<()> {
227 let tempdir = tempfile::tempdir()?;
228 let compactor = Strategy::new(2, None);
229
230 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
231 levels.add(fixture_segment(1, 1));
232 levels.add(fixture_segment(2, 2));
233 levels.add(fixture_segment(3, 3));
234 levels.add(fixture_segment(4, 4));
235
236 assert_eq!(
237 compactor.choose(&levels, &Config::default()),
238 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
239 );
240
241 Ok(())
242 }
243}
244 */