#[cfg(test)]
mod tests {
mod schema_compatibility {
use crate::utils::test::read_json;
use datafusion::arrow::datatypes::{DataType, Field};
use datafusion::common::{DFSchema, Result, TableReference};
use datafusion::datasource::empty::EmptyTable;
use datafusion::prelude::SessionContext;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
use insta::assert_snapshot;
use std::collections::HashMap;
use std::sync::Arc;
fn generate_context_with_table(
table_name: &str,
fields: Vec<(&str, DataType, bool)>,
) -> Result<SessionContext> {
let table_ref = TableReference::bare(table_name);
let fields: Vec<(Option<TableReference>, Arc<Field>)> = fields
.into_iter()
.map(|pair| {
let (field_name, data_type, nullable) = pair;
(
Some(table_ref.clone()),
Arc::new(Field::new(field_name, data_type, nullable)),
)
})
.collect();
let df_schema = DFSchema::new_with_metadata(fields, HashMap::default())?;
let ctx = SessionContext::new();
ctx.register_table(
table_ref,
Arc::new(EmptyTable::new(Arc::clone(df_schema.inner()))),
)?;
Ok(ctx)
}
#[tokio::test]
async fn ensure_schema_match_exact() -> Result<()> {
let proto_plan =
read_json("tests/testdata/test_plans/simple_select.substrait.json");
let df_schema =
vec![("a", DataType::Int32, false), ("b", DataType::Int32, true)];
let ctx = generate_context_with_table("DATA", df_schema)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
assert_snapshot!(
plan,
@r"
Projection: DATA.a, DATA.b
TableScan: DATA
"
);
Ok(())
}
#[tokio::test]
async fn ensure_schema_match_subset() -> Result<()> {
let proto_plan =
read_json("tests/testdata/test_plans/simple_select.substrait.json");
let df_schema = vec![
("b", DataType::Int32, true),
("a", DataType::Int32, false),
("c", DataType::Int32, false),
];
let ctx = generate_context_with_table("DATA", df_schema)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
assert_snapshot!(
plan,
@r"
Projection: DATA.a, DATA.b
TableScan: DATA projection=[a, b]
"
);
Ok(())
}
#[tokio::test]
async fn ensure_schema_match_subset_with_mask() -> Result<()> {
let proto_plan = read_json(
"tests/testdata/test_plans/simple_select_with_mask.substrait.json",
);
let df_schema = vec![
("d", DataType::Int32, true),
("a", DataType::Int32, false),
("c", DataType::Int32, false),
("b", DataType::Int32, false),
];
let ctx = generate_context_with_table("DATA", df_schema)?;
let plan = from_substrait_plan(&ctx.state(), &proto_plan).await?;
assert_snapshot!(
plan,
@r"
Projection: DATA.a, DATA.b
TableScan: DATA projection=[a, b]
"
);
Ok(())
}
#[tokio::test]
async fn ensure_schema_match_not_subset() -> Result<()> {
let proto_plan =
read_json("tests/testdata/test_plans/simple_select.substrait.json");
let df_schema =
vec![("a", DataType::Int32, false), ("c", DataType::Int32, true)];
let ctx = generate_context_with_table("DATA", df_schema)?;
let res = from_substrait_plan(&ctx.state(), &proto_plan).await;
assert!(res.is_err());
Ok(())
}
#[tokio::test]
async fn reject_plans_with_incompatible_field_types() -> Result<()> {
let proto_plan =
read_json("tests/testdata/test_plans/simple_select.substrait.json");
let ctx =
generate_context_with_table("DATA", vec![("a", DataType::Date32, true)])?;
let res = from_substrait_plan(&ctx.state(), &proto_plan).await;
assert!(res.is_err());
Ok(())
}
}
}