use std::collections::HashMap;
use std::sync::LazyLock;
use arrow_schema::{DataType, Field, TimeUnit};
pub(crate) use self::scan_utils::*;
use crate::DeltaResult;
use crate::kernel::{Add, AddCDCFile, Remove, Version};
pub mod scan;
mod scan_utils;
pub const CHANGE_TYPE_COL: &str = "_change_type";
pub const COMMIT_VERSION_COL: &str = "_commit_version";
pub const COMMIT_TIMESTAMP_COL: &str = "_commit_timestamp";
pub(crate) static CDC_PARTITION_SCHEMA: LazyLock<Vec<Field>> = LazyLock::new(|| {
vec![
Field::new(COMMIT_VERSION_COL, DataType::UInt64, true),
Field::new(
COMMIT_TIMESTAMP_COL,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
]
});
pub(crate) static ADD_PARTITION_SCHEMA: LazyLock<Vec<Field>> = LazyLock::new(|| {
vec![
Field::new(CHANGE_TYPE_COL, DataType::Utf8, true),
Field::new(COMMIT_VERSION_COL, DataType::UInt64, true),
Field::new(
COMMIT_TIMESTAMP_COL,
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
),
]
});
#[derive(Debug)]
pub(crate) struct CdcDataSpec<F: FileAction> {
version: Version,
timestamp: i64,
actions: Vec<F>,
}
impl<F: FileAction> CdcDataSpec<F> {
pub fn new(version: Version, timestamp: i64, actions: Vec<F>) -> Self {
Self {
version,
timestamp,
actions,
}
}
}
pub trait FileAction {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>>;
fn path(&self) -> String;
fn size(&self) -> DeltaResult<usize>;
}
impl FileAction for Add {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}
fn path(&self) -> String {
self.path.clone()
}
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}
impl FileAction for AddCDCFile {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
Ok(&self.partition_values)
}
fn path(&self) -> String {
self.path.clone()
}
fn size(&self) -> DeltaResult<usize> {
Ok(self.size as usize)
}
}
impl FileAction for Remove {
fn partition_values(&self) -> DeltaResult<&HashMap<String, Option<String>>> {
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.partition_values.as_ref().unwrap())
} else {
match self.partition_values {
Some(ref part_map) => Ok(part_map),
_ => Err(crate::DeltaTableError::MetadataError(
"Remove action is missing required field: 'partition_values'".to_string(),
)),
}
}
}
fn path(&self) -> String {
self.path.clone()
}
fn size(&self) -> DeltaResult<usize> {
if self.extended_file_metadata.unwrap_or_default() {
Ok(self.size.unwrap() as usize)
} else {
match self.size {
Some(size) => Ok(size as usize),
_ => Err(crate::DeltaTableError::MetadataError(
"Remove action is missing required field: 'size'".to_string(),
)),
}
}
}
}