lsm_tree/compaction/
fifo.rs

1// Copyright (c) 2024-present, fjall-rs
2// This source code is licensed under both the Apache 2.0 and MIT License
3// (found in the LICENSE-* files in the repository)
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7    compaction::state::CompactionState, config::Config, time::unix_timestamp, version::Version,
8    HashSet, KvPair,
9};
10
11#[doc(hidden)]
12pub const NAME: &str = "FifoCompaction";
13
14/// FIFO-style compaction
15///
16/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
17/// when the threshold is reached.
18///
19/// Will also merge tables if the number of tables in level 0 grows too much, which
20/// could cause write stalls.
21///
22/// Additionally, a (lazy) TTL can be configured to drop old tables.
23///
24/// ###### Caution
25///
26/// Only use it for specific workloads where:
27///
28/// 1) You only want to store recent data (unimportant logs, ...)
29/// 2) Your keyspace grows monotonically (e.g. time series)
30/// 3) You only insert new data (no updates)
31#[derive(Clone)]
32pub struct Strategy {
33    /// Data set size limit in bytes
34    pub limit: u64,
35
36    /// TTL in seconds, will be disabled if 0 or None
37    pub ttl_seconds: Option<u64>,
38}
39
40impl Strategy {
41    /// Configures a new `Fifo` compaction strategy
42    #[must_use]
43    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
44        Self { limit, ttl_seconds }
45    }
46}
47
48impl CompactionStrategy for Strategy {
49    fn get_name(&self) -> &'static str {
50        NAME
51    }
52
53    fn get_config(&self) -> Vec<KvPair> {
54        vec![
55            (
56                crate::UserKey::from("fifo_limit"),
57                crate::UserValue::from(self.limit.to_le_bytes()),
58            ),
59            (
60                crate::UserKey::from("fifo_ttl"),
61                crate::UserValue::from(if self.ttl_seconds.is_some() {
62                    [1u8]
63                } else {
64                    [0u8]
65                }),
66            ),
67            (
68                crate::UserKey::from("fifo_ttl_seconds"),
69                crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
70            ),
71        ]
72    }
73
74    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
75        let first_level = version.l0();
76
77        // Early return avoids unnecessary work and keeps FIFO a no-op when there is nothing to do.
78        if first_level.is_empty() {
79            return Choice::DoNothing;
80        }
81
82        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
83
84        assert!(
85            !version.level_is_busy(0, state.hidden_set()),
86            "FIFO compaction never compacts",
87        );
88
89        // Account for both table file bytes and value-log (blob) bytes to enforce the true space limit.
90        let db_size = first_level.size() + version.blob_files.on_disk_size();
91
92        let mut ids_to_drop: HashSet<_> = HashSet::default();
93
94        // Compute TTL cutoff once and perform a single pass to mark expired tables and
95        // accumulate their sizes. Also collect non-expired tables for possible size-based drops.
96        let ttl_cutoff = match self.ttl_seconds {
97            Some(s) if s > 0 => Some(
98                unix_timestamp()
99                    .as_nanos()
100                    .saturating_sub(u128::from(s) * 1_000_000_000u128),
101            ),
102            _ => None,
103        };
104
105        let mut ttl_dropped_bytes = 0u64;
106        let mut alive = Vec::new();
107
108        for table in first_level.iter().flat_map(|run| run.iter()) {
109            let expired =
110                ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
111
112            if expired {
113                ids_to_drop.insert(table.id());
114                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
115                ttl_dropped_bytes =
116                    ttl_dropped_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
117            } else {
118                alive.push(table);
119            }
120        }
121
122        // Subtract TTL-selected bytes to see if we're still over the limit.
123        let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
124
125        // If we still exceed the limit, drop additional oldest tables until within the limit.
126        if size_after_ttl > self.limit {
127            let overshoot = size_after_ttl - self.limit;
128
129            let mut collected_bytes = 0u64;
130
131            // Oldest-first list by creation time from the non-expired set.
132            alive.sort_by_key(|t| t.metadata.created_at);
133
134            for table in alive {
135                if collected_bytes >= overshoot {
136                    break;
137                }
138
139                ids_to_drop.insert(table.id());
140
141                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
142                collected_bytes =
143                    collected_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
144            }
145        }
146
147        if ids_to_drop.is_empty() {
148            Choice::DoNothing
149        } else {
150            Choice::Drop(ids_to_drop)
151        }
152    }
153}
154
155#[cfg(test)]
156mod tests {
157    use super::Strategy;
158    use crate::{AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter};
159    use std::sync::Arc;
160
161    #[test]
162    fn fifo_empty_levels() -> crate::Result<()> {
163        let dir = tempfile::tempdir()?;
164        let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
165
166        let fifo = Arc::new(Strategy::new(1, None));
167        tree.compact(fifo, 0)?;
168
169        assert_eq!(0, tree.table_count());
170        Ok(())
171    }
172
173    #[test]
174    fn fifo_below_limit() -> crate::Result<()> {
175        let dir = tempfile::tempdir()?;
176        let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
177
178        for i in 0..4u8 {
179            tree.insert([b'k', i].as_slice(), "v", u64::from(i));
180            tree.flush_active_memtable(u64::from(i))?;
181        }
182
183        let before = tree.table_count();
184        let fifo = Arc::new(Strategy::new(u64::MAX, None));
185        tree.compact(fifo, 4)?;
186
187        assert_eq!(before, tree.table_count());
188        Ok(())
189    }
190
191    #[test]
192    fn fifo_more_than_limit() -> crate::Result<()> {
193        let dir = tempfile::tempdir()?;
194        let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
195
196        for i in 0..4u8 {
197            tree.insert([b'k', i].as_slice(), "v", u64::from(i));
198            tree.flush_active_memtable(u64::from(i))?;
199        }
200
201        let before = tree.table_count();
202        // Very small limit forces dropping oldest tables
203        let fifo = Arc::new(Strategy::new(1, None));
204        tree.compact(fifo, 4)?;
205
206        assert!(tree.table_count() < before);
207        Ok(())
208    }
209
210    #[test]
211    fn fifo_more_than_limit_blobs() -> crate::Result<()> {
212        let dir = tempfile::tempdir()?;
213        let tree = Config::new(dir.path(), SequenceNumberCounter::default())
214            .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
215            .open()?;
216
217        for i in 0..3u8 {
218            tree.insert([b'k', i].as_slice(), "$", u64::from(i));
219            tree.flush_active_memtable(u64::from(i))?;
220        }
221
222        let before = tree.table_count();
223        let fifo = Arc::new(Strategy::new(1, None));
224        tree.compact(fifo, 3)?;
225
226        assert!(tree.table_count() < before);
227        Ok(())
228    }
229
230    #[test]
231    fn fifo_ttl() -> crate::Result<()> {
232        let dir = tempfile::tempdir()?;
233        let tree = Config::new(dir.path(), SequenceNumberCounter::default()).open()?;
234
235        // Freeze time and create first (older) table at t=1000s
236        crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_000)));
237        tree.insert("a", "1", 0);
238        tree.flush_active_memtable(0)?;
239
240        // Advance time and create second (newer) table at t=1005s
241        crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_005)));
242        tree.insert("b", "2", 1);
243        tree.flush_active_memtable(1)?;
244
245        // Now set current time to t=1011s; with TTL=10s, cutoff=1001s => drop first only
246        crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_011)));
247
248        assert_eq!(2, tree.table_count());
249
250        let fifo = Arc::new(Strategy::new(u64::MAX, Some(10)));
251        tree.compact(fifo, 2)?;
252
253        assert_eq!(1, tree.table_count());
254
255        // Reset override
256        crate::time::set_unix_timestamp_for_test(None);
257        Ok(())
258    }
259
260    #[test]
261    fn fifo_ttl_then_limit_additional_drops_blob_unit() -> crate::Result<()> {
262        let dir = tempfile::tempdir()?;
263        let tree = Config::new(dir.path(), SequenceNumberCounter::default())
264            .with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
265            .open()?;
266
267        // Create two tables; we will expire them via time override and force additional drops via limit.
268        tree.insert("a", "$", 0);
269        tree.flush_active_memtable(0)?;
270        tree.insert("b", "$", 1);
271        tree.flush_active_memtable(1)?;
272
273        crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(10_000_000)));
274
275        // TTL=1s will mark both expired; very small limit ensures size-based collection path is also exercised.
276        let fifo = Arc::new(Strategy::new(1, Some(1)));
277        tree.compact(fifo, 2)?;
278
279        assert_eq!(0, tree.table_count());
280
281        crate::time::set_unix_timestamp_for_test(None);
282        Ok(())
283    }
284}