datafusion_ffi/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{RString, RVec};
24use datafusion_common::{DataFusionError, Result};
25use datafusion_execution::{SendableRecordBatchStream, TaskContext};
26use datafusion_physical_plan::{
27    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
28};
29use tokio::runtime::Handle;
30
31use crate::execution::FFI_TaskContext;
32use crate::plan_properties::FFI_PlanProperties;
33use crate::record_batch_stream::FFI_RecordBatchStream;
34use crate::util::FFIResult;
35use crate::{df_result, rresult};
36
37/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
38#[repr(C)]
39#[derive(Debug, StableAbi)]
40pub struct FFI_ExecutionPlan {
41    /// Return the plan properties
42    pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
43
44    /// Return a vector of children plans
45    pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
46
47    /// Return the plan name.
48    pub name: unsafe extern "C" fn(plan: &Self) -> RString,
49
50    /// Execute the plan and return a record batch stream. Errors
51    /// will be returned as a string.
52    pub execute: unsafe extern "C" fn(
53        plan: &Self,
54        partition: usize,
55        context: FFI_TaskContext,
56    ) -> FFIResult<FFI_RecordBatchStream>,
57
58    /// Used to create a clone on the provider of the execution plan. This should
59    /// only need to be called by the receiver of the plan.
60    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62    /// Release the memory of the private data when it is no longer being used.
63    pub release: unsafe extern "C" fn(arg: &mut Self),
64
65    /// Internal data. This is only to be accessed by the provider of the plan.
66    /// A [`ForeignExecutionPlan`] should never attempt to access this data.
67    pub private_data: *mut c_void,
68
69    /// Utility to identify when FFI objects are accessed locally through
70    /// the foreign interface. See [`crate::get_library_marker_id`] and
71    /// the crate's `README.md` for more information.
72    pub library_marker_id: extern "C" fn() -> usize,
73}
74
75unsafe impl Send for FFI_ExecutionPlan {}
76unsafe impl Sync for FFI_ExecutionPlan {}
77
78pub struct ExecutionPlanPrivateData {
79    pub plan: Arc<dyn ExecutionPlan>,
80    pub runtime: Option<Handle>,
81}
82
83impl FFI_ExecutionPlan {
84    fn inner(&self) -> &Arc<dyn ExecutionPlan> {
85        let private_data = self.private_data as *const ExecutionPlanPrivateData;
86        unsafe { &(*private_data).plan }
87    }
88}
89
90unsafe extern "C" fn properties_fn_wrapper(
91    plan: &FFI_ExecutionPlan,
92) -> FFI_PlanProperties {
93    plan.inner().properties().into()
94}
95
96unsafe extern "C" fn children_fn_wrapper(
97    plan: &FFI_ExecutionPlan,
98) -> RVec<FFI_ExecutionPlan> {
99    unsafe {
100        let private_data = plan.private_data as *const ExecutionPlanPrivateData;
101        let plan = &(*private_data).plan;
102        let runtime = &(*private_data).runtime;
103
104        let children: Vec<_> = plan
105            .children()
106            .into_iter()
107            .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
108            .collect();
109
110        children.into()
111    }
112}
113
114unsafe extern "C" fn execute_fn_wrapper(
115    plan: &FFI_ExecutionPlan,
116    partition: usize,
117    context: FFI_TaskContext,
118) -> FFIResult<FFI_RecordBatchStream> {
119    unsafe {
120        let ctx = context.into();
121        let private_data = plan.private_data as *const ExecutionPlanPrivateData;
122        let plan = &(*private_data).plan;
123        let runtime = (*private_data).runtime.clone();
124
125        rresult!(
126            plan.execute(partition, ctx)
127                .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
128        )
129    }
130}
131
132unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
133    plan.inner().name().into()
134}
135
136unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
137    unsafe {
138        debug_assert!(!plan.private_data.is_null());
139        let private_data =
140            Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
141        drop(private_data);
142        plan.private_data = std::ptr::null_mut();
143    }
144}
145
146unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
147    unsafe {
148        let private_data = plan.private_data as *const ExecutionPlanPrivateData;
149        let plan_data = &(*private_data);
150
151        FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone())
152    }
153}
154
155impl Clone for FFI_ExecutionPlan {
156    fn clone(&self) -> Self {
157        unsafe { (self.clone)(self) }
158    }
159}
160
161impl FFI_ExecutionPlan {
162    /// This function is called on the provider's side.
163    pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
164        let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
165
166        Self {
167            properties: properties_fn_wrapper,
168            children: children_fn_wrapper,
169            name: name_fn_wrapper,
170            execute: execute_fn_wrapper,
171            clone: clone_fn_wrapper,
172            release: release_fn_wrapper,
173            private_data: Box::into_raw(private_data) as *mut c_void,
174            library_marker_id: crate::get_library_marker_id,
175        }
176    }
177}
178
179impl Drop for FFI_ExecutionPlan {
180    fn drop(&mut self) {
181        unsafe { (self.release)(self) }
182    }
183}
184
185/// This struct is used to access an execution plan provided by a foreign
186/// library across a FFI boundary.
187///
188/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
189/// no knowledge or access to the private data. All interaction with the plan
190/// must occur through the functions defined in FFI_ExecutionPlan.
191#[derive(Debug)]
192pub struct ForeignExecutionPlan {
193    name: String,
194    plan: FFI_ExecutionPlan,
195    properties: PlanProperties,
196    children: Vec<Arc<dyn ExecutionPlan>>,
197}
198
199unsafe impl Send for ForeignExecutionPlan {}
200unsafe impl Sync for ForeignExecutionPlan {}
201
202impl DisplayAs for ForeignExecutionPlan {
203    fn fmt_as(
204        &self,
205        t: DisplayFormatType,
206        f: &mut std::fmt::Formatter,
207    ) -> std::fmt::Result {
208        match t {
209            DisplayFormatType::Default | DisplayFormatType::Verbose => {
210                write!(
211                    f,
212                    "FFI_ExecutionPlan: {}, number_of_children={}",
213                    self.name,
214                    self.children.len(),
215                )
216            }
217            DisplayFormatType::TreeRender => {
218                // TODO: collect info
219                write!(f, "")
220            }
221        }
222    }
223}
224
225impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
226    type Error = DataFusionError;
227
228    fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
229        if (plan.library_marker_id)() == crate::get_library_marker_id() {
230            return Ok(Arc::clone(plan.inner()));
231        }
232
233        unsafe {
234            let name = (plan.name)(plan).into();
235
236            let properties: PlanProperties = (plan.properties)(plan).try_into()?;
237
238            let children_rvec = (plan.children)(plan);
239            let children = children_rvec
240                .iter()
241                .map(<Arc<dyn ExecutionPlan>>::try_from)
242                .collect::<Result<Vec<_>>>()?;
243
244            let plan = ForeignExecutionPlan {
245                name,
246                plan: plan.clone(),
247                properties,
248                children,
249            };
250
251            Ok(Arc::new(plan))
252        }
253    }
254}
255
256impl ExecutionPlan for ForeignExecutionPlan {
257    fn name(&self) -> &str {
258        &self.name
259    }
260
261    fn as_any(&self) -> &dyn std::any::Any {
262        self
263    }
264
265    fn properties(&self) -> &PlanProperties {
266        &self.properties
267    }
268
269    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
270        self.children.iter().collect()
271    }
272
273    fn with_new_children(
274        self: Arc<Self>,
275        children: Vec<Arc<dyn ExecutionPlan>>,
276    ) -> Result<Arc<dyn ExecutionPlan>> {
277        Ok(Arc::new(ForeignExecutionPlan {
278            plan: self.plan.clone(),
279            name: self.name.clone(),
280            children,
281            properties: self.properties.clone(),
282        }))
283    }
284
285    fn execute(
286        &self,
287        partition: usize,
288        context: Arc<TaskContext>,
289    ) -> Result<SendableRecordBatchStream> {
290        let context = FFI_TaskContext::from(context);
291        unsafe {
292            df_result!((self.plan.execute)(&self.plan, partition, context))
293                .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
294        }
295    }
296}
297
298#[cfg(test)]
299pub(crate) mod tests {
300    use arrow::datatypes::{DataType, Field, Schema};
301    use datafusion::physical_plan::Partitioning;
302    use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
303
304    use super::*;
305
306    #[derive(Debug)]
307    pub struct EmptyExec {
308        props: PlanProperties,
309        children: Vec<Arc<dyn ExecutionPlan>>,
310    }
311
312    impl EmptyExec {
313        pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
314            Self {
315                props: PlanProperties::new(
316                    datafusion::physical_expr::EquivalenceProperties::new(schema),
317                    Partitioning::UnknownPartitioning(3),
318                    EmissionType::Incremental,
319                    Boundedness::Bounded,
320                ),
321                children: Vec::default(),
322            }
323        }
324    }
325
326    impl DisplayAs for EmptyExec {
327        fn fmt_as(
328            &self,
329            _t: DisplayFormatType,
330            _f: &mut std::fmt::Formatter,
331        ) -> std::fmt::Result {
332            unimplemented!()
333        }
334    }
335
336    impl ExecutionPlan for EmptyExec {
337        fn name(&self) -> &'static str {
338            "empty-exec"
339        }
340
341        fn as_any(&self) -> &dyn std::any::Any {
342            self
343        }
344
345        fn properties(&self) -> &PlanProperties {
346            &self.props
347        }
348
349        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
350            self.children.iter().collect()
351        }
352
353        fn with_new_children(
354            self: Arc<Self>,
355            children: Vec<Arc<dyn ExecutionPlan>>,
356        ) -> Result<Arc<dyn ExecutionPlan>> {
357            Ok(Arc::new(EmptyExec {
358                props: self.props.clone(),
359                children,
360            }))
361        }
362
363        fn execute(
364            &self,
365            _partition: usize,
366            _context: Arc<TaskContext>,
367        ) -> Result<SendableRecordBatchStream> {
368            unimplemented!()
369        }
370
371        fn statistics(&self) -> Result<datafusion::common::Statistics> {
372            unimplemented!()
373        }
374    }
375
376    #[test]
377    fn test_round_trip_ffi_execution_plan() -> Result<()> {
378        let schema =
379            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
380
381        let original_plan = Arc::new(EmptyExec::new(schema));
382        let original_name = original_plan.name().to_string();
383
384        let mut local_plan = FFI_ExecutionPlan::new(original_plan, None);
385        local_plan.library_marker_id = crate::mock_foreign_marker_id;
386
387        let foreign_plan: Arc<dyn ExecutionPlan> = (&local_plan).try_into()?;
388
389        assert_eq!(original_name, foreign_plan.name());
390
391        let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
392            foreign_plan.as_ref(),
393        );
394
395        let buf = display.one_line().to_string();
396        assert_eq!(
397            buf.trim(),
398            "FFI_ExecutionPlan: empty-exec, number_of_children=0"
399        );
400
401        Ok(())
402    }
403
404    #[test]
405    fn test_ffi_execution_plan_children() -> Result<()> {
406        let schema =
407            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
408
409        // Version 1: Adding child to the foreign plan
410        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
411        let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
412        child_local.library_marker_id = crate::mock_foreign_marker_id;
413        let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
414
415        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
416        let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
417        parent_local.library_marker_id = crate::mock_foreign_marker_id;
418        let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
419
420        assert_eq!(parent_foreign.children().len(), 0);
421        assert_eq!(child_foreign.children().len(), 0);
422
423        let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
424        assert_eq!(parent_foreign.children().len(), 1);
425
426        // Version 2: Adding child to the local plan
427        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
428        let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
429        child_local.library_marker_id = crate::mock_foreign_marker_id;
430        let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
431
432        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
433        let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
434        let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
435        parent_local.library_marker_id = crate::mock_foreign_marker_id;
436        let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
437
438        assert_eq!(parent_foreign.children().len(), 1);
439
440        Ok(())
441    }
442
443    #[test]
444    fn test_ffi_execution_plan_local_bypass() {
445        let schema =
446            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
447
448        let plan = Arc::new(EmptyExec::new(schema));
449
450        let mut ffi_plan = FFI_ExecutionPlan::new(plan, None);
451
452        // Verify local libraries can be downcast to their original
453        let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
454        assert!(foreign_plan.as_any().downcast_ref::<EmptyExec>().is_some());
455
456        // Verify different library markers generate foreign providers
457        ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
458        let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
459        assert!(
460            foreign_plan
461                .as_any()
462                .downcast_ref::<ForeignExecutionPlan>()
463                .is_some()
464        );
465    }
466}