lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the number of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27    /// Data set size limit in bytes
28    pub limit: u64,
29
30    /// TTL in seconds, will be disabled if 0 or None
31    pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35    /// Configures a new `Fifo` compaction strategy
36    #[must_use]
37    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38        Self { limit, ttl_seconds }
39    }
40}
41
42impl CompactionStrategy for Strategy {
43    fn get_name(&self) -> &'static str {
44        "FifoCompaction"
45    }
46
47    // TODO: TTL
48    fn choose(&self, levels: &LevelManifest, _config: &Config) -> Choice {
49        // NOTE: We always have at least one level
50        #[allow(clippy::expect_used)]
51        let first_level = levels.as_slice().first().expect("should have first level");
52
53        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
54
55        let l0_size = first_level.size();
56
57        if l0_size > self.limit {
58            let overshoot = l0_size - self.limit;
59
60            let mut oldest_segments = HashSet::default();
61            let mut collected_bytes = 0;
62
63            for segment in first_level.iter().flat_map(|run| run.iter().rev()) {
64                if collected_bytes >= overshoot {
65                    break;
66                }
67
68                oldest_segments.insert(segment.id());
69                collected_bytes += segment.file_size();
70            }
71
72            Choice::Drop(oldest_segments)
73        } else {
74            Choice::DoNothing
75        }
76    }
77}
78
79// TODO: restore tests
80/*
81#[cfg(test)]
82mod tests {
83    use super::Strategy;
84    use crate::{
85        cache::Cache,
86        compaction::{Choice, CompactionStrategy},
87        config::Config,
88        descriptor_table::FileDescriptorTable,
89        file::LEVELS_MANIFEST_FILE,
90        level_manifest::LevelManifest,
91        segment::{
92            block::offset::BlockOffset,
93            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
94            file_offsets::FileOffsets,
95            meta::{Metadata, SegmentId},
96            SegmentInner,
97        },
98        super_segment::Segment,
99        time::unix_timestamp,
100        HashSet, KeyRange,
101    };
102    use std::sync::{atomic::AtomicBool, Arc};
103    use test_log::test;
104
105    #[allow(clippy::expect_used)]
106    #[allow(clippy::cast_possible_truncation)]
107    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
108        todo!()
109
110        /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
111
112        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
113        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
114
115        SegmentInner {
116            tree_id: 0,
117            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
118            block_index,
119
120            offsets: FileOffsets {
121                bloom_ptr: BlockOffset(0),
122                range_filter_ptr: BlockOffset(0),
123                index_block_ptr: BlockOffset(0),
124                metadata_ptr: BlockOffset(0),
125                range_tombstones_ptr: BlockOffset(0),
126                tli_ptr: BlockOffset(0),
127                pfx_ptr: BlockOffset(0),
128            },
129
130            metadata: Metadata {
131                data_block_count: 0,
132                index_block_count: 0,
133                data_block_size: 4_096,
134                index_block_size: 4_096,
135                created_at,
136                id,
137                file_size: 1,
138                compression: crate::segment::meta::CompressionType::None,
139                table_type: crate::segment::meta::TableType::Block,
140                item_count: 0,
141                key_count: 0,
142                key_range: KeyRange::new((vec![].into(), vec![].into())),
143                tombstone_count: 0,
144                range_tombstone_count: 0,
145                uncompressed_size: 0,
146                seqnos: (0, created_at as u64),
147            },
148            cache,
149
150            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
151
152            path: "a".into(),
153            is_deleted: AtomicBool::default(),
154        }
155        .into() */
156    }
157
158    #[test]
159    fn fifo_ttl() -> crate::Result<()> {
160        let tempdir = tempfile::tempdir()?;
161        let compactor = Strategy::new(u64::MAX, Some(5_000));
162
163        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
164
165        levels.add(fixture_segment(1, 1));
166        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
167
168        assert_eq!(
169            compactor.choose(&levels, &Config::default()),
170            Choice::Drop(set![1])
171        );
172
173        Ok(())
174    }
175
176    #[test]
177    fn fifo_empty_levels() -> crate::Result<()> {
178        let tempdir = tempfile::tempdir()?;
179        let compactor = Strategy::new(1, None);
180
181        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
182
183        assert_eq!(
184            compactor.choose(&levels, &Config::default()),
185            Choice::DoNothing
186        );
187
188        Ok(())
189    }
190
191    #[test]
192    fn fifo_below_limit() -> crate::Result<()> {
193        let tempdir = tempfile::tempdir()?;
194        let compactor = Strategy::new(4, None);
195
196        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
197
198        levels.add(fixture_segment(1, 1));
199        assert_eq!(
200            compactor.choose(&levels, &Config::default()),
201            Choice::DoNothing
202        );
203
204        levels.add(fixture_segment(2, 2));
205        assert_eq!(
206            compactor.choose(&levels, &Config::default()),
207            Choice::DoNothing
208        );
209
210        levels.add(fixture_segment(3, 3));
211        assert_eq!(
212            compactor.choose(&levels, &Config::default()),
213            Choice::DoNothing
214        );
215
216        levels.add(fixture_segment(4, 4));
217        assert_eq!(
218            compactor.choose(&levels, &Config::default()),
219            Choice::DoNothing
220        );
221
222        Ok(())
223    }
224
225    #[test]
226    fn fifo_more_than_limit() -> crate::Result<()> {
227        let tempdir = tempfile::tempdir()?;
228        let compactor = Strategy::new(2, None);
229
230        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
231        levels.add(fixture_segment(1, 1));
232        levels.add(fixture_segment(2, 2));
233        levels.add(fixture_segment(3, 3));
234        levels.add(fixture_segment(4, 4));
235
236        assert_eq!(
237            compactor.choose(&levels, &Config::default()),
238            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
239        );
240
241        Ok(())
242    }
243}
244 */