datafusion-substrait 53.1.0

DataFusion Substrait Producer and Consumer
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::collections::HashMap;

use crate::variation_const::{
    DEFAULT_CONTAINER_TYPE_VARIATION_REF, LARGE_CONTAINER_TYPE_VARIATION_REF,
    VIEW_CONTAINER_TYPE_VARIATION_REF,
};

use datafusion::arrow::datatypes::DataType;
use datafusion::datasource::source::DataSourceExec;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::{ExecutionPlan, displayable};

use datafusion::datasource::physical_plan::ParquetSource;
use substrait::proto::ReadRel;
use substrait::proto::Rel;
use substrait::proto::expression::MaskExpression;
use substrait::proto::expression::mask_expression::{StructItem, StructSelect};
use substrait::proto::read_rel::LocalFiles;
use substrait::proto::read_rel::ReadType;
use substrait::proto::read_rel::local_files::FileOrFiles;
use substrait::proto::read_rel::local_files::file_or_files::ParquetReadOptions;
use substrait::proto::read_rel::local_files::file_or_files::{FileFormat, PathType};
use substrait::proto::rel::RelType;
use substrait::proto::r#type::{
    Binary, Boolean, Fp64, I64, Kind, Nullability, String as SubstraitString, Struct,
};
use substrait::proto::{NamedStruct, Type, extensions};

/// Convert DataFusion ExecutionPlan to Substrait Rel
pub fn to_substrait_rel(
    plan: &dyn ExecutionPlan,
    _extension_info: &mut (
        Vec<extensions::SimpleExtensionDeclaration>,
        HashMap<String, u32>,
    ),
) -> Result<Box<Rel>> {
    if let Some(data_source_exec) = plan.as_any().downcast_ref::<DataSourceExec>()
        && let Some((file_config, _)) =
            data_source_exec.downcast_to_file_source::<ParquetSource>()
    {
        let mut substrait_files = vec![];
        for (partition_index, files) in file_config.file_groups.iter().enumerate() {
            for file in files.iter() {
                substrait_files.push(FileOrFiles {
                    partition_index: partition_index.try_into().unwrap(),
                    start: 0,
                    length: file.object_meta.size,
                    path_type: Some(PathType::UriPath(
                        file.object_meta.location.as_ref().to_string(),
                    )),
                    file_format: Some(FileFormat::Parquet(ParquetReadOptions {})),
                });
            }
        }

        let mut names = vec![];
        let mut types = vec![];

        for field in file_config.file_schema().fields.iter() {
            match to_substrait_type(field.data_type(), field.is_nullable()) {
                Ok(t) => {
                    names.push(field.name().clone());
                    types.push(t);
                }
                Err(e) => return Err(e),
            }
        }

        let type_info = Struct {
            types,
            // FIXME: duckdb doesn't set this field, keep it as default variant 0.
            // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1106-L1127
            type_variation_reference: 0,
            nullability: Nullability::Required.into(),
        };

        let mut select_struct = None;
        if let Some(projection) = file_config.file_source().projection().as_ref() {
            let struct_items = projection
                .column_indices()
                .into_iter()
                .map(|index| StructItem {
                    field: index as i32,
                    // FIXME: duckdb sets this to None, but it's not clear why.
                    // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1191
                    child: None,
                })
                .collect();

            select_struct = Some(StructSelect { struct_items });
        }

        return Ok(Box::new(Rel {
            rel_type: Some(RelType::Read(Box::new(ReadRel {
                common: None,
                base_schema: Some(NamedStruct {
                    names,
                    r#struct: Some(type_info),
                }),
                filter: None,
                best_effort_filter: None,
                projection: Some(MaskExpression {
                    select: select_struct,
                    // FIXME: duckdb set this to true, but it's not clear why.
                    // https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L1186.
                    maintain_singular_struct: true,
                }),
                advanced_extension: None,
                read_type: Some(ReadType::LocalFiles(LocalFiles {
                    items: substrait_files,
                    advanced_extension: None,
                })),
            }))),
        }));
    }
    Err(DataFusionError::Substrait(format!(
        "Unsupported plan in Substrait physical plan producer: {}",
        displayable(plan).one_line()
    )))
}

// see https://github.com/duckdb/substrait/blob/b6f56643cb11d52de0e32c24a01dfd5947df62be/src/to_substrait.cpp#L954-L1094.
fn to_substrait_type(data_type: &DataType, nullable: bool) -> Result<Type> {
    let nullability = if nullable {
        Nullability::Nullable.into()
    } else {
        Nullability::Required.into()
    };

    match data_type {
        DataType::Boolean => Ok(Type {
            kind: Some(Kind::Bool(Boolean {
                type_variation_reference: 0,
                nullability,
            })),
        }),
        DataType::Int64 => Ok(Type {
            kind: Some(Kind::I64(I64 {
                type_variation_reference: 0,
                nullability,
            })),
        }),
        DataType::Float64 => Ok(Type {
            kind: Some(Kind::Fp64(Fp64 {
                type_variation_reference: 0,
                nullability,
            })),
        }),
        DataType::Utf8 => Ok(Type {
            kind: Some(Kind::String(SubstraitString {
                type_variation_reference: DEFAULT_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        DataType::LargeUtf8 => Ok(Type {
            kind: Some(Kind::String(SubstraitString {
                type_variation_reference: LARGE_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        DataType::Utf8View => Ok(Type {
            kind: Some(Kind::String(SubstraitString {
                type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        DataType::Binary => Ok(Type {
            kind: Some(Kind::Binary(Binary {
                type_variation_reference: DEFAULT_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        DataType::LargeBinary => Ok(Type {
            kind: Some(Kind::Binary(Binary {
                type_variation_reference: LARGE_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        DataType::BinaryView => Ok(Type {
            kind: Some(Kind::Binary(Binary {
                type_variation_reference: VIEW_CONTAINER_TYPE_VARIATION_REF,
                nullability,
            })),
        }),
        _ => Err(DataFusionError::Substrait(format!(
            "Logical type {data_type} not implemented as substrait type"
        ))),
    }
}