lsm_tree/compaction/
fifo.rs1use super::{Choice, CompactionStrategy};
6use crate::{
7 compaction::state::CompactionState, config::Config, time::unix_timestamp, version::Version,
8 HashSet, KvPair,
9};
10
11#[doc(hidden)]
12pub const NAME: &str = "FifoCompaction";
13
14#[derive(Clone)]
32pub struct Strategy {
33 pub limit: u64,
35
36 pub ttl_seconds: Option<u64>,
38}
39
40impl Strategy {
41 #[must_use]
43 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
44 Self { limit, ttl_seconds }
45 }
46}
47
48impl CompactionStrategy for Strategy {
49 fn get_name(&self) -> &'static str {
50 NAME
51 }
52
53 fn get_config(&self) -> Vec<KvPair> {
54 vec![
55 (
56 crate::UserKey::from("fifo_limit"),
57 crate::UserValue::from(self.limit.to_le_bytes()),
58 ),
59 (
60 crate::UserKey::from("fifo_ttl"),
61 crate::UserValue::from(if self.ttl_seconds.is_some() {
62 [1u8]
63 } else {
64 [0u8]
65 }),
66 ),
67 (
68 crate::UserKey::from("fifo_ttl_seconds"),
69 crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
70 ),
71 ]
72 }
73
74 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
75 let first_level = version.l0();
76
77 if first_level.is_empty() {
79 return Choice::DoNothing;
80 }
81
82 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
83
84 assert!(
85 !version.level_is_busy(0, state.hidden_set()),
86 "FIFO compaction never compacts",
87 );
88
89 let db_size = first_level.size() + version.blob_files.on_disk_size();
91
92 let mut ids_to_drop: HashSet<_> = HashSet::default();
93
94 let ttl_cutoff = match self.ttl_seconds {
97 Some(s) if s > 0 => Some(
98 unix_timestamp()
99 .as_nanos()
100 .saturating_sub(u128::from(s) * 1_000_000_000u128),
101 ),
102 _ => None,
103 };
104
105 let mut ttl_dropped_bytes = 0u64;
106 let mut alive = Vec::new();
107
108 for table in first_level.iter().flat_map(|run| run.iter()) {
109 let expired =
110 ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
111
112 if expired {
113 ids_to_drop.insert(table.id());
114 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
115 ttl_dropped_bytes =
116 ttl_dropped_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
117 } else {
118 alive.push(table);
119 }
120 }
121
122 let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
124
125 if size_after_ttl > self.limit {
127 let overshoot = size_after_ttl - self.limit;
128
129 let mut collected_bytes = 0u64;
130
131 alive.sort_by_key(|t| t.metadata.created_at);
133
134 for table in alive {
135 if collected_bytes >= overshoot {
136 break;
137 }
138
139 ids_to_drop.insert(table.id());
140
141 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
142 collected_bytes =
143 collected_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
144 }
145 }
146
147 if ids_to_drop.is_empty() {
148 Choice::DoNothing
149 } else {
150 Choice::Drop(ids_to_drop)
151 }
152 }
153}
154
155#[cfg(test)]
156mod tests {
157 use super::Strategy;
158 use crate::{AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter};
159 use std::sync::Arc;
160
161 #[test]
162 fn fifo_empty_levels() -> crate::Result<()> {
163 let dir = tempfile::tempdir()?;
164 let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
165
166 let fifo = Arc::new(Strategy::new(1, None));
167 tree.compact(fifo, 0)?;
168
169 assert_eq!(0, tree.table_count());
170 Ok(())
171 }
172
173 #[test]
174 fn fifo_below_limit() -> crate::Result<()> {
175 let dir = tempfile::tempdir()?;
176 let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
177
178 for i in 0..4u8 {
179 tree.insert([b'k', i].as_slice(), "v", u64::from(i));
180 tree.flush_active_memtable(u64::from(i))?;
181 }
182
183 let before = tree.table_count();
184 let fifo = Arc::new(Strategy::new(u64::MAX, None));
185 tree.compact(fifo, 4)?;
186
187 assert_eq!(before, tree.table_count());
188 Ok(())
189 }
190
191 #[test]
192 fn fifo_more_than_limit() -> crate::Result<()> {
193 let dir = tempfile::tempdir()?;
194 let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
195
196 for i in 0..4u8 {
197 tree.insert([b'k', i].as_slice(), "v", u64::from(i));
198 tree.flush_active_memtable(u64::from(i))?;
199 }
200
201 let before = tree.table_count();
202 let fifo = Arc::new(Strategy::new(1, None));
204 tree.compact(fifo, 4)?;
205
206 assert!(tree.table_count() < before);
207 Ok(())
208 }
209
210 #[test]
211 fn fifo_more_than_limit_blobs() -> crate::Result<()> {
212 let dir = tempfile::tempdir()?;
213 let tree = Config::new(dir.path(), SequenceNumberCounter::default())
214 .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
215 .open()?;
216
217 for i in 0..3u8 {
218 tree.insert([b'k', i].as_slice(), "$", u64::from(i));
219 tree.flush_active_memtable(u64::from(i))?;
220 }
221
222 let before = tree.table_count();
223 let fifo = Arc::new(Strategy::new(1, None));
224 tree.compact(fifo, 3)?;
225
226 assert!(tree.table_count() < before);
227 Ok(())
228 }
229
230 #[test]
231 fn fifo_ttl() -> crate::Result<()> {
232 let dir = tempfile::tempdir()?;
233 let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
234
235 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_000)));
237 tree.insert("a", "1", 0);
238 tree.flush_active_memtable(0)?;
239
240 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_005)));
242 tree.insert("b", "2", 1);
243 tree.flush_active_memtable(1)?;
244
245 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_011)));
247
248 assert_eq!(2, tree.table_count());
249
250 let fifo = Arc::new(Strategy::new(u64::MAX, Some(10)));
251 tree.compact(fifo, 2)?;
252
253 assert_eq!(1, tree.table_count());
254
255 crate::time::set_unix_timestamp_for_test(None);
257 Ok(())
258 }
259
260 #[test]
261 fn fifo_ttl_then_limit_additional_drops_blob_unit() -> crate::Result<()> {
262 let dir = tempfile::tempdir()?;
263 let tree = Config::new(dir.path(), SequenceNumberCounter::default())
264 .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
265 .open()?;
266
267 tree.insert("a", "$", 0);
269 tree.flush_active_memtable(0)?;
270 tree.insert("b", "$", 1);
271 tree.flush_active_memtable(1)?;
272
273 crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(10_000_000)));
274
275 let fifo = Arc::new(Strategy::new(1, Some(1)));
277 tree.compact(fifo, 2)?;
278
279 assert_eq!(0, tree.table_count());
280
281 crate::time::set_unix_timestamp_for_test(None);
282 Ok(())
283 }
284}