datafusion_ffi/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21    std_types::{RResult, RString, RVec},
22    StableAbi,
23};
24use datafusion::{
25    error::DataFusionError,
26    execution::{SendableRecordBatchStream, TaskContext},
27    physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
28};
29use datafusion::{error::Result, physical_plan::DisplayFormatType};
30use tokio::runtime::Handle;
31
32use crate::{
33    df_result, plan_properties::FFI_PlanProperties,
34    record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
38#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42    /// Return the plan properties
43    pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45    /// Return a vector of children plans
46    pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48    /// Return the plan name.
49    pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51    /// Execute the plan and return a record batch stream. Errors
52    /// will be returned as a string.
53    pub execute: unsafe extern "C" fn(
54        plan: &Self,
55        partition: usize,
56    ) -> RResult<FFI_RecordBatchStream, RString>,
57
58    /// Used to create a clone on the provider of the execution plan. This should
59    /// only need to be called by the receiver of the plan.
60    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62    /// Release the memory of the private data when it is no longer being used.
63    pub release: unsafe extern "C" fn(arg: &mut Self),
64
65    /// Internal data. This is only to be accessed by the provider of the plan.
66    /// A [`ForeignExecutionPlan`] should never attempt to access this data.
67    pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74    pub plan: Arc<dyn ExecutionPlan>,
75    pub context: Arc<TaskContext>,
76    pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80    plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83    let plan = &(*private_data).plan;
84
85    plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89    plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92    let plan = &(*private_data).plan;
93    let ctx = &(*private_data).context;
94    let runtime = &(*private_data).runtime;
95
96    let children: Vec<_> = plan
97        .children()
98        .into_iter()
99        .map(|child| {
100            FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101        })
102        .collect();
103
104    children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108    plan: &FFI_ExecutionPlan,
109    partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112    let plan = &(*private_data).plan;
113    let ctx = &(*private_data).context;
114    let runtime = (*private_data).runtime.clone();
115
116    rresult!(plan
117        .execute(partition, Arc::clone(ctx))
118        .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123    let plan = &(*private_data).plan;
124
125    plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129    let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130    drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135    let plan_data = &(*private_data);
136
137    FFI_ExecutionPlan::new(
138        Arc::clone(&plan_data.plan),
139        Arc::clone(&plan_data.context),
140        plan_data.runtime.clone(),
141    )
142}
143
144impl Clone for FFI_ExecutionPlan {
145    fn clone(&self) -> Self {
146        unsafe { (self.clone)(self) }
147    }
148}
149
150impl FFI_ExecutionPlan {
151    /// This function is called on the provider's side.
152    pub fn new(
153        plan: Arc<dyn ExecutionPlan>,
154        context: Arc<TaskContext>,
155        runtime: Option<Handle>,
156    ) -> Self {
157        let private_data = Box::new(ExecutionPlanPrivateData {
158            plan,
159            context,
160            runtime,
161        });
162
163        Self {
164            properties: properties_fn_wrapper,
165            children: children_fn_wrapper,
166            name: name_fn_wrapper,
167            execute: execute_fn_wrapper,
168            clone: clone_fn_wrapper,
169            release: release_fn_wrapper,
170            private_data: Box::into_raw(private_data) as *mut c_void,
171        }
172    }
173}
174
175impl Drop for FFI_ExecutionPlan {
176    fn drop(&mut self) {
177        unsafe { (self.release)(self) }
178    }
179}
180
181/// This struct is used to access an execution plan provided by a foreign
182/// library across a FFI boundary.
183///
184/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
185/// no knowledge or access to the private data. All interaction with the plan
186/// must occur through the functions defined in FFI_ExecutionPlan.
187#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189    name: String,
190    plan: FFI_ExecutionPlan,
191    properties: PlanProperties,
192    children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199    fn fmt_as(
200        &self,
201        t: DisplayFormatType,
202        f: &mut std::fmt::Formatter,
203    ) -> std::fmt::Result {
204        match t {
205            DisplayFormatType::Default | DisplayFormatType::Verbose => {
206                write!(
207                    f,
208                    "FFI_ExecutionPlan(number_of_children={})",
209                    self.children.len(),
210                )
211            }
212            DisplayFormatType::TreeRender => {
213                // TODO: collect info
214                write!(f, "")
215            }
216        }
217    }
218}
219
220impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
221    type Error = DataFusionError;
222
223    fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
224        unsafe {
225            let name = (plan.name)(plan).into();
226
227            let properties: PlanProperties = (plan.properties)(plan).try_into()?;
228
229            let children_rvec = (plan.children)(plan);
230            let children = children_rvec
231                .iter()
232                .map(ForeignExecutionPlan::try_from)
233                .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
234                .collect::<Result<Vec<_>>>()?;
235
236            Ok(Self {
237                name,
238                plan: plan.clone(),
239                properties,
240                children,
241            })
242        }
243    }
244}
245
246impl ExecutionPlan for ForeignExecutionPlan {
247    fn name(&self) -> &str {
248        &self.name
249    }
250
251    fn as_any(&self) -> &dyn std::any::Any {
252        self
253    }
254
255    fn properties(&self) -> &PlanProperties {
256        &self.properties
257    }
258
259    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
260        self.children
261            .iter()
262            .map(|p| p as &Arc<dyn ExecutionPlan>)
263            .collect()
264    }
265
266    fn with_new_children(
267        self: Arc<Self>,
268        children: Vec<Arc<dyn ExecutionPlan>>,
269    ) -> Result<Arc<dyn ExecutionPlan>> {
270        Ok(Arc::new(ForeignExecutionPlan {
271            plan: self.plan.clone(),
272            name: self.name.clone(),
273            children,
274            properties: self.properties.clone(),
275        }))
276    }
277
278    fn execute(
279        &self,
280        partition: usize,
281        _context: Arc<TaskContext>,
282    ) -> Result<SendableRecordBatchStream> {
283        unsafe {
284            df_result!((self.plan.execute)(&self.plan, partition))
285                .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
286        }
287    }
288}
289
290#[cfg(test)]
291mod tests {
292    use arrow::datatypes::{DataType, Field, Schema};
293    use datafusion::{
294        physical_plan::{
295            execution_plan::{Boundedness, EmissionType},
296            Partitioning,
297        },
298        prelude::SessionContext,
299    };
300
301    use super::*;
302
303    #[derive(Debug)]
304    pub struct EmptyExec {
305        props: PlanProperties,
306        children: Vec<Arc<dyn ExecutionPlan>>,
307    }
308
309    impl EmptyExec {
310        pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
311            Self {
312                props: PlanProperties::new(
313                    datafusion::physical_expr::EquivalenceProperties::new(schema),
314                    Partitioning::UnknownPartitioning(3),
315                    EmissionType::Incremental,
316                    Boundedness::Bounded,
317                ),
318                children: Vec::default(),
319            }
320        }
321    }
322
323    impl DisplayAs for EmptyExec {
324        fn fmt_as(
325            &self,
326            _t: DisplayFormatType,
327            _f: &mut std::fmt::Formatter,
328        ) -> std::fmt::Result {
329            unimplemented!()
330        }
331    }
332
333    impl ExecutionPlan for EmptyExec {
334        fn name(&self) -> &'static str {
335            "empty-exec"
336        }
337
338        fn as_any(&self) -> &dyn std::any::Any {
339            self
340        }
341
342        fn properties(&self) -> &PlanProperties {
343            &self.props
344        }
345
346        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
347            self.children.iter().collect()
348        }
349
350        fn with_new_children(
351            self: Arc<Self>,
352            children: Vec<Arc<dyn ExecutionPlan>>,
353        ) -> Result<Arc<dyn ExecutionPlan>> {
354            Ok(Arc::new(EmptyExec {
355                props: self.props.clone(),
356                children,
357            }))
358        }
359
360        fn execute(
361            &self,
362            _partition: usize,
363            _context: Arc<TaskContext>,
364        ) -> Result<SendableRecordBatchStream> {
365            unimplemented!()
366        }
367
368        fn statistics(&self) -> Result<datafusion::common::Statistics> {
369            unimplemented!()
370        }
371    }
372
373    #[test]
374    fn test_round_trip_ffi_execution_plan() -> Result<()> {
375        let schema =
376            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
377        let ctx = SessionContext::new();
378
379        let original_plan = Arc::new(EmptyExec::new(schema));
380        let original_name = original_plan.name().to_string();
381
382        let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
383
384        let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
385
386        assert!(original_name == foreign_plan.name());
387
388        let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
389            &foreign_plan,
390        );
391
392        let buf = display.one_line().to_string();
393        assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)");
394
395        Ok(())
396    }
397
398    #[test]
399    fn test_ffi_execution_plan_children() -> Result<()> {
400        let schema =
401            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
402        let ctx = SessionContext::new();
403
404        // Version 1: Adding child to the foreign plan
405        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
406        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
407        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
408
409        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
410        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
411        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
412
413        assert_eq!(parent_foreign.children().len(), 0);
414        assert_eq!(child_foreign.children().len(), 0);
415
416        let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
417        assert_eq!(parent_foreign.children().len(), 1);
418
419        // Version 2: Adding child to the local plan
420        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
421        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
422        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
423
424        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
425        let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
426        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
427        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
428
429        assert_eq!(parent_foreign.children().len(), 1);
430
431        Ok(())
432    }
433}