lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{compaction::state::CompactionState, config::Config, version::Version, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the number of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27    /// Data set size limit in bytes
28    pub limit: u64,
29
30    /// TTL in seconds, will be disabled if 0 or None
31    pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35    /// Configures a new `Fifo` compaction strategy
36    #[must_use]
37    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38        Self { limit, ttl_seconds }
39    }
40}
41
42impl CompactionStrategy for Strategy {
43    fn get_name(&self) -> &'static str {
44        "FifoCompaction"
45    }
46
47    // TODO: TTL
48    fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
49        // NOTE: We always have at least one level
50        #[allow(clippy::expect_used)]
51        let first_level = version.l0();
52
53        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
54
55        assert!(
56            !version.level_is_busy(0, state.hidden_set()),
57            "FIFO compaction never compacts",
58        );
59
60        let l0_size = first_level.size();
61
62        if l0_size > self.limit {
63            let overshoot = l0_size - self.limit;
64
65            let mut oldest_segments = HashSet::default();
66            let mut collected_bytes = 0;
67
68            for segment in first_level.iter().flat_map(|run| run.iter().rev()) {
69                if collected_bytes >= overshoot {
70                    break;
71                }
72
73                oldest_segments.insert(segment.id());
74                collected_bytes += segment.file_size();
75            }
76
77            Choice::Drop(oldest_segments)
78        } else {
79            Choice::DoNothing
80        }
81    }
82}
83
84// TODO: restore tests
85/*
86#[cfg(test)]
87mod tests {
88    use super::Strategy;
89    use crate::{
90        cache::Cache,
91        compaction::{Choice, CompactionStrategy},
92        config::Config,
93        descriptor_table::FileDescriptorTable,
94        file::LEVELS_MANIFEST_FILE,
95        level_manifest::LevelManifest,
96        segment::{
97            block::offset::BlockOffset,
98            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
99            file_offsets::FileOffsets,
100            meta::{Metadata, SegmentId},
101            SegmentInner,
102        },
103        super_segment::Segment,
104        time::unix_timestamp,
105        HashSet, KeyRange,
106    };
107    use std::sync::{atomic::AtomicBool, Arc};
108    use test_log::test;
109
110    #[allow(clippy::expect_used)]
111    #[allow(clippy::cast_possible_truncation)]
112    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
113        todo!()
114
115        /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
116
117        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
118        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
119
120        SegmentInner {
121            tree_id: 0,
122            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
123            block_index,
124
125            offsets: FileOffsets {
126                bloom_ptr: BlockOffset(0),
127                range_filter_ptr: BlockOffset(0),
128                index_block_ptr: BlockOffset(0),
129                metadata_ptr: BlockOffset(0),
130                range_tombstones_ptr: BlockOffset(0),
131                tli_ptr: BlockOffset(0),
132                pfx_ptr: BlockOffset(0),
133            },
134
135            metadata: Metadata {
136                data_block_count: 0,
137                index_block_count: 0,
138                data_block_size: 4_096,
139                index_block_size: 4_096,
140                created_at,
141                id,
142                file_size: 1,
143                compression: crate::segment::meta::CompressionType::None,
144                table_type: crate::segment::meta::TableType::Block,
145                item_count: 0,
146                key_count: 0,
147                key_range: KeyRange::new((vec![].into(), vec![].into())),
148                tombstone_count: 0,
149                range_tombstone_count: 0,
150                uncompressed_size: 0,
151                seqnos: (0, created_at as u64),
152            },
153            cache,
154
155            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
156
157            path: "a".into(),
158            is_deleted: AtomicBool::default(),
159        }
160        .into() */
161    }
162
163    #[test]
164    fn fifo_ttl() -> crate::Result<()> {
165        let tempdir = tempfile::tempdir()?;
166        let compactor = Strategy::new(u64::MAX, Some(5_000));
167
168        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
169
170        levels.add(fixture_segment(1, 1));
171        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
172
173        assert_eq!(
174            compactor.choose(&levels, &Config::default()),
175            Choice::Drop(set![1])
176        );
177
178        Ok(())
179    }
180
181    #[test]
182    fn fifo_empty_levels() -> crate::Result<()> {
183        let tempdir = tempfile::tempdir()?;
184        let compactor = Strategy::new(1, None);
185
186        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
187
188        assert_eq!(
189            compactor.choose(&levels, &Config::default()),
190            Choice::DoNothing
191        );
192
193        Ok(())
194    }
195
196    #[test]
197    fn fifo_below_limit() -> crate::Result<()> {
198        let tempdir = tempfile::tempdir()?;
199        let compactor = Strategy::new(4, None);
200
201        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
202
203        levels.add(fixture_segment(1, 1));
204        assert_eq!(
205            compactor.choose(&levels, &Config::default()),
206            Choice::DoNothing
207        );
208
209        levels.add(fixture_segment(2, 2));
210        assert_eq!(
211            compactor.choose(&levels, &Config::default()),
212            Choice::DoNothing
213        );
214
215        levels.add(fixture_segment(3, 3));
216        assert_eq!(
217            compactor.choose(&levels, &Config::default()),
218            Choice::DoNothing
219        );
220
221        levels.add(fixture_segment(4, 4));
222        assert_eq!(
223            compactor.choose(&levels, &Config::default()),
224            Choice::DoNothing
225        );
226
227        Ok(())
228    }
229
230    #[test]
231    fn fifo_more_than_limit() -> crate::Result<()> {
232        let tempdir = tempfile::tempdir()?;
233        let compactor = Strategy::new(2, None);
234
235        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
236        levels.add(fixture_segment(1, 1));
237        levels.add(fixture_segment(2, 2));
238        levels.add(fixture_segment(3, 3));
239        levels.add(fixture_segment(4, 4));
240
241        assert_eq!(
242            compactor.choose(&levels, &Config::default()),
243            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
244        );
245
246        Ok(())
247    }
248}
249 */