datafusion_ffi/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21    std_types::{RResult, RString, RVec},
22    StableAbi,
23};
24use datafusion::error::Result;
25use datafusion::{
26    error::DataFusionError,
27    execution::{SendableRecordBatchStream, TaskContext},
28    physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
29};
30use tokio::runtime::Handle;
31
32use crate::{
33    df_result, plan_properties::FFI_PlanProperties,
34    record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
38#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42    /// Return the plan properties
43    pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45    /// Return a vector of children plans
46    pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48    /// Return the plan name.
49    pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51    /// Execute the plan and return a record batch stream. Errors
52    /// will be returned as a string.
53    pub execute: unsafe extern "C" fn(
54        plan: &Self,
55        partition: usize,
56    ) -> RResult<FFI_RecordBatchStream, RString>,
57
58    /// Used to create a clone on the provider of the execution plan. This should
59    /// only need to be called by the receiver of the plan.
60    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62    /// Release the memory of the private data when it is no longer being used.
63    pub release: unsafe extern "C" fn(arg: &mut Self),
64
65    /// Internal data. This is only to be accessed by the provider of the plan.
66    /// A [`ForeignExecutionPlan`] should never attempt to access this data.
67    pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74    pub plan: Arc<dyn ExecutionPlan>,
75    pub context: Arc<TaskContext>,
76    pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80    plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83    let plan = &(*private_data).plan;
84
85    plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89    plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92    let plan = &(*private_data).plan;
93    let ctx = &(*private_data).context;
94    let runtime = &(*private_data).runtime;
95
96    let children: Vec<_> = plan
97        .children()
98        .into_iter()
99        .map(|child| {
100            FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101        })
102        .collect();
103
104    children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108    plan: &FFI_ExecutionPlan,
109    partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112    let plan = &(*private_data).plan;
113    let ctx = &(*private_data).context;
114    let runtime = (*private_data).runtime.clone();
115
116    rresult!(plan
117        .execute(partition, Arc::clone(ctx))
118        .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123    let plan = &(*private_data).plan;
124
125    plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129    let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130    drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135    let plan_data = &(*private_data);
136
137    FFI_ExecutionPlan::new(
138        Arc::clone(&plan_data.plan),
139        Arc::clone(&plan_data.context),
140        plan_data.runtime.clone(),
141    )
142}
143
144impl Clone for FFI_ExecutionPlan {
145    fn clone(&self) -> Self {
146        unsafe { (self.clone)(self) }
147    }
148}
149
150impl FFI_ExecutionPlan {
151    /// This function is called on the provider's side.
152    pub fn new(
153        plan: Arc<dyn ExecutionPlan>,
154        context: Arc<TaskContext>,
155        runtime: Option<Handle>,
156    ) -> Self {
157        let private_data = Box::new(ExecutionPlanPrivateData {
158            plan,
159            context,
160            runtime,
161        });
162
163        Self {
164            properties: properties_fn_wrapper,
165            children: children_fn_wrapper,
166            name: name_fn_wrapper,
167            execute: execute_fn_wrapper,
168            clone: clone_fn_wrapper,
169            release: release_fn_wrapper,
170            private_data: Box::into_raw(private_data) as *mut c_void,
171        }
172    }
173}
174
175impl Drop for FFI_ExecutionPlan {
176    fn drop(&mut self) {
177        unsafe { (self.release)(self) }
178    }
179}
180
181/// This struct is used to access an execution plan provided by a foreign
182/// library across a FFI boundary.
183///
184/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
185/// no knowledge or access to the private data. All interaction with the plan
186/// must occur through the functions defined in FFI_ExecutionPlan.
187#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189    name: String,
190    plan: FFI_ExecutionPlan,
191    properties: PlanProperties,
192    children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199    fn fmt_as(
200        &self,
201        _t: datafusion::physical_plan::DisplayFormatType,
202        f: &mut std::fmt::Formatter,
203    ) -> std::fmt::Result {
204        write!(
205            f,
206            "FFI_ExecutionPlan(number_of_children={})",
207            self.children.len(),
208        )
209    }
210}
211
212impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
213    type Error = DataFusionError;
214
215    fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
216        unsafe {
217            let name = (plan.name)(plan).into();
218
219            let properties: PlanProperties = (plan.properties)(plan).try_into()?;
220
221            let children_rvec = (plan.children)(plan);
222            let children = children_rvec
223                .iter()
224                .map(ForeignExecutionPlan::try_from)
225                .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
226                .collect::<Result<Vec<_>>>()?;
227
228            Ok(Self {
229                name,
230                plan: plan.clone(),
231                properties,
232                children,
233            })
234        }
235    }
236}
237
238impl ExecutionPlan for ForeignExecutionPlan {
239    fn name(&self) -> &str {
240        &self.name
241    }
242
243    fn as_any(&self) -> &dyn std::any::Any {
244        self
245    }
246
247    fn properties(&self) -> &PlanProperties {
248        &self.properties
249    }
250
251    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
252        self.children
253            .iter()
254            .map(|p| p as &Arc<dyn ExecutionPlan>)
255            .collect()
256    }
257
258    fn with_new_children(
259        self: Arc<Self>,
260        children: Vec<Arc<dyn ExecutionPlan>>,
261    ) -> Result<Arc<dyn ExecutionPlan>> {
262        Ok(Arc::new(ForeignExecutionPlan {
263            plan: self.plan.clone(),
264            name: self.name.clone(),
265            children,
266            properties: self.properties.clone(),
267        }))
268    }
269
270    fn execute(
271        &self,
272        partition: usize,
273        _context: Arc<TaskContext>,
274    ) -> Result<SendableRecordBatchStream> {
275        unsafe {
276            df_result!((self.plan.execute)(&self.plan, partition))
277                .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
278        }
279    }
280}
281
282#[cfg(test)]
283mod tests {
284    use arrow::datatypes::{DataType, Field, Schema};
285    use datafusion::{
286        physical_plan::{
287            execution_plan::{Boundedness, EmissionType},
288            Partitioning,
289        },
290        prelude::SessionContext,
291    };
292
293    use super::*;
294
295    #[derive(Debug)]
296    pub struct EmptyExec {
297        props: PlanProperties,
298        children: Vec<Arc<dyn ExecutionPlan>>,
299    }
300
301    impl EmptyExec {
302        pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
303            Self {
304                props: PlanProperties::new(
305                    datafusion::physical_expr::EquivalenceProperties::new(schema),
306                    Partitioning::UnknownPartitioning(3),
307                    EmissionType::Incremental,
308                    Boundedness::Bounded,
309                ),
310                children: Vec::default(),
311            }
312        }
313    }
314
315    impl DisplayAs for EmptyExec {
316        fn fmt_as(
317            &self,
318            _t: datafusion::physical_plan::DisplayFormatType,
319            _f: &mut std::fmt::Formatter,
320        ) -> std::fmt::Result {
321            unimplemented!()
322        }
323    }
324
325    impl ExecutionPlan for EmptyExec {
326        fn name(&self) -> &'static str {
327            "empty-exec"
328        }
329
330        fn as_any(&self) -> &dyn std::any::Any {
331            self
332        }
333
334        fn properties(&self) -> &PlanProperties {
335            &self.props
336        }
337
338        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339            self.children.iter().collect()
340        }
341
342        fn with_new_children(
343            self: Arc<Self>,
344            children: Vec<Arc<dyn ExecutionPlan>>,
345        ) -> Result<Arc<dyn ExecutionPlan>> {
346            Ok(Arc::new(EmptyExec {
347                props: self.props.clone(),
348                children,
349            }))
350        }
351
352        fn execute(
353            &self,
354            _partition: usize,
355            _context: Arc<TaskContext>,
356        ) -> Result<SendableRecordBatchStream> {
357            unimplemented!()
358        }
359
360        fn statistics(&self) -> Result<datafusion::common::Statistics> {
361            unimplemented!()
362        }
363    }
364
365    #[test]
366    fn test_round_trip_ffi_execution_plan() -> Result<()> {
367        let schema =
368            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
369        let ctx = SessionContext::new();
370
371        let original_plan = Arc::new(EmptyExec::new(schema));
372        let original_name = original_plan.name().to_string();
373
374        let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
375
376        let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
377
378        assert!(original_name == foreign_plan.name());
379
380        let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
381            &foreign_plan,
382        );
383
384        let buf = display.one_line().to_string();
385        assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)");
386
387        Ok(())
388    }
389
390    #[test]
391    fn test_ffi_execution_plan_children() -> Result<()> {
392        let schema =
393            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
394        let ctx = SessionContext::new();
395
396        // Version 1: Adding child to the foreign plan
397        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
398        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
399        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
400
401        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
402        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
403        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
404
405        assert_eq!(parent_foreign.children().len(), 0);
406        assert_eq!(child_foreign.children().len(), 0);
407
408        let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
409        assert_eq!(parent_foreign.children().len(), 1);
410
411        // Version 2: Adding child to the local plan
412        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
413        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
414        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
415
416        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
417        let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
418        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
419        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
420
421        assert_eq!(parent_foreign.children().len(), 1);
422
423        Ok(())
424    }
425}