lsm_tree/compaction/fifo.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7 compaction::state::CompactionState, config::Config, version::Version, HashSet, KvPair,
8};
9
10/// FIFO-style compaction
11///
12/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
13/// when the threshold is reached.
14///
15/// Will also merge tables if the number of tables in level 0 grows too much, which
16/// could cause write stalls.
17///
18/// Additionally, a (lazy) TTL can be configured to drop old tables.
19///
20/// ###### Caution
21///
22/// Only use it for specific workloads where:
23///
24/// 1) You only want to store recent data (unimportant logs, ...)
25/// 2) Your keyspace grows monotonically (e.g. time series)
26/// 3) You only insert new data (no updates)
27#[derive(Clone)]
28pub struct Strategy {
29 /// Data set size limit in bytes
30 pub limit: u64,
31
32 /// TTL in seconds, will be disabled if 0 or None
33 pub ttl_seconds: Option<u64>,
34}
35
36impl Strategy {
37 /// Configures a new `Fifo` compaction strategy
38 #[must_use]
39 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
40 Self { limit, ttl_seconds }
41 }
42}
43
44impl CompactionStrategy for Strategy {
45 fn get_name(&self) -> &'static str {
46 "FifoCompaction"
47 }
48
49 fn get_config(&self) -> Vec<KvPair> {
50 vec![
51 (
52 crate::UserKey::from("fifo_limit"),
53 crate::UserValue::from(self.limit.to_le_bytes()),
54 ),
55 (
56 crate::UserKey::from("fifo_ttl"),
57 crate::UserValue::from(if self.ttl_seconds.is_some() {
58 [1u8]
59 } else {
60 [0u8]
61 }),
62 ),
63 (
64 crate::UserKey::from("fifo_ttl_seconds"),
65 crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
66 ),
67 ]
68 }
69
70 // TODO: 3.0.0 TTL
71 fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
72 // NOTE: We always have at least one level
73 #[allow(clippy::expect_used)]
74 let first_level = version.l0();
75
76 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
77
78 assert!(
79 !version.level_is_busy(0, state.hidden_set()),
80 "FIFO compaction never compacts",
81 );
82
83 let db_size = first_level.size() + version.blob_files.on_disk_size();
84 // eprintln!("db_size={db_size}");
85
86 if db_size > self.limit {
87 let overshoot = db_size - self.limit;
88
89 let mut oldest_tables = HashSet::default();
90 let mut collected_bytes = 0;
91
92 for table in first_level.iter().flat_map(|run| run.iter().rev()) {
93 if collected_bytes >= overshoot {
94 break;
95 }
96
97 oldest_tables.insert(table.id());
98
99 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
100
101 collected_bytes += table.file_size() + linked_blob_file_bytes;
102 }
103
104 eprintln!("DROP {oldest_tables:?}");
105
106 Choice::Drop(oldest_tables)
107 } else {
108 Choice::DoNothing
109 }
110 }
111}
112
113// #[cfg(test)]
114// mod tests {
115// use test_log::test;
116
117// #[test]
118// fn fifo_empty_levels() -> crate::Result<()> {
119// Ok(())
120// }
121
122// #[test]
123// fn fifo_below_limit() -> crate::Result<()> {
124// Ok(())
125// }
126
127// #[test]
128// fn fifo_more_than_limit() -> crate::Result<()> {
129// Ok(())
130// }
131
132// #[test]
133// fn fifo_more_than_limit_blobs() -> crate::Result<()> {
134// Ok(())
135// }
136// }
137
138// TODO: restore tests
139/*
140#[cfg(test)]
141mod tests {
142 use super::Strategy;
143 use crate::{
144 cache::Cache,
145 compaction::{Choice, CompactionStrategy},
146 config::Config,
147 descriptor_table::FileDescriptorTable,
148 file::LEVELS_MANIFEST_FILE,
149 level_manifest::LevelManifest,
150 segment::{
151 block::offset::BlockOffset,
152 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
153 file_offsets::FileOffsets,
154 meta::{Metadata, SegmentId},
155 SegmentInner,
156 },
157 super_segment::Segment,
158 time::unix_timestamp,
159 HashSet, KeyRange,
160 };
161 use std::sync::{atomic::AtomicBool, Arc};
162 use test_log::test;
163
164 #[allow(clippy::expect_used)]
165 #[allow(clippy::cast_possible_truncation)]
166 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
167 todo!()
168
169 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
170
171 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
172 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
173
174 SegmentInner {
175 tree_id: 0,
176 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
177 block_index,
178
179 offsets: FileOffsets {
180 bloom_ptr: BlockOffset(0),
181 range_filter_ptr: BlockOffset(0),
182 index_block_ptr: BlockOffset(0),
183 metadata_ptr: BlockOffset(0),
184 range_tombstones_ptr: BlockOffset(0),
185 tli_ptr: BlockOffset(0),
186 pfx_ptr: BlockOffset(0),
187 },
188
189 metadata: Metadata {
190 data_block_count: 0,
191 index_block_count: 0,
192 data_block_size: 4_096,
193 index_block_size: 4_096,
194 created_at,
195 id,
196 file_size: 1,
197 compression: crate::segment::meta::CompressionType::None,
198 table_type: crate::segment::meta::TableType::Block,
199 item_count: 0,
200 key_count: 0,
201 key_range: KeyRange::new((vec![].into(), vec![].into())),
202 tombstone_count: 0,
203 range_tombstone_count: 0,
204 uncompressed_size: 0,
205 seqnos: (0, created_at as u64),
206 },
207 cache,
208
209 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
210
211 path: "a".into(),
212 is_deleted: AtomicBool::default(),
213 }
214 .into() */
215 }
216
217 #[test]
218 fn fifo_ttl() -> crate::Result<()> {
219 let tempdir = tempfile::tempdir()?;
220 let compactor = Strategy::new(u64::MAX, Some(5_000));
221
222 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
223
224 levels.add(fixture_segment(1, 1));
225 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
226
227 assert_eq!(
228 compactor.choose(&levels, &Config::default()),
229 Choice::Drop(set![1])
230 );
231
232 Ok(())
233 }
234
235 #[test]
236 fn fifo_empty_levels() -> crate::Result<()> {
237 let tempdir = tempfile::tempdir()?;
238 let compactor = Strategy::new(1, None);
239
240 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
241
242 assert_eq!(
243 compactor.choose(&levels, &Config::default()),
244 Choice::DoNothing
245 );
246
247 Ok(())
248 }
249
250 #[test]
251 fn fifo_below_limit() -> crate::Result<()> {
252 let tempdir = tempfile::tempdir()?;
253 let compactor = Strategy::new(4, None);
254
255 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
256
257 levels.add(fixture_segment(1, 1));
258 assert_eq!(
259 compactor.choose(&levels, &Config::default()),
260 Choice::DoNothing
261 );
262
263 levels.add(fixture_segment(2, 2));
264 assert_eq!(
265 compactor.choose(&levels, &Config::default()),
266 Choice::DoNothing
267 );
268
269 levels.add(fixture_segment(3, 3));
270 assert_eq!(
271 compactor.choose(&levels, &Config::default()),
272 Choice::DoNothing
273 );
274
275 levels.add(fixture_segment(4, 4));
276 assert_eq!(
277 compactor.choose(&levels, &Config::default()),
278 Choice::DoNothing
279 );
280
281 Ok(())
282 }
283
284 #[test]
285 fn fifo_more_than_limit() -> crate::Result<()> {
286 let tempdir = tempfile::tempdir()?;
287 let compactor = Strategy::new(2, None);
288
289 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
290 levels.add(fixture_segment(1, 1));
291 levels.add(fixture_segment(2, 2));
292 levels.add(fixture_segment(3, 3));
293 levels.add(fixture_segment(4, 4));
294
295 assert_eq!(
296 compactor.choose(&levels, &Config::default()),
297 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
298 );
299
300 Ok(())
301 }
302}
303 */