use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::LazyLock;
use arrow::datatypes::Int32Type;
use arrow_array::cast::AsArray;
use arrow_array::types::Int64Type;
use arrow_array::{Array, MapArray, RecordBatch, StringArray, StructArray};
use arrow_schema::DataType as ArrowDataType;
use chrono::{DateTime, Utc};
use delta_kernel::engine::arrow_expression::evaluate_expression::to_json;
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::scan::scan_row_schema;
use delta_kernel::schema::DataType;
use object_store::ObjectMeta;
use object_store::path::Path;
use percent_encoding::percent_decode_str;
#[cfg(feature = "datafusion")]
pub(crate) use self::scan_row::parse_stats_column_with_schema;
use crate::kernel::scalars::ScalarExt;
use crate::kernel::{Add, DeletionVectorDescriptor, Remove};
use crate::{DeltaResult, DeltaTableError};
pub(crate) use self::scan_row::{ScanRowOutStream, scan_row_in_eval};
pub use self::tombstones::TombstoneView;
mod scan_row;
mod tombstones;
const FIELD_NAME_PATH: &str = "path";
const FIELD_NAME_SIZE: &str = "size";
const FIELD_NAME_MODIFICATION_TIME: &str = "modificationTime";
const FIELD_NAME_FILE_CONSTANT_VALUES: &str = "fileConstantValues";
const FIELD_NAME_RAW_PARTITION_VALUES: &str = "partitionValues";
const FIELD_NAME_STATS_PARSED: &str = "stats_parsed";
const FIELD_NAME_PARTITION_VALUES_PARSED: &str = "partitionValues_parsed";
const FIELD_NAME_DELETION_VECTOR: &str = "deletionVector";
const STATS_FIELD_NUM_RECORDS: &str = "numRecords";
const STATS_FIELD_MIN_VALUES: &str = "minValues";
const STATS_FIELD_MAX_VALUES: &str = "maxValues";
const STATS_FIELD_NULL_COUNT: &str = "nullCount";
const DV_FIELD_STORAGE_TYPE: &str = "storageType";
const DV_FIELD_PATH_OR_INLINE_DV: &str = "pathOrInlineDv";
const DV_FIELD_SIZE_IN_BYTES: &str = "sizeInBytes";
const DV_FIELD_CARDINALITY: &str = "cardinality";
const DV_FIELD_OFFSET: &str = "offset";
static FIELD_INDICES: LazyLock<HashMap<&'static str, usize>> = LazyLock::new(|| {
let schema = scan_row_schema();
let mut indices = HashMap::new();
let path_idx = schema.index_of(FIELD_NAME_PATH).unwrap();
indices.insert(FIELD_NAME_PATH, path_idx);
let size_idx = schema.index_of(FIELD_NAME_SIZE).unwrap();
indices.insert(FIELD_NAME_SIZE, size_idx);
let modification_time_idx = schema.index_of(FIELD_NAME_MODIFICATION_TIME).unwrap();
indices.insert(FIELD_NAME_MODIFICATION_TIME, modification_time_idx);
indices
});
static DV_FIELD_INDICES: LazyLock<HashMap<&'static str, usize>> = LazyLock::new(|| {
let schema = scan_row_schema();
let dv_field = schema.field(FIELD_NAME_DELETION_VECTOR).unwrap();
let DataType::Struct(dv_type) = dv_field.data_type() else {
panic!("Expected DataType::Struct for deletion vector field");
};
let mut indices = HashMap::new();
let storage_type_idx = dv_type.index_of(DV_FIELD_STORAGE_TYPE).unwrap();
indices.insert(DV_FIELD_STORAGE_TYPE, storage_type_idx);
let path_or_inline_dv_idx = dv_type.index_of(DV_FIELD_PATH_OR_INLINE_DV).unwrap();
indices.insert(DV_FIELD_PATH_OR_INLINE_DV, path_or_inline_dv_idx);
let size_in_bytes_idx = dv_type.index_of(DV_FIELD_SIZE_IN_BYTES).unwrap();
indices.insert(DV_FIELD_SIZE_IN_BYTES, size_in_bytes_idx);
let cardinality_idx = dv_type.index_of(DV_FIELD_CARDINALITY).unwrap();
indices.insert(DV_FIELD_CARDINALITY, cardinality_idx);
indices
});
#[derive(Clone)]
pub struct LogicalFileView {
files: RecordBatch,
index: usize,
}
impl LogicalFileView {
pub(crate) fn new(files: RecordBatch, index: usize) -> Self {
Self { files, index }
}
pub fn path(&self) -> Cow<'_, str> {
let raw = get_string_value(
self.files
.column(*FIELD_INDICES.get(FIELD_NAME_PATH).unwrap()),
self.index,
)
.unwrap();
percent_decode_str(raw).decode_utf8_lossy()
}
pub(crate) fn path_raw(&self) -> &str {
get_string_value(
self.files
.column(*FIELD_INDICES.get(FIELD_NAME_PATH).unwrap()),
self.index,
)
.unwrap()
}
pub(crate) fn object_store_path(&self) -> Path {
let path = self.path();
match Path::parse(path.as_ref()) {
Ok(path) => path,
Err(_) => Path::from(path.as_ref()),
}
}
pub fn size(&self) -> i64 {
self.files
.column(*FIELD_INDICES.get(FIELD_NAME_SIZE).unwrap())
.as_primitive::<Int64Type>()
.value(self.index)
}
pub fn modification_time(&self) -> i64 {
self.files
.column(*FIELD_INDICES.get(FIELD_NAME_MODIFICATION_TIME).unwrap())
.as_primitive::<Int64Type>()
.value(self.index)
}
pub fn modification_datetime(&self) -> DeltaResult<chrono::DateTime<Utc>> {
DateTime::from_timestamp_millis(self.modification_time()).ok_or(
DeltaTableError::MetadataError(format!(
"invalid modification_time: {:?}",
self.modification_time()
)),
)
}
pub fn stats(&self) -> Option<String> {
let stats = self.stats_parsed()?.slice(self.index, 1);
let value = to_json(&stats)
.ok()
.map(|arr| arr.as_string::<i32>().value(0).to_string());
value.and_then(|v| (!v.is_empty()).then_some(v))
}
pub fn partition_values(&self) -> Option<StructData> {
self.files
.column_by_name(FIELD_NAME_PARTITION_VALUES_PARSED)
.and_then(|col| col.as_struct_opt())
.and_then(|arr| {
arr.is_valid(self.index)
.then(|| match Scalar::from_array(arr, self.index) {
Some(Scalar::Struct(s)) => Some(s),
_ => None,
})
.flatten()
})
}
fn raw_partition_values(&self) -> Option<&MapArray> {
self.files
.column_by_name(FIELD_NAME_FILE_CONSTANT_VALUES)
.and_then(|col| col.as_struct_opt())
.and_then(|file_constants| {
file_constants.column_by_name(FIELD_NAME_RAW_PARTITION_VALUES)
})
.and_then(|col| col.as_map_opt())
}
fn partition_values_map(&self) -> HashMap<String, Option<String>> {
self.raw_partition_values()
.filter(|partitions| partitions.is_valid(self.index))
.and_then(|partitions| collect_string_map(&partitions.value(self.index)))
.unwrap_or_default()
}
fn stats_parsed(&self) -> Option<&StructArray> {
self.files
.column_by_name(FIELD_NAME_STATS_PARSED)
.and_then(|col| col.as_struct_opt())
}
pub fn num_records(&self) -> Option<usize> {
self.stats_parsed()
.and_then(|stats| stats.column_by_name(STATS_FIELD_NUM_RECORDS))
.and_then(|col| col.as_primitive_opt::<Int64Type>())
.and_then(|a| {
a.is_valid(self.index)
.then(|| usize::try_from(a.value(self.index)).ok())
.flatten()
})
}
pub fn null_counts(&self) -> Option<Scalar> {
self.stats_parsed()
.and_then(|stats| stats.column_by_name(STATS_FIELD_NULL_COUNT))
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
}
pub fn min_values(&self) -> Option<Scalar> {
self.stats_parsed()
.and_then(|stats| stats.column_by_name(STATS_FIELD_MIN_VALUES))
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
}
pub fn max_values(&self) -> Option<Scalar> {
self.stats_parsed()
.and_then(|stats| stats.column_by_name(STATS_FIELD_MAX_VALUES))
.and_then(|c| Scalar::from_array(c.as_ref(), self.index))
.map(|s| round_ms_datetimes(s, &ceil_datetime))
}
pub fn deletion_vector_descriptor(&self) -> Option<DeletionVectorDescriptor> {
self.deletion_vector().map(|dv| dv.descriptor())
}
fn deletion_vector(&self) -> Option<DeletionVectorView<'_>> {
let dv_col = self
.files
.column_by_name(FIELD_NAME_DELETION_VECTOR)
.and_then(|col| col.as_struct_opt())?;
if dv_col.null_count() == dv_col.len() {
return None;
}
dv_col
.is_valid(self.index)
.then(|| {
let storage_col =
dv_col.column(*DV_FIELD_INDICES.get(DV_FIELD_STORAGE_TYPE).unwrap());
storage_col
.is_valid(self.index)
.then_some(DeletionVectorView {
data: dv_col,
index: self.index,
})
})
.flatten()
}
pub(crate) fn to_add(&self) -> Add {
Add {
path: self.path().to_string(),
partition_values: self.partition_values_map(),
size: self.size(),
modification_time: self.modification_time(),
data_change: true,
stats: self.stats(),
tags: None,
deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()),
base_row_id: None,
default_row_commit_version: None,
clustering_provider: None,
}
}
#[deprecated(
since = "0.31.0",
note = "Use Arrow arrays directly instead of converting to Add actions."
)]
pub fn add_action(&self) -> Add {
self.to_add()
}
pub fn remove_action(&self, data_change: bool) -> Remove {
Remove {
path: self.path().to_string(),
data_change,
deletion_timestamp: Some(Utc::now().timestamp_millis()),
extended_file_metadata: Some(true),
size: Some(self.size()),
partition_values: Some(self.partition_values_map()),
deletion_vector: self.deletion_vector().map(|dv| dv.descriptor()),
tags: None,
base_row_id: None,
default_row_commit_version: None,
}
}
}
fn ceil_datetime(v: i64) -> i64 {
let remainder = v % 1000;
if remainder == 0 {
((v as f64 / 1000.0).floor() as i64 + 1) * 1000
} else {
v
}
}
fn round_ms_datetimes<F>(value: Scalar, func: &F) -> Scalar
where
F: Fn(i64) -> i64,
{
match value {
Scalar::Timestamp(v) => Scalar::Timestamp(func(v)),
Scalar::TimestampNtz(v) => Scalar::TimestampNtz(func(v)),
Scalar::Struct(struct_data) => {
let mut fields = Vec::with_capacity(struct_data.fields().len());
let mut scalars = Vec::with_capacity(struct_data.values().len());
for (field, value) in struct_data.fields().iter().zip(struct_data.values().iter()) {
fields.push(field.clone());
scalars.push(round_ms_datetimes(value.clone(), func));
}
let data = StructData::try_new(fields, scalars).unwrap();
Scalar::Struct(data)
}
value => value,
}
}
#[derive(Debug)]
struct DeletionVectorView<'a> {
data: &'a StructArray,
index: usize,
}
impl DeletionVectorView<'_> {
fn descriptor(&self) -> DeletionVectorDescriptor {
DeletionVectorDescriptor {
storage_type: self.storage_type().parse().unwrap(),
path_or_inline_dv: self.path_or_inline_dv().to_string(),
size_in_bytes: self.size_in_bytes(),
cardinality: self.cardinality(),
offset: self.offset(),
}
}
fn storage_type(&self) -> &str {
get_string_value(
self.data
.column(*DV_FIELD_INDICES.get(DV_FIELD_STORAGE_TYPE).unwrap()),
self.index,
)
.unwrap()
}
fn path_or_inline_dv(&self) -> &str {
get_string_value(
self.data
.column(*DV_FIELD_INDICES.get(DV_FIELD_PATH_OR_INLINE_DV).unwrap()),
self.index,
)
.unwrap()
}
fn size_in_bytes(&self) -> i32 {
self.data
.column(*DV_FIELD_INDICES.get(DV_FIELD_SIZE_IN_BYTES).unwrap())
.as_primitive::<Int32Type>()
.value(self.index)
}
fn cardinality(&self) -> i64 {
self.data
.column(*DV_FIELD_INDICES.get(DV_FIELD_CARDINALITY).unwrap())
.as_primitive::<Int64Type>()
.value(self.index)
}
fn offset(&self) -> Option<i32> {
let col = self
.data
.column_by_name(DV_FIELD_OFFSET)
.map(|c| c.as_primitive::<Int32Type>())?;
col.is_valid(self.index).then(|| col.value(self.index))
}
}
fn get_string_value(data: &dyn Array, index: usize) -> Option<&str> {
match data.data_type() {
ArrowDataType::Utf8 => {
let arr = data.as_string::<i32>();
arr.is_valid(index).then(|| arr.value(index))
}
ArrowDataType::LargeUtf8 => {
let arr = data.as_string::<i64>();
arr.is_valid(index).then(|| arr.value(index))
}
ArrowDataType::Utf8View => {
let arr = data.as_string_view();
arr.is_valid(index).then(|| arr.value(index))
}
_ => None,
}
}
fn collect_string_map(data: &dyn Array) -> Option<HashMap<String, Option<String>>> {
let entries = data.as_any().downcast_ref::<StructArray>()?;
let keys = entries.column(0).as_any().downcast_ref::<StringArray>()?;
let values = entries.column(1).as_any().downcast_ref::<StringArray>()?;
Some(
keys.iter()
.zip(values.iter())
.filter_map(|(key, value)| key.map(|k| (k.to_string(), value.map(str::to_string))))
.collect(),
)
}
impl TryFrom<&LogicalFileView> for ObjectMeta {
type Error = DeltaTableError;
fn try_from(file_stats: &LogicalFileView) -> Result<Self, Self::Error> {
Ok(ObjectMeta {
location: file_stats.object_store_path(),
size: file_stats.size() as u64,
last_modified: file_stats.modification_datetime()?,
version: None,
e_tag: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::TestTables;
use chrono::DateTime;
use futures::TryStreamExt;
#[tokio::test]
async fn test_logical_file_view_with_real_data() {
let log_store = TestTables::Simple
.table_builder()
.expect("Failed to create table builder")
.build_storage()
.expect("Failed to build storage");
let snapshot =
crate::kernel::snapshot::Snapshot::try_new(&log_store, Default::default(), None)
.await
.unwrap();
let files: Vec<_> = snapshot
.files(&log_store, None)
.try_collect()
.await
.unwrap();
assert!(!files.is_empty(), "Should have test files");
let first_batch = &files[0];
let view = LogicalFileView::new(first_batch.clone(), 0);
assert!(!view.path().is_empty());
assert!(view.size() > 0);
assert!(view.modification_time() > 0);
let datetime = view.modification_datetime().unwrap();
assert!(datetime.timestamp_millis() > 0);
let add_action = view.to_add();
assert_eq!(add_action.path, view.path());
assert_eq!(add_action.size, view.size());
assert!(add_action.data_change);
let remove_action = view.remove_action(true);
assert_eq!(remove_action.path, view.path());
assert!(remove_action.data_change);
assert!(remove_action.deletion_timestamp.is_some());
let object_meta: ObjectMeta = (&view).try_into().unwrap();
assert_eq!(object_meta.size as i64, view.size());
assert_eq!(
object_meta.last_modified.timestamp_millis(),
view.modification_time()
);
}
#[test]
fn test_path_url_decoding() {
use percent_encoding::percent_decode_str;
use std::borrow::Cow;
let encoded_path = "path/to/file%20with%20spaces.parquet";
let decoded: Cow<str> = percent_decode_str(encoded_path).decode_utf8_lossy();
assert_eq!(decoded, "path/to/file with spaces.parquet");
}
#[test]
fn test_get_string_value_different_types() {
use arrow_array::{Int32Array, StringArray};
let utf8_array = StringArray::from(vec![Some("test"), None]);
assert_eq!(get_string_value(&utf8_array, 0), Some("test"));
assert_eq!(get_string_value(&utf8_array, 1), None);
let large_utf8_array = arrow_array::LargeStringArray::from(vec![Some("large_test"), None]);
assert_eq!(get_string_value(&large_utf8_array, 0), Some("large_test"));
assert_eq!(get_string_value(&large_utf8_array, 1), None);
let utf8_view_array = arrow_array::StringViewArray::from(vec![Some("view_test"), None]);
assert_eq!(get_string_value(&utf8_view_array, 0), Some("view_test"));
assert_eq!(get_string_value(&utf8_view_array, 1), None);
let int_array = Int32Array::from(vec![123]);
assert_eq!(get_string_value(&int_array, 0), None);
}
#[test]
fn test_ceil_datetime() {
assert_eq!(ceil_datetime(1609459200000), 1609459201000);
assert_eq!(ceil_datetime(1609459200123), 1609459200123);
assert_eq!(ceil_datetime(0), 1000);
}
#[test]
fn test_round_ms_datetimes() {
use delta_kernel::expressions::{Scalar, StructData};
use delta_kernel::schema::{DataType, PrimitiveType, StructField};
let ceil_fn = |v: i64| v + 1000;
let timestamp = Scalar::Timestamp(1609459200000);
let rounded = round_ms_datetimes(timestamp, &ceil_fn);
assert_eq!(rounded, Scalar::Timestamp(1609459201000));
let timestamp_ntz = Scalar::TimestampNtz(1609459200000);
let rounded = round_ms_datetimes(timestamp_ntz, &ceil_fn);
assert_eq!(rounded, Scalar::TimestampNtz(1609459201000));
let string_scalar = Scalar::String("test".into());
let rounded = round_ms_datetimes(string_scalar.clone(), &ceil_fn);
assert_eq!(rounded, string_scalar);
let fields = vec![StructField::new(
"ts",
DataType::Primitive(PrimitiveType::Timestamp),
true,
)];
let values = vec![Scalar::Timestamp(1609459200000)];
let struct_data = StructData::try_new(fields.clone(), values).unwrap();
let struct_scalar = Scalar::Struct(struct_data);
let rounded = round_ms_datetimes(struct_scalar, &ceil_fn);
if let Scalar::Struct(rounded_struct) = rounded {
assert_eq!(rounded_struct.values()[0], Scalar::Timestamp(1609459201000));
} else {
panic!("Expected struct scalar");
}
}
#[test]
fn test_invalid_modification_time() {
let invalid_timestamp = i64::MAX;
let result = DateTime::from_timestamp_millis(invalid_timestamp);
assert!(result.is_none());
let valid_timestamp = 1609459200000; let result = DateTime::from_timestamp_millis(valid_timestamp);
assert!(result.is_some());
}
}