use std::ffi::c_void;
use std::sync::Arc;
use abi_stable::StableAbi;
use abi_stable::std_types::{ROption, RVec};
use arrow::datatypes::SchemaRef;
use datafusion_common::error::{DataFusionError, Result};
use datafusion_physical_expr::EquivalenceProperties;
use datafusion_physical_expr_common::sort_expr::PhysicalSortExpr;
use datafusion_physical_plan::PlanProperties;
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
use crate::arrow_wrappers::WrappedSchema;
use crate::physical_expr::partitioning::FFI_Partitioning;
use crate::physical_expr::sort::FFI_PhysicalSortExpr;
#[repr(C)]
#[derive(Debug, StableAbi)]
pub struct FFI_PlanProperties {
pub output_partitioning: unsafe extern "C" fn(plan: &Self) -> FFI_Partitioning,
pub emission_type: unsafe extern "C" fn(plan: &Self) -> FFI_EmissionType,
pub boundedness: unsafe extern "C" fn(plan: &Self) -> FFI_Boundedness,
pub output_ordering:
unsafe extern "C" fn(plan: &Self) -> ROption<RVec<FFI_PhysicalSortExpr>>,
pub schema: unsafe extern "C" fn(plan: &Self) -> WrappedSchema,
pub release: unsafe extern "C" fn(arg: &mut Self),
pub private_data: *mut c_void,
pub library_marker_id: extern "C" fn() -> usize,
}
struct PlanPropertiesPrivateData {
props: PlanProperties,
}
impl FFI_PlanProperties {
fn inner(&self) -> &PlanProperties {
let private_data = self.private_data as *const PlanPropertiesPrivateData;
unsafe { &(*private_data).props }
}
}
unsafe extern "C" fn output_partitioning_fn_wrapper(
properties: &FFI_PlanProperties,
) -> FFI_Partitioning {
properties.inner().output_partitioning().into()
}
unsafe extern "C" fn emission_type_fn_wrapper(
properties: &FFI_PlanProperties,
) -> FFI_EmissionType {
properties.inner().emission_type.into()
}
unsafe extern "C" fn boundedness_fn_wrapper(
properties: &FFI_PlanProperties,
) -> FFI_Boundedness {
properties.inner().boundedness.into()
}
unsafe extern "C" fn output_ordering_fn_wrapper(
properties: &FFI_PlanProperties,
) -> ROption<RVec<FFI_PhysicalSortExpr>> {
let ordering: Option<RVec<FFI_PhysicalSortExpr>> =
properties.inner().output_ordering().map(|lex_ordering| {
let vec_ordering: Vec<PhysicalSortExpr> = lex_ordering.clone().into();
vec_ordering
.iter()
.map(FFI_PhysicalSortExpr::from)
.collect()
});
ordering.into()
}
unsafe extern "C" fn schema_fn_wrapper(properties: &FFI_PlanProperties) -> WrappedSchema {
let schema: SchemaRef = Arc::clone(properties.inner().eq_properties.schema());
schema.into()
}
unsafe extern "C" fn release_fn_wrapper(props: &mut FFI_PlanProperties) {
unsafe {
debug_assert!(!props.private_data.is_null());
let private_data =
Box::from_raw(props.private_data as *mut PlanPropertiesPrivateData);
drop(private_data);
props.private_data = std::ptr::null_mut();
}
}
impl Drop for FFI_PlanProperties {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}
impl From<&PlanProperties> for FFI_PlanProperties {
fn from(props: &PlanProperties) -> Self {
let private_data = Box::new(PlanPropertiesPrivateData {
props: props.clone(),
});
FFI_PlanProperties {
output_partitioning: output_partitioning_fn_wrapper,
emission_type: emission_type_fn_wrapper,
boundedness: boundedness_fn_wrapper,
output_ordering: output_ordering_fn_wrapper,
schema: schema_fn_wrapper,
release: release_fn_wrapper,
private_data: Box::into_raw(private_data) as *mut c_void,
library_marker_id: crate::get_library_marker_id,
}
}
}
impl TryFrom<FFI_PlanProperties> for PlanProperties {
type Error = DataFusionError;
fn try_from(ffi_props: FFI_PlanProperties) -> Result<Self, Self::Error> {
if (ffi_props.library_marker_id)() == crate::get_library_marker_id() {
return Ok(ffi_props.inner().clone());
}
let ffi_schema = unsafe { (ffi_props.schema)(&ffi_props) };
let schema = (&ffi_schema.0).try_into()?;
let ffi_orderings: Option<RVec<FFI_PhysicalSortExpr>> =
unsafe { (ffi_props.output_ordering)(&ffi_props) }.into();
let sort_exprs = ffi_orderings
.map(|ordering_vec| {
ordering_vec
.iter()
.map(PhysicalSortExpr::from)
.collect::<Vec<_>>()
})
.unwrap_or_default();
let partitioning = unsafe { (ffi_props.output_partitioning)(&ffi_props) };
let eq_properties = if sort_exprs.is_empty() {
EquivalenceProperties::new(Arc::new(schema))
} else {
EquivalenceProperties::new_with_orderings(Arc::new(schema), [sort_exprs])
};
let emission_type: EmissionType =
unsafe { (ffi_props.emission_type)(&ffi_props).into() };
let boundedness: Boundedness =
unsafe { (ffi_props.boundedness)(&ffi_props).into() };
Ok(PlanProperties::new(
eq_properties,
(&partitioning).into(),
emission_type,
boundedness,
))
}
}
#[repr(C)]
#[derive(Clone, StableAbi)]
pub enum FFI_Boundedness {
Bounded,
Unbounded { requires_infinite_memory: bool },
}
impl From<Boundedness> for FFI_Boundedness {
fn from(value: Boundedness) -> Self {
match value {
Boundedness::Bounded => FFI_Boundedness::Bounded,
Boundedness::Unbounded {
requires_infinite_memory,
} => FFI_Boundedness::Unbounded {
requires_infinite_memory,
},
}
}
}
impl From<FFI_Boundedness> for Boundedness {
fn from(value: FFI_Boundedness) -> Self {
match value {
FFI_Boundedness::Bounded => Boundedness::Bounded,
FFI_Boundedness::Unbounded {
requires_infinite_memory,
} => Boundedness::Unbounded {
requires_infinite_memory,
},
}
}
}
#[repr(C)]
#[derive(Clone, StableAbi)]
pub enum FFI_EmissionType {
Incremental,
Final,
Both,
}
impl From<EmissionType> for FFI_EmissionType {
fn from(value: EmissionType) -> Self {
match value {
EmissionType::Incremental => FFI_EmissionType::Incremental,
EmissionType::Final => FFI_EmissionType::Final,
EmissionType::Both => FFI_EmissionType::Both,
}
}
}
impl From<FFI_EmissionType> for EmissionType {
fn from(value: FFI_EmissionType) -> Self {
match value {
FFI_EmissionType::Incremental => EmissionType::Incremental,
FFI_EmissionType::Final => EmissionType::Final,
FFI_EmissionType::Both => EmissionType::Both,
}
}
}
#[cfg(test)]
mod tests {
use datafusion::physical_expr::PhysicalSortExpr;
use datafusion::physical_plan::Partitioning;
use super::*;
fn create_test_props() -> Result<PlanProperties> {
use arrow::datatypes::{DataType, Field, Schema};
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
let mut eqp = EquivalenceProperties::new(Arc::clone(&schema));
let _ = eqp.reorder([PhysicalSortExpr::new_default(
datafusion::physical_plan::expressions::col("a", &schema)?,
)]);
Ok(PlanProperties::new(
eqp,
Partitioning::RoundRobinBatch(3),
EmissionType::Incremental,
Boundedness::Bounded,
))
}
#[test]
fn test_round_trip_ffi_plan_properties() -> Result<()> {
let original_props = create_test_props()?;
let mut local_props_ptr = FFI_PlanProperties::from(&original_props);
local_props_ptr.library_marker_id = crate::mock_foreign_marker_id;
let foreign_props: PlanProperties = local_props_ptr.try_into()?;
assert_eq!(format!("{foreign_props:?}"), format!("{original_props:?}"));
Ok(())
}
#[test]
fn test_ffi_plan_properties_local_bypass() -> Result<()> {
let props = create_test_props()?;
let ffi_plan = FFI_PlanProperties::from(&props);
let foreign_plan: PlanProperties = ffi_plan.try_into()?;
assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}"));
let mut ffi_plan = FFI_PlanProperties::from(&props);
ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
let foreign_plan: PlanProperties = ffi_plan.try_into()?;
assert_eq!(format!("{foreign_plan:?}"), format!("{props:?}"));
Ok(())
}
}