Skip to main content

datafusion_ffi/
physical_optimizer.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use datafusion_common::config::ConfigOptions;
23use datafusion_common::error::Result;
24use datafusion_physical_optimizer::PhysicalOptimizerRule;
25use datafusion_physical_plan::ExecutionPlan;
26use stabby::string::String as SString;
27use tokio::runtime::Handle;
28
29use crate::config::FFI_ConfigOptions;
30use crate::execution_plan::FFI_ExecutionPlan;
31use crate::util::FFI_Result;
32use crate::{df_result, sresult_return};
33
34/// A stable struct for sharing [`PhysicalOptimizerRule`] across FFI boundaries.
35#[repr(C)]
36#[derive(Debug)]
37pub struct FFI_PhysicalOptimizerRule {
38    pub optimize: unsafe extern "C" fn(
39        &Self,
40        plan: &FFI_ExecutionPlan,
41        config: FFI_ConfigOptions,
42    ) -> FFI_Result<FFI_ExecutionPlan>,
43
44    pub name: unsafe extern "C" fn(&Self) -> SString,
45
46    pub schema_check: unsafe extern "C" fn(&Self) -> bool,
47
48    /// Used to create a clone on the rule. This should
49    /// only need to be called by the receiver of the plan.
50    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
51
52    /// Release the memory of the private data when it is no longer being used.
53    pub release: unsafe extern "C" fn(arg: &mut Self),
54
55    /// Return the major DataFusion version number of this rule.
56    pub version: unsafe extern "C" fn() -> u64,
57
58    /// Internal data. This is only to be accessed by the provider of the rule.
59    /// A [`ForeignPhysicalOptimizerRule`] should never attempt to access this data.
60    pub private_data: *mut c_void,
61
62    /// Utility to identify when FFI objects are accessed locally through
63    /// the foreign interface.
64    pub library_marker_id: extern "C" fn() -> usize,
65}
66
67unsafe impl Send for FFI_PhysicalOptimizerRule {}
68unsafe impl Sync for FFI_PhysicalOptimizerRule {}
69
70struct RulePrivateData {
71    rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
72    runtime: Option<Handle>,
73}
74
75impl FFI_PhysicalOptimizerRule {
76    fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> {
77        let private_data = self.private_data as *const RulePrivateData;
78        unsafe { &(*private_data).rule }
79    }
80
81    fn runtime(&self) -> Option<Handle> {
82        let private_data = self.private_data as *const RulePrivateData;
83        unsafe { (*private_data).runtime.clone() }
84    }
85}
86
87unsafe extern "C" fn optimize_fn_wrapper(
88    rule: &FFI_PhysicalOptimizerRule,
89    plan: &FFI_ExecutionPlan,
90    config: FFI_ConfigOptions,
91) -> FFI_Result<FFI_ExecutionPlan> {
92    let runtime = rule.runtime();
93    let rule = rule.inner();
94    let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
95    let config = sresult_return!(ConfigOptions::try_from(config));
96    let optimized_plan = sresult_return!(rule.optimize(plan, &config));
97
98    FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
99}
100
101unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString {
102    let rule = rule.inner();
103    rule.name().into()
104}
105
106unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool {
107    rule.inner().schema_check()
108}
109
110unsafe extern "C" fn release_fn_wrapper(rule: &mut FFI_PhysicalOptimizerRule) {
111    unsafe {
112        debug_assert!(!rule.private_data.is_null());
113        let private_data = Box::from_raw(rule.private_data as *mut RulePrivateData);
114        drop(private_data);
115        rule.private_data = std::ptr::null_mut();
116    }
117}
118
119unsafe extern "C" fn clone_fn_wrapper(
120    rule: &FFI_PhysicalOptimizerRule,
121) -> FFI_PhysicalOptimizerRule {
122    let runtime = rule.runtime();
123    let rule = Arc::clone(rule.inner());
124
125    let private_data =
126        Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void;
127
128    FFI_PhysicalOptimizerRule {
129        optimize: optimize_fn_wrapper,
130        name: name_fn_wrapper,
131        schema_check: schema_check_fn_wrapper,
132        clone: clone_fn_wrapper,
133        release: release_fn_wrapper,
134        version: super::version,
135        private_data,
136        library_marker_id: crate::get_library_marker_id,
137    }
138}
139
140impl Drop for FFI_PhysicalOptimizerRule {
141    fn drop(&mut self) {
142        unsafe { (self.release)(self) }
143    }
144}
145
146impl FFI_PhysicalOptimizerRule {
147    /// Creates a new [`FFI_PhysicalOptimizerRule`].
148    pub fn new(
149        rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
150        runtime: Option<Handle>,
151    ) -> Self {
152        if let Some(rule) = (Arc::clone(&rule) as Arc<dyn std::any::Any>)
153            .downcast_ref::<ForeignPhysicalOptimizerRule>()
154        {
155            return rule.rule.clone();
156        }
157
158        let private_data = Box::new(RulePrivateData { rule, runtime });
159        let private_data = Box::into_raw(private_data) as *mut c_void;
160
161        Self {
162            optimize: optimize_fn_wrapper,
163            name: name_fn_wrapper,
164            schema_check: schema_check_fn_wrapper,
165            clone: clone_fn_wrapper,
166            release: release_fn_wrapper,
167            version: super::version,
168            private_data,
169            library_marker_id: crate::get_library_marker_id,
170        }
171    }
172}
173
174/// This wrapper struct exists on the receiver side of the FFI interface, so it has
175/// no guarantees about being able to access the data in `private_data`. Any functions
176/// defined on this struct must only use the stable functions provided in
177/// FFI_PhysicalOptimizerRule to interact with the foreign rule.
178#[derive(Debug)]
179pub struct ForeignPhysicalOptimizerRule {
180    name: String,
181    rule: FFI_PhysicalOptimizerRule,
182}
183
184unsafe impl Send for ForeignPhysicalOptimizerRule {}
185unsafe impl Sync for ForeignPhysicalOptimizerRule {}
186
187impl From<&FFI_PhysicalOptimizerRule> for Arc<dyn PhysicalOptimizerRule + Send + Sync> {
188    fn from(rule: &FFI_PhysicalOptimizerRule) -> Self {
189        if (rule.library_marker_id)() == crate::get_library_marker_id() {
190            return Arc::clone(rule.inner());
191        }
192
193        let name: String = unsafe { (rule.name)(rule).into() };
194        Arc::new(ForeignPhysicalOptimizerRule {
195            name,
196            rule: rule.clone(),
197        }) as Arc<dyn PhysicalOptimizerRule + Send + Sync>
198    }
199}
200
201impl Clone for FFI_PhysicalOptimizerRule {
202    fn clone(&self) -> Self {
203        unsafe { (self.clone)(self) }
204    }
205}
206
207#[async_trait]
208impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
209    fn optimize(
210        &self,
211        plan: Arc<dyn ExecutionPlan>,
212        config: &ConfigOptions,
213    ) -> Result<Arc<dyn ExecutionPlan>> {
214        let config_options: FFI_ConfigOptions = config.into();
215        let plan = FFI_ExecutionPlan::new(plan, None);
216
217        let optimized_plan = unsafe {
218            df_result!((self.rule.optimize)(&self.rule, &plan, config_options))?
219        };
220        (&optimized_plan).try_into()
221    }
222
223    fn name(&self) -> &str {
224        &self.name
225    }
226
227    fn schema_check(&self) -> bool {
228        unsafe { (self.rule.schema_check)(&self.rule) }
229    }
230}
231
232#[cfg(test)]
233mod tests {
234    use std::sync::Arc;
235
236    use arrow::datatypes::{DataType, Field, Schema};
237    use datafusion_common::config::ConfigOptions;
238    use datafusion_common::error::Result;
239    use datafusion_physical_optimizer::PhysicalOptimizerRule;
240    use datafusion_physical_plan::ExecutionPlan;
241
242    use super::*;
243    use crate::execution_plan::tests::EmptyExec;
244
245    #[derive(Debug)]
246    struct NoOpRule {
247        schema_check: bool,
248    }
249
250    impl PhysicalOptimizerRule for NoOpRule {
251        fn optimize(
252            &self,
253            plan: Arc<dyn ExecutionPlan>,
254            _config: &ConfigOptions,
255        ) -> Result<Arc<dyn ExecutionPlan>> {
256            Ok(plan)
257        }
258
259        fn name(&self) -> &str {
260            "no_op_rule"
261        }
262
263        fn schema_check(&self) -> bool {
264            self.schema_check
265        }
266    }
267
268    fn create_test_plan() -> Arc<dyn ExecutionPlan> {
269        let schema =
270            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
271        Arc::new(EmptyExec::new(schema))
272    }
273
274    #[test]
275    fn test_round_trip_ffi_physical_optimizer_rule() -> Result<()> {
276        for expected_schema_check in [true, false] {
277            let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(NoOpRule {
278                schema_check: expected_schema_check,
279            });
280
281            let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
282            ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
283
284            let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
285                (&ffi_rule).into();
286
287            assert_eq!(foreign_rule.name(), "no_op_rule");
288            assert_eq!(foreign_rule.schema_check(), expected_schema_check);
289        }
290
291        Ok(())
292    }
293
294    #[test]
295    fn test_round_trip_optimize() -> Result<()> {
296        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
297            Arc::new(NoOpRule { schema_check: true });
298
299        let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
300        ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
301
302        let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
303            (&ffi_rule).into();
304
305        let plan = create_test_plan();
306        let config = ConfigOptions::new();
307
308        let optimized = foreign_rule.optimize(plan, &config)?;
309        assert_eq!(optimized.name(), "empty-exec");
310
311        Ok(())
312    }
313
314    #[test]
315    fn test_local_bypass() -> Result<()> {
316        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
317            Arc::new(NoOpRule { schema_check: true });
318
319        // Without mock marker, local bypass should return the original rule
320        let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
321        let recovered: Arc<dyn PhysicalOptimizerRule + Send + Sync> = (&ffi_rule).into();
322        let any_ref: &dyn std::any::Any = &*recovered;
323        assert!(any_ref.downcast_ref::<NoOpRule>().is_some());
324
325        // With mock marker, should wrap in ForeignPhysicalOptimizerRule
326        let rule2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
327            Arc::new(NoOpRule { schema_check: true });
328        let mut ffi_rule2 = FFI_PhysicalOptimizerRule::new(rule2, None);
329        ffi_rule2.library_marker_id = crate::mock_foreign_marker_id;
330        let recovered2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
331            (&ffi_rule2).into();
332        let any_ref2: &dyn std::any::Any = &*recovered2;
333        assert!(
334            any_ref2
335                .downcast_ref::<ForeignPhysicalOptimizerRule>()
336                .is_some()
337        );
338
339        Ok(())
340    }
341
342    #[test]
343    fn test_clone() -> Result<()> {
344        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
345            Arc::new(NoOpRule { schema_check: true });
346
347        let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
348        let cloned = ffi_rule.clone();
349
350        let name1: String = unsafe { (ffi_rule.name)(&ffi_rule).into() };
351        let name2: String = unsafe { (cloned.name)(&cloned).into() };
352        assert_eq!(name1, name2);
353
354        Ok(())
355    }
356
357    #[test]
358    fn test_foreign_rule_rewrap_bypass() -> Result<()> {
359        // When creating an FFI wrapper from a ForeignPhysicalOptimizerRule,
360        // it should return the inner FFI rule rather than double-wrapping.
361        let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
362            Arc::new(NoOpRule { schema_check: true });
363
364        let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
365        ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
366
367        let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
368            (&ffi_rule).into();
369
370        // Now wrap the foreign rule back into FFI - should not double-wrap
371        let re_wrapped = FFI_PhysicalOptimizerRule::new(foreign_rule, None);
372        let name: String = unsafe { (re_wrapped.name)(&re_wrapped).into() };
373        assert_eq!(name, "no_op_rule");
374
375        Ok(())
376    }
377}