lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, time::unix_timestamp, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the amount of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27    /// Data set size limit in bytes
28    pub limit: u64,
29
30    /// TTL in seconds, will be disabled if 0 or None
31    pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35    /// Configures a new `Fifo` compaction strategy
36    #[must_use]
37    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38        Self { limit, ttl_seconds }
39    }
40}
41
42impl CompactionStrategy for Strategy {
43    fn get_name(&self) -> &'static str {
44        "FifoStrategy"
45    }
46
47    fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
48        let resolved_view = levels.resolved_view();
49
50        // NOTE: First level always exists, trivial
51        #[allow(clippy::expect_used)]
52        let first_level = resolved_view.first().expect("L0 should always exist");
53
54        let mut segment_ids_to_delete = HashSet::with_hasher(xxhash_rust::xxh3::Xxh3Builder::new());
55
56        if let Some(ttl_seconds) = self.ttl_seconds {
57            if ttl_seconds > 0 {
58                let now = unix_timestamp().as_micros();
59
60                for segment in resolved_view.iter().flat_map(|lvl| &lvl.segments) {
61                    let lifetime_us = now - segment.metadata.created_at;
62                    let lifetime_sec = lifetime_us / 1000 / 1000;
63
64                    if lifetime_sec > ttl_seconds.into() {
65                        log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
66                        segment_ids_to_delete.insert(segment.id());
67                    }
68                }
69            }
70        }
71
72        let db_size = levels.size();
73
74        if db_size > self.limit {
75            let mut bytes_to_delete = db_size - self.limit;
76
77            // NOTE: Sort the level by oldest to newest
78            // levels are sorted from newest to oldest, so we can just reverse
79            let mut first_level = first_level.clone();
80            first_level.sort_by_seqno();
81            first_level.segments.reverse();
82
83            for segment in first_level.iter() {
84                if bytes_to_delete == 0 {
85                    break;
86                }
87
88                bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);
89
90                segment_ids_to_delete.insert(segment.id());
91
92                log::debug!(
93                    "dropping segment to reach configured size limit: {:?}",
94                    segment.id(),
95                );
96            }
97        }
98
99        if segment_ids_to_delete.is_empty() {
100            // NOTE: Only try to merge segments if they are not disjoint
101            // to improve read performance
102            // But ideally FIFO is only used for monotonic workloads
103            // so there's nothing we need to do
104            if first_level.is_disjoint {
105                Choice::DoNothing
106            } else {
107                super::maintenance::Strategy.choose(levels, config)
108            }
109        } else {
110            let ids = segment_ids_to_delete.into_iter().collect();
111            Choice::Drop(ids)
112        }
113    }
114}
115
116#[cfg(test)]
117mod tests {
118    use super::Strategy;
119    use crate::{
120        cache::Cache,
121        compaction::{Choice, CompactionStrategy},
122        config::Config,
123        descriptor_table::FileDescriptorTable,
124        file::LEVELS_MANIFEST_FILE,
125        level_manifest::LevelManifest,
126        segment::{
127            block::offset::BlockOffset,
128            block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
129            file_offsets::FileOffsets,
130            meta::{Metadata, SegmentId},
131            Segment, SegmentInner,
132        },
133        time::unix_timestamp,
134        HashSet, KeyRange,
135    };
136    use std::sync::{atomic::AtomicBool, Arc};
137    use test_log::test;
138
139    #[allow(clippy::expect_used)]
140    #[allow(clippy::cast_possible_truncation)]
141    fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
142        let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
143
144        let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
145        let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
146
147        SegmentInner {
148            tree_id: 0,
149            descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
150            block_index,
151
152            offsets: FileOffsets {
153                bloom_ptr: BlockOffset(0),
154                range_filter_ptr: BlockOffset(0),
155                index_block_ptr: BlockOffset(0),
156                metadata_ptr: BlockOffset(0),
157                range_tombstones_ptr: BlockOffset(0),
158                tli_ptr: BlockOffset(0),
159                pfx_ptr: BlockOffset(0),
160            },
161
162            metadata: Metadata {
163                data_block_count: 0,
164                index_block_count: 0,
165                data_block_size: 4_096,
166                index_block_size: 4_096,
167                created_at,
168                id,
169                file_size: 1,
170                compression: crate::segment::meta::CompressionType::None,
171                table_type: crate::segment::meta::TableType::Block,
172                item_count: 0,
173                key_count: 0,
174                key_range: KeyRange::new((vec![].into(), vec![].into())),
175                tombstone_count: 0,
176                range_tombstone_count: 0,
177                uncompressed_size: 0,
178                seqnos: (0, created_at as u64),
179            },
180            cache,
181
182            bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
183
184            path: "a".into(),
185            is_deleted: AtomicBool::default(),
186        }
187        .into()
188    }
189
190    #[test]
191    fn fifo_ttl() -> crate::Result<()> {
192        let tempdir = tempfile::tempdir()?;
193        let compactor = Strategy::new(u64::MAX, Some(5_000));
194
195        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
196
197        levels.add(fixture_segment(1, 1));
198        levels.add(fixture_segment(2, unix_timestamp().as_micros()));
199
200        assert_eq!(
201            compactor.choose(&levels, &Config::default()),
202            Choice::Drop(set![1])
203        );
204
205        Ok(())
206    }
207
208    #[test]
209    fn fifo_empty_levels() -> crate::Result<()> {
210        let tempdir = tempfile::tempdir()?;
211        let compactor = Strategy::new(1, None);
212
213        let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
214
215        assert_eq!(
216            compactor.choose(&levels, &Config::default()),
217            Choice::DoNothing
218        );
219
220        Ok(())
221    }
222
223    #[test]
224    fn fifo_below_limit() -> crate::Result<()> {
225        let tempdir = tempfile::tempdir()?;
226        let compactor = Strategy::new(4, None);
227
228        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
229
230        levels.add(fixture_segment(1, 1));
231        assert_eq!(
232            compactor.choose(&levels, &Config::default()),
233            Choice::DoNothing
234        );
235
236        levels.add(fixture_segment(2, 2));
237        assert_eq!(
238            compactor.choose(&levels, &Config::default()),
239            Choice::DoNothing
240        );
241
242        levels.add(fixture_segment(3, 3));
243        assert_eq!(
244            compactor.choose(&levels, &Config::default()),
245            Choice::DoNothing
246        );
247
248        levels.add(fixture_segment(4, 4));
249        assert_eq!(
250            compactor.choose(&levels, &Config::default()),
251            Choice::DoNothing
252        );
253
254        Ok(())
255    }
256
257    #[test]
258    fn fifo_more_than_limit() -> crate::Result<()> {
259        let tempdir = tempfile::tempdir()?;
260        let compactor = Strategy::new(2, None);
261
262        let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
263        levels.add(fixture_segment(1, 1));
264        levels.add(fixture_segment(2, 2));
265        levels.add(fixture_segment(3, 3));
266        levels.add(fixture_segment(4, 4));
267
268        assert_eq!(
269            compactor.choose(&levels, &Config::default()),
270            Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
271        );
272
273        Ok(())
274    }
275}