1use std::{ffi::c_void, pin::Pin, sync::Arc};
19
20use abi_stable::{
21 std_types::{RResult, RString, RVec},
22 StableAbi,
23};
24use datafusion::error::Result;
25use datafusion::{
26 error::DataFusionError,
27 execution::{SendableRecordBatchStream, TaskContext},
28 physical_plan::{DisplayAs, ExecutionPlan, PlanProperties},
29};
30use tokio::runtime::Handle;
31
32use crate::{
33 df_result, plan_properties::FFI_PlanProperties,
34 record_batch_stream::FFI_RecordBatchStream, rresult,
35};
36
37#[repr(C)]
39#[derive(Debug, StableAbi)]
40#[allow(non_camel_case_types)]
41pub struct FFI_ExecutionPlan {
42 pub properties: unsafe extern "C" fn(plan: &Self) -> FFI_PlanProperties,
44
45 pub children: unsafe extern "C" fn(plan: &Self) -> RVec<FFI_ExecutionPlan>,
47
48 pub name: unsafe extern "C" fn(plan: &Self) -> RString,
50
51 pub execute: unsafe extern "C" fn(
54 plan: &Self,
55 partition: usize,
56 ) -> RResult<FFI_RecordBatchStream, RString>,
57
58 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
61
62 pub release: unsafe extern "C" fn(arg: &mut Self),
64
65 pub private_data: *mut c_void,
68}
69
70unsafe impl Send for FFI_ExecutionPlan {}
71unsafe impl Sync for FFI_ExecutionPlan {}
72
73pub struct ExecutionPlanPrivateData {
74 pub plan: Arc<dyn ExecutionPlan>,
75 pub context: Arc<TaskContext>,
76 pub runtime: Option<Handle>,
77}
78
79unsafe extern "C" fn properties_fn_wrapper(
80 plan: &FFI_ExecutionPlan,
81) -> FFI_PlanProperties {
82 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
83 let plan = &(*private_data).plan;
84
85 plan.properties().into()
86}
87
88unsafe extern "C" fn children_fn_wrapper(
89 plan: &FFI_ExecutionPlan,
90) -> RVec<FFI_ExecutionPlan> {
91 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
92 let plan = &(*private_data).plan;
93 let ctx = &(*private_data).context;
94 let runtime = &(*private_data).runtime;
95
96 let children: Vec<_> = plan
97 .children()
98 .into_iter()
99 .map(|child| {
100 FFI_ExecutionPlan::new(Arc::clone(child), Arc::clone(ctx), runtime.clone())
101 })
102 .collect();
103
104 children.into()
105}
106
107unsafe extern "C" fn execute_fn_wrapper(
108 plan: &FFI_ExecutionPlan,
109 partition: usize,
110) -> RResult<FFI_RecordBatchStream, RString> {
111 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
112 let plan = &(*private_data).plan;
113 let ctx = &(*private_data).context;
114 let runtime = (*private_data).runtime.clone();
115
116 rresult!(plan
117 .execute(partition, Arc::clone(ctx))
118 .map(|rbs| FFI_RecordBatchStream::new(rbs, runtime)))
119}
120
121unsafe extern "C" fn name_fn_wrapper(plan: &FFI_ExecutionPlan) -> RString {
122 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
123 let plan = &(*private_data).plan;
124
125 plan.name().into()
126}
127
128unsafe extern "C" fn release_fn_wrapper(plan: &mut FFI_ExecutionPlan) {
129 let private_data = Box::from_raw(plan.private_data as *mut ExecutionPlanPrivateData);
130 drop(private_data);
131}
132
133unsafe extern "C" fn clone_fn_wrapper(plan: &FFI_ExecutionPlan) -> FFI_ExecutionPlan {
134 let private_data = plan.private_data as *const ExecutionPlanPrivateData;
135 let plan_data = &(*private_data);
136
137 FFI_ExecutionPlan::new(
138 Arc::clone(&plan_data.plan),
139 Arc::clone(&plan_data.context),
140 plan_data.runtime.clone(),
141 )
142}
143
144impl Clone for FFI_ExecutionPlan {
145 fn clone(&self) -> Self {
146 unsafe { (self.clone)(self) }
147 }
148}
149
150impl FFI_ExecutionPlan {
151 pub fn new(
153 plan: Arc<dyn ExecutionPlan>,
154 context: Arc<TaskContext>,
155 runtime: Option<Handle>,
156 ) -> Self {
157 let private_data = Box::new(ExecutionPlanPrivateData {
158 plan,
159 context,
160 runtime,
161 });
162
163 Self {
164 properties: properties_fn_wrapper,
165 children: children_fn_wrapper,
166 name: name_fn_wrapper,
167 execute: execute_fn_wrapper,
168 clone: clone_fn_wrapper,
169 release: release_fn_wrapper,
170 private_data: Box::into_raw(private_data) as *mut c_void,
171 }
172 }
173}
174
175impl Drop for FFI_ExecutionPlan {
176 fn drop(&mut self) {
177 unsafe { (self.release)(self) }
178 }
179}
180
181#[derive(Debug)]
188pub struct ForeignExecutionPlan {
189 name: String,
190 plan: FFI_ExecutionPlan,
191 properties: PlanProperties,
192 children: Vec<Arc<dyn ExecutionPlan>>,
193}
194
195unsafe impl Send for ForeignExecutionPlan {}
196unsafe impl Sync for ForeignExecutionPlan {}
197
198impl DisplayAs for ForeignExecutionPlan {
199 fn fmt_as(
200 &self,
201 _t: datafusion::physical_plan::DisplayFormatType,
202 f: &mut std::fmt::Formatter,
203 ) -> std::fmt::Result {
204 write!(
205 f,
206 "FFI_ExecutionPlan(number_of_children={})",
207 self.children.len(),
208 )
209 }
210}
211
212impl TryFrom<&FFI_ExecutionPlan> for ForeignExecutionPlan {
213 type Error = DataFusionError;
214
215 fn try_from(plan: &FFI_ExecutionPlan) -> Result<Self, Self::Error> {
216 unsafe {
217 let name = (plan.name)(plan).into();
218
219 let properties: PlanProperties = (plan.properties)(plan).try_into()?;
220
221 let children_rvec = (plan.children)(plan);
222 let children = children_rvec
223 .iter()
224 .map(ForeignExecutionPlan::try_from)
225 .map(|child| child.map(|c| Arc::new(c) as Arc<dyn ExecutionPlan>))
226 .collect::<Result<Vec<_>>>()?;
227
228 Ok(Self {
229 name,
230 plan: plan.clone(),
231 properties,
232 children,
233 })
234 }
235 }
236}
237
238impl ExecutionPlan for ForeignExecutionPlan {
239 fn name(&self) -> &str {
240 &self.name
241 }
242
243 fn as_any(&self) -> &dyn std::any::Any {
244 self
245 }
246
247 fn properties(&self) -> &PlanProperties {
248 &self.properties
249 }
250
251 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
252 self.children
253 .iter()
254 .map(|p| p as &Arc<dyn ExecutionPlan>)
255 .collect()
256 }
257
258 fn with_new_children(
259 self: Arc<Self>,
260 children: Vec<Arc<dyn ExecutionPlan>>,
261 ) -> Result<Arc<dyn ExecutionPlan>> {
262 Ok(Arc::new(ForeignExecutionPlan {
263 plan: self.plan.clone(),
264 name: self.name.clone(),
265 children,
266 properties: self.properties.clone(),
267 }))
268 }
269
270 fn execute(
271 &self,
272 partition: usize,
273 _context: Arc<TaskContext>,
274 ) -> Result<SendableRecordBatchStream> {
275 unsafe {
276 df_result!((self.plan.execute)(&self.plan, partition))
277 .map(|stream| Pin::new(Box::new(stream)) as SendableRecordBatchStream)
278 }
279 }
280}
281
282#[cfg(test)]
283mod tests {
284 use arrow::datatypes::{DataType, Field, Schema};
285 use datafusion::{
286 physical_plan::{
287 execution_plan::{Boundedness, EmissionType},
288 Partitioning,
289 },
290 prelude::SessionContext,
291 };
292
293 use super::*;
294
295 #[derive(Debug)]
296 pub struct EmptyExec {
297 props: PlanProperties,
298 children: Vec<Arc<dyn ExecutionPlan>>,
299 }
300
301 impl EmptyExec {
302 pub fn new(schema: arrow::datatypes::SchemaRef) -> Self {
303 Self {
304 props: PlanProperties::new(
305 datafusion::physical_expr::EquivalenceProperties::new(schema),
306 Partitioning::UnknownPartitioning(3),
307 EmissionType::Incremental,
308 Boundedness::Bounded,
309 ),
310 children: Vec::default(),
311 }
312 }
313 }
314
315 impl DisplayAs for EmptyExec {
316 fn fmt_as(
317 &self,
318 _t: datafusion::physical_plan::DisplayFormatType,
319 _f: &mut std::fmt::Formatter,
320 ) -> std::fmt::Result {
321 unimplemented!()
322 }
323 }
324
325 impl ExecutionPlan for EmptyExec {
326 fn name(&self) -> &'static str {
327 "empty-exec"
328 }
329
330 fn as_any(&self) -> &dyn std::any::Any {
331 self
332 }
333
334 fn properties(&self) -> &PlanProperties {
335 &self.props
336 }
337
338 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
339 self.children.iter().collect()
340 }
341
342 fn with_new_children(
343 self: Arc<Self>,
344 children: Vec<Arc<dyn ExecutionPlan>>,
345 ) -> Result<Arc<dyn ExecutionPlan>> {
346 Ok(Arc::new(EmptyExec {
347 props: self.props.clone(),
348 children,
349 }))
350 }
351
352 fn execute(
353 &self,
354 _partition: usize,
355 _context: Arc<TaskContext>,
356 ) -> Result<SendableRecordBatchStream> {
357 unimplemented!()
358 }
359
360 fn statistics(&self) -> Result<datafusion::common::Statistics> {
361 unimplemented!()
362 }
363 }
364
365 #[test]
366 fn test_round_trip_ffi_execution_plan() -> Result<()> {
367 let schema =
368 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
369 let ctx = SessionContext::new();
370
371 let original_plan = Arc::new(EmptyExec::new(schema));
372 let original_name = original_plan.name().to_string();
373
374 let local_plan = FFI_ExecutionPlan::new(original_plan, ctx.task_ctx(), None);
375
376 let foreign_plan: ForeignExecutionPlan = (&local_plan).try_into()?;
377
378 assert!(original_name == foreign_plan.name());
379
380 let display = datafusion::physical_plan::display::DisplayableExecutionPlan::new(
381 &foreign_plan,
382 );
383
384 let buf = display.one_line().to_string();
385 assert_eq!(buf.trim(), "FFI_ExecutionPlan(number_of_children=0)");
386
387 Ok(())
388 }
389
390 #[test]
391 fn test_ffi_execution_plan_children() -> Result<()> {
392 let schema =
393 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
394 let ctx = SessionContext::new();
395
396 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
398 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
399 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
400
401 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
402 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
403 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
404
405 assert_eq!(parent_foreign.children().len(), 0);
406 assert_eq!(child_foreign.children().len(), 0);
407
408 let parent_foreign = parent_foreign.with_new_children(vec![child_foreign])?;
409 assert_eq!(parent_foreign.children().len(), 1);
410
411 let child_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
413 let child_local = FFI_ExecutionPlan::new(child_plan, ctx.task_ctx(), None);
414 let child_foreign = Arc::new(ForeignExecutionPlan::try_from(&child_local)?);
415
416 let parent_plan = Arc::new(EmptyExec::new(Arc::clone(&schema)));
417 let parent_plan = parent_plan.with_new_children(vec![child_foreign])?;
418 let parent_local = FFI_ExecutionPlan::new(parent_plan, ctx.task_ctx(), None);
419 let parent_foreign = Arc::new(ForeignExecutionPlan::try_from(&parent_local)?);
420
421 assert_eq!(parent_foreign.children().len(), 1);
422
423 Ok(())
424 }
425}