Skip to main content

lsm_tree/compaction/
fifo.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2024-present, fjall-rs
3// Copyright (c) 2026-present, Structured World Foundation
4
5use super::{Choice, CompactionStrategy};
6use crate::{
7    HashSet, KvPair, compaction::state::CompactionState, config::Config, time::unix_timestamp,
8    version::Version,
9};
10#[cfg(not(feature = "std"))]
11use alloc::vec::Vec;
12
13#[doc(hidden)]
14pub const NAME: &str = "FifoCompaction";
15
16/// FIFO-style compaction
17///
18/// Limits the tree size to roughly `limit` bytes, deleting the oldest table(s)
19/// when the threshold is reached.
20///
21/// Will also merge tables if the number of tables in level 0 grows too much, which
22/// could cause write stalls.
23///
24/// Additionally, a (lazy) TTL can be configured to drop old tables.
25///
26/// ###### Caution
27///
28/// Only use it for specific workloads where:
29///
30/// 1) You only want to store recent data (unimportant logs, ...)
31/// 2) The key order of inserts is strictly monotonically increasing or decreasing
32/// 3) You only insert new data (no updates/deletes)
33#[derive(Clone)]
34pub struct Strategy {
35    /// Data set size limit in bytes
36    pub limit: u64,
37
38    /// TTL in seconds, will be disabled if 0 or None
39    pub ttl_seconds: Option<u64>,
40}
41
42impl Strategy {
43    /// Configures a new `Fifo` compaction strategy
44    #[must_use]
45    pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
46        Self { limit, ttl_seconds }
47    }
48}
49
50impl CompactionStrategy for Strategy {
51    fn get_name(&self) -> &'static str {
52        NAME
53    }
54
55    fn get_config(&self) -> Vec<KvPair> {
56        vec![
57            (
58                crate::UserKey::from("fifo_limit"),
59                crate::UserValue::from(self.limit.to_le_bytes()),
60            ),
61            (
62                crate::UserKey::from("fifo_ttl"),
63                crate::UserValue::from(if self.ttl_seconds.is_some() {
64                    [1u8]
65                } else {
66                    [0u8]
67                }),
68            ),
69            (
70                crate::UserKey::from("fifo_ttl_seconds"),
71                crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
72            ),
73        ]
74    }
75
76    fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
77        let first_level = version.l0();
78
79        // Early return avoids unnecessary work and keeps FIFO a no-op when there is nothing to do.
80        if first_level.is_empty() {
81            return Choice::DoNothing;
82        }
83
84        assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
85
86        assert!(
87            !version.level_is_busy(0, state.hidden_set()),
88            "FIFO compaction never compacts",
89        );
90
91        // Account for both table file bytes and value-log (blob) bytes to enforce the true space limit.
92        let db_size = first_level.size() + version.blob_files.on_disk_size();
93
94        let mut ids_to_drop: HashSet<_> = HashSet::default();
95
96        // Compute TTL cutoff once and perform a single pass to mark expired tables and
97        // accumulate their sizes. Also collect non-expired tables for possible size-based drops.
98        let ttl_cutoff = match self.ttl_seconds {
99            Some(s) if s > 0 => Some(
100                // Clamp-to-zero: a TTL longer than the wall clock leaves no
101                // expiry cutoff rather than wrapping.
102                unix_timestamp()
103                    .as_nanos()
104                    .saturating_sub(u128::from(s) * 1_000_000_000u128),
105            ),
106            _ => None,
107        };
108
109        let mut ttl_dropped_bytes = 0u64;
110        let mut alive = Vec::new();
111
112        for table in first_level.iter().flat_map(|run| run.iter()) {
113            let expired =
114                ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
115
116            if expired {
117                ids_to_drop.insert(table.id());
118                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
119                // Accumulated dropped-byte total, bounded by the on-disk size;
120                // cannot overflow u64.
121                ttl_dropped_bytes += table.file_size() + linked_blob_file_bytes;
122            } else {
123                alive.push(table);
124            }
125        }
126
127        // Subtract TTL-selected bytes to see if we're still over the limit.
128        let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
129
130        // If we still exceed the limit, drop additional oldest tables until within the limit.
131        if size_after_ttl > self.limit {
132            let overshoot = size_after_ttl - self.limit;
133
134            let mut collected_bytes = 0u64;
135
136            // Oldest-first list by creation time from the non-expired set.
137            alive.sort_by_key(|t| t.metadata.created_at);
138
139            for table in alive {
140                if collected_bytes >= overshoot {
141                    break;
142                }
143
144                ids_to_drop.insert(table.id());
145
146                let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
147                // Accumulated collected-byte total, bounded by the on-disk size;
148                // cannot overflow u64.
149                collected_bytes += table.file_size() + linked_blob_file_bytes;
150            }
151        }
152
153        if ids_to_drop.is_empty() {
154            Choice::DoNothing
155        } else {
156            Choice::Drop(ids_to_drop)
157        }
158    }
159}
160
161#[cfg(test)]
162mod tests;