use url::Url;
use super::scan_file::CdfScanFileType;
use crate::actions::deletion_vector::{deletion_treemap_to_bools, selection_treemap_to_bools};
use crate::table_changes::scan_file::CdfScanFile;
use crate::{DeltaResult, Engine, Error};
pub(crate) struct ResolvedCdfScanFile {
pub(crate) scan_file: CdfScanFile,
pub(crate) selection_vector: Option<Vec<bool>>,
}
pub(crate) fn resolve_scan_file_dv(
engine: &dyn Engine,
table_root: &Url,
scan_file: CdfScanFile,
) -> DeltaResult<impl Iterator<Item = ResolvedCdfScanFile>> {
let add_dv = scan_file.dv_info.get_treemap(engine, table_root)?;
let rm_dv = scan_file
.remove_dv
.as_ref()
.map(|rm_dv| rm_dv.get_treemap(engine, table_root))
.transpose()?;
let (add_dv, rm_dv) = match (add_dv, rm_dv, &scan_file.scan_type) {
(_, Some(_), CdfScanFileType::Remove) => {
return Err(Error::generic(
"CdfScanFile with type remove cannot have a remove deletion vector",
));
}
(_, Some(_), CdfScanFileType::Cdc) => {
return Err(Error::generic(
"CdfScanFile with type cdc cannot have a remove deletion vector",
));
}
(add_dv, Some(rm_dv), CdfScanFileType::Add) => {
let add_dv = add_dv.unwrap_or_else(Default::default);
let rm_dv = rm_dv.unwrap_or_else(Default::default);
let adds = &rm_dv - &add_dv;
let removes = add_dv - rm_dv;
let adds = (!adds.is_empty()).then_some(adds);
let removes = (!removes.is_empty()).then_some(removes);
(adds, removes)
}
(add_dv, None, CdfScanFileType::Add | CdfScanFileType::Cdc) => {
(Some(add_dv.unwrap_or_else(Default::default)), None)
}
(rm_dv, None, CdfScanFileType::Remove) => {
(None, Some(rm_dv.unwrap_or_else(Default::default)))
}
};
let treemap_to_bools = if scan_file.remove_dv.is_some() {
selection_treemap_to_bools
} else {
deletion_treemap_to_bools
};
let resolve = |scan_file, sv: Vec<bool>| ResolvedCdfScanFile {
scan_file,
selection_vector: (!sv.is_empty()).then_some(sv),
};
let removes = rm_dv.map(treemap_to_bools).map(|sv| {
let scan_file = CdfScanFile {
scan_type: CdfScanFileType::Remove,
..scan_file.clone()
};
resolve(scan_file, sv)
});
let adds = add_dv
.map(treemap_to_bools)
.map(|sv| resolve(scan_file, sv));
Ok([removes, adds].into_iter().flatten())
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::io::Write;
use std::path::PathBuf;
use bytes::BufMut;
use itertools::Itertools;
use roaring::RoaringTreemap;
use super::resolve_scan_file_dv;
use crate::actions::deletion_vector::{DeletionVectorDescriptor, DeletionVectorStorageType};
use crate::engine::sync::SyncEngine;
use crate::scan::state::DvInfo;
use crate::table_changes::scan_file::{CdfScanFile, CdfScanFileType};
use crate::Error;
fn treemap_to_dv_descriptor(map: RoaringTreemap) -> DeletionVectorDescriptor {
let buf = Vec::new();
let mut writer = buf.writer();
let magic: u32 = 1681511377;
writer.write_all(&magic.to_le_bytes()).unwrap();
map.serialize_into(&mut writer).unwrap();
let buf = writer.into_inner();
let inline_dv = z85::encode(&buf);
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::Inline,
path_or_inline_dv: inline_dv,
offset: None,
size_in_bytes: buf.len().try_into().unwrap(),
cardinality: map.len().try_into().unwrap(),
}
}
fn get_scan_file(
scan_type: CdfScanFileType,
dv_info: DvInfo,
remove_dv: Option<DvInfo>,
) -> CdfScanFile {
CdfScanFile {
scan_type,
path: "fake_path".to_string(),
dv_info,
remove_dv,
partition_values: HashMap::new(),
commit_version: 42,
commit_timestamp: 1234,
size: None,
}
}
#[test]
fn add_with_dv() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let deletion_vector = Some(DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
});
let dv_info = DvInfo { deletion_vector };
let remove_dv = Some(Default::default());
let scan_file = get_scan_file(CdfScanFileType::Add, dv_info, remove_dv);
let mut expected_sv = vec![false; 10];
expected_sv[0] = true;
expected_sv[9] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Remove, Some(expected_sv))]);
}
#[test]
fn rm_with_dv() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let deletion_vector = Some(DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: "vBn[lx{q8@P<9BNH/isA".to_string(),
offset: Some(1),
size_in_bytes: 36,
cardinality: 2,
});
let dv_info = Default::default();
let remove_dv = Some(DvInfo { deletion_vector });
let scan_file = get_scan_file(CdfScanFileType::Add, dv_info, remove_dv);
let mut expected_sv = vec![false; 10];
expected_sv[0] = true;
expected_sv[9] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Add, Some(expected_sv))]);
}
#[test]
fn restore_subset() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let rm_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 1, 4, 5]));
let add_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 5]));
let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_scan_file(CdfScanFileType::Add, dv_info, remove_dv);
let mut expected_sv = vec![false; 5];
expected_sv[1] = true;
expected_sv[4] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Add, Some(expected_sv))]);
}
#[test]
fn delete_subset() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let rm_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 5]));
let add_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 1, 4, 5]));
let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_scan_file(CdfScanFileType::Add, dv_info, remove_dv);
let mut expected_sv = vec![false; 5];
expected_sv[1] = true;
expected_sv[4] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Remove, Some(expected_sv))]);
}
#[test]
fn adds_and_removes() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let rm_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 2]));
let add_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 1]));
let dv_info = DvInfo::from(add_dv);
let remove_dv = Some(DvInfo::from(rm_dv));
let scan_file = get_scan_file(CdfScanFileType::Add, dv_info, remove_dv);
let mut rm_sv = vec![false; 2];
rm_sv[1] = true;
let mut add_sv = vec![false; 3];
add_sv[2] = true;
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file)
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(
resolved,
vec![
(CdfScanFileType::Remove, Some(rm_sv)),
(CdfScanFileType::Add, Some(add_sv))
]
);
}
#[test]
fn cdc_and_remove_with_remove_dv_fails() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let rm_dv = treemap_to_dv_descriptor(RoaringTreemap::from([0, 2]));
let remove_dv = Some(DvInfo::from(rm_dv));
let mut scan_file = get_scan_file(CdfScanFileType::Cdc, Default::default(), remove_dv);
let expected_err =
Error::generic("CdfScanFile with type cdc cannot have a remove deletion vector");
let res = resolve_scan_file_dv(&engine, &table_root, scan_file.clone())
.err()
.unwrap();
assert_eq!(res.to_string(), expected_err.to_string());
scan_file.scan_type = CdfScanFileType::Remove;
let expected_err =
Error::generic("CdfScanFile with type remove cannot have a remove deletion vector");
let res = resolve_scan_file_dv(&engine, &table_root, scan_file)
.err()
.unwrap();
assert_eq!(res.to_string(), expected_err.to_string());
}
#[test]
fn cdc_file_resolution() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let scan_file = get_scan_file(CdfScanFileType::Cdc, Default::default(), None);
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file.clone())
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Cdc, None)]);
}
#[test]
fn remove_file_resolution() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let scan_file = get_scan_file(CdfScanFileType::Remove, Default::default(), None);
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file.clone())
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Remove, None)]);
}
#[test]
fn add_file_no_dv_resolution() {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let table_root = url::Url::from_directory_path(path).unwrap();
let scan_file = get_scan_file(CdfScanFileType::Add, Default::default(), None);
let resolved = resolve_scan_file_dv(&engine, &table_root, scan_file.clone())
.unwrap()
.map(|file| (file.scan_file.scan_type, file.selection_vector))
.collect_vec();
assert_eq!(resolved, vec![(CdfScanFileType::Add, None)]);
}
}