use super::{Choice, CompactionStrategy};
use crate::{
HashSet, KvPair, compaction::state::CompactionState, config::Config, time::unix_timestamp,
version::Version,
};
#[doc(hidden)]
pub const NAME: &str = "FifoCompaction";
#[derive(Clone)]
pub struct Strategy {
pub limit: u64,
pub ttl_seconds: Option<u64>,
}
impl Strategy {
#[must_use]
pub fn new(limit: u64, ttl_seconds: Option<u64>) -> Self {
Self { limit, ttl_seconds }
}
}
impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
NAME
}
fn get_config(&self) -> Vec<KvPair> {
vec![
(
crate::UserKey::from("fifo_limit"),
crate::UserValue::from(self.limit.to_le_bytes()),
),
(
crate::UserKey::from("fifo_ttl"),
crate::UserValue::from(if self.ttl_seconds.is_some() {
[1u8]
} else {
[0u8]
}),
),
(
crate::UserKey::from("fifo_ttl_seconds"),
crate::UserValue::from(self.ttl_seconds.map(u64::to_le_bytes).unwrap_or_default()),
),
]
}
fn choose(&self, version: &Version, _: &Config, state: &CompactionState) -> Choice {
let first_level = version.l0();
if first_level.is_empty() {
return Choice::DoNothing;
}
assert!(first_level.is_disjoint(), "L0 needs to be disjoint");
assert!(
!version.level_is_busy(0, state.hidden_set()),
"FIFO compaction never compacts",
);
let db_size = first_level.size() + version.blob_files.on_disk_size();
let mut ids_to_drop: HashSet<_> = HashSet::default();
let ttl_cutoff = match self.ttl_seconds {
Some(s) if s > 0 => Some(
unix_timestamp()
.as_nanos()
.saturating_sub(u128::from(s) * 1_000_000_000u128),
),
_ => None,
};
let mut ttl_dropped_bytes = 0u64;
let mut alive = Vec::new();
for table in first_level.iter().flat_map(|run| run.iter()) {
let expired =
ttl_cutoff.is_some_and(|cutoff| u128::from(table.metadata.created_at) <= cutoff);
if expired {
ids_to_drop.insert(table.id());
let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
ttl_dropped_bytes =
ttl_dropped_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
} else {
alive.push(table);
}
}
let size_after_ttl = db_size.saturating_sub(ttl_dropped_bytes);
if size_after_ttl > self.limit {
let overshoot = size_after_ttl - self.limit;
let mut collected_bytes = 0u64;
alive.sort_by_key(|t| t.metadata.created_at);
for table in alive {
if collected_bytes >= overshoot {
break;
}
ids_to_drop.insert(table.id());
let linked_blob_file_bytes = table.referenced_blob_bytes().unwrap_or_default();
collected_bytes =
collected_bytes.saturating_add(table.file_size() + linked_blob_file_bytes);
}
}
if ids_to_drop.is_empty() {
Choice::DoNothing
} else {
Choice::Drop(ids_to_drop)
}
}
}
#[cfg(test)]
mod tests {
use super::Strategy;
use crate::{AbstractTree, Config, KvSeparationOptions, SequenceNumberCounter};
use std::sync::Arc;
#[test]
fn fifo_empty_levels() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
let fifo = Arc::new(Strategy::new(1, None));
tree.compact(fifo, 0)?;
assert_eq!(0, tree.table_count());
Ok(())
}
#[test]
fn fifo_below_limit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
let before = tree.table_count();
let fifo = Arc::new(Strategy::new(u64::MAX, None));
tree.compact(fifo, 4)?;
assert_eq!(before, tree.table_count());
Ok(())
}
#[test]
fn fifo_more_than_limit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
for i in 0..4u8 {
tree.insert([b'k', i].as_slice(), "v", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
let before = tree.table_count();
let fifo = Arc::new(Strategy::new(1, None));
tree.compact(fifo, 4)?;
assert!(tree.table_count() < before);
Ok(())
}
#[test]
fn fifo_more_than_limit_blobs() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
.open()?;
for i in 0..3u8 {
tree.insert([b'k', i].as_slice(), "$", u64::from(i));
tree.flush_active_memtable(u64::from(i))?;
}
let before = tree.table_count();
let fifo = Arc::new(Strategy::new(1, None));
tree.compact(fifo, 3)?;
assert!(tree.table_count() < before);
Ok(())
}
#[test]
fn fifo_ttl() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?;
crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_000)));
tree.insert("a", "1", 0);
tree.flush_active_memtable(0)?;
crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_005)));
tree.insert("b", "2", 1);
tree.flush_active_memtable(1)?;
crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(1_011)));
assert_eq!(2, tree.table_count());
let fifo = Arc::new(Strategy::new(u64::MAX, Some(10)));
tree.compact(fifo, 2)?;
assert_eq!(1, tree.table_count());
crate::time::set_unix_timestamp_for_test(None);
Ok(())
}
#[test]
fn fifo_ttl_then_limit_additional_drops_blob_unit() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.with_kv_separation(Some(KvSeparationOptions::default().separation_threshold(1)))
.open()?;
tree.insert("a", "$", 0);
tree.flush_active_memtable(0)?;
tree.insert("b", "$", 1);
tree.flush_active_memtable(1)?;
crate::time::set_unix_timestamp_for_test(Some(std::time::Duration::from_secs(10_000_000)));
let fifo = Arc::new(Strategy::new(1, Some(1)));
tree.compact(fifo, 2)?;
assert_eq!(0, tree.table_count());
crate::time::set_unix_timestamp_for_test(None);
Ok(())
}
}