datafusion_ffi/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21    std_types::{RResult, RString, RVec},
22    StableAbi,
23};
24use datafusion::{
25    error::DataFusionError,
26    execution::{SendableRecordBatchStream, TaskContext},
27    physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
28};
29use datafusion::{error::Result, physical_plan::DisplayFormatType};
30use tokio::runtime::Handle;
31
32use crate::{
33    df_result, plan_properties::FFI_PlanProperties,
34    record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
38#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42    /// Return the plan properties
43    pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45    /// Return a vector of children plans
46    pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48    /// Return the plan name.
49    pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51    /// Execute the plan and return a record batch stream. Errors
52    /// will be returned as a string.
53    pub execute: unsafe extern "C" fn(
54        plan: &Self,
55        partition: usize,
56    ) -> RResult<FFI_RecordBatchStream, RString>,
57
58    /// Used to create a clone on the provider of the execution plan. This should
59    /// only need to be called by the receiver of the plan.
60    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62    /// Release the memory of the private data when it is no longer being used.
63    pub release: unsafe extern "C" fn(arg: &mut Self),
64
65    /// Internal data. This is only to be accessed by the provider of the plan.
66    /// A [`ForeignExecutionPlan`] should never attempt to access this data.
67    pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74    pub plan: Arc<dyn ExecutionPlan>,
75    pub context: Arc<TaskContext>,
76    pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80    plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83    let plan = &(*private_data).plan;
84
85    plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89    plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92    let plan = &(*private_data).plan;
93    let ctx = &(*private_data).context;
94    let runtime = &(*private_data).runtime;
95
96    let children: Vec<_> = plan
97        .children()
98        .into_iter()
99        .map(|child| {
100            FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101        })
102        .collect();
103
104    children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108    plan: &FFI_ExecutionPlan,
109    partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112    let plan = &(*private_data).plan;
113    let ctx = &(*private_data).context;
114    let runtime = (*private_data).runtime.clone();
115
116    rresult!(plan
117        .execute(partition, Arc::clone(ctx))
118        .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123    let plan = &(*private_data).plan;
124
125    plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129    let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130    drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134    let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135    let plan_data = &(*private_data);
136
137    FFI_ExecutionPlan::new(
138        Arc::clone(&plan_data.plan),
139        Arc::clone(&plan_data.context),
140        plan_data.runtime.clone(),
141    )
142}
143
144impl Clone for FFI_ExecutionPlan {
145    fn clone(&self) -> Self {
146        unsafe { (self.clone)(self) }
147    }
148}
149
150impl FFI_ExecutionPlan {
151    /// This function is called on the provider's side.
152    pub fn new(
153        plan: Arc<dyn ExecutionPlan>,
154        context: Arc<TaskContext>,
155        runtime: Option<Handle>,
156    ) -> Self {
157        let private_data = Box::new(ExecutionPlanPrivateData {
158            plan,
159            context,
160            runtime,
161        });
162
163        Self {
164            properties: properties_fn_wrapper,
165            children: children_fn_wrapper,
166            name: name_fn_wrapper,
167            execute: execute_fn_wrapper,
168            clone: clone_fn_wrapper,
169            release: release_fn_wrapper,
170            private_data: Box::into_raw(private_data) as *mut c_void,
171        }
172    }
173}
174
175impl Drop for FFI_ExecutionPlan {
176    fn drop(&mut self) {
177        unsafe { (self.release)(self) }
178    }
179}
180
181/// This struct is used to access an execution plan provided by a foreign
182/// library across a FFI boundary.
183///
184/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
185/// no knowledge or access to the private data. All interaction with the plan
186/// must occur through the functions defined in FFI_ExecutionPlan.
187#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189    name: String,
190    plan: FFI_ExecutionPlan,
191    properties: PlanProperties,
192    children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199    fn fmt_as(
200        &self,
201        t: DisplayFormatType,
202        f: &mut std::fmt::Formatter,
203    ) -> std::fmt::Result {
204        match t {
205            DisplayFormatType::Default | DisplayFormatType::Verbose => {
206                write!(
207                    f,
208                    "FFI_ExecutionPlan: {}, number_of_children={}",
209                    self.name,
210                    self.children.len(),
211                )
212            }
213            DisplayFormatType::TreeRender => {
214                // TODO: collect info
215                write!(f, "")
216            }
217        }
218    }
219}
220
221impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
222    type Error = DataFusionError;
223
224    fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
225        unsafe {
226            let name = (plan.name)(plan).into();
227
228            let properties: PlanProperties = (plan.properties)(plan).try_into()?;
229
230            let children_rvec = (plan.children)(plan);
231            let children = children_rvec
232                .iter()
233                .map(ForeignExecutionPlan::try_from)
234                .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
235                .collect::<Result<Vec<_>>>()?;
236
237            Ok(Self {
238                name,
239                plan: plan.clone(),
240                properties,
241                children,
242            })
243        }
244    }
245}
246
247impl ExecutionPlan for ForeignExecutionPlan {
248    fn name(&self) -> &str {
249        &self.name
250    }
251
252    fn as_any(&self) -> &dyn std::any::Any {
253        self
254    }
255
256    fn properties(&self) -> &PlanProperties {
257        &self.properties
258    }
259
260    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
261        self.children
262            .iter()
263            .map(|p| p as &Arc<dyn ExecutionPlan>)
264            .collect()
265    }
266
267    fn with_new_children(
268        self: Arc<Self>,
269        children: Vec<Arc<dyn ExecutionPlan>>,
270    ) -> Result<Arc<dyn ExecutionPlan>> {
271        Ok(Arc::new(ForeignExecutionPlan {
272            plan: self.plan.clone(),
273            name: self.name.clone(),
274            children,
275            properties: self.properties.clone(),
276        }))
277    }
278
279    fn execute(
280        &self,
281        partition: usize,
282        _context: Arc<TaskContext>,
283    ) -> Result<SendableRecordBatchStream> {
284        unsafe {
285            df_result!((self.plan.execute)(&self.plan, partition))
286                .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
287        }
288    }
289}
290
291#[cfg(test)]
292mod tests {
293    use arrow::datatypes::{DataType, Field, Schema};
294    use datafusion::{
295        physical_plan::{
296            execution_plan::{Boundedness, EmissionType},
297            Partitioning,
298        },
299        prelude::SessionContext,
300    };
301
302    use super::*;
303
304    #[derive(Debug)]
305    pub struct EmptyExec {
306        props: PlanProperties,
307        children: Vec<Arc<dyn ExecutionPlan>>,
308    }
309
310    impl EmptyExec {
311        pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
312            Self {
313                props: PlanProperties::new(
314                    datafusion::physical_expr::EquivalenceProperties::new(schema),
315                    Partitioning::UnknownPartitioning(3),
316                    EmissionType::Incremental,
317                    Boundedness::Bounded,
318                ),
319                children: Vec::default(),
320            }
321        }
322    }
323
324    impl DisplayAs for EmptyExec {
325        fn fmt_as(
326            &self,
327            _t: DisplayFormatType,
328            _f: &mut std::fmt::Formatter,
329        ) -> std::fmt::Result {
330            unimplemented!()
331        }
332    }
333
334    impl ExecutionPlan for EmptyExec {
335        fn name(&self) -> &'static str {
336            "empty-exec"
337        }
338
339        fn as_any(&self) -> &dyn std::any::Any {
340            self
341        }
342
343        fn properties(&self) -> &PlanProperties {
344            &self.props
345        }
346
347        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
348            self.children.iter().collect()
349        }
350
351        fn with_new_children(
352            self: Arc<Self>,
353            children: Vec<Arc<dyn ExecutionPlan>>,
354        ) -> Result<Arc<dyn ExecutionPlan>> {
355            Ok(Arc::new(EmptyExec {
356                props: self.props.clone(),
357                children,
358            }))
359        }
360
361        fn execute(
362            &self,
363            _partition: usize,
364            _context: Arc<TaskContext>,
365        ) -> Result<SendableRecordBatchStream> {
366            unimplemented!()
367        }
368
369        fn statistics(&self) -> Result<datafusion::common::Statistics> {
370            unimplemented!()
371        }
372    }
373
374    #[test]
375    fn test_round_trip_ffi_execution_plan() -> Result<()> {
376        let schema =
377            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
378        let ctx = SessionContext::new();
379
380        let original_plan = Arc::new(EmptyExec::new(schema));
381        let original_name = original_plan.name().to_string();
382
383        let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
384
385        let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
386
387        assert!(original_name == foreign_plan.name());
388
389        let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
390            &foreign_plan,
391        );
392
393        let buf = display.one_line().to_string();
394        assert_eq!(
395            buf.trim(),
396            "FFI_ExecutionPlan: empty-exec, number_of_children=0"
397        );
398
399        Ok(())
400    }
401
402    #[test]
403    fn test_ffi_execution_plan_children() -> Result<()> {
404        let schema =
405            Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
406        let ctx = SessionContext::new();
407
408        // Version 1: Adding child to the foreign plan
409        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
410        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
411        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
412
413        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
414        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
415        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
416
417        assert_eq!(parent_foreign.children().len(), 0);
418        assert_eq!(child_foreign.children().len(), 0);
419
420        let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
421        assert_eq!(parent_foreign.children().len(), 1);
422
423        // Version 2: Adding child to the local plan
424        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
425        let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
426        let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
427
428        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
429        let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
430        let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
431        let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
432
433        assert_eq!(parent_foreign.children().len(), 1);
434
435        Ok(())
436    }
437}