lsm_tree/compaction/fifo.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7 compaction::state::CompactionState, config::Config, version::Version, HashSet, KvPair,
8};
9
10/// FIFO-style compaction
11///
12/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
13/// when the threshold is reached.
14///
15/// Will also merge tables if the number of tables in level 0 grows too much, which
16/// could cause write stalls.
17///
18/// Additionally, a (lazy) TTL can be configured to drop old tables.
19///
20/// ###### Caution
21///
22/// Only use it for specific workloads where:
23///
24/// 1) You only want to store recent data (unimportant logs, ...)
25/// 2) Your keyspace grows monotonically (e.g. time series)
26/// 3) You only insert new data (no updates)
27#[derive(Clone)]
28pub struct Strategy {
29 /// Data set size limit in bytes
30 pub limit: u64,
31
32 /// TTL in seconds, will be disabled if 0 or None
33 pub ttl_seconds: Option<u64>,
34}
35
36impl Strategy {
37 /// Configures a new `Fifo` compaction strategy
38 #[must_use]
39 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
40 Self { limit, ttl_seconds }
41 }
42}
43
44impl CompactionStrategy for Strategy {
45 fn get_name(&self) -> &'static str {
46 "FifoCompaction"
47 }
48
49 fn get_config(&self) -> Vec<KvPair> {
50 vec![
51 (
52 crate::UserKey::from("fifo_limit"),
53 crate::UserValue::from(self.limit.to_le_bytes()),
54 ),
55 (
56 crate::UserKey::from("fifo_ttl"),
57 crate::UserValue::from(if self.ttl_seconds.is_some() {
58 [1u8]
59 } else {
60 [0u8]
61 }),
62 ),
63 (
64 crate::UserKey::from("fifo_ttl_seconds"),
65 crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
66 ),
67 ]
68 }
69
70 // TODO: 3.0.0 TTL
71 fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
72 let first_level = version.l0();
73
74 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
75
76 assert!(
77 !version.level_is_busy(0, state.hidden_set()),
78 "FIFO compaction never compacts",
79 );
80
81 let db_size = first_level.size() + version.blob_files.on_disk_size();
82 // eprintln!("db_size={db_size}");
83
84 if db_size > self.limit {
85 let overshoot = db_size - self.limit;
86
87 let mut oldest_tables = HashSet::default();
88 let mut collected_bytes = 0;
89
90 for table in first_level.iter().flat_map(|run| run.iter().rev()) {
91 if collected_bytes >= overshoot {
92 break;
93 }
94
95 oldest_tables.insert(table.id());
96
97 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
98
99 collected_bytes += table.file_size() + linked_blob_file_bytes;
100 }
101
102 eprintln!("DROP {oldest_tables:?}");
103
104 Choice::Drop(oldest_tables)
105 } else {
106 Choice::DoNothing
107 }
108 }
109}
110
111// #[cfg(test)]
112// mod tests {
113// use test_log::test;
114
115// #[test]
116// fn fifo_empty_levels() -> crate::Result<()> {
117// Ok(())
118// }
119
120// #[test]
121// fn fifo_below_limit() -> crate::Result<()> {
122// Ok(())
123// }
124
125// #[test]
126// fn fifo_more_than_limit() -> crate::Result<()> {
127// Ok(())
128// }
129
130// #[test]
131// fn fifo_more_than_limit_blobs() -> crate::Result<()> {
132// Ok(())
133// }
134// }
135
136// TODO: restore tests
137/*
138#[cfg(test)]
139mod tests {
140 use super::Strategy;
141 use crate::{
142 cache::Cache,
143 compaction::{Choice, CompactionStrategy},
144 config::Config,
145 descriptor_table::FileDescriptorTable,
146 file::LEVELS_MANIFEST_FILE,
147 level_manifest::LevelManifest,
148 segment::{
149 block::offset::BlockOffset,
150 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
151 file_offsets::FileOffsets,
152 meta::{Metadata, SegmentId},
153 SegmentInner,
154 },
155 super_segment::Segment,
156 time::unix_timestamp,
157 HashSet, KeyRange,
158 };
159 use std::sync::{atomic::AtomicBool, Arc};
160 use test_log::test;
161
162 #[allow(clippy::expect_used)]
163 #[allow(clippy::cast_possible_truncation)]
164 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
165 todo!()
166
167 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
168
169 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
170 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
171
172 SegmentInner {
173 tree_id: 0,
174 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
175 block_index,
176
177 offsets: FileOffsets {
178 bloom_ptr: BlockOffset(0),
179 range_filter_ptr: BlockOffset(0),
180 index_block_ptr: BlockOffset(0),
181 metadata_ptr: BlockOffset(0),
182 range_tombstones_ptr: BlockOffset(0),
183 tli_ptr: BlockOffset(0),
184 pfx_ptr: BlockOffset(0),
185 },
186
187 metadata: Metadata {
188 data_block_count: 0,
189 index_block_count: 0,
190 data_block_size: 4_096,
191 index_block_size: 4_096,
192 created_at,
193 id,
194 file_size: 1,
195 compression: crate::segment::meta::CompressionType::None,
196 table_type: crate::segment::meta::TableType::Block,
197 item_count: 0,
198 key_count: 0,
199 key_range: KeyRange::new((vec![].into(), vec![].into())),
200 tombstone_count: 0,
201 range_tombstone_count: 0,
202 uncompressed_size: 0,
203 seqnos: (0, created_at as u64),
204 },
205 cache,
206
207 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
208
209 path: "a".into(),
210 is_deleted: AtomicBool::default(),
211 }
212 .into() */
213 }
214
215 #[test]
216 fn fifo_ttl() -> crate::Result<()> {
217 let tempdir = tempfile::tempdir()?;
218 let compactor = Strategy::new(u64::MAX, Some(5_000));
219
220 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
221
222 levels.add(fixture_segment(1, 1));
223 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
224
225 assert_eq!(
226 compactor.choose(&levels, &Config::default()),
227 Choice::Drop(set![1])
228 );
229
230 Ok(())
231 }
232
233 #[test]
234 fn fifo_empty_levels() -> crate::Result<()> {
235 let tempdir = tempfile::tempdir()?;
236 let compactor = Strategy::new(1, None);
237
238 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
239
240 assert_eq!(
241 compactor.choose(&levels, &Config::default()),
242 Choice::DoNothing
243 );
244
245 Ok(())
246 }
247
248 #[test]
249 fn fifo_below_limit() -> crate::Result<()> {
250 let tempdir = tempfile::tempdir()?;
251 let compactor = Strategy::new(4, None);
252
253 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
254
255 levels.add(fixture_segment(1, 1));
256 assert_eq!(
257 compactor.choose(&levels, &Config::default()),
258 Choice::DoNothing
259 );
260
261 levels.add(fixture_segment(2, 2));
262 assert_eq!(
263 compactor.choose(&levels, &Config::default()),
264 Choice::DoNothing
265 );
266
267 levels.add(fixture_segment(3, 3));
268 assert_eq!(
269 compactor.choose(&levels, &Config::default()),
270 Choice::DoNothing
271 );
272
273 levels.add(fixture_segment(4, 4));
274 assert_eq!(
275 compactor.choose(&levels, &Config::default()),
276 Choice::DoNothing
277 );
278
279 Ok(())
280 }
281
282 #[test]
283 fn fifo_more_than_limit() -> crate::Result<()> {
284 let tempdir = tempfile::tempdir()?;
285 let compactor = Strategy::new(2, None);
286
287 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
288 levels.add(fixture_segment(1, 1));
289 levels.add(fixture_segment(2, 2));
290 levels.add(fixture_segment(3, 3));
291 levels.add(fixture_segment(4, 4));
292
293 assert_eq!(
294 compactor.choose(&levels, &Config::default()),
295 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
296 );
297
298 Ok(())
299 }
300}
301 */