lsm_tree/compaction/
fifo.rs1use super::{Choice, CompactionStrategy};
6use crate::{
7 compaction::state::CompactionState, config::Config, time::unix_timestamp, version::Version,
8 HashSet, KvPair,
9};
10
11#[doc(hidden)]
12pub const NAME: &str = "FifoCompaction";
13
14#[derive(Clone)]
32pub struct Strategy {
33 pub limit: u64,
35
36 pub ttl_seconds: Option<u64>,
38}
39
40impl Strategy {
41 #[must_use]
43 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
44 Self { limit, ttl_seconds }
45 }
46}
47
48impl CompactionStrategy for Strategy {
49 fn get_name(&self) -> &'static str {
50 NAME
51 }
52
53 fn get_config(&self) -> Vec<KvPair> {
54 vec![
55 (
56 crate::UserKey::from("fifo_limit"),
57 crate::UserValue::from(self.limit.to_le_bytes()),
58 ),
59 (
60 crate::UserKey::from("fifo_ttl"),
61 crate::UserValue::from(if self.ttl_seconds.is_some() {
62 [1u8]
63 } else {
64 [0u8]
65 }),
66 ),
67 (
68 crate::UserKey::from("fifo_ttl_seconds"),
69 crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
70 ),
71 ]
72 }
73
74 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
75 let first_level = version.l0();
76
77 if first_level.is_empty() {
79 return Choice::DoNothing;
80 }
81
82 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
83
84 assert!(
85 !version.level_is_busy(0, state.hidden_set()),
86 "FIFO compaction never compacts",
87 );
88
89 let db_size = first_level.size() + version.blob_files.on_disk_size();
91
92 let mut ids_to_drop: HashSet<_> = HashSet::default();
93
94 let ttl_cutoff = match self.ttl_seconds {
97 Some(s) if s > 0 => Some(
98 unix_timestamp()
99 .as_nanos()
100 .saturating_sub(u128::from(s) * 1_000_000_000u128),
101 ),
102 _ => None,
103 };
104
105 let mut ttl_dropped_bytes = 0u64;
106 let mut alive = Vec::new();
107
108 for table in first_level.iter().flat_map(|run| run.iter()) {
109 let expired =
110 ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
111
112 if expired {
113 ids_to_drop.insert(table.id());
114 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
115 ttl_dropped_bytes =
116 ttl_dropped_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
117 } else {
118 alive.push(table);
119 }
120 }
121
122 let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
124
125 if size_after_ttl > self.limit {
127 let overshoot = size_after_ttl - self.limit;
128
129 let mut collected_bytes = 0u64;
130
131 alive.sort_by_key(|t| t.metadata.created_at);
133
134 for table in alive {
135 if collected_bytes >= overshoot {
136 break;
137 }
138
139 ids_to_drop.insert(table.id());
140
141 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
142 collected_bytes =
143 collected_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
144 }
145 }
146
147 if ids_to_drop.is_empty() {
148 Choice::DoNothing
149 } else {
150 Choice::Drop(ids_to_drop)
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::Strategy;
158 use crate::{AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter};
159 use std::sync::Arc;
160
161 #[test]
162 fn fifo_empty_levels() -> crate::Result<()> {
163 let dir = tempfile::tempdir()?;
164 let tree = Config::new(
165 dir.path(),
166 SequenceNumberCounter::default(),
167 SequenceNumberCounter::default(),
168 )
169 .open()?;
170
171 let fifo = Arc::new(Strategy::new(1, None));
172 tree.compact(fifo, 0)?;
173
174 assert_eq!(0, tree.table_count());
175 Ok(())
176 }
177
178 #[test]
179 fn fifo_below_limit() -> crate::Result<()> {
180 let dir = tempfile::tempdir()?;
181 let tree = Config::new(
182 dir.path(),
183 SequenceNumberCounter::default(),
184 SequenceNumberCounter::default(),
185 )
186 .open()?;
187
188 for i in 0..4u8 {
189 tree.insert([b'k', i].as_slice(), "v", u64::from(i));
190 tree.flush_active_memtable(u64::from(i))?;
191 }
192
193 let before = tree.table_count();
194 let fifo = Arc::new(Strategy::new(u64::MAX, None));
195 tree.compact(fifo, 4)?;
196
197 assert_eq!(before, tree.table_count());
198 Ok(())
199 }
200
201 #[test]
202 fn fifo_more_than_limit() -> crate::Result<()> {
203 let dir = tempfile::tempdir()?;
204 let tree = Config::new(
205 dir.path(),
206 SequenceNumberCounter::default(),
207 SequenceNumberCounter::default(),
208 )
209 .open()?;
210
211 for i in 0..4u8 {
212 tree.insert([b'k', i].as_slice(), "v", u64::from(i));
213 tree.flush_active_memtable(u64::from(i))?;
214 }
215
216 let before = tree.table_count();
217 let fifo = Arc::new(Strategy::new(1, None));
219 tree.compact(fifo, 4)?;
220
221 assert!(tree.table_count() < before);
222 Ok(())
223 }
224
225 #[test]
226 fn fifo_more_than_limit_blobs() -> crate::Result<()> {
227 let dir = tempfile::tempdir()?;
228 let tree = Config::new(
229 dir.path(),
230 SequenceNumberCounter::default(),
231 SequenceNumberCounter::default(),
232 )
233 .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
234 .open()?;
235
236 for i in 0..3u8 {
237 tree.insert([b'k', i].as_slice(), "$", u64::from(i));
238 tree.flush_active_memtable(u64::from(i))?;
239 }
240
241 let before = tree.table_count();
242 let fifo = Arc::new(Strategy::new(1, None));
243 tree.compact(fifo, 3)?;
244
245 assert!(tree.table_count() < before);
246 Ok(())
247 }
248
249 #[test]
250 fn fifo_ttl() -> crate::Result<()> {
251 let dir = tempfile::tempdir()?;
252 let tree = Config::new(
253 dir.path(),
254 SequenceNumberCounter::default(),
255 SequenceNumberCounter::default(),
256 )
257 .open()?;
258
259 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_000)));
261 tree.insert("a", "1", 0);
262 tree.flush_active_memtable(0)?;
263
264 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_005)));
266 tree.insert("b", "2", 1);
267 tree.flush_active_memtable(1)?;
268
269 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_011)));
271
272 assert_eq!(2, tree.table_count());
273
274 let fifo = Arc::new(Strategy::new(u64::MAX, Some(10)));
275 tree.compact(fifo, 2)?;
276
277 assert_eq!(1, tree.table_count());
278
279 crate::time::set_unix_timestamp_for_test(None);
281 Ok(())
282 }
283
284 #[test]
285 fn fifo_ttl_then_limit_additional_drops_blob_unit() -> crate::Result<()> {
286 let dir = tempfile::tempdir()?;
287 let tree = Config::new(
288 dir.path(),
289 SequenceNumberCounter::default(),
290 SequenceNumberCounter::default(),
291 )
292 .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
293 .open()?;
294
295 tree.insert("a", "$", 0);
297 tree.flush_active_memtable(0)?;
298 tree.insert("b", "$", 1);
299 tree.flush_active_memtable(1)?;
300
301 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(10_000_000)));
302
303 let fifo = Arc::new(Strategy::new(1, Some(1)));
305 tree.compact(fifo, 2)?;
306
307 assert_eq!(0, tree.table_count());
308
309 crate::time::set_unix_timestamp_for_test(None);
310 Ok(())
311 }
312}