1use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21 std_types::{RResult, RString, RVec},
22 StableAbi,
23};
24use datafusion::{
25 error::DataFusionError,
26 execution::{SendableRecordBatchStream, TaskContext},
27 physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
28};
29use datafusion::{error::Result, physical_plan::DisplayFormatType};
30use tokio::runtime::Handle;
31
32use crate::{
33 df_result, plan_properties::FFI_PlanProperties,
34 record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42 pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45 pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48 pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51 pub execute: unsafe extern "C" fn(
54 plan: &Self,
55 partition: usize,
56 ) -> RResult<FFI_RecordBatchStream, RString>,
57
58 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62 pub release: unsafe extern "C" fn(arg: &mut Self),
64
65 pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74 pub plan: Arc<dyn ExecutionPlan>,
75 pub context: Arc<TaskContext>,
76 pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80 plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83 let plan = &(*private_data).plan;
84
85 plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89 plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92 let plan = &(*private_data).plan;
93 let ctx = &(*private_data).context;
94 let runtime = &(*private_data).runtime;
95
96 let children: Vec<_> = plan
97 .children()
98 .into_iter()
99 .map(|child| {
100 FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101 })
102 .collect();
103
104 children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108 plan: &FFI_ExecutionPlan,
109 partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112 let plan = &(*private_data).plan;
113 let ctx = &(*private_data).context;
114 let runtime = (*private_data).runtime.clone();
115
116 rresult!(plan
117 .execute(partition, Arc::clone(ctx))
118 .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123 let plan = &(*private_data).plan;
124
125 plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129 let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130 drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135 let plan_data = &(*private_data);
136
137 FFI_ExecutionPlan::new(
138 Arc::clone(&plan_data.plan),
139 Arc::clone(&plan_data.context),
140 plan_data.runtime.clone(),
141 )
142}
143
144impl Clone for FFI_ExecutionPlan {
145 fn clone(&self) -> Self {
146 unsafe { (self.clone)(self) }
147 }
148}
149
150impl FFI_ExecutionPlan {
151 pub fn new(
153 plan: Arc<dyn ExecutionPlan>,
154 context: Arc<TaskContext>,
155 runtime: Option<Handle>,
156 ) -> Self {
157 let private_data = Box::new(ExecutionPlanPrivateData {
158 plan,
159 context,
160 runtime,
161 });
162
163 Self {
164 properties: properties_fn_wrapper,
165 children: children_fn_wrapper,
166 name: name_fn_wrapper,
167 execute: execute_fn_wrapper,
168 clone: clone_fn_wrapper,
169 release: release_fn_wrapper,
170 private_data: Box::into_raw(private_data) as *mut c_void,
171 }
172 }
173}
174
175impl Drop for FFI_ExecutionPlan {
176 fn drop(&mut self) {
177 unsafe { (self.release)(self) }
178 }
179}
180
181#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189 name: String,
190 plan: FFI_ExecutionPlan,
191 properties: PlanProperties,
192 children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199 fn fmt_as(
200 &self,
201 t: DisplayFormatType,
202 f: &mut std::fmt::Formatter,
203 ) -> std::fmt::Result {
204 match t {
205 DisplayFormatType::Default | DisplayFormatType::Verbose => {
206 write!(
207 f,
208 "FFI_ExecutionPlan: {}, number_of_children={}",
209 self.name,
210 self.children.len(),
211 )
212 }
213 DisplayFormatType::TreeRender => {
214 write!(f, "")
216 }
217 }
218 }
219}
220
221impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
222 type Error = DataFusionError;
223
224 fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
225 unsafe {
226 let name = (plan.name)(plan).into();
227
228 let properties: PlanProperties = (plan.properties)(plan).try_into()?;
229
230 let children_rvec = (plan.children)(plan);
231 let children = children_rvec
232 .iter()
233 .map(ForeignExecutionPlan::try_from)
234 .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
235 .collect::<Result<Vec<_>>>()?;
236
237 Ok(Self {
238 name,
239 plan: plan.clone(),
240 properties,
241 children,
242 })
243 }
244 }
245}
246
247impl ExecutionPlan for ForeignExecutionPlan {
248 fn name(&self) -> &str {
249 &self.name
250 }
251
252 fn as_any(&self) -> &dyn std::any::Any {
253 self
254 }
255
256 fn properties(&self) -> &PlanProperties {
257 &self.properties
258 }
259
260 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
261 self.children
262 .iter()
263 .map(|p| p as &Arc<dyn ExecutionPlan>)
264 .collect()
265 }
266
267 fn with_new_children(
268 self: Arc<Self>,
269 children: Vec<Arc<dyn ExecutionPlan>>,
270 ) -> Result<Arc<dyn ExecutionPlan>> {
271 Ok(Arc::new(ForeignExecutionPlan {
272 plan: self.plan.clone(),
273 name: self.name.clone(),
274 children,
275 properties: self.properties.clone(),
276 }))
277 }
278
279 fn execute(
280 &self,
281 partition: usize,
282 _context: Arc<TaskContext>,
283 ) -> Result<SendableRecordBatchStream> {
284 unsafe {
285 df_result!((self.plan.execute)(&self.plan, partition))
286 .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
287 }
288 }
289}
290
291#[cfg(test)]
292mod tests {
293 use arrow::datatypes::{DataType, Field, Schema};
294 use datafusion::{
295 physical_plan::{
296 execution_plan::{Boundedness, EmissionType},
297 Partitioning,
298 },
299 prelude::SessionContext,
300 };
301
302 use super::*;
303
304 #[derive(Debug)]
305 pub struct EmptyExec {
306 props: PlanProperties,
307 children: Vec<Arc<dyn ExecutionPlan>>,
308 }
309
310 impl EmptyExec {
311 pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
312 Self {
313 props: PlanProperties::new(
314 datafusion::physical_expr::EquivalenceProperties::new(schema),
315 Partitioning::UnknownPartitioning(3),
316 EmissionType::Incremental,
317 Boundedness::Bounded,
318 ),
319 children: Vec::default(),
320 }
321 }
322 }
323
324 impl DisplayAs for EmptyExec {
325 fn fmt_as(
326 &self,
327 _t: DisplayFormatType,
328 _f: &mut std::fmt::Formatter,
329 ) -> std::fmt::Result {
330 unimplemented!()
331 }
332 }
333
334 impl ExecutionPlan for EmptyExec {
335 fn name(&self) -> &'static str {
336 "empty-exec"
337 }
338
339 fn as_any(&self) -> &dyn std::any::Any {
340 self
341 }
342
343 fn properties(&self) -> &PlanProperties {
344 &self.props
345 }
346
347 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
348 self.children.iter().collect()
349 }
350
351 fn with_new_children(
352 self: Arc<Self>,
353 children: Vec<Arc<dyn ExecutionPlan>>,
354 ) -> Result<Arc<dyn ExecutionPlan>> {
355 Ok(Arc::new(EmptyExec {
356 props: self.props.clone(),
357 children,
358 }))
359 }
360
361 fn execute(
362 &self,
363 _partition: usize,
364 _context: Arc<TaskContext>,
365 ) -> Result<SendableRecordBatchStream> {
366 unimplemented!()
367 }
368
369 fn statistics(&self) -> Result<datafusion::common::Statistics> {
370 unimplemented!()
371 }
372 }
373
374 #[test]
375 fn test_round_trip_ffi_execution_plan() -> Result<()> {
376 let schema =
377 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
378 let ctx = SessionContext::new();
379
380 let original_plan = Arc::new(EmptyExec::new(schema));
381 let original_name = original_plan.name().to_string();
382
383 let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
384
385 let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
386
387 assert!(original_name == foreign_plan.name());
388
389 let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
390 &foreign_plan,
391 );
392
393 let buf = display.one_line().to_string();
394 assert_eq!(
395 buf.trim(),
396 "FFI_ExecutionPlan: empty-exec, number_of_children=0"
397 );
398
399 Ok(())
400 }
401
402 #[test]
403 fn test_ffi_execution_plan_children() -> Result<()> {
404 let schema =
405 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
406 let ctx = SessionContext::new();
407
408 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
410 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
411 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
412
413 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
414 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
415 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
416
417 assert_eq!(parent_foreign.children().len(), 0);
418 assert_eq!(child_foreign.children().len(), 0);
419
420 let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
421 assert_eq!(parent_foreign.children().len(), 1);
422
423 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
425 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
426 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
427
428 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
429 let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
430 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
431 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
432
433 assert_eq!(parent_foreign.children().len(), 1);
434
435 Ok(())
436 }
437}