use crate::kernel::lsm::table::loader::TableLoader;
use itertools::Itertools;
use std::sync::Arc;
use tokio::sync::mpsc::UnboundedReceiver;
use tracing::error;
#[derive(Debug)]
pub enum CleanTag {
Clean(u64),
Add { version: u64, gens: Vec<i64> },
}
pub(crate) struct Cleaner {
ss_table_loader: Arc<TableLoader>,
tag_rx: UnboundedReceiver<CleanTag>,
del_gens: Vec<(u64, Vec<i64>)>,
}
impl Cleaner {
pub(crate) fn new(
ss_table_loader: &Arc<TableLoader>,
tag_rx: UnboundedReceiver<CleanTag>,
) -> Self {
Self {
ss_table_loader: Arc::clone(ss_table_loader),
tag_rx,
del_gens: Vec::new(),
}
}
pub(crate) async fn listen(&mut self) {
loop {
match self.tag_rx.recv().await {
Some(CleanTag::Clean(ver_num)) => self.clean(ver_num),
Some(CleanTag::Add { version, gens }) => {
self.del_gens.push((version, gens));
}
None => {
let all_ver_num = self
.del_gens
.iter()
.map(|(ver_num, _)| ver_num)
.cloned()
.collect_vec();
for ver_num in all_ver_num {
self.clean(ver_num)
}
return;
}
}
}
}
fn clean(&mut self, ver_num: u64) {
if let Some(index) = Self::find_index_with_ver_num(&self.del_gens, ver_num) {
let (_, mut vec_gen) = self.del_gens.remove(index);
if index == 0 {
let ss_table_loader = &self.ss_table_loader;
for gen in vec_gen {
if let Err(err) = ss_table_loader.clean(gen) {
error!(
"[Cleaner][clean][SSTable: {}]: Remove Error!: {:?}",
gen, err
);
};
}
} else {
if let Some((_, pre_vec_gen)) = self.del_gens.get_mut(index - 1) {
pre_vec_gen.append(&mut vec_gen);
}
}
}
}
fn find_index_with_ver_num(del_gen: &[(u64, Vec<i64>)], ver_num: u64) -> Option<usize> {
del_gen
.iter()
.enumerate()
.find(|(_, (vn, _))| vn == &ver_num)
.map(|(index, _)| index)
}
}