use std::collections::HashMap;
use std::sync::LazyLock;
use crate::actions::deletion_vector::deletion_treemap_to_bools;
use crate::utils::require;
use crate::{
actions::{deletion_vector::DeletionVectorDescriptor, visitors::visit_deletion_vector_at},
engine_data::{GetData, RowVisitor, TypedGetData as _},
schema::{ColumnName, ColumnNamesAndTypes, DataType, SchemaRef},
table_features::ColumnMappingMode,
DeltaResult, Engine, EngineData, Error,
};
use roaring::RoaringTreemap;
use serde::{Deserialize, Serialize};
use tracing::warn;
use super::log_replay::SCAN_ROW_SCHEMA;
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct GlobalScanState {
pub table_root: String,
pub partition_columns: Vec<String>,
pub logical_schema: SchemaRef,
pub physical_schema: SchemaRef,
pub column_mapping_mode: ColumnMappingMode,
}
#[derive(Default, Debug, Clone, PartialEq, Eq)]
pub struct DvInfo {
pub(crate) deletion_vector: Option<DeletionVectorDescriptor>,
}
impl From<DeletionVectorDescriptor> for DvInfo {
fn from(deletion_vector: DeletionVectorDescriptor) -> Self {
let deletion_vector = Some(deletion_vector);
DvInfo { deletion_vector }
}
}
#[derive(Debug, Clone, PartialEq, Eq, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Stats {
pub num_records: u64,
}
impl DvInfo {
pub fn has_vector(&self) -> bool {
self.deletion_vector.is_some()
}
pub(crate) fn get_treemap(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<RoaringTreemap>> {
self.deletion_vector
.as_ref()
.map(|dv_descriptor| {
let fs_client = engine.get_file_system_client();
dv_descriptor.read(fs_client, table_root)
})
.transpose()
}
pub fn get_selection_vector(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<bool>>> {
let dv_treemap = self.get_treemap(engine, table_root)?;
Ok(dv_treemap.map(deletion_treemap_to_bools))
}
pub fn get_row_indexes(
&self,
engine: &dyn Engine,
table_root: &url::Url,
) -> DeltaResult<Option<Vec<u64>>> {
self.deletion_vector
.as_ref()
.map(|dv| {
let fs_client = engine.get_file_system_client();
dv.row_indexes(fs_client, table_root)
})
.transpose()
}
}
pub type ScanCallback<T> = fn(
context: &mut T,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
partition_values: HashMap<String, String>,
);
pub fn visit_scan_files<T>(
data: &dyn EngineData,
selection_vector: &[bool],
context: T,
callback: ScanCallback<T>,
) -> DeltaResult<T> {
let mut visitor = ScanFileVisitor {
callback,
selection_vector,
context,
};
visitor.visit_rows_of(data)?;
Ok(visitor.context)
}
struct ScanFileVisitor<'a, T> {
callback: ScanCallback<T>,
selection_vector: &'a [bool],
context: T,
}
impl<T> RowVisitor for ScanFileVisitor<'_, T> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
static NAMES_AND_TYPES: LazyLock<ColumnNamesAndTypes> =
LazyLock::new(|| SCAN_ROW_SCHEMA.leaves(None));
NAMES_AND_TYPES.as_ref()
}
fn visit<'a>(&mut self, row_count: usize, getters: &[&'a dyn GetData<'a>]) -> DeltaResult<()> {
require!(
getters.len() == 10,
Error::InternalError(format!(
"Wrong number of ScanFileVisitor getters: {}",
getters.len()
))
);
for row_index in 0..row_count {
if !self.selection_vector[row_index] {
continue;
}
if let Some(path) = getters[0].get_opt(row_index, "scanFile.path")? {
let size = getters[1].get(row_index, "scanFile.size")?;
let stats: Option<String> = getters[3].get_opt(row_index, "scanFile.stats")?;
let stats: Option<Stats> =
stats.and_then(|json| match serde_json::from_str(json.as_str()) {
Ok(stats) => Some(stats),
Err(e) => {
warn!("Invalid stats string in Add file {json}: {}", e);
None
}
});
let dv_index = SCAN_ROW_SCHEMA
.index_of("deletionVector")
.ok_or_else(|| Error::missing_column("deletionVector"))?;
let deletion_vector = visit_deletion_vector_at(row_index, &getters[dv_index..])?;
let dv_info = DvInfo { deletion_vector };
let partition_values =
getters[9].get(row_index, "scanFile.fileConstantValues.partitionValues")?;
(self.callback)(
&mut self.context,
path,
size,
stats,
dv_info,
partition_values,
)
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use crate::scan::test_utils::{add_batch_simple, run_with_validate_callback};
use super::{DvInfo, Stats};
#[derive(Clone)]
struct TestContext {
id: usize,
}
fn validate_visit(
context: &mut TestContext,
path: &str,
size: i64,
stats: Option<Stats>,
dv_info: DvInfo,
part_vals: HashMap<String, String>,
) {
assert_eq!(
path,
"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet"
);
assert_eq!(size, 635);
assert!(stats.is_some());
assert_eq!(stats.as_ref().unwrap().num_records, 10);
assert_eq!(part_vals.get("date"), Some(&"2017-12-10".to_string()));
assert_eq!(part_vals.get("non-existent"), None);
assert!(dv_info.deletion_vector.is_some());
let dv = dv_info.deletion_vector.unwrap();
assert_eq!(dv.unique_id(), "uvBn[lx{q8@P<9BNH/isA@1");
assert_eq!(context.id, 2);
}
#[test]
fn test_simple_visit_scan_data() {
let context = TestContext { id: 2 };
run_with_validate_callback(
vec![add_batch_simple()],
&[true, false],
context,
validate_visit,
);
}
}