1use std::ffi::c_void;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use datafusion_common::config::ConfigOptions;
23use datafusion_common::{DataFusionError, Result, Statistics};
24use datafusion_execution::{SendableRecordBatchStream, TaskContext};
25use datafusion_physical_expr_common::metrics::MetricsSet;
26use datafusion_physical_plan::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
28};
29use stabby::string::String as SString;
30use stabby::vec::Vec as SVec;
31use tokio::runtime::Handle;
32
33use crate::config::FFI_ConfigOptions;
34use crate::execution::FFI_TaskContext;
35use crate::physical_expr::metrics::FFI_MetricsSet;
36use crate::plan_properties::FFI_PlanProperties;
37use crate::record_batch_stream::FFI_RecordBatchStream;
38use crate::statistics::{deserialize_statistics, serialize_statistics};
39use crate::util::{FFI_Option, FFI_Result};
40use crate::{df_result, sresult, sresult_return};
41
42#[repr(C)]
44#[derive(Debug)]
45pub struct FFI_ExecutionPlan {
46 pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
48
49 pub children: unsafe extern "C" fn(plan: &Self) -> SVec<FFI_ExecutionPlan>,
51
52 pub with_new_children:
53 unsafe extern "C" fn(plan: &Self, children: SVec<Self>) -> FFI_Result<Self>,
54
55 pub name: unsafe extern "C" fn(plan: &Self) -> SString,
57
58 pub execute: unsafe extern "C" fn(
61 plan: &Self,
62 partition: usize,
63 context: FFI_TaskContext,
64 ) -> FFI_Result<FFI_RecordBatchStream>,
65
66 pub repartitioned: unsafe extern "C" fn(
67 plan: &Self,
68 target_partitions: usize,
69 config: FFI_ConfigOptions,
70 )
71 -> FFI_Result<FFI_Option<FFI_ExecutionPlan>>,
72
73 pub metrics: unsafe extern "C" fn(plan: &Self) -> FFI_Option<FFI_MetricsSet>,
76
77 pub partition_statistics: unsafe extern "C" fn(
82 plan: &Self,
83 partition: FFI_Option<usize>,
84 ) -> FFI_Result<SVec<u8>>,
85
86 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
89
90 pub release: unsafe extern "C" fn(arg: &mut Self),
92
93 pub private_data: *mut c_void,
96
97 pub library_marker_id: extern "C" fn() -> usize,
101}
102
103unsafe impl Send for FFI_ExecutionPlan {}
104unsafe impl Sync for FFI_ExecutionPlan {}
105
106pub struct ExecutionPlanPrivateData {
107 pub plan: Arc<dyn ExecutionPlan>,
108 pub runtime: Option<Handle>,
109}
110
111impl FFI_ExecutionPlan {
112 fn inner(&self) -> &Arc<dyn ExecutionPlan> {
113 let private_data = self.private_data as *const ExecutionPlanPrivateData;
114 unsafe { &(*private_data).plan }
115 }
116
117 fn runtime(&self) -> Option<Handle> {
118 let private_data = self.private_data as *const ExecutionPlanPrivateData;
119 unsafe { (*private_data).runtime.clone() }
120 }
121}
122
123unsafe extern "C" fn properties_fn_wrapper(
124 plan: &FFI_ExecutionPlan,
125) -> FFI_PlanProperties {
126 plan.inner().properties().as_ref().into()
127}
128
129unsafe extern "C" fn children_fn_wrapper(
130 plan: &FFI_ExecutionPlan,
131) -> SVec<FFI_ExecutionPlan> {
132 let runtime = plan.runtime();
133 plan.inner()
134 .children()
135 .into_iter()
136 .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
137 .collect()
138}
139
140unsafe extern "C" fn with_new_children_fn_wrapper(
141 plan: &FFI_ExecutionPlan,
142 children: SVec<FFI_ExecutionPlan>,
143) -> FFI_Result<FFI_ExecutionPlan> {
144 let runtime = plan.runtime();
145 let inner_plan = Arc::clone(plan.inner());
146
147 let children: Result<Vec<Arc<dyn ExecutionPlan>>> = children
148 .iter()
149 .map(<Arc<dyn ExecutionPlan>>::try_from)
150 .collect();
151
152 let children = sresult_return!(children);
153 let new_plan = sresult_return!(inner_plan.with_new_children(children));
154
155 FFI_Result::Ok(FFI_ExecutionPlan::new(new_plan, runtime))
156}
157
158unsafe extern "C" fn execute_fn_wrapper(
159 plan: &FFI_ExecutionPlan,
160 partition: usize,
161 context: FFI_TaskContext,
162) -> FFI_Result<FFI_RecordBatchStream> {
163 let ctx = context.into();
164 let runtime = plan.runtime();
165 let plan = plan.inner();
166
167 let _runtime_guard = runtime.as_ref().map(|rt| rt.enter());
168
169 sresult!(
170 plan.execute(partition, ctx)
171 .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
172 )
173}
174
175unsafe extern "C" fn repartitioned_fn_wrapper(
176 plan: &FFI_ExecutionPlan,
177 target_partitions: usize,
178 config: FFI_ConfigOptions,
179) -> FFI_Result<FFI_Option<FFI_ExecutionPlan>> {
180 let maybe_config: Result<ConfigOptions, DataFusionError> = config.try_into();
181 let config = sresult_return!(maybe_config);
182 let runtime = plan.runtime();
183 let plan = plan.inner();
184
185 sresult!(
186 plan.repartitioned(target_partitions, &config)
187 .map(|maybe_plan| maybe_plan
188 .map(|plan| FFI_ExecutionPlan::new(plan, runtime))
189 .into())
190 )
191}
192
193unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> SString {
194 plan.inner().name().into()
195}
196
197unsafe extern "C" fn metrics_fn_wrapper(
198 plan: &FFI_ExecutionPlan,
199) -> FFI_Option<FFI_MetricsSet> {
200 plan.inner()
201 .metrics()
202 .as_ref()
203 .map(FFI_MetricsSet::from)
204 .into()
205}
206
207unsafe extern "C" fn partition_statistics_fn_wrapper(
208 plan: &FFI_ExecutionPlan,
209 partition: FFI_Option<usize>,
210) -> FFI_Result<SVec<u8>> {
211 let partition: Option<usize> = partition.into();
212 plan.inner()
213 .partition_statistics(partition)
214 .map(|stats| SVec::from(serialize_statistics(stats.as_ref()).as_slice()))
215 .into()
216}
217
218unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
219 unsafe {
220 debug_assert!(!plan.private_data.is_null());
221 let private_data =
222 Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
223 drop(private_data);
224 plan.private_data = std::ptr::null_mut();
225 }
226}
227
228unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
229 let runtime = plan.runtime();
230 let plan = plan.inner();
231
232 FFI_ExecutionPlan::new(Arc::clone(plan), runtime)
233}
234
235impl Clone for FFI_ExecutionPlan {
236 fn clone(&self) -> Self {
237 unsafe { (self.clone)(self) }
238 }
239}
240
241fn pass_runtime_to_children(
246 plan: &Arc<dyn ExecutionPlan>,
247 runtime: &Handle,
248) -> Result<Option<Arc<dyn ExecutionPlan>>> {
249 let mut updated_children = false;
250 let plan_is_foreign = plan.is::<ForeignExecutionPlan>();
251
252 let children = plan
253 .children()
254 .into_iter()
255 .map(|child| {
256 let child = match pass_runtime_to_children(child, runtime)? {
257 Some(child) => {
258 updated_children = true;
259 child
260 }
261 None => Arc::clone(child),
262 };
263
264 if plan_is_foreign && !child.is::<ForeignExecutionPlan>() {
270 updated_children = true;
271 let ffi_child = FFI_ExecutionPlan::new(child, Some(runtime.clone()));
272 let foreign_child = ForeignExecutionPlan::try_from(ffi_child);
273 foreign_child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>)
274 } else {
275 Ok(child)
276 }
277 })
278 .collect::<Result<Vec<_>>>()?;
279 if updated_children {
280 Arc::clone(plan).with_new_children(children).map(Some)
281 } else {
282 Ok(None)
283 }
284}
285
286impl FFI_ExecutionPlan {
287 pub fn new(mut plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
289 if let Some(plan) = plan.downcast_ref::<ForeignExecutionPlan>() {
292 return plan.plan.clone();
293 }
294
295 if let Some(rt) = &runtime
296 && let Ok(Some(p)) = pass_runtime_to_children(&plan, rt)
297 {
298 plan = p;
299 }
300
301 let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
302 Self {
303 properties: properties_fn_wrapper,
304 children: children_fn_wrapper,
305 with_new_children: with_new_children_fn_wrapper,
306 name: name_fn_wrapper,
307 execute: execute_fn_wrapper,
308 repartitioned: repartitioned_fn_wrapper,
309 metrics: metrics_fn_wrapper,
310 partition_statistics: partition_statistics_fn_wrapper,
311 clone: clone_fn_wrapper,
312 release: release_fn_wrapper,
313 private_data: Box::into_raw(private_data) as *mut c_void,
314 library_marker_id: crate::get_library_marker_id,
315 }
316 }
317}
318
319impl Drop for FFI_ExecutionPlan {
320 fn drop(&mut self) {
321 unsafe { (self.release)(self) }
322 }
323}
324
325#[derive(Debug)]
332pub struct ForeignExecutionPlan {
333 name: String,
334 plan: FFI_ExecutionPlan,
335 properties: Arc<PlanProperties>,
336 children: Vec<Arc<dyn ExecutionPlan>>,
337}
338
339unsafe impl Send for ForeignExecutionPlan {}
340unsafe impl Sync for ForeignExecutionPlan {}
341
342impl DisplayAs for ForeignExecutionPlan {
343 fn fmt_as(
344 &self,
345 t: DisplayFormatType,
346 f: &mut std::fmt::Formatter,
347 ) -> std::fmt::Result {
348 match t {
349 DisplayFormatType::Default | DisplayFormatType::Verbose => {
350 write!(
351 f,
352 "FFI_ExecutionPlan: {}, number_of_children={}",
353 self.name,
354 self.children.len(),
355 )
356 }
357 DisplayFormatType::TreeRender => {
358 write!(f, "")
360 }
361 }
362 }
363}
364
365impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
366 type Error = DataFusionError;
367
368 fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
369 if (plan.library_marker_id)() == crate::get_library_marker_id() {
370 Ok(Arc::clone(plan.inner()))
371 } else {
372 let plan = ForeignExecutionPlan::try_from(plan.clone())?;
373 Ok(Arc::new(plan))
374 }
375 }
376}
377
378impl TryFrom<FFI_ExecutionPlan> for ForeignExecutionPlan {
379 type Error = DataFusionError;
380 fn try_from(plan: FFI_ExecutionPlan) -> Result<Self, Self::Error> {
381 unsafe {
382 let name = (plan.name)(&plan).into();
383
384 let properties: PlanProperties = (plan.properties)(&plan).try_into()?;
385
386 let children_rvec = (plan.children)(&plan);
387 let children = children_rvec
388 .iter()
389 .map(<Arc<dyn ExecutionPlan>>::try_from)
390 .collect::<Result<Vec<_>>>()?;
391
392 Ok(ForeignExecutionPlan {
393 name,
394 plan,
395 properties: Arc::new(properties),
396 children,
397 })
398 }
399 }
400}
401
402impl ExecutionPlan for ForeignExecutionPlan {
403 fn name(&self) -> &str {
404 &self.name
405 }
406
407 fn properties(&self) -> &Arc<PlanProperties> {
408 &self.properties
409 }
410
411 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
412 self.children.iter().collect()
413 }
414
415 fn with_new_children(
416 self: Arc<Self>,
417 children: Vec<Arc<dyn ExecutionPlan>>,
418 ) -> Result<Arc<dyn ExecutionPlan>> {
419 let children = children
420 .into_iter()
421 .map(|child| FFI_ExecutionPlan::new(child, None))
422 .collect::<SVec<_>>();
423 let new_plan =
424 unsafe { df_result!((self.plan.with_new_children)(&self.plan, children))? };
425
426 (&new_plan).try_into()
427 }
428
429 fn execute(
430 &self,
431 partition: usize,
432 context: Arc<TaskContext>,
433 ) -> Result<SendableRecordBatchStream> {
434 let context = FFI_TaskContext::from(context);
435 unsafe {
436 df_result!((self.plan.execute)(&self.plan, partition, context))
437 .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
438 }
439 }
440
441 fn repartitioned(
442 &self,
443 target_partitions: usize,
444 config: &ConfigOptions,
445 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
446 let config = config.into();
447 let maybe_plan: Option<FFI_ExecutionPlan> = df_result!(unsafe {
448 (self.plan.repartitioned)(&self.plan, target_partitions, config)
449 })?
450 .into();
451
452 maybe_plan
453 .map(|plan| <Arc<dyn ExecutionPlan>>::try_from(&plan))
454 .transpose()
455 }
456
457 fn metrics(&self) -> Option<MetricsSet> {
458 let ffi: Option<FFI_MetricsSet> =
459 unsafe { (self.plan.metrics)(&self.plan) }.into();
460 ffi.map(MetricsSet::from)
461 }
462
463 fn partition_statistics(&self, partition: Option<usize>) -> Result<Arc<Statistics>> {
464 let bytes = df_result!(unsafe {
465 (self.plan.partition_statistics)(&self.plan, partition.into())
466 })?;
467 Ok(Arc::new(deserialize_statistics(bytes.as_slice())?))
468 }
469}
470
471#[cfg(any(test, feature = "integration-tests"))]
472pub mod tests {
473 use datafusion_physical_plan::Partitioning;
474 use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
475
476 use super::*;
477
478 #[derive(Debug)]
479 pub struct EmptyExec {
480 props: Arc<PlanProperties>,
481 children: Vec<Arc<dyn ExecutionPlan>>,
482 metrics: Option<MetricsSet>,
483 statistics: Option<Statistics>,
484 }
485
486 impl EmptyExec {
487 pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
488 Self {
489 props: Arc::new(PlanProperties::new(
490 datafusion_physical_expr::EquivalenceProperties::new(schema),
491 Partitioning::UnknownPartitioning(3),
492 EmissionType::Incremental,
493 Boundedness::Bounded,
494 )),
495 children: Vec::default(),
496 metrics: None,
497 statistics: None,
498 }
499 }
500
501 pub fn with_metrics(mut self, metrics: MetricsSet) -> Self {
502 self.metrics = Some(metrics);
503 self
504 }
505
506 pub fn with_statistics(mut self, statistics: Statistics) -> Self {
507 self.statistics = Some(statistics);
508 self
509 }
510 }
511
512 impl DisplayAs for EmptyExec {
513 fn fmt_as(
514 &self,
515 _t: DisplayFormatType,
516 _f: &mut std::fmt::Formatter,
517 ) -> std::fmt::Result {
518 unimplemented!()
519 }
520 }
521
522 impl ExecutionPlan for EmptyExec {
523 fn name(&self) -> &'static str {
524 "empty-exec"
525 }
526
527 fn properties(&self) -> &Arc<PlanProperties> {
528 &self.props
529 }
530
531 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
532 self.children.iter().collect()
533 }
534
535 fn with_new_children(
536 self: Arc<Self>,
537 children: Vec<Arc<dyn ExecutionPlan>>,
538 ) -> Result<Arc<dyn ExecutionPlan>> {
539 Ok(Arc::new(EmptyExec {
540 props: Arc::clone(&self.props),
541 children,
542 metrics: self.metrics.clone(),
543 statistics: self.statistics.clone(),
544 }))
545 }
546
547 fn execute(
548 &self,
549 _partition: usize,
550 _context: Arc<TaskContext>,
551 ) -> Result<SendableRecordBatchStream> {
552 unimplemented!()
553 }
554
555 fn metrics(&self) -> Option<MetricsSet> {
556 self.metrics.clone()
557 }
558
559 fn partition_statistics(
560 &self,
561 _partition: Option<usize>,
562 ) -> Result<Arc<Statistics>> {
563 Ok(Arc::new(self.statistics.clone().unwrap_or_else(|| {
564 Statistics::new_unknown(self.props.eq_properties.schema())
565 })))
566 }
567 }
568
569 #[test]
570 fn test_round_trip_ffi_execution_plan() -> Result<()> {
571 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
572 arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
573 ]));
574
575 let original_plan = Arc::new(EmptyExec::new(schema));
576 let original_name = original_plan.name().to_string();
577
578 let mut local_plan = FFI_ExecutionPlan::new(original_plan, None);
579 local_plan.library_marker_id = crate::mock_foreign_marker_id;
580
581 let foreign_plan: Arc<dyn ExecutionPlan> = (&local_plan).try_into()?;
582
583 assert_eq!(original_name, foreign_plan.name());
584
585 let display = datafusion_physical_plan::display::DisplayableExecutionPlan::new(
586 foreign_plan.as_ref(),
587 );
588
589 let buf = display.one_line().to_string();
590 assert_eq!(
591 buf.trim(),
592 "FFI_ExecutionPlan: empty-exec, number_of_children=0"
593 );
594
595 Ok(())
596 }
597
598 #[test]
599 fn test_ffi_execution_plan_children() -> Result<()> {
600 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
601 arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
602 ]));
603
604 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
606 let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
607 child_local.library_marker_id = crate::mock_foreign_marker_id;
608 let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
609
610 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
611 let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
612 parent_local.library_marker_id = crate::mock_foreign_marker_id;
613 let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
614
615 assert_eq!(parent_foreign.children().len(), 0);
616 assert_eq!(child_foreign.children().len(), 0);
617
618 let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
619 assert_eq!(parent_foreign.children().len(), 1);
620
621 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
623 let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
624 child_local.library_marker_id = crate::mock_foreign_marker_id;
625 let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
626
627 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
628 let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
629 let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
630 parent_local.library_marker_id = crate::mock_foreign_marker_id;
631 let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
632
633 assert_eq!(parent_foreign.children().len(), 1);
634
635 Ok(())
636 }
637
638 #[test]
639 fn test_ffi_execution_plan_metrics_round_trip() -> Result<()> {
640 use datafusion_physical_expr_common::metrics::{Count, Metric, MetricValue};
641
642 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
643 arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
644 ]));
645
646 let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
648 let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
649 bare_local.library_marker_id = crate::mock_foreign_marker_id;
650 let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
651 assert!(bare_foreign.metrics().is_none());
652
653 let mut original_metrics = MetricsSet::new();
655 let c0 = Count::new();
656 c0.add(11);
657 original_metrics
658 .push(Arc::new(Metric::new(MetricValue::OutputRows(c0), Some(0))));
659 let c1 = Count::new();
660 c1.add(31);
661 original_metrics
662 .push(Arc::new(Metric::new(MetricValue::OutputRows(c1), Some(1))));
663
664 let metric_plan = Arc::new(EmptyExec::new(schema).with_metrics(original_metrics));
665 let mut metric_local = FFI_ExecutionPlan::new(metric_plan, None);
666 metric_local.library_marker_id = crate::mock_foreign_marker_id;
667 let metric_foreign: Arc<dyn ExecutionPlan> = (&metric_local).try_into()?;
668
669 let observed = metric_foreign.metrics().expect("metrics should be present");
670 assert_eq!(observed.output_rows(), Some(42));
671
672 Ok(())
673 }
674
675 #[test]
676 fn test_ffi_execution_plan_partition_statistics_round_trip() -> Result<()> {
677 use datafusion_common::stats::Precision;
678 use datafusion_common::{ColumnStatistics, ScalarValue};
679
680 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
681 arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Int32, true),
682 ]));
683
684 let bare_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
687 let mut bare_local = FFI_ExecutionPlan::new(bare_plan, None);
688 bare_local.library_marker_id = crate::mock_foreign_marker_id;
689 let bare_foreign: Arc<dyn ExecutionPlan> = (&bare_local).try_into()?;
690 let bare_stats = bare_foreign.partition_statistics(None)?;
691 assert_eq!(bare_stats.as_ref(), &Statistics::new_unknown(&schema));
692
693 let original_stats = Statistics {
696 num_rows: Precision::Exact(7),
697 total_byte_size: Precision::Inexact(128),
698 column_statistics: vec![ColumnStatistics {
699 null_count: Precision::Exact(1),
700 max_value: Precision::Exact(ScalarValue::Int32(Some(10))),
701 min_value: Precision::Exact(ScalarValue::Int32(Some(-3))),
702 sum_value: Precision::Absent,
703 distinct_count: Precision::Inexact(6),
704 byte_size: Precision::Exact(28),
705 }],
706 };
707 let stats_plan = Arc::new(
708 EmptyExec::new(Arc::clone(&schema)).with_statistics(original_stats.clone()),
709 );
710 let mut stats_local = FFI_ExecutionPlan::new(stats_plan, None);
711 stats_local.library_marker_id = crate::mock_foreign_marker_id;
712 let stats_foreign: Arc<dyn ExecutionPlan> = (&stats_local).try_into()?;
713
714 let observed = stats_foreign.partition_statistics(None)?;
715 assert_eq!(observed.as_ref(), &original_stats);
716
717 let observed_partition = stats_foreign.partition_statistics(Some(1))?;
718 assert_eq!(observed_partition.as_ref(), &original_stats);
719
720 Ok(())
721 }
722
723 #[test]
724 fn test_ffi_execution_plan_local_bypass() {
725 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
726 arrow::datatypes::Field::new("a", arrow::datatypes::DataType::Float32, false),
727 ]));
728
729 let plan = Arc::new(EmptyExec::new(schema));
730
731 let mut ffi_plan = FFI_ExecutionPlan::new(plan, None);
732
733 let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
735 assert!(foreign_plan.is::<EmptyExec>());
736
737 ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
739 let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
740 assert!(foreign_plan.is::<ForeignExecutionPlan>());
741 }
742}