use ailake_catalog::provider::DeletionVector;
use ailake_core::{AilakeError, AilakeResult};
use ailake_store::Store;
use roaring::RoaringBitmap;
use std::sync::Arc;
pub async fn load_deletion_vector(
store: &Arc<dyn Store>,
dv: &DeletionVector,
) -> AilakeResult<RoaringBitmap> {
let bytes = store
.get_range(&dv.path, dv.offset..dv.offset + dv.length)
.await?;
RoaringBitmap::deserialize_from(bytes.as_ref()).map_err(|e| {
AilakeError::Io(std::io::Error::other(format!(
"ailake: failed to deserialize Deletion Vector bitmap from '{}' \
(offset={}, length={}): {e}",
dv.path, dv.offset, dv.length
)))
})
}
#[inline]
pub fn has_deletions(bitmap: &RoaringBitmap, row_ids: &[u64]) -> bool {
row_ids.iter().any(|&id| bitmap.contains(id as u32))
}
#[cfg(test)]
mod tests {
use super::*;
use ailake_store::LocalStore;
use bytes::Bytes;
use roaring::RoaringBitmap;
fn make_bitmap_bytes(deleted: &[u32]) -> Vec<u8> {
let mut bm = RoaringBitmap::new();
for &r in deleted {
bm.insert(r);
}
let mut buf = Vec::new();
bm.serialize_into(&mut buf).unwrap();
buf
}
#[tokio::test]
async fn load_dv_roundtrip() {
let dir = tempfile::tempdir().unwrap();
let bitmap_bytes = make_bitmap_bytes(&[0, 5, 42, 1000]);
let offset: u64 = 16; let mut file_bytes = vec![0u8; offset as usize]; file_bytes.extend_from_slice(&bitmap_bytes);
let dvd_path = "data/dv-0001.dvd";
let store: Arc<dyn Store> = Arc::new(LocalStore::new(dir.path()));
store.put(dvd_path, Bytes::from(file_bytes)).await.unwrap();
let dv = DeletionVector {
path: dvd_path.to_string(),
offset,
length: bitmap_bytes.len() as u64,
cardinality: 4,
};
let bm = load_deletion_vector(&store, &dv).await.unwrap();
assert!(bm.contains(0));
assert!(bm.contains(5));
assert!(bm.contains(42));
assert!(bm.contains(1000));
assert!(!bm.contains(1)); assert_eq!(bm.len(), 4);
}
#[test]
fn has_deletions_detects_overlap() {
let mut bm = RoaringBitmap::new();
bm.insert(10);
bm.insert(20);
assert!(has_deletions(&bm, &[5, 10, 15])); assert!(!has_deletions(&bm, &[1, 2, 3])); }
}