lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7    compaction::state::CompactionState, config::Config, version::Version, HashSet, KvPair,
8};
9
10/// FIFO-style compaction
11///
12/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
13/// when the threshold is reached.
14///
15/// Will also merge tables if the number of tables in level 0 grows too much, which
16/// could cause write stalls.
17///
18/// Additionally, a (lazy) TTL can be configured to drop old tables.
19///
20/// ###### Caution
21///
22/// Only use it for specific workloads where:
23///
24/// 1) You only want to store recent data (unimportant logs, ...)
25/// 2) Your keyspace grows monotonically (e.g. time series)
26/// 3) You only insert new data (no updates)
27#[derive(Clone)]
28pub struct Strategy {
29    /// Data set size limit in bytes
30    pub limit: u64,
31
32    /// TTL in seconds, will be disabled if 0 or None
33    pub ttl_seconds: Option<u64>,
34}
35
36impl Strategy {
37    /// Configures a new `Fifo` compaction strategy
38    #[must_use]
39    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
40        Self { limit, ttl_seconds }
41    }
42}
43
44impl CompactionStrategy for Strategy {
45    fn get_name(&self) -> &'static str {
46        "FifoCompaction"
47    }
48
49    fn get_config(&self) -> Vec<KvPair> {
50        vec![
51            (
52                crate::UserKey::from("fifo_limit"),
53                crate::UserValue::from(self.limit.to_le_bytes()),
54            ),
55            (
56                crate::UserKey::from("fifo_ttl"),
57                crate::UserValue::from(if self.ttl_seconds.is_some() {
58                    [1u8]
59                } else {
60                    [0u8]
61                }),
62            ),
63            (
64                crate::UserKey::from("fifo_ttl_seconds"),
65                crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
66            ),
67        ]
68    }
69
70    // TODO: 3.0.0 TTL
71    fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
72        let first_level = version.l0();
73
74        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
75
76        assert!(
77            !version.level_is_busy(0, state.hidden_set()),
78            "FIFO compaction never compacts",
79        );
80
81        let db_size = first_level.size() + version.blob_files.on_disk_size();
82        // eprintln!("db_size={db_size}");
83
84        if db_size > self.limit {
85            let overshoot = db_size - self.limit;
86
87            let mut oldest_tables = HashSet::default();
88            let mut collected_bytes = 0;
89
90            for table in first_level.iter().flat_map(|run| run.iter().rev()) {
91                if collected_bytes >= overshoot {
92                    break;
93                }
94
95                oldest_tables.insert(table.id());
96
97                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
98
99                collected_bytes += table.file_size() + linked_blob_file_bytes;
100            }
101
102            eprintln!("DROP {oldest_tables:?}");
103
104            Choice::Drop(oldest_tables)
105        } else {
106            Choice::DoNothing
107        }
108    }
109}
110
111// #[cfg(test)]
112// mod tests {
113//     use test_log::test;
114
115//     #[test]
116//     fn fifo_empty_levels() -> crate::Result<()> {
117//         Ok(())
118//     }
119
120//     #[test]
121//     fn fifo_below_limit() -> crate::Result<()> {
122//         Ok(())
123//     }
124
125//     #[test]
126//     fn fifo_more_than_limit() -> crate::Result<()> {
127//         Ok(())
128//     }
129
130//     #[test]
131//     fn fifo_more_than_limit_blobs() -> crate::Result<()> {
132//         Ok(())
133//     }
134// }
135
136// TODO: restore tests
137/*
138#[cfg(test)]
139mod tests {
140    use super::Strategy;
141    use crate::{
142        cache::Cache,
143        compaction::{Choice, CompactionStrategy},
144        config::Config,
145        descriptor_table::FileDescriptorTable,
146        file::LEVELS_MANIFEST_FILE,
147        level_manifest::LevelManifest,
148        segment::{
149            block::offset::BlockOffset,
150            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
151            file_offsets::FileOffsets,
152            meta::{Metadata, SegmentId},
153            SegmentInner,
154        },
155        super_segment::Segment,
156        time::unix_timestamp,
157        HashSet, KeyRange,
158    };
159    use std::sync::{atomic::AtomicBool, Arc};
160    use test_log::test;
161
162    #[allow(clippy::expect_used)]
163    #[allow(clippy::cast_possible_truncation)]
164    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
165        todo!()
166
167        /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
168
169        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
170        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
171
172        SegmentInner {
173            tree_id: 0,
174            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
175            block_index,
176
177            offsets: FileOffsets {
178                bloom_ptr: BlockOffset(0),
179                range_filter_ptr: BlockOffset(0),
180                index_block_ptr: BlockOffset(0),
181                metadata_ptr: BlockOffset(0),
182                range_tombstones_ptr: BlockOffset(0),
183                tli_ptr: BlockOffset(0),
184                pfx_ptr: BlockOffset(0),
185            },
186
187            metadata: Metadata {
188                data_block_count: 0,
189                index_block_count: 0,
190                data_block_size: 4_096,
191                index_block_size: 4_096,
192                created_at,
193                id,
194                file_size: 1,
195                compression: crate::segment::meta::CompressionType::None,
196                table_type: crate::segment::meta::TableType::Block,
197                item_count: 0,
198                key_count: 0,
199                key_range: KeyRange::new((vec![].into(), vec![].into())),
200                tombstone_count: 0,
201                range_tombstone_count: 0,
202                uncompressed_size: 0,
203                seqnos: (0, created_at as u64),
204            },
205            cache,
206
207            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
208
209            path: "a".into(),
210            is_deleted: AtomicBool::default(),
211        }
212        .into() */
213    }
214
215    #[test]
216    fn fifo_ttl() -> crate::Result<()> {
217        let tempdir = tempfile::tempdir()?;
218        let compactor = Strategy::new(u64::MAX, Some(5_000));
219
220        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
221
222        levels.add(fixture_segment(1, 1));
223        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
224
225        assert_eq!(
226            compactor.choose(&levels, &Config::default()),
227            Choice::Drop(set![1])
228        );
229
230        Ok(())
231    }
232
233    #[test]
234    fn fifo_empty_levels() -> crate::Result<()> {
235        let tempdir = tempfile::tempdir()?;
236        let compactor = Strategy::new(1, None);
237
238        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
239
240        assert_eq!(
241            compactor.choose(&levels, &Config::default()),
242            Choice::DoNothing
243        );
244
245        Ok(())
246    }
247
248    #[test]
249    fn fifo_below_limit() -> crate::Result<()> {
250        let tempdir = tempfile::tempdir()?;
251        let compactor = Strategy::new(4, None);
252
253        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
254
255        levels.add(fixture_segment(1, 1));
256        assert_eq!(
257            compactor.choose(&levels, &Config::default()),
258            Choice::DoNothing
259        );
260
261        levels.add(fixture_segment(2, 2));
262        assert_eq!(
263            compactor.choose(&levels, &Config::default()),
264            Choice::DoNothing
265        );
266
267        levels.add(fixture_segment(3, 3));
268        assert_eq!(
269            compactor.choose(&levels, &Config::default()),
270            Choice::DoNothing
271        );
272
273        levels.add(fixture_segment(4, 4));
274        assert_eq!(
275            compactor.choose(&levels, &Config::default()),
276            Choice::DoNothing
277        );
278
279        Ok(())
280    }
281
282    #[test]
283    fn fifo_more_than_limit() -> crate::Result<()> {
284        let tempdir = tempfile::tempdir()?;
285        let compactor = Strategy::new(2, None);
286
287        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
288        levels.add(fixture_segment(1, 1));
289        levels.add(fixture_segment(2, 2));
290        levels.add(fixture_segment(3, 3));
291        levels.add(fixture_segment(4, 4));
292
293        assert_eq!(
294            compactor.choose(&levels, &Config::default()),
295            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
296        );
297
298        Ok(())
299    }
300}
301 */