1use super::{Choice, CompactionStrategy};
6use crate::{config::Config, level_manifest::LevelManifest, time::unix_timestamp, HashSet};
7
8#[derive(Clone)]
26pub struct Strategy {
27 pub limit: u64,
29
30 pub ttl_seconds: Option<u64>,
32}
33
34impl Strategy {
35 #[must_use]
37 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
38 Self { limit, ttl_seconds }
39 }
40}
41
42impl CompactionStrategy for Strategy {
43 fn get_name(&self) -> &'static str {
44 "FifoStrategy"
45 }
46
47 fn choose(&self, levels: &LevelManifest, config: &Config) -> Choice {
48 let resolved_view = levels.resolved_view();
49
50 #[allow(clippy::expect_used)]
52 let first_level = resolved_view.first().expect("L0 should always exist");
53
54 let mut segment_ids_to_delete = HashSet::with_hasher(xxhash_rust::xxh3::Xxh3Builder::new());
55
56 if let Some(ttl_seconds) = self.ttl_seconds {
57 if ttl_seconds > 0 {
58 let now = unix_timestamp().as_micros();
59
60 for segment in resolved_view.iter().flat_map(|lvl| &lvl.segments) {
61 let lifetime_us = now - segment.metadata.created_at;
62 let lifetime_sec = lifetime_us / 1000 / 1000;
63
64 if lifetime_sec > ttl_seconds.into() {
65 log::warn!("segment is older than configured TTL: {:?}", segment.id(),);
66 segment_ids_to_delete.insert(segment.id());
67 }
68 }
69 }
70 }
71
72 let db_size = levels.size();
73
74 if db_size > self.limit {
75 let mut bytes_to_delete = db_size - self.limit;
76
77 let mut first_level = first_level.clone();
80 first_level.sort_by_seqno();
81 first_level.segments.reverse();
82
83 for segment in first_level.iter() {
84 if bytes_to_delete == 0 {
85 break;
86 }
87
88 bytes_to_delete = bytes_to_delete.saturating_sub(segment.metadata.file_size);
89
90 segment_ids_to_delete.insert(segment.id());
91
92 log::debug!(
93 "dropping segment to reach configured size limit: {:?}",
94 segment.id(),
95 );
96 }
97 }
98
99 if segment_ids_to_delete.is_empty() {
100 if first_level.is_disjoint {
105 Choice::DoNothing
106 } else {
107 super::maintenance::Strategy.choose(levels, config)
108 }
109 } else {
110 let ids = segment_ids_to_delete.into_iter().collect();
111 Choice::Drop(ids)
112 }
113 }
114}
115
116#[cfg(test)]
117mod tests {
118 use super::Strategy;
119 use crate::{
120 cache::Cache,
121 compaction::{Choice, CompactionStrategy},
122 config::Config,
123 descriptor_table::FileDescriptorTable,
124 file::LEVELS_MANIFEST_FILE,
125 level_manifest::LevelManifest,
126 segment::{
127 block::offset::BlockOffset,
128 block_index::{two_level_index::TwoLevelBlockIndex, BlockIndexImpl},
129 file_offsets::FileOffsets,
130 meta::{Metadata, SegmentId},
131 Segment, SegmentInner,
132 },
133 time::unix_timestamp,
134 HashSet, KeyRange,
135 };
136 use std::sync::{atomic::AtomicBool, Arc};
137 use test_log::test;
138
139 #[allow(clippy::expect_used)]
140 #[allow(clippy::cast_possible_truncation)]
141 fn fixture_segment(id: SegmentId, created_at: u128) -> Segment {
142 let cache = Arc::new(Cache::with_capacity_bytes(10 * 1_024 * 1_024));
143
144 let block_index = TwoLevelBlockIndex::new((0, id).into(), cache.clone());
145 let block_index = Arc::new(BlockIndexImpl::TwoLevel(block_index));
146
147 SegmentInner {
148 tree_id: 0,
149 descriptor_table: Arc::new(FileDescriptorTable::new(512, 1)),
150 block_index,
151
152 offsets: FileOffsets {
153 bloom_ptr: BlockOffset(0),
154 range_filter_ptr: BlockOffset(0),
155 index_block_ptr: BlockOffset(0),
156 metadata_ptr: BlockOffset(0),
157 range_tombstones_ptr: BlockOffset(0),
158 tli_ptr: BlockOffset(0),
159 pfx_ptr: BlockOffset(0),
160 },
161
162 metadata: Metadata {
163 data_block_count: 0,
164 index_block_count: 0,
165 data_block_size: 4_096,
166 index_block_size: 4_096,
167 created_at,
168 id,
169 file_size: 1,
170 compression: crate::segment::meta::CompressionType::None,
171 table_type: crate::segment::meta::TableType::Block,
172 item_count: 0,
173 key_count: 0,
174 key_range: KeyRange::new((vec![].into(), vec![].into())),
175 tombstone_count: 0,
176 range_tombstone_count: 0,
177 uncompressed_size: 0,
178 seqnos: (0, created_at as u64),
179 },
180 cache,
181
182 bloom_filter: Some(crate::bloom::BloomFilter::with_fp_rate(1, 0.1)),
183
184 path: "a".into(),
185 is_deleted: AtomicBool::default(),
186 }
187 .into()
188 }
189
190 #[test]
191 fn fifo_ttl() -> crate::Result<()> {
192 let tempdir = tempfile::tempdir()?;
193 let compactor = Strategy::new(u64::MAX, Some(5_000));
194
195 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
196
197 levels.add(fixture_segment(1, 1));
198 levels.add(fixture_segment(2, unix_timestamp().as_micros()));
199
200 assert_eq!(
201 compactor.choose(&levels, &Config::default()),
202 Choice::Drop(set![1])
203 );
204
205 Ok(())
206 }
207
208 #[test]
209 fn fifo_empty_levels() -> crate::Result<()> {
210 let tempdir = tempfile::tempdir()?;
211 let compactor = Strategy::new(1, None);
212
213 let levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
214
215 assert_eq!(
216 compactor.choose(&levels, &Config::default()),
217 Choice::DoNothing
218 );
219
220 Ok(())
221 }
222
223 #[test]
224 fn fifo_below_limit() -> crate::Result<()> {
225 let tempdir = tempfile::tempdir()?;
226 let compactor = Strategy::new(4, None);
227
228 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
229
230 levels.add(fixture_segment(1, 1));
231 assert_eq!(
232 compactor.choose(&levels, &Config::default()),
233 Choice::DoNothing
234 );
235
236 levels.add(fixture_segment(2, 2));
237 assert_eq!(
238 compactor.choose(&levels, &Config::default()),
239 Choice::DoNothing
240 );
241
242 levels.add(fixture_segment(3, 3));
243 assert_eq!(
244 compactor.choose(&levels, &Config::default()),
245 Choice::DoNothing
246 );
247
248 levels.add(fixture_segment(4, 4));
249 assert_eq!(
250 compactor.choose(&levels, &Config::default()),
251 Choice::DoNothing
252 );
253
254 Ok(())
255 }
256
257 #[test]
258 fn fifo_more_than_limit() -> crate::Result<()> {
259 let tempdir = tempfile::tempdir()?;
260 let compactor = Strategy::new(2, None);
261
262 let mut levels = LevelManifest::create_new(4, tempdir.path().join(LEVELS_MANIFEST_FILE))?;
263 levels.add(fixture_segment(1, 1));
264 levels.add(fixture_segment(2, 2));
265 levels.add(fixture_segment(3, 3));
266 levels.add(fixture_segment(4, 4));
267
268 assert_eq!(
269 compactor.choose(&levels, &Config::default()),
270 Choice::Drop([1, 2].into_iter().collect::<HashSet<_>>())
271 );
272
273 Ok(())
274 }
275}