1use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21 std_types::{RResult, RString, RVec},
22 StableAbi,
23};
24use datafusion::{
25 error::DataFusionError,
26 execution::{SendableRecordBatchStream, TaskContext},
27 physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
28};
29use datafusion::{error::Result, physical_plan::DisplayFormatType};
30use tokio::runtime::Handle;
31
32use crate::{
33 df_result, plan_properties::FFI_PlanProperties,
34 record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42 pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45 pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48 pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51 pub execute: unsafe extern "C" fn(
54 plan: &Self,
55 partition: usize,
56 ) -> RResult<FFI_RecordBatchStream, RString>,
57
58 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62 pub release: unsafe extern "C" fn(arg: &mut Self),
64
65 pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74 pub plan: Arc<dyn ExecutionPlan>,
75 pub context: Arc<TaskContext>,
76 pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80 plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83 let plan = &(*private_data).plan;
84
85 plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89 plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92 let plan = &(*private_data).plan;
93 let ctx = &(*private_data).context;
94 let runtime = &(*private_data).runtime;
95
96 let children: Vec<_> = plan
97 .children()
98 .into_iter()
99 .map(|child| {
100 FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101 })
102 .collect();
103
104 children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108 plan: &FFI_ExecutionPlan,
109 partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112 let plan = &(*private_data).plan;
113 let ctx = &(*private_data).context;
114 let runtime = (*private_data).runtime.clone();
115
116 rresult!(plan
117 .execute(partition, Arc::clone(ctx))
118 .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123 let plan = &(*private_data).plan;
124
125 plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129 let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130 drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135 let plan_data = &(*private_data);
136
137 FFI_ExecutionPlan::new(
138 Arc::clone(&plan_data.plan),
139 Arc::clone(&plan_data.context),
140 plan_data.runtime.clone(),
141 )
142}
143
144impl Clone for FFI_ExecutionPlan {
145 fn clone(&self) -> Self {
146 unsafe { (self.clone)(self) }
147 }
148}
149
150impl FFI_ExecutionPlan {
151 pub fn new(
153 plan: Arc<dyn ExecutionPlan>,
154 context: Arc<TaskContext>,
155 runtime: Option<Handle>,
156 ) -> Self {
157 let private_data = Box::new(ExecutionPlanPrivateData {
158 plan,
159 context,
160 runtime,
161 });
162
163 Self {
164 properties: properties_fn_wrapper,
165 children: children_fn_wrapper,
166 name: name_fn_wrapper,
167 execute: execute_fn_wrapper,
168 clone: clone_fn_wrapper,
169 release: release_fn_wrapper,
170 private_data: Box::into_raw(private_data) as *mut c_void,
171 }
172 }
173}
174
175impl Drop for FFI_ExecutionPlan {
176 fn drop(&mut self) {
177 unsafe { (self.release)(self) }
178 }
179}
180
181#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189 name: String,
190 plan: FFI_ExecutionPlan,
191 properties: PlanProperties,
192 children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199 fn fmt_as(
200 &self,
201 t: DisplayFormatType,
202 f: &mut std::fmt::Formatter,
203 ) -> std::fmt::Result {
204 match t {
205 DisplayFormatType::Default | DisplayFormatType::Verbose => {
206 write!(
207 f,
208 "FFI_ExecutionPlan(number_of_children={})",
209 self.children.len(),
210 )
211 }
212 DisplayFormatType::TreeRender => {
213 write!(f, "")
215 }
216 }
217 }
218}
219
220impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
221 type Error = DataFusionError;
222
223 fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
224 unsafe {
225 let name = (plan.name)(plan).into();
226
227 let properties: PlanProperties = (plan.properties)(plan).try_into()?;
228
229 let children_rvec = (plan.children)(plan);
230 let children = children_rvec
231 .iter()
232 .map(ForeignExecutionPlan::try_from)
233 .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
234 .collect::<Result<Vec<_>>>()?;
235
236 Ok(Self {
237 name,
238 plan: plan.clone(),
239 properties,
240 children,
241 })
242 }
243 }
244}
245
246impl ExecutionPlan for ForeignExecutionPlan {
247 fn name(&self) -> &str {
248 &self.name
249 }
250
251 fn as_any(&self) -> &dyn std::any::Any {
252 self
253 }
254
255 fn properties(&self) -> &PlanProperties {
256 &self.properties
257 }
258
259 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
260 self.children
261 .iter()
262 .map(|p| p as &Arc<dyn ExecutionPlan>)
263 .collect()
264 }
265
266 fn with_new_children(
267 self: Arc<Self>,
268 children: Vec<Arc<dyn ExecutionPlan>>,
269 ) -> Result<Arc<dyn ExecutionPlan>> {
270 Ok(Arc::new(ForeignExecutionPlan {
271 plan: self.plan.clone(),
272 name: self.name.clone(),
273 children,
274 properties: self.properties.clone(),
275 }))
276 }
277
278 fn execute(
279 &self,
280 partition: usize,
281 _context: Arc<TaskContext>,
282 ) -> Result<SendableRecordBatchStream> {
283 unsafe {
284 df_result!((self.plan.execute)(&self.plan, partition))
285 .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
286 }
287 }
288}
289
290#[cfg(test)]
291mod tests {
292 use arrow::datatypes::{DataType, Field, Schema};
293 use datafusion::{
294 physical_plan::{
295 execution_plan::{Boundedness, EmissionType},
296 Partitioning,
297 },
298 prelude::SessionContext,
299 };
300
301 use super::*;
302
303 #[derive(Debug)]
304 pub struct EmptyExec {
305 props: PlanProperties,
306 children: Vec<Arc<dyn ExecutionPlan>>,
307 }
308
309 impl EmptyExec {
310 pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
311 Self {
312 props: PlanProperties::new(
313 datafusion::physical_expr::EquivalenceProperties::new(schema),
314 Partitioning::UnknownPartitioning(3),
315 EmissionType::Incremental,
316 Boundedness::Bounded,
317 ),
318 children: Vec::default(),
319 }
320 }
321 }
322
323 impl DisplayAs for EmptyExec {
324 fn fmt_as(
325 &self,
326 _t: DisplayFormatType,
327 _f: &mut std::fmt::Formatter,
328 ) -> std::fmt::Result {
329 unimplemented!()
330 }
331 }
332
333 impl ExecutionPlan for EmptyExec {
334 fn name(&self) -> &'static str {
335 "empty-exec"
336 }
337
338 fn as_any(&self) -> &dyn std::any::Any {
339 self
340 }
341
342 fn properties(&self) -> &PlanProperties {
343 &self.props
344 }
345
346 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
347 self.children.iter().collect()
348 }
349
350 fn with_new_children(
351 self: Arc<Self>,
352 children: Vec<Arc<dyn ExecutionPlan>>,
353 ) -> Result<Arc<dyn ExecutionPlan>> {
354 Ok(Arc::new(EmptyExec {
355 props: self.props.clone(),
356 children,
357 }))
358 }
359
360 fn execute(
361 &self,
362 _partition: usize,
363 _context: Arc<TaskContext>,
364 ) -> Result<SendableRecordBatchStream> {
365 unimplemented!()
366 }
367
368 fn statistics(&self) -> Result<datafusion::common::Statistics> {
369 unimplemented!()
370 }
371 }
372
373 #[test]
374 fn test_round_trip_ffi_execution_plan() -> Result<()> {
375 let schema =
376 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
377 let ctx = SessionContext::new();
378
379 let original_plan = Arc::new(EmptyExec::new(schema));
380 let original_name = original_plan.name().to_string();
381
382 let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
383
384 let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
385
386 assert!(original_name == foreign_plan.name());
387
388 let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
389 &foreign_plan,
390 );
391
392 let buf = display.one_line().to_string();
393 assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)");
394
395 Ok(())
396 }
397
398 #[test]
399 fn test_ffi_execution_plan_children() -> Result<()> {
400 let schema =
401 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
402 let ctx = SessionContext::new();
403
404 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
406 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
407 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
408
409 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
410 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
411 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
412
413 assert_eq!(parent_foreign.children().len(), 0);
414 assert_eq!(child_foreign.children().len(), 0);
415
416 let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
417 assert_eq!(parent_foreign.children().len(), 1);
418
419 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
421 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
422 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
423
424 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
425 let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
426 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
427 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
428
429 assert_eq!(parent_foreign.children().len(), 1);
430
431 Ok(())
432 }
433}