1use std::ffi::c_void;
19use std::pin::Pin;
20use std::sync::Arc;
21
22use abi_stable::StableAbi;
23use abi_stable::std_types::{RString, RVec};
24use datafusion_common::{DataFusionError, Result};
25use datafusion_execution::{SendableRecordBatchStream, TaskContext};
26use datafusion_physical_plan::{
27 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
28};
29use tokio::runtime::Handle;
30
31use crate::execution::FFI_TaskContext;
32use crate::plan_properties::FFI_PlanProperties;
33use crate::record_batch_stream::FFI_RecordBatchStream;
34use crate::util::FFIResult;
35use crate::{df_result, rresult};
36
37#[repr(C)]
39#[derive(Debug, StableAbi)]
40pub struct FFI_ExecutionPlan {
41 pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
43
44 pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
46
47 pub name: unsafe extern "C" fn(plan: &Self) -> RString,
49
50 pub execute: unsafe extern "C" fn(
53 plan: &Self,
54 partition: usize,
55 context: FFI_TaskContext,
56 ) -> FFIResult<FFI_RecordBatchStream>,
57
58 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62 pub release: unsafe extern "C" fn(arg: &mut Self),
64
65 pub private_data: *mut c_void,
68
69 pub library_marker_id: extern "C" fn() -> usize,
73}
74
75unsafe impl Send for FFI_ExecutionPlan {}
76unsafe impl Sync for FFI_ExecutionPlan {}
77
78pub struct ExecutionPlanPrivateData {
79 pub plan: Arc<dyn ExecutionPlan>,
80 pub runtime: Option<Handle>,
81}
82
83impl FFI_ExecutionPlan {
84 fn inner(&self) -> &Arc<dyn ExecutionPlan> {
85 let private_data = self.private_data as *const ExecutionPlanPrivateData;
86 unsafe { &(*private_data).plan }
87 }
88}
89
90unsafe extern "C" fn properties_fn_wrapper(
91 plan: &FFI_ExecutionPlan,
92) -> FFI_PlanProperties {
93 plan.inner().properties().into()
94}
95
96unsafe extern "C" fn children_fn_wrapper(
97 plan: &FFI_ExecutionPlan,
98) -> RVec<FFI_ExecutionPlan> {
99 unsafe {
100 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
101 let plan = &(*private_data).plan;
102 let runtime = &(*private_data).runtime;
103
104 let children: Vec<_> = plan
105 .children()
106 .into_iter()
107 .map(|child| FFI_ExecutionPlan::new(Arc::clone(child), runtime.clone()))
108 .collect();
109
110 children.into()
111 }
112}
113
114unsafe extern "C" fn execute_fn_wrapper(
115 plan: &FFI_ExecutionPlan,
116 partition: usize,
117 context: FFI_TaskContext,
118) -> FFIResult<FFI_RecordBatchStream> {
119 unsafe {
120 let ctx = context.into();
121 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
122 let plan = &(*private_data).plan;
123 let runtime = (*private_data).runtime.clone();
124
125 rresult!(
126 plan.execute(partition, ctx)
127 .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime))
128 )
129 }
130}
131
132unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
133 plan.inner().name().into()
134}
135
136unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
137 unsafe {
138 debug_assert!(!plan.private_data.is_null());
139 let private_data =
140 Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
141 drop(private_data);
142 plan.private_data = std::ptr::null_mut();
143 }
144}
145
146unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
147 unsafe {
148 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
149 let plan_data = &(*private_data);
150
151 FFI_ExecutionPlan::new(Arc::clone(&plan_data.plan), plan_data.runtime.clone())
152 }
153}
154
155impl Clone for FFI_ExecutionPlan {
156 fn clone(&self) -> Self {
157 unsafe { (self.clone)(self) }
158 }
159}
160
161impl FFI_ExecutionPlan {
162 pub fn new(plan: Arc<dyn ExecutionPlan>, runtime: Option<Handle>) -> Self {
164 let private_data = Box::new(ExecutionPlanPrivateData { plan, runtime });
165
166 Self {
167 properties: properties_fn_wrapper,
168 children: children_fn_wrapper,
169 name: name_fn_wrapper,
170 execute: execute_fn_wrapper,
171 clone: clone_fn_wrapper,
172 release: release_fn_wrapper,
173 private_data: Box::into_raw(private_data) as *mut c_void,
174 library_marker_id: crate::get_library_marker_id,
175 }
176 }
177}
178
179impl Drop for FFI_ExecutionPlan {
180 fn drop(&mut self) {
181 unsafe { (self.release)(self) }
182 }
183}
184
185#[derive(Debug)]
192pub struct ForeignExecutionPlan {
193 name: String,
194 plan: FFI_ExecutionPlan,
195 properties: PlanProperties,
196 children: Vec<Arc<dyn ExecutionPlan>>,
197}
198
199unsafe impl Send for ForeignExecutionPlan {}
200unsafe impl Sync for ForeignExecutionPlan {}
201
202impl DisplayAs for ForeignExecutionPlan {
203 fn fmt_as(
204 &self,
205 t: DisplayFormatType,
206 f: &mut std::fmt::Formatter,
207 ) -> std::fmt::Result {
208 match t {
209 DisplayFormatType::Default | DisplayFormatType::Verbose => {
210 write!(
211 f,
212 "FFI_ExecutionPlan: {}, number_of_children={}",
213 self.name,
214 self.children.len(),
215 )
216 }
217 DisplayFormatType::TreeRender => {
218 write!(f, "")
220 }
221 }
222 }
223}
224
225impl TryFrom<&FFI_ExecutionPlan> for Arc<dyn ExecutionPlan> {
226 type Error = DataFusionError;
227
228 fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
229 if (plan.library_marker_id)() == crate::get_library_marker_id() {
230 return Ok(Arc::clone(plan.inner()));
231 }
232
233 unsafe {
234 let name = (plan.name)(plan).into();
235
236 let properties: PlanProperties = (plan.properties)(plan).try_into()?;
237
238 let children_rvec = (plan.children)(plan);
239 let children = children_rvec
240 .iter()
241 .map(<Arc<dyn ExecutionPlan>>::try_from)
242 .collect::<Result<Vec<_>>>()?;
243
244 let plan = ForeignExecutionPlan {
245 name,
246 plan: plan.clone(),
247 properties,
248 children,
249 };
250
251 Ok(Arc::new(plan))
252 }
253 }
254}
255
256impl ExecutionPlan for ForeignExecutionPlan {
257 fn name(&self) -> &str {
258 &self.name
259 }
260
261 fn as_any(&self) -> &dyn std::any::Any {
262 self
263 }
264
265 fn properties(&self) -> &PlanProperties {
266 &self.properties
267 }
268
269 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
270 self.children.iter().collect()
271 }
272
273 fn with_new_children(
274 self: Arc<Self>,
275 children: Vec<Arc<dyn ExecutionPlan>>,
276 ) -> Result<Arc<dyn ExecutionPlan>> {
277 Ok(Arc::new(ForeignExecutionPlan {
278 plan: self.plan.clone(),
279 name: self.name.clone(),
280 children,
281 properties: self.properties.clone(),
282 }))
283 }
284
285 fn execute(
286 &self,
287 partition: usize,
288 context: Arc<TaskContext>,
289 ) -> Result<SendableRecordBatchStream> {
290 let context = FFI_TaskContext::from(context);
291 unsafe {
292 df_result!((self.plan.execute)(&self.plan, partition, context))
293 .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
294 }
295 }
296}
297
298#[cfg(test)]
299pub(crate) mod tests {
300 use arrow::datatypes::{DataType, Field, Schema};
301 use datafusion::physical_plan::Partitioning;
302 use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
303
304 use super::*;
305
306 #[derive(Debug)]
307 pub struct EmptyExec {
308 props: PlanProperties,
309 children: Vec<Arc<dyn ExecutionPlan>>,
310 }
311
312 impl EmptyExec {
313 pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
314 Self {
315 props: PlanProperties::new(
316 datafusion::physical_expr::EquivalenceProperties::new(schema),
317 Partitioning::UnknownPartitioning(3),
318 EmissionType::Incremental,
319 Boundedness::Bounded,
320 ),
321 children: Vec::default(),
322 }
323 }
324 }
325
326 impl DisplayAs for EmptyExec {
327 fn fmt_as(
328 &self,
329 _t: DisplayFormatType,
330 _f: &mut std::fmt::Formatter,
331 ) -> std::fmt::Result {
332 unimplemented!()
333 }
334 }
335
336 impl ExecutionPlan for EmptyExec {
337 fn name(&self) -> &'static str {
338 "empty-exec"
339 }
340
341 fn as_any(&self) -> &dyn std::any::Any {
342 self
343 }
344
345 fn properties(&self) -> &PlanProperties {
346 &self.props
347 }
348
349 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
350 self.children.iter().collect()
351 }
352
353 fn with_new_children(
354 self: Arc<Self>,
355 children: Vec<Arc<dyn ExecutionPlan>>,
356 ) -> Result<Arc<dyn ExecutionPlan>> {
357 Ok(Arc::new(EmptyExec {
358 props: self.props.clone(),
359 children,
360 }))
361 }
362
363 fn execute(
364 &self,
365 _partition: usize,
366 _context: Arc<TaskContext>,
367 ) -> Result<SendableRecordBatchStream> {
368 unimplemented!()
369 }
370
371 fn statistics(&self) -> Result<datafusion::common::Statistics> {
372 unimplemented!()
373 }
374 }
375
376 #[test]
377 fn test_round_trip_ffi_execution_plan() -> Result<()> {
378 let schema =
379 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
380
381 let original_plan = Arc::new(EmptyExec::new(schema));
382 let original_name = original_plan.name().to_string();
383
384 let mut local_plan = FFI_ExecutionPlan::new(original_plan, None);
385 local_plan.library_marker_id = crate::mock_foreign_marker_id;
386
387 let foreign_plan: Arc<dyn ExecutionPlan> = (&local_plan).try_into()?;
388
389 assert_eq!(original_name, foreign_plan.name());
390
391 let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
392 foreign_plan.as_ref(),
393 );
394
395 let buf = display.one_line().to_string();
396 assert_eq!(
397 buf.trim(),
398 "FFI_ExecutionPlan: empty-exec, number_of_children=0"
399 );
400
401 Ok(())
402 }
403
404 #[test]
405 fn test_ffi_execution_plan_children() -> Result<()> {
406 let schema =
407 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
408
409 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
411 let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
412 child_local.library_marker_id = crate::mock_foreign_marker_id;
413 let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
414
415 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
416 let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
417 parent_local.library_marker_id = crate::mock_foreign_marker_id;
418 let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
419
420 assert_eq!(parent_foreign.children().len(), 0);
421 assert_eq!(child_foreign.children().len(), 0);
422
423 let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
424 assert_eq!(parent_foreign.children().len(), 1);
425
426 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
428 let mut child_local = FFI_ExecutionPlan::new(child_plan, None);
429 child_local.library_marker_id = crate::mock_foreign_marker_id;
430 let child_foreign = <Arc<dyn ExecutionPlan>>::try_from(&child_local)?;
431
432 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
433 let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
434 let mut parent_local = FFI_ExecutionPlan::new(parent_plan, None);
435 parent_local.library_marker_id = crate::mock_foreign_marker_id;
436 let parent_foreign = <Arc<dyn ExecutionPlan>>::try_from(&parent_local)?;
437
438 assert_eq!(parent_foreign.children().len(), 1);
439
440 Ok(())
441 }
442
443 #[test]
444 fn test_ffi_execution_plan_local_bypass() {
445 let schema =
446 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
447
448 let plan = Arc::new(EmptyExec::new(schema));
449
450 let mut ffi_plan = FFI_ExecutionPlan::new(plan, None);
451
452 let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
454 assert!(foreign_plan.as_any().downcast_ref::<EmptyExec>().is_some());
455
456 ffi_plan.library_marker_id = crate::mock_foreign_marker_id;
458 let foreign_plan: Arc<dyn ExecutionPlan> = (&ffi_plan).try_into().unwrap();
459 assert!(
460 foreign_plan
461 .as_any()
462 .downcast_ref::<ForeignExecutionPlan>()
463 .is_some()
464 );
465 }
466}