lsm_tree/compaction/fifo.rs
1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{compaction::state::CompactionState, config::Config, version::Version, HashSet};
7
8/// FIFO-style compaction
9///
10/// Limits the tree size to roughly `limit` bytes, deleting the oldest segment(s)
11/// when the threshold is reached.
12///
13/// Will also merge segments if the number of segments in level 0 grows too much, which
14/// could cause write stalls.
15///
16/// Additionally, a (lazy) TTL can be configured to drop old segments.
17///
18/// ###### Caution
19///
20/// Only use it for specific workloads where:
21///
22/// 1) You only want to store recent data (unimportant logs, ...)
23/// 2) Your keyspace grows monotonically (e.g. time series)
24/// 3) You only insert new data (no updates)
25#[derive(Clone)]
26pub struct Strategy {
27 /// Data set size limit in bytes
28 pub limit: u64,
29
30 /// TTL in seconds, will be disabled if 0 or None
31 pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35 /// Configures a new `Fifo` compaction strategy
36 #[must_use]
37 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38 Self { limit, ttl_seconds }
39 }
40}
41
42impl CompactionStrategy for Strategy {
43 fn get_name(&self) -> &'static str {
44 "FifoCompaction"
45 }
46
47 // TODO: TTL
48 fn choose(&self, version: &Version, _config: &Config, state: &CompactionState) -> Choice {
49 // NOTE: We always have at least one level
50 #[allow(clippy::expect_used)]
51 let first_level = version.l0();
52
53 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
54
55 assert!(
56 !version.level_is_busy(0, state.hidden_set()),
57 "FIFO compaction never compacts",
58 );
59
60 let l0_size = first_level.size();
61
62 if l0_size > self.limit {
63 let overshoot = l0_size - self.limit;
64
65 let mut oldest_segments = HashSet::default();
66 let mut collected_bytes = 0;
67
68 for segment in first_level.iter().flat_map(|run| run.iter().rev()) {
69 if collected_bytes >= overshoot {
70 break;
71 }
72
73 oldest_segments.insert(segment.id());
74 collected_bytes += segment.file_size();
75 }
76
77 Choice::Drop(oldest_segments)
78 } else {
79 Choice::DoNothing
80 }
81 }
82}
83
84// TODO: restore tests
85/*
86#[cfg(test)]
87mod tests {
88 use super::Strategy;
89 use crate::{
90 cache::Cache,
91 compaction::{Choice, CompactionStrategy},
92 config::Config,
93 descriptor_table::FileDescriptorTable,
94 file::LEVELS_MANIFEST_FILE,
95 level_manifest::LevelManifest,
96 segment::{
97 block::offset::BlockOffset,
98 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
99 file_offsets::FileOffsets,
100 meta::{Metadata, SegmentId},
101 SegmentInner,
102 },
103 super_segment::Segment,
104 time::unix_timestamp,
105 HashSet, KeyRange,
106 };
107 use std::sync::{atomic::AtomicBool, Arc};
108 use test_log::test;
109
110 #[allow(clippy::expect_used)]
111 #[allow(clippy::cast_possible_truncation)]
112 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
113 todo!()
114
115 /* let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
116
117 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
118 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
119
120 SegmentInner {
121 tree_id: 0,
122 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
123 block_index,
124
125 offsets: FileOffsets {
126 bloom_ptr: BlockOffset(0),
127 range_filter_ptr: BlockOffset(0),
128 index_block_ptr: BlockOffset(0),
129 metadata_ptr: BlockOffset(0),
130 range_tombstones_ptr: BlockOffset(0),
131 tli_ptr: BlockOffset(0),
132 pfx_ptr: BlockOffset(0),
133 },
134
135 metadata: Metadata {
136 data_block_count: 0,
137 index_block_count: 0,
138 data_block_size: 4_096,
139 index_block_size: 4_096,
140 created_at,
141 id,
142 file_size: 1,
143 compression: crate::segment::meta::CompressionType::None,
144 table_type: crate::segment::meta::TableType::Block,
145 item_count: 0,
146 key_count: 0,
147 key_range: KeyRange::new((vec![].into(), vec![].into())),
148 tombstone_count: 0,
149 range_tombstone_count: 0,
150 uncompressed_size: 0,
151 seqnos: (0, created_at as u64),
152 },
153 cache,
154
155 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
156
157 path: "a".into(),
158 is_deleted: AtomicBool::default(),
159 }
160 .into() */
161 }
162
163 #[test]
164 fn fifo_ttl() -> crate::Result<()> {
165 let tempdir = tempfile::tempdir()?;
166 let compactor = Strategy::new(u64::MAX, Some(5_000));
167
168 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
169
170 levels.add(fixture_segment(1, 1));
171 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
172
173 assert_eq!(
174 compactor.choose(&levels, &Config::default()),
175 Choice::Drop(set![1])
176 );
177
178 Ok(())
179 }
180
181 #[test]
182 fn fifo_empty_levels() -> crate::Result<()> {
183 let tempdir = tempfile::tempdir()?;
184 let compactor = Strategy::new(1, None);
185
186 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
187
188 assert_eq!(
189 compactor.choose(&levels, &Config::default()),
190 Choice::DoNothing
191 );
192
193 Ok(())
194 }
195
196 #[test]
197 fn fifo_below_limit() -> crate::Result<()> {
198 let tempdir = tempfile::tempdir()?;
199 let compactor = Strategy::new(4, None);
200
201 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
202
203 levels.add(fixture_segment(1, 1));
204 assert_eq!(
205 compactor.choose(&levels, &Config::default()),
206 Choice::DoNothing
207 );
208
209 levels.add(fixture_segment(2, 2));
210 assert_eq!(
211 compactor.choose(&levels, &Config::default()),
212 Choice::DoNothing
213 );
214
215 levels.add(fixture_segment(3, 3));
216 assert_eq!(
217 compactor.choose(&levels, &Config::default()),
218 Choice::DoNothing
219 );
220
221 levels.add(fixture_segment(4, 4));
222 assert_eq!(
223 compactor.choose(&levels, &Config::default()),
224 Choice::DoNothing
225 );
226
227 Ok(())
228 }
229
230 #[test]
231 fn fifo_more_than_limit() -> crate::Result<()> {
232 let tempdir = tempfile::tempdir()?;
233 let compactor = Strategy::new(2, None);
234
235 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
236 levels.add(fixture_segment(1, 1));
237 levels.add(fixture_segment(2, 2));
238 levels.add(fixture_segment(3, 3));
239 levels.add(fixture_segment(4, 4));
240
241 assert_eq!(
242 compactor.choose(&levels, &Config::default()),
243 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
244 );
245
246 Ok(())
247 }
248}
249 */