#[cfg(feature = "integration-tests")]
mod tests {
use arrow::datatypes::Field;
use arrow::datatypes::Schema;
use arrow_schema::DataType;
use datafusion_common::DataFusionError;
use datafusion_ffi::execution_plan::FFI_ExecutionPlan;
use datafusion_ffi::execution_plan::ForeignExecutionPlan;
use datafusion_ffi::execution_plan::{ExecutionPlanPrivateData, tests::EmptyExec};
use datafusion_ffi::tests::utils::get_module;
use datafusion_physical_plan::ExecutionPlan;
use std::sync::Arc;
#[test]
fn test_ffi_execution_plan_partition_statistics_cross_library()
-> Result<(), DataFusionError> {
let module = get_module()?;
let bare = (module.create_empty_exec)();
let bare: Arc<dyn ExecutionPlan> = (&bare).try_into()?;
assert!(bare.is::<ForeignExecutionPlan>());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let bare_stats = bare.partition_statistics(None)?;
assert_eq!(
bare_stats.as_ref(),
&datafusion_common::Statistics::new_unknown(&schema),
);
let expected = datafusion_ffi::tests::make_test_statistics();
let with_stats = (module.create_exec_with_statistics)();
let with_stats: Arc<dyn ExecutionPlan> = (&with_stats).try_into()?;
assert!(with_stats.is::<ForeignExecutionPlan>());
let observed_all = with_stats.partition_statistics(None)?;
assert_eq!(observed_all.as_ref(), &expected);
let observed_part = with_stats.partition_statistics(Some(0))?;
assert_eq!(observed_part.as_ref(), &expected);
Ok(())
}
#[test]
fn test_ffi_execution_plan_new_sets_runtimes_on_children()
-> Result<(), DataFusionError> {
let module = get_module()?;
fn generate_local_plan() -> Arc<dyn ExecutionPlan> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
Arc::new(EmptyExec::new(schema))
}
let child_plan = (module.create_empty_exec)();
let child_plan: Arc<dyn ExecutionPlan> = (&child_plan)
.try_into()
.expect("should be able create plan");
assert!(child_plan.is::<ForeignExecutionPlan>());
let grandchild_plan = generate_local_plan();
let child_plan = child_plan.with_new_children(vec![grandchild_plan])?;
unsafe {
let ffi_child = FFI_ExecutionPlan::new(Arc::clone(&child_plan), None);
let ffi_grandchild =
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
let grandchild_private_data =
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
assert!((*grandchild_private_data).runtime.is_none());
}
let parent_plan = generate_local_plan().with_new_children(vec![child_plan])?;
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
let ffi_parent =
FFI_ExecutionPlan::new(parent_plan, Some(runtime.handle().clone()));
unsafe {
let ffi_child = (ffi_parent.children)(&ffi_parent)
.into_iter()
.next()
.unwrap();
let ffi_grandchild =
(ffi_child.children)(&ffi_child).into_iter().next().unwrap();
assert_eq!(
(ffi_grandchild.library_marker_id)(),
(ffi_parent.library_marker_id)()
);
let grandchild_private_data =
ffi_grandchild.private_data as *const ExecutionPlanPrivateData;
assert!((*grandchild_private_data).runtime.is_some());
}
Ok(())
}
}