lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7    compaction::state::CompactionState, config::Config, version::Version, HashSet, KvPair,
8};
9
10/// FIFO-style compaction
11///
12/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
13/// when the threshold is reached.
14///
15/// Will also merge tables if the number of tables in level 0 grows too much, which
16/// could cause write stalls.
17///
18/// Additionally, a (lazy) TTL can be configured to drop old tables.
19///
20/// ###### Caution
21///
22/// Only use it for specific workloads where:
23///
24/// 1) You only want to store recent data (unimportant logs, ...)
25/// 2) Your keyspace grows monotonically (e.g. time series)
26/// 3) You only insert new data (no updates)
27#[derive(Clone)]
28pub struct Strategy {
29    /// Data set size limit in bytes
30    pub limit: u64,
31
32    /// TTL in seconds, will be disabled if 0 or None
33    pub ttl_seconds: Option<u64>,
34}
35
36impl Strategy {
37    /// Configures a new `Fifo` compaction strategy
38    #[must_use]
39    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
40        Self { limit, ttl_seconds }
41    }
42}
43
44impl CompactionStrategy for Strategy {
45    fn get_name(&self) -> &'static str {
46        "FifoCompaction"
47    }
48
49    fn get_config(&self) -> Vec<KvPair> {
50        vec![
51            (
52                crate::UserKey::from("fifo_limit"),
53                crate::UserValue::from(self.limit.to_le_bytes()),
54            ),
55            (
56                crate::UserKey::from("fifo_ttl"),
57                crate::UserValue::from(if self.ttl_seconds.is_some() {
58                    [1u8]
59                } else {
60                    [0u8]
61                }),
62            ),
63            (
64                crate::UserKey::from("fifo_ttl_seconds"),
65                crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
66            ),
67        ]
68    }
69
70    // TODO: 3.0.0 TTL
71    fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
72        // NOTE: We always have at least one level
73        #[allow(clippy::expect_used)]
74        let first_level = version.l0();
75
76        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
77
78        assert!(
79            !version.level_is_busy(0, state.hidden_set()),
80            "FIFO compaction never compacts",
81        );
82
83        let db_size = first_level.size() + version.blob_files.on_disk_size();
84        // eprintln!("db_size={db_size}");
85
86        if db_size > self.limit {
87            let overshoot = db_size - self.limit;
88
89            let mut oldest_tables = HashSet::default();
90            let mut collected_bytes = 0;
91
92            for table in first_level.iter().flat_map(|run| run.iter().rev()) {
93                if collected_bytes >= overshoot {
94                    break;
95                }
96
97                oldest_tables.insert(table.id());
98
99                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
100
101                collected_bytes += table.file_size() + linked_blob_file_bytes;
102            }
103
104            eprintln!("DROP {oldest_tables:?}");
105
106            Choice::Drop(oldest_tables)
107        } else {
108            Choice::DoNothing
109        }
110    }
111}
112
113// #[cfg(test)]
114// mod tests {
115//     use test_log::test;
116
117//     #[test]
118//     fn fifo_empty_levels() -> crate::Result<()> {
119//         Ok(())
120//     }
121
122//     #[test]
123//     fn fifo_below_limit() -> crate::Result<()> {
124//         Ok(())
125//     }
126
127//     #[test]
128//     fn fifo_more_than_limit() -> crate::Result<()> {
129//         Ok(())
130//     }
131
132//     #[test]
133//     fn fifo_more_than_limit_blobs() -> crate::Result<()> {
134//         Ok(())
135//     }
136// }
137
138// TODO: restore tests
139/*
140#[cfg(test)]
141mod tests {
142    use super::Strategy;
143    use crate::{
144        cache::Cache,
145        compaction::{Choice, CompactionStrategy},
146        config::Config,
147        descriptor_table::FileDescriptorTable,
148        file::LEVELS_MANIFEST_FILE,
149        level_manifest::LevelManifest,
150        segment::{
151            block::offset::BlockOffset,
152            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
153            file_offsets::FileOffsets,
154            meta::{Metadata, SegmentId},
155            SegmentInner,
156        },
157        super_segment::Segment,
158        time::unix_timestamp,
159        HashSet, KeyRange,
160    };
161    use std::sync::{atomic::AtomicBool, Arc};
162    use test_log::test;
163
164    #[allow(clippy::expect_used)]
165    #[allow(clippy::cast_possible_truncation)]
166    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
167        todo!()
168
169        /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
170
171        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
172        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
173
174        SegmentInner {
175            tree_id: 0,
176            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
177            block_index,
178
179            offsets: FileOffsets {
180                bloom_ptr: BlockOffset(0),
181                range_filter_ptr: BlockOffset(0),
182                index_block_ptr: BlockOffset(0),
183                metadata_ptr: BlockOffset(0),
184                range_tombstones_ptr: BlockOffset(0),
185                tli_ptr: BlockOffset(0),
186                pfx_ptr: BlockOffset(0),
187            },
188
189            metadata: Metadata {
190                data_block_count: 0,
191                index_block_count: 0,
192                data_block_size: 4_096,
193                index_block_size: 4_096,
194                created_at,
195                id,
196                file_size: 1,
197                compression: crate::segment::meta::CompressionType::None,
198                table_type: crate::segment::meta::TableType::Block,
199                item_count: 0,
200                key_count: 0,
201                key_range: KeyRange::new((vec![].into(), vec![].into())),
202                tombstone_count: 0,
203                range_tombstone_count: 0,
204                uncompressed_size: 0,
205                seqnos: (0, created_at as u64),
206            },
207            cache,
208
209            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
210
211            path: "a".into(),
212            is_deleted: AtomicBool::default(),
213        }
214        .into() */
215    }
216
217    #[test]
218    fn fifo_ttl() -> crate::Result<()> {
219        let tempdir = tempfile::tempdir()?;
220        let compactor = Strategy::new(u64::MAX, Some(5_000));
221
222        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
223
224        levels.add(fixture_segment(1, 1));
225        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
226
227        assert_eq!(
228            compactor.choose(&levels, &Config::default()),
229            Choice::Drop(set![1])
230        );
231
232        Ok(())
233    }
234
235    #[test]
236    fn fifo_empty_levels() -> crate::Result<()> {
237        let tempdir = tempfile::tempdir()?;
238        let compactor = Strategy::new(1, None);
239
240        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
241
242        assert_eq!(
243            compactor.choose(&levels, &Config::default()),
244            Choice::DoNothing
245        );
246
247        Ok(())
248    }
249
250    #[test]
251    fn fifo_below_limit() -> crate::Result<()> {
252        let tempdir = tempfile::tempdir()?;
253        let compactor = Strategy::new(4, None);
254
255        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
256
257        levels.add(fixture_segment(1, 1));
258        assert_eq!(
259            compactor.choose(&levels, &Config::default()),
260            Choice::DoNothing
261        );
262
263        levels.add(fixture_segment(2, 2));
264        assert_eq!(
265            compactor.choose(&levels, &Config::default()),
266            Choice::DoNothing
267        );
268
269        levels.add(fixture_segment(3, 3));
270        assert_eq!(
271            compactor.choose(&levels, &Config::default()),
272            Choice::DoNothing
273        );
274
275        levels.add(fixture_segment(4, 4));
276        assert_eq!(
277            compactor.choose(&levels, &Config::default()),
278            Choice::DoNothing
279        );
280
281        Ok(())
282    }
283
284    #[test]
285    fn fifo_more_than_limit() -> crate::Result<()> {
286        let tempdir = tempfile::tempdir()?;
287        let compactor = Strategy::new(2, None);
288
289        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
290        levels.add(fixture_segment(1, 1));
291        levels.add(fixture_segment(2, 2));
292        levels.add(fixture_segment(3, 3));
293        levels.add(fixture_segment(4, 4));
294
295        assert_eq!(
296            compactor.choose(&levels, &Config::default()),
297            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
298        );
299
300        Ok(())
301    }
302}
303 */