use std::ffi::c_void;
use std::pin::Pin;
use std::sync::Arc;
use abi_stable::StableAbi;
use abi_stable::std_types::{RString, RVec};
use datafusion_common::{DataFusionError, Result};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_physical_plan::{
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
};
use tokio::runtime::Handle;
use crate::execution::FFI_TaskContext;
use crate::plan_properties::FFI_PlanProperties;
use crate::record_batch_stream::FFI_RecordBatchStream;
use crate::util::FFIResult;
use crate::{df_result, rresult};
#[repr(C)]
#[derive(Debug, StableAbi)]
pub struct FFI_ExecutionPlan {
pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
pub name: unsafe extern "C" fn(plan: &Self) -> RString,
pub execute: unsafe extern "C" fn(
plan: &Self,
partition: usize,
context: FFI_TaskContext,
) -> FFIResult<FFI_RecordBatchStream>,
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
pub release: unsafe extern "C" fn(arg: &mut Self),
pub private_data: *mut c_void,
pub library_marker_id: extern "C" fn() -> usize,
}
unsafe impl Send for FFI_ExecutionPlan {}
unsafe impl Sync for FFI_ExecutionPlan {}
pub struct ExecutionPlanPrivateData {
pub plan: Arc<dyn ExecutionPlan>,
pub runtime: Option<Handle>,
}
impl FFI_ExecutionPlan {
fn inner(&self) -> &Arc<dyn ExecutionPlan> {
let private_data = self.private_data as *const ExecutionPlanPrivateData;
unsafe { &(*private_data).plan }
}
}
unsafe extern "C" fn properties_fn_wrapper(
plan: &FFI_ExecutionPlan,
) -> FFI_PlanProperties {
plan.inner().properties().as_ref().into()
}
unsafe extern "C" fn children_fn_wrapper(
plan: &FFI_ExecutionPlan,
) -> RVec<FFI_ExecutionPlan> {
unsafe {
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let runtime = &(*private_data).runtime;
let children: Vec<_> = plan
.children()
.into_iter()
.map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
.collect();
children.into()
}
}
unsafe extern "C" fn execute_fn_wrapper(
plan: &FFI_ExecutionPlan,
partition: usize,
context: FFI_TaskContext,
) -> FFIResult<FFI_RecordBatchStream> {
unsafe {
let ctx = context.into();
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan = &(*private_data).plan;
let runtime = (*private_data).runtime.clone();
rresult!(
plan.execute(partition, ctx)
.map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
)
}
}
unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
plan.inner().name().into()
}
unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
unsafe {
debug_assert!(!plan.private_data.is_null());
let private_data =
Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
drop(private_data);
plan.private_data = std::ptr::null_mut();
}
}
unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
unsafe {
let private_data = plan.private_data as *const ExecutionPlanPrivateData;
let plan_data = &(*private_data);
FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone())
}
}
impl Clone for FFI_ExecutionPlan {
fn clone(&self) -> Self {
unsafe { (self.clone)(self) }
}
}
impl FFI_ExecutionPlan {
pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
Self {
properties: properties_fn_wrapper,
children: children_fn_wrapper,
name: name_fn_wrapper,
execute: execute_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
private_data: Box::into_raw(private_data) as *mut c_void,
library_marker_id: crate::get_library_marker_id,
}
}
}
impl Drop for FFI_ExecutionPlan {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}
#[derive(Debug)]
pub struct ForeignExecutionPlan {
name: String,
plan: FFI_ExecutionPlan,
properties: Arc<PlanProperties>,
children: Vec<Arc<dyn ExecutionPlan>>,
}
unsafe impl Send for ForeignExecutionPlan {}
unsafe impl Sync for ForeignExecutionPlan {}
impl DisplayAs for ForeignExecutionPlan {
fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
match t {
DisplayFormatType::Default | DisplayFormatType::Verbose => {
write!(
f,
"FFI_ExecutionPlan: {}, number_of_children={}",
self.name,
self.children.len(),
)
}
DisplayFormatType::TreeRender => {
write!(f, "")
}
}
}
}
impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
type Error = DataFusionError;
fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
if (plan.library_marker_id)() == crate::get_library_marker_id() {
return Ok(Arc::clone(plan.inner()));
}
unsafe {
let name = (plan.name)(plan).into();
let properties: PlanProperties = (plan.properties)(plan).try_into()?;
let children_rvec = (plan.children)(plan);
let children = children_rvec
.iter()
.map(<Arc<dyn ExecutionPlan>>::try_from)
.collect::<Result<Vec<_>>>()?;
let plan = ForeignExecutionPlan {
name,
plan: plan.clone(),
properties: Arc::new(properties),
children,
};
Ok(Arc::new(plan))
}
}
}
impl ExecutionPlan for ForeignExecutionPlan {
fn name(&self) -> &str {
&self.name
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.properties
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.children.iter().collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(ForeignExecutionPlan {
plan: self.plan.clone(),
name: self.name.clone(),
children,
properties: Arc::clone(&self.properties),
}))
}
fn execute(
&self,
partition: usize,
context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
let context = FFI_TaskContext::from(context);
unsafe {
df_result!((self.plan.execute)(&self.plan, partition, context))
.map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
}
}
}
#[cfg(test)]
pub(crate) mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use datafusion::physical_plan::Partitioning;
use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
use super::*;
#[derive(Debug)]
pub struct EmptyExec {
props: Arc<PlanProperties>,
children: Vec<Arc<dyn ExecutionPlan>>,
}
impl EmptyExec {
pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
Self {
props: Arc::new(PlanProperties::new(
datafusion::physical_expr::EquivalenceProperties::new(schema),
Partitioning::UnknownPartitioning(3),
EmissionType::Incremental,
Boundedness::Bounded,
)),
children: Vec::default(),
}
}
}
impl DisplayAs for EmptyExec {
fn fmt_as(
&self,
_t: DisplayFormatType,
_f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
unimplemented!()
}
}
impl ExecutionPlan for EmptyExec {
fn name(&self) -> &'static str {
"empty-exec"
}
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn properties(&self) -> &Arc<PlanProperties> {
&self.props
}
fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
self.children.iter().collect()
}
fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(Arc::new(EmptyExec {
props: Arc::clone(&self.props),
children,
}))
}
fn execute(
&self,
_partition: usize,
_context: Arc<TaskContext>,
) -> Result<SendableRecordBatchStream> {
unimplemented!()
}
}
#[test]
fn test_round_trip_ffi_execution_plan() -> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let original_plan = Arc::new(EmptyExec::new(schema));
let original_name = original_plan.name().to_string();
let mut local_plan = FFI_ExecutionPlan::new(original_plan, None);
local_plan.library_marker_id = crate::mock_foreign_marker_id;
let foreign_plan: Arc<dyn ExecutionPlan> = (&local_plan).try_into()?;
assert_eq!(original_name, foreign_plan.name());
let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
foreign_plan.as_ref(),
);
let buf = display.one_line().to_string();
assert_eq!(
buf.trim(),
"FFI_ExecutionPlan: empty-exec, number_of_children=0"
);
Ok(())
}
#[test]
fn test_ffi_execution_plan_children() -> Result<()> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
child_local.library_marker_id = crate::mock_foreign_marker_id;
let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
parent_local.library_marker_id = crate::mock_foreign_marker_id;
let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
assert_eq!(parent_foreign.children().len(), 0);
assert_eq!(child_foreign.children().len(), 0);
let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
assert_eq!(parent_foreign.children().len(), 1);
let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
child_local.library_marker_id = crate::mock_foreign_marker_id;
let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
parent_local.library_marker_id = crate::mock_foreign_marker_id;
let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
assert_eq!(parent_foreign.children().len(), 1);
Ok(())
}
#[test]
fn test_ffi_execution_plan_local_bypass() {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let plan = Arc::new(EmptyExec::new(schema));
let mut ffi_plan = FFI_ExecutionPlan::new(plan, None);
let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
assert!(foreign_plan.as_any().downcast_ref::<EmptyExec>().is_some());
ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
assert!(
foreign_plan
.as_any()
.downcast_ref::<ForeignExecutionPlan>()
.is_some()
);
}
}