use std::ffi::c_void;
use std::sync::Arc;
use async_trait::async_trait;
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
use stabby::string::String as SString;
use tokio::runtime::Handle;
use crate::config::FFI_ConfigOptions;
use crate::execution_plan::FFI_ExecutionPlan;
use crate::util::FFI_Result;
use crate::{df_result, sresult_return};
#[repr(C)]
#[derive(Debug)]
pub struct FFI_PhysicalOptimizerRule {
pub optimize: unsafe extern "C" fn(
&Self,
plan: &FFI_ExecutionPlan,
config: FFI_ConfigOptions,
) -> FFI_Result<FFI_ExecutionPlan>,
pub name: unsafe extern "C" fn(&Self) -> SString,
pub schema_check: unsafe extern "C" fn(&Self) -> bool,
pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
pub release: unsafe extern "C" fn(arg: &mut Self),
pub version: unsafe extern "C" fn() -> u64,
pub private_data: *mut c_void,
pub library_marker_id: extern "C" fn() -> usize,
}
unsafe impl Send for FFI_PhysicalOptimizerRule {}
unsafe impl Sync for FFI_PhysicalOptimizerRule {}
struct RulePrivateData {
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
runtime: Option<Handle>,
}
impl FFI_PhysicalOptimizerRule {
fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> {
let private_data = self.private_data as *const RulePrivateData;
unsafe { &(*private_data).rule }
}
fn runtime(&self) -> Option<Handle> {
let private_data = self.private_data as *const RulePrivateData;
unsafe { (*private_data).runtime.clone() }
}
}
unsafe extern "C" fn optimize_fn_wrapper(
rule: &FFI_PhysicalOptimizerRule,
plan: &FFI_ExecutionPlan,
config: FFI_ConfigOptions,
) -> FFI_Result<FFI_ExecutionPlan> {
let runtime = rule.runtime();
let rule = rule.inner();
let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
let config = sresult_return!(ConfigOptions::try_from(config));
let optimized_plan = sresult_return!(rule.optimize(plan, &config));
FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
}
unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString {
let rule = rule.inner();
rule.name().into()
}
unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool {
rule.inner().schema_check()
}
unsafe extern "C" fn release_fn_wrapper(rule: &mut FFI_PhysicalOptimizerRule) {
unsafe {
debug_assert!(!rule.private_data.is_null());
let private_data = Box::from_raw(rule.private_data as *mut RulePrivateData);
drop(private_data);
rule.private_data = std::ptr::null_mut();
}
}
unsafe extern "C" fn clone_fn_wrapper(
rule: &FFI_PhysicalOptimizerRule,
) -> FFI_PhysicalOptimizerRule {
let runtime = rule.runtime();
let rule = Arc::clone(rule.inner());
let private_data =
Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void;
FFI_PhysicalOptimizerRule {
optimize: optimize_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
library_marker_id: crate::get_library_marker_id,
}
}
impl Drop for FFI_PhysicalOptimizerRule {
fn drop(&mut self) {
unsafe { (self.release)(self) }
}
}
impl FFI_PhysicalOptimizerRule {
pub fn new(
rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
runtime: Option<Handle>,
) -> Self {
if let Some(rule) = (Arc::clone(&rule) as Arc<dyn std::any::Any>)
.downcast_ref::<ForeignPhysicalOptimizerRule>()
{
return rule.rule.clone();
}
let private_data = Box::new(RulePrivateData { rule, runtime });
let private_data = Box::into_raw(private_data) as *mut c_void;
Self {
optimize: optimize_fn_wrapper,
name: name_fn_wrapper,
schema_check: schema_check_fn_wrapper,
clone: clone_fn_wrapper,
release: release_fn_wrapper,
version: super::version,
private_data,
library_marker_id: crate::get_library_marker_id,
}
}
}
#[derive(Debug)]
pub struct ForeignPhysicalOptimizerRule {
name: String,
rule: FFI_PhysicalOptimizerRule,
}
unsafe impl Send for ForeignPhysicalOptimizerRule {}
unsafe impl Sync for ForeignPhysicalOptimizerRule {}
impl From<&FFI_PhysicalOptimizerRule> for Arc<dyn PhysicalOptimizerRule + Send + Sync> {
fn from(rule: &FFI_PhysicalOptimizerRule) -> Self {
if (rule.library_marker_id)() == crate::get_library_marker_id() {
return Arc::clone(rule.inner());
}
let name: String = unsafe { (rule.name)(rule).into() };
Arc::new(ForeignPhysicalOptimizerRule {
name,
rule: rule.clone(),
}) as Arc<dyn PhysicalOptimizerRule + Send + Sync>
}
}
impl Clone for FFI_PhysicalOptimizerRule {
fn clone(&self) -> Self {
unsafe { (self.clone)(self) }
}
}
#[async_trait]
impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
let config_options: FFI_ConfigOptions = config.into();
let plan = FFI_ExecutionPlan::new(plan, None);
let optimized_plan = unsafe {
df_result!((self.rule.optimize)(&self.rule, &plan, config_options))?
};
(&optimized_plan).try_into()
}
fn name(&self) -> &str {
&self.name
}
fn schema_check(&self) -> bool {
unsafe { (self.rule.schema_check)(&self.rule) }
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow::datatypes::{DataType, Field, Schema};
use datafusion_common::config::ConfigOptions;
use datafusion_common::error::Result;
use datafusion_physical_optimizer::PhysicalOptimizerRule;
use datafusion_physical_plan::ExecutionPlan;
use super::*;
use crate::execution_plan::tests::EmptyExec;
#[derive(Debug)]
struct NoOpRule {
schema_check: bool,
}
impl PhysicalOptimizerRule for NoOpRule {
fn optimize(
&self,
plan: Arc<dyn ExecutionPlan>,
_config: &ConfigOptions,
) -> Result<Arc<dyn ExecutionPlan>> {
Ok(plan)
}
fn name(&self) -> &str {
"no_op_rule"
}
fn schema_check(&self) -> bool {
self.schema_check
}
}
fn create_test_plan() -> Arc<dyn ExecutionPlan> {
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
Arc::new(EmptyExec::new(schema))
}
#[test]
fn test_round_trip_ffi_physical_optimizer_rule() -> Result<()> {
for expected_schema_check in [true, false] {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(NoOpRule {
schema_check: expected_schema_check,
});
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule).into();
assert_eq!(foreign_rule.name(), "no_op_rule");
assert_eq!(foreign_rule.schema_check(), expected_schema_check);
}
Ok(())
}
#[test]
fn test_round_trip_optimize() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(NoOpRule { schema_check: true });
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule).into();
let plan = create_test_plan();
let config = ConfigOptions::new();
let optimized = foreign_rule.optimize(plan, &config)?;
assert_eq!(optimized.name(), "empty-exec");
Ok(())
}
#[test]
fn test_local_bypass() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(NoOpRule { schema_check: true });
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
let recovered: Arc<dyn PhysicalOptimizerRule + Send + Sync> = (&ffi_rule).into();
let any_ref: &dyn std::any::Any = &*recovered;
assert!(any_ref.downcast_ref::<NoOpRule>().is_some());
let rule2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(NoOpRule { schema_check: true });
let mut ffi_rule2 = FFI_PhysicalOptimizerRule::new(rule2, None);
ffi_rule2.library_marker_id = crate::mock_foreign_marker_id;
let recovered2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule2).into();
let any_ref2: &dyn std::any::Any = &*recovered2;
assert!(
any_ref2
.downcast_ref::<ForeignPhysicalOptimizerRule>()
.is_some()
);
Ok(())
}
#[test]
fn test_clone() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(NoOpRule { schema_check: true });
let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
let cloned = ffi_rule.clone();
let name1: String = unsafe { (ffi_rule.name)(&ffi_rule).into() };
let name2: String = unsafe { (cloned.name)(&cloned).into() };
assert_eq!(name1, name2);
Ok(())
}
#[test]
fn test_foreign_rule_rewrap_bypass() -> Result<()> {
let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
Arc::new(NoOpRule { schema_check: true });
let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
(&ffi_rule).into();
let re_wrapped = FFI_PhysicalOptimizerRule::new(foreign_rule, None);
let name: String = unsafe { (re_wrapped.name)(&re_wrapped).into() };
assert_eq!(name, "no_op_rule");
Ok(())
}
}