1use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, time::unix_timestamp, HashSet};
7
8#[derive(Clone)]
26pub struct Strategy {
27 pub limit: u64,
29
30 pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35 #[must_use]
37 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38 Self { limit, ttl_seconds }
39 }
40}
41
42impl CompactionStrategy for Strategy {
43 fn get_name(&self) -> &'static str {
44 "FifoStrategy"
45 }
46
47 fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
48 let resolved_view = levels.resolved_view();
49
50 #[allow(clippy::expect_used)]
52 let first_level = resolved_view.first().expect("L0 should always exist");
53
54 let mut segment_ids_to_delete = HashSet::with_hasher(xxhash_rust::xxh3::Xxh3Builder::new());
55
56 if let Some(ttl_seconds) = self.ttl_seconds {
57 if ttl_seconds > 0 {
58 let now = unix_timestamp().as_micros();
59
60 for segment in resolved_view.iter().flat_map(|lvl| &lvl.segments) {
61 let lifetime_us = now - segment.metadata.created_at;
62 let lifetime_sec = lifetime_us / 1000 / 1000;
63
64 if lifetime_sec > ttl_seconds.into() {
65 log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
66 segment_ids_to_delete.insert(segment.id());
67 }
68 }
69 }
70 }
71
72 let db_size = levels.size();
73
74 if db_size > self.limit {
75 let mut bytes_to_delete = db_size - self.limit;
76
77 let mut first_level = first_level.clone();
80 first_level.sort_by_seqno();
81 first_level.segments.reverse();
82
83 for segment in first_level.iter() {
84 if bytes_to_delete == 0 {
85 break;
86 }
87
88 bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);
89
90 segment_ids_to_delete.insert(segment.id());
91
92 log::debug!(
93 "dropping segment to reach configured size limit: {:?}",
94 segment.id(),
95 );
96 }
97 }
98
99 if segment_ids_to_delete.is_empty() {
100 if first_level.is_disjoint {
105 Choice::DoNothing
106 } else {
107 super::maintenance::Strategy.choose(levels, config)
108 }
109 } else {
110 let ids = segment_ids_to_delete.into_iter().collect();
111 Choice::Drop(ids)
112 }
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::Strategy;
119 use crate::{
120 block_cache::BlockCache,
121 compaction::{Choice, CompactionStrategy},
122 config::Config,
123 descriptor_table::FileDescriptorTable,
124 file::LEVELS_MANIFEST_FILE,
125 key_range::KeyRange,
126 level_manifest::LevelManifest,
127 segment::{
128 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
129 file_offsets::FileOffsets,
130 meta::{Metadata, SegmentId},
131 value_block::BlockOffset,
132 Segment, SegmentInner,
133 },
134 time::unix_timestamp,
135 HashSet,
136 };
137 use std::sync::Arc;
138 use test_log::test;
139
140 #[allow(clippy::expect_used)]
141 #[allow(clippy::cast_possible_truncation)]
142 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
143 let block_cache = Arc::new(BlockCache::with_capacity_bytes(10 * 1_024 * 1_024));
144
145 let block_index = TwoLevelBlockIndex::new((0, id).into(), block_cache.clone());
146 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
147
148 SegmentInner {
149 tree_id: 0,
150 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
151 block_index,
152
153 offsets: FileOffsets {
154 bloom_ptr: BlockOffset(0),
155 range_filter_ptr: BlockOffset(0),
156 index_block_ptr: BlockOffset(0),
157 metadata_ptr: BlockOffset(0),
158 range_tombstones_ptr: BlockOffset(0),
159 tli_ptr: BlockOffset(0),
160 pfx_ptr: BlockOffset(0),
161 },
162
163 metadata: Metadata {
164 data_block_count: 0,
165 index_block_count: 0,
166 data_block_size: 4_096,
167 index_block_size: 4_096,
168 created_at,
169 id,
170 file_size: 1,
171 compression: crate::segment::meta::CompressionType::None,
172 table_type: crate::segment::meta::TableType::Block,
173 item_count: 0,
174 key_count: 0,
175 key_range: KeyRange::new((vec![].into(), vec![].into())),
176 tombstone_count: 0,
177 range_tombstone_count: 0,
178 uncompressed_size: 0,
179 seqnos: (0, created_at as u64),
180 },
181 block_cache,
182
183 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
184 }
185 .into()
186 }
187
188 #[test]
189 fn fifo_ttl() -> crate::Result<()> {
190 let tempdir = tempfile::tempdir()?;
191 let compactor = Strategy::new(u64::MAX, Some(5_000));
192
193 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
194
195 levels.add(fixture_segment(1, 1));
196 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
197
198 assert_eq!(
199 compactor.choose(&levels, &Config::default()),
200 Choice::Drop(set![1])
201 );
202
203 Ok(())
204 }
205
206 #[test]
207 fn fifo_empty_levels() -> crate::Result<()> {
208 let tempdir = tempfile::tempdir()?;
209 let compactor = Strategy::new(1, None);
210
211 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
212
213 assert_eq!(
214 compactor.choose(&levels, &Config::default()),
215 Choice::DoNothing
216 );
217
218 Ok(())
219 }
220
221 #[test]
222 fn fifo_below_limit() -> crate::Result<()> {
223 let tempdir = tempfile::tempdir()?;
224 let compactor = Strategy::new(4, None);
225
226 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
227
228 levels.add(fixture_segment(1, 1));
229 assert_eq!(
230 compactor.choose(&levels, &Config::default()),
231 Choice::DoNothing
232 );
233
234 levels.add(fixture_segment(2, 2));
235 assert_eq!(
236 compactor.choose(&levels, &Config::default()),
237 Choice::DoNothing
238 );
239
240 levels.add(fixture_segment(3, 3));
241 assert_eq!(
242 compactor.choose(&levels, &Config::default()),
243 Choice::DoNothing
244 );
245
246 levels.add(fixture_segment(4, 4));
247 assert_eq!(
248 compactor.choose(&levels, &Config::default()),
249 Choice::DoNothing
250 );
251
252 Ok(())
253 }
254
255 #[test]
256 fn fifo_more_than_limit() -> crate::Result<()> {
257 let tempdir = tempfile::tempdir()?;
258 let compactor = Strategy::new(2, None);
259
260 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
261 levels.add(fixture_segment(1, 1));
262 levels.add(fixture_segment(2, 2));
263 levels.add(fixture_segment(3, 3));
264 levels.add(fixture_segment(4, 4));
265
266 assert_eq!(
267 compactor.choose(&levels, &Config::default()),
268 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
269 );
270
271 Ok(())
272 }
273}