Skip to main content

datafusion_ffi/
execution_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::ffi::c_void;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use datafusion_common::config::ConfigOptions;
23use datafusion_common::{DataFusionError, Result, Statistics};
24use datafusion_execution::{SendableRecordBatchStream, TaskContext};
25use datafusion_physical_expr_common::metrics::MetricsSet;
26use datafusion_physical_plan::{
27    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
28};
29use stabby::string::String as SString;
30use stabby::vec::Vec as SVec;
31use tokio::runtime::Handle;
32
33use crate::config::FFI_ConfigOptions;
34use crate::execution::FFI_TaskContext;
35use crate::physical_expr::metrics::FFI_MetricsSet;
36use crate::plan_properties::FFI_PlanProperties;
37use crate::record_batch_stream::FFI_RecordBatchStream;
38use crate::statistics::{deserialize_statistics, serialize_statistics};
39use crate::util::{FFI_Option, FFI_Result};
40use crate::{df_result, sresult, sresult_return};
41
42/// A stable struct for sharing a [`ExecutionPlan`] across FFI boundaries.
43#[repr(C)]
44#[derive(Debug)]
45pub struct FFI_ExecutionPlan {
46    /// Return the plan properties
47    pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
48
49    /// Return a vector of children plans
50    pub children: unsafe extern "C" fn(plan: &Self) -> SVec<FFI_ExecutionPlan>,
51
52    pub with_new_children:
53        unsafe extern "C" fn(plan: &Self, children: SVec<Self>) -> FFI_Result<Self>,
54
55    /// Return the plan name.
56    pub name: unsafe extern "C" fn(plan: &Self) -> SString,
57
58    /// Execute the plan and return a record batch stream. Errors
59    /// will be returned as a string.
60    pub execute: unsafe extern "C" fn(
61        plan: &Self,
62        partition: usize,
63        context: FFI_TaskContext,
64    ) -> FFI_Result<FFI_RecordBatchStream>,
65
66    pub repartitioned: unsafe extern "C" fn(
67        plan: &Self,
68        target_partitions: usize,
69        config: FFI_ConfigOptions,
70    )
71        -> FFI_Result<FFI_Option<FFI_ExecutionPlan>>,
72
73    /// Snapshot the plan's execution metrics. Returns `None` when the
74    /// underlying [`ExecutionPlan::metrics`] returned `None`.
75    pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>,
76
77    /// Snapshot partition statistics. `partition == None` corresponds to
78    /// statistics over all partitions; `Some(idx)` corresponds to a specific
79    /// partition. The returned bytes are a prost-encoded
80    /// `datafusion_proto_common::Statistics`.
81    pub partition_statistics: unsafe extern "C" fn(
82        plan: &Self,
83        partition: FFI_Option<usize>,
84    ) -> FFI_Result<SVec<u8>>,
85
86    /// Used to create a clone on the provider of the execution plan. This should
87    /// only need to be called by the receiver of the plan.
88    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
89
90    /// Release the memory of the private data when it is no longer being used.
91    pub release: unsafe extern "C" fn(arg: &mut Self),
92
93    /// Internal data. This is only to be accessed by the provider of the plan.
94    /// A [`ForeignExecutionPlan`] should never attempt to access this data.
95    pub private_data: *mut c_void,
96
97    /// Utility to identify when FFI objects are accessed locally through
98    /// the foreign interface. See [`crate::get_library_marker_id`] and
99    /// the crate's `README.md` for more information.
100    pub library_marker_id: extern "C" fn() -> usize,
101}
102
103unsafe impl Send for FFI_ExecutionPlan {}
104unsafe impl Sync for FFI_ExecutionPlan {}
105
106pub struct ExecutionPlanPrivateData {
107    pub plan: Arc<dyn ExecutionPlan>,
108    pub runtime: Option<Handle>,
109}
110
111impl FFI_ExecutionPlan {
112    fn inner(&self) -> &Arc<dyn ExecutionPlan> {
113        let private_data = self.private_data as *const ExecutionPlanPrivateData;
114        unsafe { &(*private_data).plan }
115    }
116
117    fn runtime(&self) -> Option<Handle> {
118        let private_data = self.private_data as *const ExecutionPlanPrivateData;
119        unsafe { (*private_data).runtime.clone() }
120    }
121}
122
123unsafe extern "C" fn properties_fn_wrapper(
124    plan: &FFI_ExecutionPlan,
125) -> FFI_PlanProperties {
126    plan.inner().properties().as_ref().into()
127}
128
129unsafe extern "C" fn children_fn_wrapper(
130    plan: &FFI_ExecutionPlan,
131) -> SVec<FFI_ExecutionPlan> {
132    let runtime = plan.runtime();
133    plan.inner()
134        .children()
135        .into_iter()
136        .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
137        .collect()
138}
139
140unsafe extern "C" fn with_new_children_fn_wrapper(
141    plan: &FFI_ExecutionPlan,
142    children: SVec<FFI_ExecutionPlan>,
143) -> FFI_Result<FFI_ExecutionPlan> {
144    let runtime = plan.runtime();
145    let inner_plan = Arc::clone(plan.inner());
146
147    let children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
148        .iter()
149        .map(<Arc<dyn ExecutionPlan>>::try_from)
150        .collect();
151
152    let children = sresult_return!(children);
153    let new_plan = sresult_return!(inner_plan.with_new_children(children));
154
155    FFI_Result::Ok(FFI_ExecutionPlan::new(new_plan, runtime))
156}
157
158unsafe extern "C" fn execute_fn_wrapper(
159    plan: &FFI_ExecutionPlan,
160    partition: usize,
161    context: FFI_TaskContext,
162) -> FFI_Result<FFI_RecordBatchStream> {
163    let ctx = context.into();
164    let runtime = plan.runtime();
165    let plan = plan.inner();
166
167    let _runtime_guard = runtime.as_ref().map(|rt| rt.enter());
168
169    sresult!(
170        plan.execute(partition, ctx)
171            .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
172    )
173}
174
175unsafe extern "C" fn repartitioned_fn_wrapper(
176    plan: &FFI_ExecutionPlan,
177    target_partitions: usize,
178    config: FFI_ConfigOptions,
179) -> FFI_Result<FFI_Option<FFI_ExecutionPlan>> {
180    let maybe_config: Result<ConfigOptions, DataFusionError> = config.try_into();
181    let config = sresult_return!(maybe_config);
182    let runtime = plan.runtime();
183    let plan = plan.inner();
184
185    sresult!(
186        plan.repartitioned(target_partitions, &config)
187            .map(|maybe_plan| maybe_plan
188                .map(|plan| FFI_ExecutionPlan::new(plan, runtime))
189                .into())
190    )
191}
192
193unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> SString {
194    plan.inner().name().into()
195}
196
197unsafe extern "C" fn metrics_fn_wrapper(
198    plan: &FFI_ExecutionPlan,
199) -> FFI_Option<FFI_MetricsSet> {
200    plan.inner()
201        .metrics()
202        .as_ref()
203        .map(FFI_MetricsSet::from)
204        .into()
205}
206
207unsafe extern "C" fn partition_statistics_fn_wrapper(
208    plan: &FFI_ExecutionPlan,
209    partition: FFI_Option<usize>,
210) -> FFI_Result<SVec<u8>> {
211    let partition: Option<usize> = partition.into();
212    plan.inner()
213        .partition_statistics(partition)
214        .map(|stats| SVec::from(serialize_statistics(stats.as_ref()).as_slice()))
215        .into()
216}
217
218unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
219    unsafe {
220        debug_assert!(!plan.private_data.is_null());
221        let private_data =
222            Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
223        drop(private_data);
224        plan.private_data = std::ptr::null_mut();
225    }
226}
227
228unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
229    let runtime = plan.runtime();
230    let plan = plan.inner();
231
232    FFI_ExecutionPlan::new(Arc::clone(plan), runtime)
233}
234
235impl Clone for FFI_ExecutionPlan {
236    fn clone(&self) -> Self {
237        unsafe { (self.clone)(self) }
238    }
239}
240
241/// Helper function to recursively identify any children that do not
242/// have a runtime set but should because they are local to this same
243/// library. This does imply a restriction that all execution plans
244/// in this chain that are within the same library use the same runtime.
245fn pass_runtime_to_children(
246    plan: &Arc<dyn ExecutionPlan>,
247    runtime: &Handle,
248) -> Result<Option<Arc<dyn ExecutionPlan>>> {
249    let mut updated_children = false;
250    let plan_is_foreign = plan.is::<ForeignExecutionPlan>();
251
252    let children = plan
253        .children()
254        .into_iter()
255        .map(|child| {
256            let child = match pass_runtime_to_children(child, runtime)? {
257                Some(child) => {
258                    updated_children = true;
259                    child
260                }
261                None => Arc::clone(child),
262            };
263
264            // If the parent is foreign and the child is local to this library, then when
265            // we called `children()` above we will get something other than a
266            // `ForeignExecutionPlan`. In this case wrap the plan in a `ForeignExecutionPlan`
267            // because when we call `with_new_children` below it will extract the
268            // FFI plan that does contain the runtime.
269            if plan_is_foreign && !child.is::<ForeignExecutionPlan>() {
270                updated_children = true;
271                let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone()));
272                let foreign_child = ForeignExecutionPlan::try_from(ffi_child);
273                foreign_child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
274            } else {
275                Ok(child)
276            }
277        })
278        .collect::<Result<Vec<_>>>()?;
279    if updated_children {
280        Arc::clone(plan).with_new_children(children).map(Some)
281    } else {
282        Ok(None)
283    }
284}
285
286impl FFI_ExecutionPlan {
287    /// This function is called on the provider's side.
288    pub fn new(mut plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
289        // Note to developers: `pass_runtime_to_children` relies on the logic here to
290        // get the underlying FFI plan during calls to `new_with_children`.
291        if let Some(plan) = plan.downcast_ref::<ForeignExecutionPlan>() {
292            return plan.plan.clone();
293        }
294
295        if let Some(rt) = &runtime
296            && let Ok(Some(p)) = pass_runtime_to_children(&plan, rt)
297        {
298            plan = p;
299        }
300
301        let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
302        Self {
303            properties: properties_fn_wrapper,
304            children: children_fn_wrapper,
305            with_new_children: with_new_children_fn_wrapper,
306            name: name_fn_wrapper,
307            execute: execute_fn_wrapper,
308            repartitioned: repartitioned_fn_wrapper,
309            metrics: metrics_fn_wrapper,
310            partition_statistics: partition_statistics_fn_wrapper,
311            clone: clone_fn_wrapper,
312            release: release_fn_wrapper,
313            private_data: Box::into_raw(private_data) as *mut c_void,
314            library_marker_id: crate::get_library_marker_id,
315        }
316    }
317}
318
319impl Drop for FFI_ExecutionPlan {
320    fn drop(&mut self) {
321        unsafe { (self.release)(self) }
322    }
323}
324
325/// This struct is used to access an execution plan provided by a foreign
326/// library across a FFI boundary.
327///
328/// The ForeignExecutionPlan is to be used by the caller of the plan, so it has
329/// no knowledge or access to the private data. All interaction with the plan
330/// must occur through the functions defined in FFI_ExecutionPlan.
331#[derive(Debug)]
332pub struct ForeignExecutionPlan {
333    name: String,
334    plan: FFI_ExecutionPlan,
335    properties: Arc<PlanProperties>,
336    children: Vec<Arc<dyn ExecutionPlan>>,
337}
338
339unsafe impl Send for ForeignExecutionPlan {}
340unsafe impl Sync for ForeignExecutionPlan {}
341
342impl DisplayAs for ForeignExecutionPlan {
343    fn fmt_as(
344        &self,
345        t: DisplayFormatType,
346        f: &mut std::fmt::Formatter,
347    ) -> std::fmt::Result {
348        match t {
349            DisplayFormatType::Default | DisplayFormatType::Verbose => {
350                write!(
351                    f,
352                    "FFI_ExecutionPlan: {}, number_of_children={}",
353                    self.name,
354                    self.children.len(),
355                )
356            }
357            DisplayFormatType::TreeRender => {
358                // TODO: collect info
359                write!(f, "")
360            }
361        }
362    }
363}
364
365impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
366    type Error = DataFusionError;
367
368    fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
369        if (plan.library_marker_id)() == crate::get_library_marker_id() {
370            Ok(Arc::clone(plan.inner()))
371        } else {
372            let plan = ForeignExecutionPlan::try_from(plan.clone())?;
373            Ok(Arc::new(plan))
374        }
375    }
376}
377
378impl TryFrom<FFI_ExecutionPlan> for ForeignExecutionPlan {
379    type Error = DataFusionError;
380    fn try_from(plan: FFI_ExecutionPlan) -> Result<Self, Self::Error> {
381        unsafe {
382            let name = (plan.name)(&plan).into();
383
384            let properties: PlanProperties = (plan.properties)(&plan).try_into()?;
385
386            let children_rvec = (plan.children)(&plan);
387            let children = children_rvec
388                .iter()
389                .map(<Arc<dyn ExecutionPlan>>::try_from)
390                .collect::<Result<Vec<_>>>()?;
391
392            Ok(ForeignExecutionPlan {
393                name,
394                plan,
395                properties: Arc::new(properties),
396                children,
397            })
398        }
399    }
400}
401
402impl ExecutionPlan for ForeignExecutionPlan {
403    fn name(&self) -> &str {
404        &self.name
405    }
406
407    fn properties(&self) -> &Arc<PlanProperties> {
408        &self.properties
409    }
410
411    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
412        self.children.iter().collect()
413    }
414
415    fn with_new_children(
416        self: Arc<Self>,
417        children: Vec<Arc<dyn ExecutionPlan>>,
418    ) -> Result<Arc<dyn ExecutionPlan>> {
419        let children = children
420            .into_iter()
421            .map(|child| FFI_ExecutionPlan::new(child, None))
422            .collect::<SVec<_>>();
423        let new_plan =
424            unsafe { df_result!((self.plan.with_new_children)(&self.plan, children))? };
425
426        (&new_plan).try_into()
427    }
428
429    fn execute(
430        &self,
431        partition: usize,
432        context: Arc<TaskContext>,
433    ) -> Result<SendableRecordBatchStream> {
434        let context = FFI_TaskContext::from(context);
435        unsafe {
436            df_result!((self.plan.execute)(&self.plan, partition, context))
437                .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
438        }
439    }
440
441    fn repartitioned(
442        &self,
443        target_partitions: usize,
444        config: &ConfigOptions,
445    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
446        let config = config.into();
447        let maybe_plan: Option<FFI_ExecutionPlan> = df_result!(unsafe {
448            (self.plan.repartitioned)(&self.plan, target_partitions, config)
449        })?
450        .into();
451
452        maybe_plan
453            .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
454            .transpose()
455    }
456
457    fn metrics(&self) -> Option<MetricsSet> {
458        let ffi: Option<FFI_MetricsSet> =
459            unsafe { (self.plan.metrics)(&self.plan) }.into();
460        ffi.map(MetricsSet::from)
461    }
462
463    fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
464        let bytes = df_result!(unsafe {
465            (self.plan.partition_statistics)(&self.plan, partition.into())
466        })?;
467        Ok(Arc::new(deserialize_statistics(bytes.as_slice())?))
468    }
469}
470
471#[cfg(any(test, feature = "integration-tests"))]
472pub mod tests {
473    use datafusion_physical_plan::Partitioning;
474    use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
475
476    use super::*;
477
478    #[derive(Debug)]
479    pub struct EmptyExec {
480        props: Arc<PlanProperties>,
481        children: Vec<Arc<dyn ExecutionPlan>>,
482        metrics: Option<MetricsSet>,
483        statistics: Option<Statistics>,
484    }
485
486    impl EmptyExec {
487        pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
488            Self {
489                props: Arc::new(PlanProperties::new(
490                    datafusion_physical_expr::EquivalenceProperties::new(schema),
491                    Partitioning::UnknownPartitioning(3),
492                    EmissionType::Incremental,
493                    Boundedness::Bounded,
494                )),
495                children: Vec::default(),
496                metrics: None,
497                statistics: None,
498            }
499        }
500
501        pub fn with_metrics(mut self, metrics: MetricsSet) -> Self {
502            self.metrics = Some(metrics);
503            self
504        }
505
506        pub fn with_statistics(mut self, statistics: Statistics) -> Self {
507            self.statistics = Some(statistics);
508            self
509        }
510    }
511
512    impl DisplayAs for EmptyExec {
513        fn fmt_as(
514            &self,
515            _t: DisplayFormatType,
516            _f: &mut std::fmt::Formatter,
517        ) -> std::fmt::Result {
518            unimplemented!()
519        }
520    }
521
522    impl ExecutionPlan for EmptyExec {
523        fn name(&self) -> &'static str {
524            "empty-exec"
525        }
526
527        fn properties(&self) -> &Arc<PlanProperties> {
528            &self.props
529        }
530
531        fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
532            self.children.iter().collect()
533        }
534
535        fn with_new_children(
536            self: Arc<Self>,
537            children: Vec<Arc<dyn ExecutionPlan>>,
538        ) -> Result<Arc<dyn ExecutionPlan>> {
539            Ok(Arc::new(EmptyExec {
540                props: Arc::clone(&self.props),
541                children,
542                metrics: self.metrics.clone(),
543                statistics: self.statistics.clone(),
544            }))
545        }
546
547        fn execute(
548            &self,
549            _partition: usize,
550            _context: Arc<TaskContext>,
551        ) -> Result<SendableRecordBatchStream> {
552            unimplemented!()
553        }
554
555        fn metrics(&self) -> Option<MetricsSet> {
556            self.metrics.clone()
557        }
558
559        fn partition_statistics(
560            &self,
561            _partition: Option<usize>,
562        ) -> Result<Arc<Statistics>> {
563            Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| {
564                Statistics::new_unknown(self.props.eq_properties.schema())
565            })))
566        }
567    }
568
569    #[test]
570    fn test_round_trip_ffi_execution_plan() -> Result<()> {
571        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
572            arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
573        ]));
574
575        let original_plan = Arc::new(EmptyExec::new(schema));
576        let original_name = original_plan.name().to_string();
577
578        let mut local_plan = FFI_ExecutionPlan::new(original_plan, None);
579        local_plan.library_marker_id = crate::mock_foreign_marker_id;
580
581        let foreign_plan: Arc<dyn ExecutionPlan> = (&local_plan).try_into()?;
582
583        assert_eq!(original_name, foreign_plan.name());
584
585        let display = datafusion_physical_plan::display::DisplayableExecutionPlan::new(
586            foreign_plan.as_ref(),
587        );
588
589        let buf = display.one_line().to_string();
590        assert_eq!(
591            buf.trim(),
592            "FFI_ExecutionPlan: empty-exec, number_of_children=0"
593        );
594
595        Ok(())
596    }
597
598    #[test]
599    fn test_ffi_execution_plan_children() -> Result<()> {
600        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
601            arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
602        ]));
603
604        // Version 1: Adding child to the foreign plan
605        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
606        let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
607        child_local.library_marker_id = crate::mock_foreign_marker_id;
608        let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
609
610        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
611        let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
612        parent_local.library_marker_id = crate::mock_foreign_marker_id;
613        let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
614
615        assert_eq!(parent_foreign.children().len(), 0);
616        assert_eq!(child_foreign.children().len(), 0);
617
618        let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
619        assert_eq!(parent_foreign.children().len(), 1);
620
621        // Version 2: Adding child to the local plan
622        let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
623        let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
624        child_local.library_marker_id = crate::mock_foreign_marker_id;
625        let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
626
627        let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
628        let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
629        let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
630        parent_local.library_marker_id = crate::mock_foreign_marker_id;
631        let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
632
633        assert_eq!(parent_foreign.children().len(), 1);
634
635        Ok(())
636    }
637
638    #[test]
639    fn test_ffi_execution_plan_metrics_round_trip() -> Result<()> {
640        use datafusion_physical_expr_common::metrics::{Count, Metric, MetricValue};
641
642        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
643            arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
644        ]));
645
646        // Plans without metrics still return None across the boundary.
647        let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
648        let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
649        bare_local.library_marker_id = crate::mock_foreign_marker_id;
650        let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
651        assert!(bare_foreign.metrics().is_none());
652
653        // Plans with metrics produce equivalent MetricsSets after a round trip.
654        let mut original_metrics = MetricsSet::new();
655        let c0 = Count::new();
656        c0.add(11);
657        original_metrics
658            .push(Arc::new(Metric::new(MetricValue::OutputRows(c0), Some(0))));
659        let c1 = Count::new();
660        c1.add(31);
661        original_metrics
662            .push(Arc::new(Metric::new(MetricValue::OutputRows(c1), Some(1))));
663
664        let metric_plan = Arc::new(EmptyExec::new(schema).with_metrics(original_metrics));
665        let mut metric_local = FFI_ExecutionPlan::new(metric_plan, None);
666        metric_local.library_marker_id = crate::mock_foreign_marker_id;
667        let metric_foreign: Arc<dyn ExecutionPlan> = (&metric_local).try_into()?;
668
669        let observed = metric_foreign.metrics().expect("metrics should be present");
670        assert_eq!(observed.output_rows(), Some(42));
671
672        Ok(())
673    }
674
675    #[test]
676    fn test_ffi_execution_plan_partition_statistics_round_trip() -> Result<()> {
677        use datafusion_common::stats::Precision;
678        use datafusion_common::{ColumnStatistics, ScalarValue};
679
680        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
681            arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Int32, true),
682        ]));
683
684        // Plans without explicit statistics return Statistics::new_unknown across
685        // the boundary.
686        let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
687        let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
688        bare_local.library_marker_id = crate::mock_foreign_marker_id;
689        let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
690        let bare_stats = bare_foreign.partition_statistics(None)?;
691        assert_eq!(bare_stats.as_ref(), &Statistics::new_unknown(&schema));
692
693        // Plans with statistics round-trip them faithfully, including
694        // ScalarValue-typed min/max.
695        let original_stats = Statistics {
696            num_rows: Precision::Exact(7),
697            total_byte_size: Precision::Inexact(128),
698            column_statistics: vec![ColumnStatistics {
699                null_count: Precision::Exact(1),
700                max_value: Precision::Exact(ScalarValue::Int32(Some(10))),
701                min_value: Precision::Exact(ScalarValue::Int32(Some(-3))),
702                sum_value: Precision::Absent,
703                distinct_count: Precision::Inexact(6),
704                byte_size: Precision::Exact(28),
705            }],
706        };
707        let stats_plan = Arc::new(
708            EmptyExec::new(Arc::clone(&schema)).with_statistics(original_stats.clone()),
709        );
710        let mut stats_local = FFI_ExecutionPlan::new(stats_plan, None);
711        stats_local.library_marker_id = crate::mock_foreign_marker_id;
712        let stats_foreign: Arc<dyn ExecutionPlan> = (&stats_local).try_into()?;
713
714        let observed = stats_foreign.partition_statistics(None)?;
715        assert_eq!(observed.as_ref(), &original_stats);
716
717        let observed_partition = stats_foreign.partition_statistics(Some(1))?;
718        assert_eq!(observed_partition.as_ref(), &original_stats);
719
720        Ok(())
721    }
722
723    #[test]
724    fn test_ffi_execution_plan_local_bypass() {
725        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
726            arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
727        ]));
728
729        let plan = Arc::new(EmptyExec::new(schema));
730
731        let mut ffi_plan = FFI_ExecutionPlan::new(plan, None);
732
733        // Verify local libraries can be downcast to their original
734        let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
735        assert!(foreign_plan.is::<EmptyExec>());
736
737        // Verify different library markers generate foreign providers
738        ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
739        let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
740        assert!(foreign_plan.is::<ForeignExecutionPlan>());
741    }
742}