use std::borrow::Borrow;
use std::io::Write;
use bytes::Bytes;
use roaring::RoaringTreemap;
use crate::actions::deletion_vector::{
create_dv_crc32, DeletionVectorDescriptor, DeletionVectorPath, DeletionVectorStorageType,
};
use crate::{DeltaResult, Error};
pub trait DeletionVector: Sized {
type IndexIterator: Iterator<Item = u64>;
fn into_iter(self) -> Self::IndexIterator;
fn cardinality(&self) -> u64;
fn serialize(self) -> DeltaResult<Bytes> {
let treemap: RoaringTreemap = self.into_iter().collect();
let mut serialized = Vec::new();
treemap
.serialize_into(&mut serialized)
.map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {e}")))?;
Ok(Bytes::from(serialized))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeletionVectorWriteResult {
pub offset: i32,
pub size_in_bytes: i32,
pub cardinality: i64,
}
impl DeletionVectorWriteResult {
pub fn to_descriptor(self, path: &DeletionVectorPath) -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: path.encoded_relative_path(),
offset: Some(self.offset),
size_in_bytes: self.size_in_bytes,
cardinality: self.cardinality,
}
}
}
#[derive(Debug, Clone)]
pub struct KernelDeletionVector {
dv: RoaringTreemap,
}
impl Default for KernelDeletionVector {
fn default() -> Self {
Self::new()
}
}
impl KernelDeletionVector {
pub fn new() -> Self {
Self {
dv: RoaringTreemap::new(),
}
}
pub fn add_deleted_row_indexes<I, T>(&mut self, iter: I)
where
I: IntoIterator<Item = T>,
T: Borrow<u64>,
{
for index in iter {
self.dv.insert(*index.borrow());
}
}
pub fn cardinality(&self) -> u64 {
self.dv.len()
}
}
impl DeletionVector for KernelDeletionVector {
type IndexIterator = roaring::treemap::IntoIter;
fn into_iter(self) -> Self::IndexIterator {
self.dv.into_iter()
}
fn serialize(self) -> DeltaResult<Bytes> {
let mut serialized = Vec::new();
self.dv
.serialize_into(&mut serialized)
.map_err(|e| Error::generic(format!("Failed to serialize deletion vector: {e}")))?;
Ok(Bytes::from(serialized))
}
fn cardinality(&self) -> u64 {
self.dv.len()
}
}
pub struct StreamingDeletionVectorWriter<'a, W: Write> {
writer: &'a mut W,
current_offset: usize,
}
impl<'a, W: Write> StreamingDeletionVectorWriter<'a, W> {
pub fn new(writer: &'a mut W) -> Self {
Self {
writer,
current_offset: 0,
}
}
pub fn write_deletion_vector(
&mut self,
deletion_vector: impl DeletionVector,
) -> DeltaResult<DeletionVectorWriteResult> {
if self.current_offset == 0 {
self.writer
.write_all(&[1u8])
.map_err(|e| Error::generic(format!("Failed to write version byte: {e}")))?;
self.current_offset = 1;
}
let cardinality = deletion_vector.cardinality();
let serialized = deletion_vector.serialize()?;
let dv_size = serialized.len() + 4;
if dv_size > i32::MAX as usize {
return Err(Error::generic(
"Deletion vector size exceeds maximum allowed size",
));
}
let dv_offset: i32 = self
.current_offset
.try_into()
.map_err(|_| Error::generic("Deletion vector offset doesn't fit in i32"))?;
let size_bytes = (dv_size as u32).to_be_bytes();
self.writer
.write_all(&size_bytes)
.map_err(|e| Error::generic(format!("Failed to write size: {e}")))?;
let magic: u32 = 1681511377;
self.writer
.write_all(&magic.to_le_bytes())
.map_err(|e| Error::generic(format!("Failed to write magic: {e}")))?;
self.writer
.write_all(&serialized)
.map_err(|e| Error::generic(format!("Failed to write deletion vector data: {e}")))?;
let crc_instance = create_dv_crc32();
let mut digest = crc_instance.digest();
digest.update(&magic.to_le_bytes());
digest.update(&serialized);
let checksum = digest.finalize();
self.writer
.write_all(&checksum.to_be_bytes())
.map_err(|e| Error::generic(format!("Failed to write CRC32 checksum: {e}")))?;
let bytes_written = 4 + dv_size + 4; self.current_offset += bytes_written;
Ok(DeletionVectorWriteResult {
offset: dv_offset,
size_in_bytes: dv_size as i32,
cardinality: cardinality as i64,
})
}
pub fn finalize(self) -> DeltaResult<()> {
self.writer
.flush()
.map_err(|e| Error::generic(format!("Failed to flush writer: {e}")))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Cursor;
#[test]
fn test_kernel_deletion_vector_new() {
let dv = KernelDeletionVector::new();
assert_eq!(dv.cardinality(), 0);
}
#[test]
fn test_kernel_deletion_vector_add_indexes() {
let mut dv = KernelDeletionVector::new();
dv.add_deleted_row_indexes([1u64, 5, 10]);
assert_eq!(dv.cardinality(), 3);
assert_eq!(
dv.into_iter().collect::<RoaringTreemap>(),
RoaringTreemap::from_iter([1, 5, 10])
);
}
#[test]
fn test_streaming_writer_single_dv() {
let mut buffer = Vec::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut buffer);
let mut dv = KernelDeletionVector::new();
dv.add_deleted_row_indexes([0u64, 9]);
let descriptor = writer.write_deletion_vector(dv).unwrap();
writer.finalize().unwrap();
assert_eq!(descriptor.offset, 1); assert_eq!(descriptor.cardinality, 2);
assert!(descriptor.size_in_bytes > 0);
assert!(!buffer.is_empty());
assert_eq!(buffer[0], 1); }
#[test]
fn test_streaming_writer_multiple_dvs() {
let mut buffer = Vec::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut buffer);
let mut dv1 = KernelDeletionVector::new();
dv1.add_deleted_row_indexes([0u64, 9]);
let mut dv2 = KernelDeletionVector::new();
dv2.add_deleted_row_indexes([5u64, 15, 25]);
let desc1 = writer.write_deletion_vector(dv1).unwrap();
let desc2 = writer.write_deletion_vector(dv2).unwrap();
writer.finalize().unwrap();
assert_eq!(desc1.offset, 1);
assert!(desc2.offset > desc1.offset);
assert_eq!(desc1.cardinality, 2);
assert_eq!(desc2.cardinality, 3);
}
#[test]
fn test_streaming_writer_empty_dv() {
use crate::Engine;
use std::fs::File;
use tempfile::tempdir;
use url::Url;
let temp_dir = tempdir().unwrap();
let table_url = Url::from_directory_path(temp_dir.path()).unwrap();
let dv_path = DeletionVectorPath::new(table_url.clone(), String::from("test"));
let file_path = dv_path.absolute_path().unwrap().to_file_path().unwrap();
if let Some(parent) = file_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let mut file = File::create(&file_path).unwrap();
let dv = KernelDeletionVector::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut file);
let write_result = writer.write_deletion_vector(dv).unwrap();
writer.finalize().unwrap();
drop(file);
assert_eq!(write_result.offset, 1); assert_eq!(write_result.cardinality, 0);
assert!(write_result.size_in_bytes > 0);
use crate::engine::sync::SyncEngine;
let engine = SyncEngine::new();
let storage = engine.storage_handler();
let descriptor = write_result.to_descriptor(&dv_path);
let treemap = descriptor.read(storage, &table_url).unwrap();
assert_eq!(treemap.len(), 0);
assert!(treemap.is_empty());
}
#[test]
fn test_streaming_writer_roundtrip() {
let mut buffer = Vec::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut buffer);
let mut dv = KernelDeletionVector::new();
let test_indexes = vec![3, 4, 7, 11, 18, 29];
dv.add_deleted_row_indexes(&test_indexes);
let descriptor = writer.write_deletion_vector(dv).unwrap();
writer.finalize().unwrap();
let mut cursor = Cursor::new(buffer);
cursor.set_position(descriptor.offset as u64);
let mut size_buf = [0u8; 4];
std::io::Read::read_exact(&mut cursor, &mut size_buf).unwrap();
let size = u32::from_be_bytes(size_buf);
assert_eq!(size, descriptor.size_in_bytes as u32);
let mut magic_buf = [0u8; 4];
std::io::Read::read_exact(&mut cursor, &mut magic_buf).unwrap();
let magic = u32::from_le_bytes(magic_buf);
assert_eq!(magic, 1681511377);
let serialized_data_len = (size - 4) as usize;
let mut serialized_data = vec![0u8; serialized_data_len];
std::io::Read::read_exact(&mut cursor, &mut serialized_data).unwrap();
let mut crc_buf = [0u8; 4];
std::io::Read::read_exact(&mut cursor, &mut crc_buf).unwrap();
let stored_checksum = u32::from_be_bytes(crc_buf);
let crc_instance = create_dv_crc32();
let mut digest = crc_instance.digest();
digest.update(&magic_buf);
digest.update(&serialized_data);
let expected_checksum = digest.finalize();
assert_eq!(
stored_checksum, expected_checksum,
"CRC32 checksum mismatch"
);
let treemap = RoaringTreemap::deserialize_from(&serialized_data[..]).unwrap();
assert_eq!(treemap.len(), test_indexes.len() as u64);
for idx in test_indexes {
assert!(treemap.contains(idx));
}
}
#[test]
fn test_deletion_vector_trait() {
struct TestDV {
indexes: Vec<u64>,
}
impl DeletionVector for TestDV {
type IndexIterator = std::vec::IntoIter<u64>;
fn into_iter(self) -> Self::IndexIterator {
self.indexes.into_iter()
}
fn cardinality(&self) -> u64 {
self.indexes.len() as u64
}
}
let test_dv = TestDV {
indexes: vec![1, 2, 3],
};
let mut buffer = Vec::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut buffer);
let descriptor = writer.write_deletion_vector(test_dv).unwrap();
assert_eq!(descriptor.cardinality, 3);
}
#[test]
fn test_array_based_deletion_vector() {
use crate::Engine;
use std::fs::File;
use tempfile::tempdir;
use url::Url;
struct ArrayDeletionVector {
deleted_rows: Vec<u64>,
}
impl ArrayDeletionVector {
fn new(deleted_rows: Vec<u64>) -> Self {
Self { deleted_rows }
}
}
impl DeletionVector for ArrayDeletionVector {
type IndexIterator = std::vec::IntoIter<u64>;
fn into_iter(self) -> Self::IndexIterator {
self.deleted_rows.into_iter()
}
fn cardinality(&self) -> u64 {
self.deleted_rows.len() as u64
}
}
let temp_dir = tempdir().unwrap();
let table_url = Url::from_directory_path(temp_dir.path()).unwrap();
let dv_path = DeletionVectorPath::new(table_url.clone(), String::from("test"));
let file_path = dv_path.absolute_path().unwrap().to_file_path().unwrap();
if let Some(parent) = file_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let mut file = File::create(&file_path).unwrap();
let deleted_indexes = vec![5u64, 12, 23, 45, 67, 89, 100];
let array_dv = ArrayDeletionVector::new(deleted_indexes.clone());
let mut writer = StreamingDeletionVectorWriter::new(&mut file);
let write_result = writer.write_deletion_vector(array_dv).unwrap();
writer.finalize().unwrap();
drop(file);
assert_eq!(write_result.cardinality, deleted_indexes.len() as i64);
assert_eq!(write_result.offset, 1); assert!(write_result.size_in_bytes > 0);
use crate::engine::sync::SyncEngine;
let engine = SyncEngine::new();
let storage = engine.storage_handler();
let descriptor = write_result.to_descriptor(&dv_path);
let treemap = descriptor.read(storage, &table_url).unwrap();
let read_indexes: Vec<u64> = treemap.into_iter().collect();
assert_eq!(read_indexes, deleted_indexes);
}
#[test]
fn test_to_descriptor_preserves_absolute_path() {
use url::Url;
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("deletion_vectors");
let dv_path = DeletionVectorPath::new(table_path.clone(), prefix);
let expected_absolute_path = dv_path.absolute_path().unwrap();
let write_result = DeletionVectorWriteResult {
offset: 1,
size_in_bytes: 100,
cardinality: 42,
};
let descriptor = write_result.to_descriptor(&dv_path);
let actual_absolute_path = descriptor.absolute_path(&table_path).unwrap();
assert_eq!(Some(expected_absolute_path), actual_absolute_path);
}
#[test]
fn test_to_descriptor_preserves_absolute_path_empty_prefix() {
use url::Url;
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("");
let dv_path = DeletionVectorPath::new(table_path.clone(), prefix);
let expected_absolute_path = dv_path.absolute_path().unwrap();
let write_result = DeletionVectorWriteResult {
offset: 10,
size_in_bytes: 50,
cardinality: 5,
};
let descriptor = write_result.to_descriptor(&dv_path);
let actual_absolute_path = descriptor.absolute_path(&table_path).unwrap();
assert_eq!(Some(expected_absolute_path), actual_absolute_path);
}
#[test]
fn test_to_descriptor_fields() {
use url::Url;
let table_path = Url::parse("s3://my-bucket/delta_table/").unwrap();
let prefix = String::from("dv");
let dv_path = DeletionVectorPath::new(table_path.clone(), prefix);
let write_result = DeletionVectorWriteResult {
offset: 42,
size_in_bytes: 256,
cardinality: 100,
};
let descriptor = write_result.to_descriptor(&dv_path);
assert_eq!(descriptor.offset, Some(42));
assert_eq!(descriptor.size_in_bytes, 256);
assert_eq!(descriptor.cardinality, 100);
assert_eq!(
descriptor.storage_type,
DeletionVectorStorageType::PersistedRelative
);
}
#[test]
fn test_multiple_deletion_vectors_roundtrip_with_descriptor() {
use crate::Engine;
use std::fs::File;
use tempfile::tempdir;
use url::Url;
let temp_dir = tempdir().unwrap();
let table_url = Url::from_directory_path(temp_dir.path()).unwrap();
let dv_path = DeletionVectorPath::new(table_url.clone(), String::from("abc"));
let file_path = dv_path.absolute_path().unwrap().to_file_path().unwrap();
if let Some(parent) = file_path.parent() {
std::fs::create_dir_all(parent).unwrap();
}
let mut file = File::create(&file_path).unwrap();
let test_data = vec![
vec![0u64, 5, 10, 15],
vec![1u64, 2, 3, 100, 200],
vec![50u64, 51, 52, 53, 54, 55],
];
let mut descriptors = Vec::new();
let mut writer = StreamingDeletionVectorWriter::new(&mut file);
for indexes in &test_data {
let mut dv = KernelDeletionVector::new();
dv.add_deleted_row_indexes(indexes);
let write_result = writer.write_deletion_vector(dv).unwrap();
descriptors.push(write_result);
}
writer.finalize().unwrap();
drop(file);
use crate::engine::sync::SyncEngine;
let engine = SyncEngine::new();
let storage = engine.storage_handler();
for (write_result, expected_indexes) in descriptors.iter().zip(&test_data) {
let descriptor = write_result.clone().to_descriptor(&dv_path);
let treemap = descriptor.read(storage.clone(), &table_url).unwrap();
assert_eq!(
treemap,
expected_indexes.iter().collect::<RoaringTreemap>(),
"read {treemap:?} != expected {expected_indexes:?}"
);
}
}
}