lsm_tree/compaction/
fifo.rs1use super::{Choice, CompactionStrategy};
6use crate::{
7 HashSet, KvPair, compaction::state::CompactionState, config::Config, time::unix_timestamp,
8 version::Version,
9};
10#[cfg(not(feature = "std"))]
11use alloc::vec::Vec;
12
13#[doc(hidden)]
14pub const NAME: &str = "FifoCompaction";
15
16#[derive(Clone)]
34pub struct Strategy {
35 pub limit: u64,
37
38 pub ttl_seconds: Option<u64>,
40}
41
42impl Strategy {
43 #[must_use]
45 pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
46 Self { limit, ttl_seconds }
47 }
48}
49
50impl CompactionStrategy for Strategy {
51 fn get_name(&self) -> &'static str {
52 NAME
53 }
54
55 fn get_config(&self) -> Vec<KvPair> {
56 vec![
57 (
58 crate::UserKey::from("fifo_limit"),
59 crate::UserValue::from(self.limit.to_le_bytes()),
60 ),
61 (
62 crate::UserKey::from("fifo_ttl"),
63 crate::UserValue::from(if self.ttl_seconds.is_some() {
64 [1u8]
65 } else {
66 [0u8]
67 }),
68 ),
69 (
70 crate::UserKey::from("fifo_ttl_seconds"),
71 crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
72 ),
73 ]
74 }
75
76 fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
77 let first_level = version.l0();
78
79 if first_level.is_empty() {
81 return Choice::DoNothing;
82 }
83
84 assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
85
86 assert!(
87 !version.level_is_busy(0, state.hidden_set()),
88 "FIFO compaction never compacts",
89 );
90
91 let db_size = first_level.size() + version.blob_files.on_disk_size();
93
94 let mut ids_to_drop: HashSet<_> = HashSet::default();
95
96 let ttl_cutoff = match self.ttl_seconds {
99 Some(s) if s > 0 => Some(
100 unix_timestamp()
103 .as_nanos()
104 .saturating_sub(u128::from(s) * 1_000_000_000u128),
105 ),
106 _ => None,
107 };
108
109 let mut ttl_dropped_bytes = 0u64;
110 let mut alive = Vec::new();
111
112 for table in first_level.iter().flat_map(|run| run.iter()) {
113 let expired =
114 ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
115
116 if expired {
117 ids_to_drop.insert(table.id());
118 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
119 ttl_dropped_bytes += table.file_size() + linked_blob_file_bytes;
122 } else {
123 alive.push(table);
124 }
125 }
126
127 let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
129
130 if size_after_ttl > self.limit {
132 let overshoot = size_after_ttl - self.limit;
133
134 let mut collected_bytes = 0u64;
135
136 alive.sort_by_key(|t| t.metadata.created_at);
138
139 for table in alive {
140 if collected_bytes >= overshoot {
141 break;
142 }
143
144 ids_to_drop.insert(table.id());
145
146 let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
147 collected_bytes += table.file_size() + linked_blob_file_bytes;
150 }
151 }
152
153 if ids_to_drop.is_empty() {
154 Choice::DoNothing
155 } else {
156 Choice::Drop(ids_to_drop)
157 }
158 }
159}
160
161#[cfg(test)]
162mod tests;