buoyant_kernel 0.21.100

Buoyant Data distribution of delta-kernel
Documentation
use crate::arrow::array::StringArray;
use crate::scan::state_info::StateInfo;
use crate::schema::StructType;
use crate::utils::test_utils::string_array_to_engine_data;
use itertools::Itertools;
use std::sync::Arc;

use crate::log_replay::ActionsBatch;
use crate::log_segment::CheckpointReadInfo;
use crate::{
    actions::get_commit_schema,
    engine::{
        arrow_data::ArrowEngineData,
        sync::{json::SyncJsonHandler, SyncEngine},
    },
    scan::log_replay::scan_action_iter,
    schema::SchemaRef,
    JsonHandler,
};

use super::state::ScanCallback;
use super::PhysicalPredicate;
use crate::scan::transform_spec::TransformSpec;
use crate::table_features::ColumnMappingMode;

// Generates a batch of sidecar actions with the given paths.
// The schema is provided as null columns affect equality checks.
pub(crate) fn sidecar_batch_with_given_paths(
    paths: Vec<&str>,
    output_schema: SchemaRef,
) -> Box<ArrowEngineData> {
    // Use default size of 9268 for backward compatibility
    let paths_with_sizes: Vec<_> = paths.into_iter().map(|p| (p, 9268u64)).collect();
    sidecar_batch_with_given_paths_and_sizes(paths_with_sizes, output_schema)
}

// Generates a batch of sidecar actions with the given paths and sizes.
// The schema is provided as null columns affect equality checks.
pub(crate) fn sidecar_batch_with_given_paths_and_sizes(
    paths_and_sizes: Vec<(&str, u64)>,
    output_schema: SchemaRef,
) -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};

    let mut json_strings: Vec<String> = paths_and_sizes
        .iter()
        .map(|(path, size)| {
            format!(
                r#"{{"sidecar":{{"path":"{path}","sizeInBytes":{size},"modificationTime":1714496113961,"tags":{{"tag_foo":"tag_bar"}}}}}}"#
            )
        })
        .collect();
    json_strings.push(r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#.to_string());

    let json_strings_array: StringArray =
        json_strings.iter().map(|s| s.as_str()).collect_vec().into();

    let parsed = handler
        .parse_json(
            string_array_to_engine_data(json_strings_array),
            output_schema,
        )
        .unwrap();

    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

// Generates a batch with an add action.
// The schema is provided as null columns affect equality checks.
pub(crate) fn add_batch_simple(output_schema: SchemaRef) -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};
    let json_strings: StringArray = vec![
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
        r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
    ]
        .into();
    let parsed = handler
        .parse_json(string_array_to_engine_data(json_strings), output_schema)
        .unwrap();
    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

// Generates a batch with an add action.
// The schema is provided as null columns affect equality checks.
pub(crate) fn add_batch_for_row_id(output_schema: SchemaRef) -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};
    let json_strings: StringArray = vec![
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","baseRowId": 42, "tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
        r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none", "delta.enableRowTracking": "true", "delta.rowTracking.materializedRowIdColumnName":"row_id_col"},"createdTime":1677811175819}}"#,
    ]
        .into();
    let parsed = handler
        .parse_json(string_array_to_engine_data(json_strings), output_schema)
        .unwrap();
    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

// An add batch with a removed file parsed with the schema provided
pub(crate) fn add_batch_with_remove(output_schema: SchemaRef) -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};
    let json_strings: StringArray = vec![
        r#"{"remove":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","deletionTimestamp":1677811194426,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{},"size":635,"tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues":{},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
        r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
    ]
        .into();
    let parsed = handler
        .parse_json(string_array_to_engine_data(json_strings), output_schema)
        .unwrap();
    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

// A batch with a Remove action and a partition column (`date`). The Remove has
// `partitionValues: {"date": "2017-12-10"}` but the transform reads from `add.*` columns,
// so the Remove's partition values are not visible to the data skipping filter.
pub(crate) fn add_batch_with_remove_and_partition(
    output_schema: SchemaRef,
) -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};
    let json_strings: StringArray = vec![
        r#"{"remove":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","deletionTimestamp":1677811194426,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"date":"2017-12-10"},"size":635}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues":{"date":"2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0}}"}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues":{"date":"2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0}}"}}"#,
        r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{"delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
    ]
        .into();
    let parsed = handler
        .parse_json(string_array_to_engine_data(json_strings), output_schema)
        .unwrap();
    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

// add batch with a `date` partition col
pub(crate) fn add_batch_with_partition_col() -> Box<ArrowEngineData> {
    let handler = SyncJsonHandler {};
    let json_strings: StringArray = vec![
        r#"{"metaData":{"id":"testId","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"value\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{"delta.enableDeletionVectors":"true","delta.columnMapping.mode":"none"},"createdTime":1677811175819}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c001.snappy.parquet","partitionValues": {"date": "2017-12-11"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":false}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"}}}"#,
        r#"{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}"#,
        r#"{"add":{"path":"part-00000-fae5310a-a37d-4e51-827b-c3d5516560ca-c000.snappy.parquet","partitionValues": {"date": "2017-12-10"},"size":635,"modificationTime":1677811178336,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"value\":0},\"maxValues\":{\"value\":9},\"nullCount\":{\"value\":0},\"tightBounds\":true}","tags":{"INSERTION_TIME":"1677811178336000","MIN_INSERTION_TIME":"1677811178336000","MAX_INSERTION_TIME":"1677811178336000","OPTIMIZE_TARGET_SIZE":"268435456"},"deletionVector":{"storageType":"u","pathOrInlineDv":"vBn[lx{q8@P<9BNH/isA","offset":1,"sizeInBytes":36,"cardinality":2}}}"#,
    ]
        .into();
    let output_schema = get_commit_schema().clone();
    let parsed = handler
        .parse_json(string_array_to_engine_data(json_strings), output_schema)
        .unwrap();
    ArrowEngineData::try_from_engine_data(parsed).unwrap()
}

/// Create a scan action iter and validate what's called back. If you pass `None` as
/// `logical_schema`, `transform` should also be `None`
#[allow(clippy::vec_box)]
pub(crate) fn run_with_validate_callback<T: Clone>(
    batch: Vec<Box<ArrowEngineData>>,
    logical_schema: Option<SchemaRef>,
    transform_spec: Option<Arc<TransformSpec>>,
    expected_sel_vec: &[bool],
    context: T,
    validate_callback: ScanCallback<T>,
) {
    let logical_schema =
        logical_schema.unwrap_or_else(|| Arc::new(StructType::new_unchecked(vec![])));
    let state_info = Arc::new(StateInfo {
        logical_schema: logical_schema.clone(),
        physical_schema: logical_schema,
        physical_predicate: PhysicalPredicate::None,
        transform_spec,
        column_mapping_mode: ColumnMappingMode::None,
        physical_stats_schema: None,
        physical_partition_schema: None,
    });
    let checkpoint_info = CheckpointReadInfo::without_stats_parsed();
    let (iter, _metrics) = scan_action_iter(
        &SyncEngine::new(),
        batch
            .into_iter()
            .map(|batch| Ok(ActionsBatch::new(batch as _, true))),
        state_info,
        checkpoint_info,
        false,
    )
    .unwrap();
    let mut batch_count = 0;
    for res in iter {
        let scan_metadata = res.unwrap();
        assert_eq!(
            scan_metadata.scan_files.selection_vector(),
            expected_sel_vec
        );
        scan_metadata
            .visit_scan_files(context.clone(), validate_callback)
            .unwrap();
        batch_count += 1;
    }
    assert_eq!(batch_count, 1);
}