use roaring::RoaringBitmap;
use crate::error::ColumnarError;
#[derive(Debug, Clone)]
pub struct DeleteBitmap {
inner: RoaringBitmap,
}
impl DeleteBitmap {
pub fn new() -> Self {
Self {
inner: RoaringBitmap::new(),
}
}
pub fn mark_deleted(&mut self, row_idx: u32) -> bool {
self.inner.insert(row_idx)
}
pub fn mark_deleted_batch(&mut self, row_indices: &[u32]) {
for &idx in row_indices {
self.inner.insert(idx);
}
}
pub fn is_deleted(&self, row_idx: u32) -> bool {
self.inner.contains(row_idx)
}
pub fn deleted_count(&self) -> u64 {
self.inner.len()
}
pub fn is_empty(&self) -> bool {
self.inner.is_empty()
}
pub fn delete_ratio(&self, total_rows: u64) -> f64 {
if total_rows == 0 {
return 0.0;
}
self.inner.len() as f64 / total_rows as f64
}
pub fn should_compact(&self, total_rows: u64, threshold: f64) -> bool {
self.delete_ratio(total_rows) > threshold
}
pub fn is_block_fully_deleted(&self, block_start: u32, block_len: u32) -> bool {
if block_len == 0 {
return true;
}
let count = self
.inner
.range(block_start..block_start + block_len)
.count() as u32;
count == block_len
}
pub fn apply_to_validity(&self, valid: &mut [bool], global_offset: u32) {
for (i, v) in valid.iter_mut().enumerate() {
if self.inner.contains(global_offset + i as u32) {
*v = false;
}
}
}
pub fn to_bytes(&self) -> Result<Vec<u8>, ColumnarError> {
let mut buf = Vec::with_capacity(self.inner.serialized_size());
self.inner
.serialize_into(&mut buf)
.map_err(|e| ColumnarError::Serialization(format!("delete bitmap: {e}")))?;
Ok(buf)
}
pub fn from_bytes(data: &[u8]) -> Result<Self, ColumnarError> {
let inner = RoaringBitmap::deserialize_from(data)
.map_err(|e| ColumnarError::Serialization(format!("delete bitmap: {e}")))?;
Ok(Self { inner })
}
pub fn iter(&self) -> impl Iterator<Item = u32> + '_ {
self.inner.iter()
}
pub fn merge(&mut self, other: &DeleteBitmap) {
self.inner |= &other.inner;
}
}
impl Default for DeleteBitmap {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mark_and_check() {
let mut bm = DeleteBitmap::new();
assert!(!bm.is_deleted(5));
assert!(bm.mark_deleted(5));
assert!(bm.is_deleted(5));
assert!(!bm.mark_deleted(5)); assert_eq!(bm.deleted_count(), 1);
}
#[test]
fn batch_delete() {
let mut bm = DeleteBitmap::new();
bm.mark_deleted_batch(&[1, 3, 5, 7, 9]);
assert_eq!(bm.deleted_count(), 5);
assert!(bm.is_deleted(3));
assert!(!bm.is_deleted(4));
}
#[test]
fn serialization_roundtrip() {
let mut bm = DeleteBitmap::new();
bm.mark_deleted_batch(&[0, 10, 100, 1000, 10000]);
let bytes = bm.to_bytes().unwrap();
let restored = DeleteBitmap::from_bytes(&bytes).expect("deserialize");
assert_eq!(restored.deleted_count(), 5);
assert!(restored.is_deleted(0));
assert!(restored.is_deleted(10));
assert!(restored.is_deleted(10000));
assert!(!restored.is_deleted(50));
}
#[test]
fn delete_ratio_and_compaction() {
let mut bm = DeleteBitmap::new();
for i in 0..30 {
bm.mark_deleted(i);
}
assert!((bm.delete_ratio(100) - 0.3).abs() < 0.001);
assert!(bm.should_compact(100, 0.2));
assert!(!bm.should_compact(100, 0.5));
}
#[test]
fn block_fully_deleted() {
let mut bm = DeleteBitmap::new();
for i in 10..20 {
bm.mark_deleted(i);
}
assert!(bm.is_block_fully_deleted(10, 10));
assert!(!bm.is_block_fully_deleted(10, 11)); assert!(!bm.is_block_fully_deleted(5, 10)); }
#[test]
fn apply_to_validity() {
let mut bm = DeleteBitmap::new();
bm.mark_deleted_batch(&[2, 4]);
let mut valid = vec![true, true, true, true, true];
bm.apply_to_validity(&mut valid, 0);
assert!(valid[0]);
assert!(valid[1]);
assert!(!valid[2]); assert!(valid[3]);
assert!(!valid[4]); }
#[test]
fn apply_to_validity_with_offset() {
let mut bm = DeleteBitmap::new();
bm.mark_deleted(1025);
let mut valid = vec![true; 1024];
bm.apply_to_validity(&mut valid, 1024);
assert!(valid[0]); assert!(!valid[1]); assert!(valid[2]); }
#[test]
fn merge_bitmaps() {
let mut bm1 = DeleteBitmap::new();
bm1.mark_deleted_batch(&[1, 2, 3]);
let mut bm2 = DeleteBitmap::new();
bm2.mark_deleted_batch(&[3, 4, 5]);
bm1.merge(&bm2);
assert_eq!(bm1.deleted_count(), 5); }
#[test]
fn empty_bitmap() {
let bm = DeleteBitmap::new();
assert!(bm.is_empty());
assert_eq!(bm.deleted_count(), 0);
assert!(!bm.is_deleted(0));
let bytes = bm.to_bytes().unwrap();
let restored = DeleteBitmap::from_bytes(&bytes).expect("deserialize");
assert!(restored.is_empty());
}
}