lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, time::unix_timestamp, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the amount of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27    /// Data set size limit in bytes
28    pub limit: u64,
29
30    /// TTL in seconds, will be disabled if 0 or None
31    pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35    /// Configures a new `Fifo` compaction strategy
36    #[must_use]
37    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38        Self { limit, ttl_seconds }
39    }
40}
41
42impl CompactionStrategy for Strategy {
43    fn get_name(&self) -> &'static str {
44        "FifoStrategy"
45    }
46
47    fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
48        let resolved_view = levels.resolved_view();
49
50        // NOTE: First level always exists, trivial
51        #[allow(clippy::expect_used)]
52        let first_level = resolved_view.first().expect("L0 should always exist");
53
54        let mut segment_ids_to_delete = HashSet::with_hasher(xxhash_rust::xxh3::Xxh3Builder::new());
55
56        if let Some(ttl_seconds) = self.ttl_seconds {
57            if ttl_seconds > 0 {
58                let now = unix_timestamp().as_micros();
59
60                for segment in resolved_view.iter().flat_map(|lvl| &lvl.segments) {
61                    let lifetime_us = now - segment.metadata.created_at;
62                    let lifetime_sec = lifetime_us / 1000 / 1000;
63
64                    if lifetime_sec > ttl_seconds.into() {
65                        log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
66                        segment_ids_to_delete.insert(segment.id());
67                    }
68                }
69            }
70        }
71
72        let db_size = levels.size();
73
74        if db_size > self.limit {
75            let mut bytes_to_delete = db_size - self.limit;
76
77            // NOTE: Sort the level by oldest to newest
78            // levels are sorted from newest to oldest, so we can just reverse
79            let mut first_level = first_level.clone();
80            first_level.sort_by_seqno();
81            first_level.segments.reverse();
82
83            for segment in first_level.iter() {
84                if bytes_to_delete == 0 {
85                    break;
86                }
87
88                bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);
89
90                segment_ids_to_delete.insert(segment.id());
91
92                log::debug!(
93                    "dropping segment to reach configured size limit: {:?}",
94                    segment.id(),
95                );
96            }
97        }
98
99        if segment_ids_to_delete.is_empty() {
100            // NOTE: Only try to merge segments if they are not disjoint
101            // to improve read performance
102            // But ideally FIFO is only used for monotonic workloads
103            // so there's nothing we need to do
104            if first_level.is_disjoint {
105                Choice::DoNothing
106            } else {
107                super::maintenance::Strategy.choose(levels, config)
108            }
109        } else {
110            let ids = segment_ids_to_delete.into_iter().collect();
111            Choice::Drop(ids)
112        }
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::Strategy;
119    use crate::{
120        block_cache::BlockCache,
121        compaction::{Choice, CompactionStrategy},
122        config::Config,
123        descriptor_table::FileDescriptorTable,
124        file::LEVELS_MANIFEST_FILE,
125        key_range::KeyRange,
126        level_manifest::LevelManifest,
127        segment::{
128            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
129            file_offsets::FileOffsets,
130            meta::{Metadata, SegmentId},
131            value_block::BlockOffset,
132            Segment, SegmentInner,
133        },
134        time::unix_timestamp,
135        HashSet,
136    };
137    use std::sync::Arc;
138    use test_log::test;
139
140    #[allow(clippy::expect_used)]
141    #[allow(clippy::cast_possible_truncation)]
142    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
143        let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
144
145        let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
146        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
147
148        SegmentInner {
149            tree_id: 0,
150            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
151            block_index,
152
153            offsets: FileOffsets {
154                bloom_ptr: BlockOffset(0),
155                range_filter_ptr: BlockOffset(0),
156                index_block_ptr: BlockOffset(0),
157                metadata_ptr: BlockOffset(0),
158                range_tombstones_ptr: BlockOffset(0),
159                tli_ptr: BlockOffset(0),
160                pfx_ptr: BlockOffset(0),
161            },
162
163            metadata: Metadata {
164                data_block_count: 0,
165                index_block_count: 0,
166                data_block_size: 4_096,
167                index_block_size: 4_096,
168                created_at,
169                id,
170                file_size: 1,
171                compression: crate::segment::meta::CompressionType::None,
172                table_type: crate::segment::meta::TableType::Block,
173                item_count: 0,
174                key_count: 0,
175                key_range: KeyRange::new((vec![].into(), vec![].into())),
176                tombstone_count: 0,
177                range_tombstone_count: 0,
178                uncompressed_size: 0,
179                seqnos: (0, created_at as u64),
180            },
181            block_cache,
182
183            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
184        }
185        .into()
186    }
187
188    #[test]
189    fn fifo_ttl() -> crate::Result<()> {
190        let tempdir = tempfile::tempdir()?;
191        let compactor = Strategy::new(u64::MAX, Some(5_000));
192
193        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
194
195        levels.add(fixture_segment(1, 1));
196        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
197
198        assert_eq!(
199            compactor.choose(&levels, &Config::default()),
200            Choice::Drop(set![1])
201        );
202
203        Ok(())
204    }
205
206    #[test]
207    fn fifo_empty_levels() -> crate::Result<()> {
208        let tempdir = tempfile::tempdir()?;
209        let compactor = Strategy::new(1, None);
210
211        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
212
213        assert_eq!(
214            compactor.choose(&levels, &Config::default()),
215            Choice::DoNothing
216        );
217
218        Ok(())
219    }
220
221    #[test]
222    fn fifo_below_limit() -> crate::Result<()> {
223        let tempdir = tempfile::tempdir()?;
224        let compactor = Strategy::new(4, None);
225
226        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
227
228        levels.add(fixture_segment(1, 1));
229        assert_eq!(
230            compactor.choose(&levels, &Config::default()),
231            Choice::DoNothing
232        );
233
234        levels.add(fixture_segment(2, 2));
235        assert_eq!(
236            compactor.choose(&levels, &Config::default()),
237            Choice::DoNothing
238        );
239
240        levels.add(fixture_segment(3, 3));
241        assert_eq!(
242            compactor.choose(&levels, &Config::default()),
243            Choice::DoNothing
244        );
245
246        levels.add(fixture_segment(4, 4));
247        assert_eq!(
248            compactor.choose(&levels, &Config::default()),
249            Choice::DoNothing
250        );
251
252        Ok(())
253    }
254
255    #[test]
256    fn fifo_more_than_limit() -> crate::Result<()> {
257        let tempdir = tempfile::tempdir()?;
258        let compactor = Strategy::new(2, None);
259
260        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
261        levels.add(fixture_segment(1, 1));
262        levels.add(fixture_segment(2, 2));
263        levels.add(fixture_segment(3, 3));
264        levels.add(fixture_segment(4, 4));
265
266        assert_eq!(
267            compactor.choose(&levels, &Config::default()),
268            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
269        );
270
271        Ok(())
272    }
273}