use super::types::{TypedColumn, VacuumConfig, VacuumStats};
use super::ColumnStore;
use roaring::RoaringBitmap;
use std::collections::HashMap;
fn compact_vec<T: Copy>(
data: &[T],
deleted: &rustc_hash::FxHashSet<usize>,
element_bytes: u64,
) -> (Vec<T>, u64) {
let mut new_data = Vec::with_capacity(data.len().saturating_sub(deleted.len()));
let mut bytes_reclaimed = 0u64;
for (idx, value) in data.iter().enumerate() {
if deleted.contains(&idx) {
bytes_reclaimed += element_bytes;
} else {
new_data.push(*value);
}
}
(new_data, bytes_reclaimed)
}
impl ColumnStore {
pub fn vacuum(&mut self, _config: VacuumConfig) -> VacuumStats {
let start = std::time::Instant::now();
let tombstones_found = self.deleted_rows.len();
if tombstones_found == 0 {
return VacuumStats {
tombstones_found: 0,
completed: true,
duration_ms: start.elapsed().as_millis() as u64,
..Default::default()
};
}
let mut stats = VacuumStats {
tombstones_found,
..Default::default()
};
let (old_to_new, new_row_count) = self.build_compaction_map();
self.compact_all_columns(&mut stats);
self.remap_primary_index(&old_to_new);
self.remap_row_expiry(&old_to_new);
stats.tombstones_removed = self.deleted_rows.len();
self.deleted_rows.clear();
self.deletion_bitmap.clear();
self.row_count = new_row_count;
stats.completed = true;
stats.duration_ms = start.elapsed().as_millis() as u64;
stats
}
fn build_compaction_map(&self) -> (HashMap<usize, usize>, usize) {
let mut old_to_new: HashMap<usize, usize> = HashMap::new();
let mut new_idx = 0;
for old_idx in 0..self.row_count {
if !self.deleted_rows.contains(&old_idx) {
old_to_new.insert(old_idx, new_idx);
new_idx += 1;
}
}
(old_to_new, new_idx)
}
fn compact_all_columns(&mut self, stats: &mut VacuumStats) {
for column in self.columns.values_mut() {
let (new_col, bytes) = Self::compact_column(column, &self.deleted_rows);
stats.bytes_reclaimed += bytes;
*column = new_col;
}
}
fn remap_primary_index(&mut self, old_to_new: &HashMap<usize, usize>) {
if self.primary_key_column.is_none() {
return;
}
let mut new_primary_index: HashMap<i64, usize> = HashMap::new();
let mut new_row_idx_to_pk: HashMap<usize, i64> = HashMap::new();
for (pk, old_idx) in &self.primary_index {
if let Some(&new_idx) = old_to_new.get(old_idx) {
new_primary_index.insert(*pk, new_idx);
new_row_idx_to_pk.insert(new_idx, *pk);
}
}
self.primary_index = new_primary_index;
self.row_idx_to_pk = new_row_idx_to_pk;
}
fn remap_row_expiry(&mut self, old_to_new: &HashMap<usize, usize>) {
let mut new_row_expiry: HashMap<usize, u64> = HashMap::new();
for (old_idx, expiry) in &self.row_expiry {
if let Some(&new_idx) = old_to_new.get(old_idx) {
new_row_expiry.insert(new_idx, *expiry);
}
}
self.row_expiry = new_row_expiry;
}
fn compact_column(
column: &TypedColumn,
deleted: &rustc_hash::FxHashSet<usize>,
) -> (TypedColumn, u64) {
match column {
TypedColumn::Int(data) => {
let (new_data, bytes) = compact_vec(data, deleted, 8);
(TypedColumn::Int(new_data), bytes)
}
TypedColumn::Float(data) => {
let (new_data, bytes) = compact_vec(data, deleted, 8);
(TypedColumn::Float(new_data), bytes)
}
TypedColumn::String(data) => {
let (new_data, bytes) = compact_vec(data, deleted, 4);
(TypedColumn::String(new_data), bytes)
}
TypedColumn::Bool(data) => {
let (new_data, bytes) = compact_vec(data, deleted, 1);
(TypedColumn::Bool(new_data), bytes)
}
}
}
#[must_use]
pub fn should_vacuum(&self, threshold: f64) -> bool {
if self.row_count == 0 {
return false;
}
let ratio = self.deleted_rows.len() as f64 / self.row_count as f64;
ratio >= threshold
}
#[must_use]
#[inline]
pub fn is_row_deleted_bitmap(&self, row_idx: usize) -> bool {
if let Ok(idx) = u32::try_from(row_idx) {
self.deletion_bitmap.contains(idx)
} else {
self.deleted_rows.contains(&row_idx)
}
}
pub fn live_row_indices(&self) -> impl Iterator<Item = usize> + '_ {
(0..self.row_count).filter(|&idx| !self.is_row_deleted_bitmap(idx))
}
#[must_use]
pub fn deletion_bitmap(&self) -> &RoaringBitmap {
&self.deletion_bitmap
}
#[must_use]
pub fn deleted_count_bitmap(&self) -> u64 {
self.deletion_bitmap.len()
}
}