use std::io::{Cursor, Read};
use std::str::FromStr;
use std::sync::Arc;
use bytes::Bytes;
use crc::{Crc, CRC_32_ISO_HDLC};
use delta_kernel::schema::derive_macro_utils::ToDataType;
use delta_kernel_derive::ToSchema;
use roaring::RoaringTreemap;
use url::Url;
use crate::schema::DataType;
use crate::utils::require;
use crate::{DeltaResult, Error, StorageHandler};
const ROARING_BITMAP_PORTABLE_MAGIC: u32 = 1681511377;
const ROARING_BITMAP_NATIVE_MAGIC: u32 = 1681511376;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
#[cfg_attr(test, derive(serde::Serialize, serde::Deserialize))]
pub enum DeletionVectorStorageType {
#[cfg_attr(test, serde(rename = "u"))]
PersistedRelative,
#[cfg_attr(test, serde(rename = "i"))]
Inline,
#[cfg_attr(test, serde(rename = "p"))]
PersistedAbsolute,
}
impl FromStr for DeletionVectorStorageType {
type Err = Error;
fn from_str(s: &str) -> DeltaResult<Self> {
match s {
"u" => Ok(Self::PersistedRelative),
"i" => Ok(Self::Inline),
"p" => Ok(Self::PersistedAbsolute),
_ => Err(Error::internal_error(format!(
"Unsupported deletion vector format option: {s}"
))),
}
}
}
impl std::fmt::Display for DeletionVectorStorageType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DeletionVectorStorageType::PersistedRelative => write!(f, "u"),
DeletionVectorStorageType::Inline => write!(f, "i"),
DeletionVectorStorageType::PersistedAbsolute => write!(f, "p"),
}
}
}
impl ToDataType for DeletionVectorStorageType {
fn to_data_type() -> DataType {
DataType::STRING
}
}
pub struct DeletionVectorPath {
table_path: Url,
uuid: uuid::Uuid,
prefix: String,
}
impl DeletionVectorPath {
pub(crate) fn new(table_path: Url, prefix: String) -> Self {
Self {
table_path,
uuid: uuid::Uuid::new_v4(),
prefix,
}
}
#[cfg(test)]
pub(crate) fn new_with_uuid(table_path: Url, prefix: String, uuid: uuid::Uuid) -> Self {
Self {
table_path,
uuid,
prefix,
}
}
fn relative_path(prefix: &str, uuid: &uuid::Uuid) -> String {
if !prefix.is_empty() {
format!("{prefix}/deletion_vector_{uuid}.bin")
} else {
format!("deletion_vector_{uuid}.bin")
}
}
pub fn absolute_path(&self) -> DeltaResult<Url> {
let dv_suffix = Self::relative_path(&self.prefix, &self.uuid);
self.table_path
.join(&dv_suffix)
.map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))
}
pub(crate) fn encoded_relative_path(&self) -> String {
format!("{}{}", self.prefix, z85::encode(self.uuid.as_bytes()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, ToSchema)]
#[cfg_attr(
test,
derive(serde::Serialize, serde::Deserialize),
serde(rename_all = "camelCase")
)]
pub struct DeletionVectorDescriptor {
pub storage_type: DeletionVectorStorageType,
pub path_or_inline_dv: String,
pub offset: Option<i32>,
pub size_in_bytes: i32,
pub cardinality: i64,
}
impl DeletionVectorDescriptor {
pub fn unique_id(&self) -> String {
Self::unique_id_from_parts(
&self.storage_type.to_string(),
&self.path_or_inline_dv,
self.offset,
)
}
pub(crate) fn unique_id_from_parts(
storage_type: &str,
path_or_inline_dv: &str,
offset: Option<i32>,
) -> String {
match offset {
Some(offset) => format!("{storage_type}{path_or_inline_dv}@{offset}"),
None => format!("{storage_type}{path_or_inline_dv}"),
}
}
pub fn absolute_path(&self, parent: &Url) -> DeltaResult<Option<Url>> {
match self.storage_type {
DeletionVectorStorageType::PersistedRelative => {
let path_len = self.path_or_inline_dv.len();
require!(
path_len >= 20,
Error::DeletionVector(format!("Invalid length {path_len}, must be >= 20"))
);
let prefix_len = path_len - 20;
let decoded = z85::decode(&self.path_or_inline_dv[prefix_len..])
.map_err(|_| Error::deletion_vector("Failed to decode DV uuid"))?;
let uuid = uuid::Uuid::from_slice(&decoded)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
let dv_suffix =
DeletionVectorPath::relative_path(&self.path_or_inline_dv[..prefix_len], &uuid);
let dv_path = parent
.join(&dv_suffix)
.map_err(|_| Error::DeletionVector(format!("invalid path: {dv_suffix}")))?;
Ok(Some(dv_path))
}
DeletionVectorStorageType::PersistedAbsolute => {
Ok(Some(Url::parse(&self.path_or_inline_dv).map_err(|_| {
Error::DeletionVector(format!("invalid path: {}", self.path_or_inline_dv))
})?))
}
DeletionVectorStorageType::Inline => Ok(None),
}
}
pub fn read(
&self,
storage: Arc<dyn StorageHandler>,
parent: &Url,
) -> DeltaResult<RoaringTreemap> {
match self.absolute_path(parent)? {
None => {
let byte_slice = z85::decode(&self.path_or_inline_dv)
.map_err(|_| Error::deletion_vector("Failed to decode DV"))?;
let magic = slice_to_u32(&byte_slice[0..4], Endian::Little)?;
match magic {
ROARING_BITMAP_PORTABLE_MAGIC => {
RoaringTreemap::deserialize_from(&byte_slice[4..])
.map_err(|err| Error::DeletionVector(err.to_string()))
}
ROARING_BITMAP_NATIVE_MAGIC => Err(Error::deletion_vector(
"Native serialization in inline bitmaps is not yet supported",
)),
_ => Err(Error::DeletionVector(format!("Invalid magic {magic}"))),
}
}
Some(path) => {
let size_in_bytes: u32 =
self.size_in_bytes
.try_into()
.or(Err(Error::DeletionVector(format!(
"size_in_bytes doesn't fit in usize for {path}"
))))?;
let dv_data = storage
.read_files(vec![(path.clone(), None)])?
.next()
.ok_or(Error::missing_data(format!(
"No deletion vector data for {path}"
)))??;
let dv_data_len = dv_data.len();
let mut cursor = Cursor::new(dv_data);
let mut version_buf = [0; 1];
cursor.read(&mut version_buf).map_err(|err| {
Error::DeletionVector(format!("Failed to read version from {path}: {err}"))
})?;
let version = u8::from_be_bytes(version_buf);
require!(
version == 1,
Error::DeletionVector(format!("Invalid version {version} for {path}"))
);
let this_dv_start: usize =
self.offset
.unwrap_or(1)
.try_into()
.or(Err(Error::DeletionVector(format!(
"Offset {:?} doesn't fit in usize for {path}",
self.offset
))))?;
let magic_start = this_dv_start + 4;
let bitmap_start = this_dv_start + 8;
let crc_start = this_dv_start + 4 + (size_in_bytes as usize);
require!(
this_dv_start < dv_data_len,
Error::DeletionVector(format!(
"This DV start is out of bounds for {path} (Offset: {this_dv_start} >= Size: {dv_data_len})"
))
);
cursor.set_position(this_dv_start as u64);
let dv_size = read_u32(&mut cursor, Endian::Big)?;
require!(
dv_size == size_in_bytes,
Error::DeletionVector(format!(
"DV size mismatch for {path}. Log indicates {size_in_bytes}, file says: {dv_size}"
))
);
let magic = read_u32(&mut cursor, Endian::Little)?;
require!(
magic == ROARING_BITMAP_PORTABLE_MAGIC,
Error::DeletionVector(format!("Invalid magic {magic} for {path}"))
);
let bytes = cursor.into_inner();
require!(
bytes.len() >= crc_start + 4,
Error::DeletionVector(format!(
"Can't read deletion vector for {path} as there are not enough bytes. Expected {}, but got {}",
crc_start + 4,
bytes.len()
))
);
let mut crc_cursor: Cursor<Bytes> =
Cursor::new(bytes.slice(crc_start..crc_start + 4));
let crc = read_u32(&mut crc_cursor, Endian::Big)?;
let crc32 = create_dv_crc32();
let expected_crc = crc32.checksum(&bytes.slice(magic_start..crc_start));
require!(
crc == expected_crc,
Error::DeletionVector(format!(
"CRC32 checksum mismatch for {path}. Got: {crc}, expected: {expected_crc}"
))
);
let dv_bytes = bytes.slice(bitmap_start..crc_start);
let cursor = Cursor::new(dv_bytes);
RoaringTreemap::deserialize_from(cursor).map_err(|err| {
Error::DeletionVector(format!(
"Failed to deserialize deletion vector for {path}: {err}"
))
})
}
}
}
pub fn row_indexes(
&self,
storage: Arc<dyn StorageHandler>,
parent: &Url,
) -> DeltaResult<Vec<u64>> {
Ok(self.read(storage, parent)?.into_iter().collect())
}
}
enum Endian {
Big,
Little,
}
pub(crate) fn create_dv_crc32() -> Crc<u32> {
Crc::<u32>::new(&CRC_32_ISO_HDLC)
}
fn read_u32(cursor: &mut Cursor<Bytes>, endian: Endian) -> DeltaResult<u32> {
let mut buf = [0; 4];
cursor
.read(&mut buf)
.map_err(|err| Error::DeletionVector(err.to_string()))?;
match endian {
Endian::Big => Ok(u32::from_be_bytes(buf)),
Endian::Little => Ok(u32::from_le_bytes(buf)),
}
}
fn slice_to_u32(buf: &[u8], endian: Endian) -> DeltaResult<u32> {
let array = buf
.try_into()
.map_err(|_| Error::generic("Must have a 4 byte slice to decode to u32"))?;
match endian {
Endian::Big => Ok(u32::from_be_bytes(array)),
Endian::Little => Ok(u32::from_le_bytes(array)),
}
}
pub(crate) fn deletion_treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
treemap_to_bools_with(treemap, false)
}
pub(crate) fn selection_treemap_to_bools(treemap: RoaringTreemap) -> Vec<bool> {
treemap_to_bools_with(treemap, true)
}
fn treemap_to_bools_with(treemap: RoaringTreemap, set_bit: bool) -> Vec<bool> {
fn combine(high_bits: u32, low_bits: u32) -> usize {
((u64::from(high_bits) << 32) | u64::from(low_bits)) as usize
}
match treemap.max() {
Some(max) => {
let mut result = vec![!set_bit; max as usize + 1];
let bitmaps = treemap.bitmaps();
for (index, bitmap) in bitmaps {
for bit in bitmap.iter() {
let vec_index = combine(index, bit);
result[vec_index] = set_bit;
}
}
result
}
None => {
vec![]
}
}
}
pub fn split_vector(
vector: Option<&mut Vec<bool>>,
split_index: usize,
extend: Option<bool>,
) -> Option<Vec<bool>> {
match vector {
Some(vector) if split_index < vector.len() => Some(vector.split_off(split_index)),
Some(vector) if extend.is_some() => {
vector.extend(std::iter::repeat_n(
#[allow(clippy::unwrap_used)]
extend.unwrap(),
split_index - vector.len(),
));
None
}
_ => None,
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use roaring::RoaringTreemap;
use super::{DeletionVectorDescriptor, *};
use crate::engine::sync::SyncEngine;
use crate::Engine;
fn dv_relative() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "ab^-aqEH.-t@S}K{vb[*k^".to_string(),
offset: Some(4),
size_in_bytes: 40,
cardinality: 6,
}
}
fn dv_absolute() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedAbsolute,
path_or_inline_dv:
"s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin".to_string(),
offset: Some(4),
size_in_bytes: 40,
cardinality: 6,
}
}
fn dv_inline() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::Inline,
path_or_inline_dv: "^Bg9^0rr910000000000iXQKl0rr91000f55c8Xg0@@D72lkbi5=-{L"
.to_string(),
offset: None,
size_in_bytes: 44,
cardinality: 6,
}
}
fn dv_example() -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
}
}
#[test]
fn test_deletion_vector_absolute_path() {
let parent = Url::parse("s3://mytable/").unwrap();
let relative = dv_relative();
let expected =
Url::parse("s3://mytable/ab/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin")
.unwrap();
assert_eq!(expected, relative.absolute_path(&parent).unwrap().unwrap());
let absolute = dv_absolute();
let expected =
Url::parse("s3://mytable/deletion_vector_d2c639aa-8816-431a-aaf6-d3fe2512ff61.bin")
.unwrap();
assert_eq!(expected, absolute.absolute_path(&parent).unwrap().unwrap());
let inline = dv_inline();
assert_eq!(None, inline.absolute_path(&parent).unwrap());
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let parent = url::Url::from_directory_path(path).unwrap();
let dv_url = parent
.join("deletion_vector_61d16c75-6994-46b7-a15b-8b538852e50e.bin")
.unwrap();
let example = dv_example();
assert_eq!(dv_url, example.absolute_path(&parent).unwrap().unwrap());
}
#[test]
fn test_magic_number_constants() {
assert_eq!(ROARING_BITMAP_PORTABLE_MAGIC, 1681511377);
assert_eq!(ROARING_BITMAP_NATIVE_MAGIC, 1681511376);
}
#[test]
fn test_inline_read() {
let inline = dv_inline();
let sync_engine = SyncEngine::new();
let storage = sync_engine.storage_handler();
let parent = Url::parse("http://not.used").unwrap();
let tree_map = inline.read(storage, &parent).unwrap();
assert_eq!(tree_map.len(), 6);
for i in [3, 4, 7, 11, 18, 29] {
assert!(tree_map.contains(i));
}
for i in [1, 2, 8, 17, 55, 200] {
assert!(!tree_map.contains(i));
}
}
#[test]
fn test_inline_native_serialization_error() {
let sync_engine = SyncEngine::new();
let storage = sync_engine.storage_handler();
let parent = Url::parse("http://not.used").unwrap();
let mut bytes = Vec::new();
bytes.extend_from_slice(&1681511376u32.to_le_bytes());
bytes.extend_from_slice(&[1u8, 2, 3, 4]);
let encoded = z85::encode(&bytes);
let inline = DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::Inline,
path_or_inline_dv: encoded,
offset: None,
size_in_bytes: bytes.len() as i32,
cardinality: 0,
};
let err = inline.read(storage, &parent).unwrap_err();
let msg = err.to_string();
assert!(msg.contains("Native serialization in inline bitmaps is not yet supported"));
}
#[test]
fn test_deletion_vector_read() {
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let parent = url::Url::from_directory_path(path).unwrap();
let sync_engine = SyncEngine::new();
let storage = sync_engine.storage_handler();
let example = dv_example();
let tree_map = example.read(storage.clone(), &parent).unwrap();
let expected: Vec<u64> = vec![0, 9];
let found = tree_map.iter().collect::<Vec<_>>();
assert_eq!(found, expected)
}
#[test]
#[ignore]
fn test_dv_to_bools() {
let mut rb = RoaringTreemap::new();
rb.insert(0);
rb.insert(2);
rb.insert(7);
rb.insert(30854);
rb.insert(4294967297);
rb.insert(4294967300);
let bools = super::deletion_treemap_to_bools(rb);
let mut expected = vec![true; 4294967301];
expected[0] = false;
expected[2] = false;
expected[7] = false;
expected[30854] = false;
expected[4294967297] = false;
expected[4294967300] = false;
assert_eq!(bools, expected);
}
#[test]
fn test_sv_to_bools() {
let mut rb = RoaringTreemap::new();
rb.insert(0);
rb.insert(2);
rb.insert(7);
rb.insert(30854);
rb.insert(4294967297);
rb.insert(4294967300);
let bools = super::selection_treemap_to_bools(rb);
let mut expected = vec![false; 4294967301];
expected[0] = true;
expected[2] = true;
expected[7] = true;
expected[30854] = true;
expected[4294967297] = true;
expected[4294967300] = true;
assert_eq!(bools, expected);
}
#[test]
fn test_dv_row_indexes() {
let example = dv_inline();
let sync_engine = SyncEngine::new();
let storage = sync_engine.storage_handler();
let parent = Url::parse("http://not.used").unwrap();
let row_idx = example.row_indexes(storage, &parent).unwrap();
assert_eq!(row_idx.len(), 6);
assert_eq!(&row_idx, &[3, 4, 7, 11, 18, 29]);
}
#[test]
fn test_deletion_vector_storage_type_from_str_valid() {
assert_eq!(
"u".parse::<DeletionVectorStorageType>().unwrap(),
DeletionVectorStorageType::PersistedRelative
);
assert_eq!(
"i".parse::<DeletionVectorStorageType>().unwrap(),
DeletionVectorStorageType::Inline
);
assert_eq!(
"p".parse::<DeletionVectorStorageType>().unwrap(),
DeletionVectorStorageType::PersistedAbsolute
);
}
#[test]
fn test_deletion_vector_storage_type_from_str_invalid() {
assert!("x".parse::<DeletionVectorStorageType>().is_err());
assert!("U".parse::<DeletionVectorStorageType>().is_err());
assert!("I".parse::<DeletionVectorStorageType>().is_err());
assert!("P".parse::<DeletionVectorStorageType>().is_err());
assert!("".parse::<DeletionVectorStorageType>().is_err());
assert!("invalid".parse::<DeletionVectorStorageType>().is_err());
assert!("PersistedRelative"
.parse::<DeletionVectorStorageType>()
.is_err());
assert!("Inline".parse::<DeletionVectorStorageType>().is_err());
assert!("PersistedAbsolute"
.parse::<DeletionVectorStorageType>()
.is_err());
}
#[test]
fn test_deletion_vector_storage_type_from_str_error_message() {
let result = "invalid".parse::<DeletionVectorStorageType>();
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(error_msg.contains("invalid"));
assert!(error_msg.contains("Unsupported deletion vector format option"));
}
#[test]
fn test_deletion_vector_storage_type_roundtrip() {
let variants = [
DeletionVectorStorageType::PersistedRelative,
DeletionVectorStorageType::Inline,
DeletionVectorStorageType::PersistedAbsolute,
];
for variant in variants {
let string_repr = variant.to_string();
let parsed = string_repr.parse::<DeletionVectorStorageType>().unwrap();
assert_eq!(variant, parsed);
}
}
#[test]
fn test_deletion_vector_path_uniqueness() {
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("deletion_vectors");
let dv_path1 = DeletionVectorPath::new(table_path.clone(), prefix.clone());
let dv_path2 = DeletionVectorPath::new(table_path.clone(), prefix.clone());
let abs_path1 = dv_path1.absolute_path().unwrap();
let abs_path2 = dv_path2.absolute_path().unwrap();
assert_ne!(abs_path1, abs_path2);
assert_ne!(
dv_path1.encoded_relative_path(),
dv_path2.encoded_relative_path()
);
}
#[test]
fn test_deletion_vector_path_absolute_path_with_prefix() {
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("dv");
let known_uuid = uuid::Uuid::parse_str("abcdef01-2345-6789-abcd-ef0123456789").unwrap();
let dv_path = DeletionVectorPath::new_with_uuid(table_path.clone(), prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();
let expected =
"file:///tmp/test_table/dv/deletion_vector_abcdef01-2345-6789-abcd-ef0123456789.bin";
assert_eq!(abs_path.as_str(), expected);
}
#[test]
fn test_deletion_vector_path_absolute_path_with_known_uuid() {
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("dv");
let known_uuid = uuid::Uuid::parse_str("550e8400-e29b-41d4-a716-446655440000").unwrap();
let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();
let expected_path =
"file:///tmp/test_table/dv/deletion_vector_550e8400-e29b-41d4-a716-446655440000.bin";
assert_eq!(abs_path.as_str(), expected_path);
let encoded = dv_path.encoded_relative_path();
assert_eq!(encoded, "dvrsTVZ&*Sl-RXRWjryu/!");
}
#[test]
fn test_deletion_vector_path_absolute_path_with_known_uuid_empty_prefix() {
let table_path = Url::parse("file:///tmp/test_table/").unwrap();
let prefix = String::from("");
let known_uuid = uuid::Uuid::parse_str("123e4567-e89b-12d3-a456-426614174000").unwrap();
let dv_path = DeletionVectorPath::new_with_uuid(table_path, prefix, known_uuid);
let abs_path = dv_path.absolute_path().unwrap();
let expected_path =
"file:///tmp/test_table/deletion_vector_123e4567-e89b-12d3-a456-426614174000.bin";
assert_eq!(abs_path.as_str(), expected_path);
let encoded = dv_path.encoded_relative_path();
assert_eq!(encoded, "5<w-%>:JjlQ/G/]6C<1m");
}
}