datafusion_ffi/
physical_optimizer.rs1use std::ffi::c_void;
19use std::sync::Arc;
20
21use async_trait::async_trait;
22use datafusion_common::config::ConfigOptions;
23use datafusion_common::error::Result;
24use datafusion_physical_optimizer::PhysicalOptimizerRule;
25use datafusion_physical_plan::ExecutionPlan;
26use stabby::string::String as SString;
27use tokio::runtime::Handle;
28
29use crate::config::FFI_ConfigOptions;
30use crate::execution_plan::FFI_ExecutionPlan;
31use crate::util::FFI_Result;
32use crate::{df_result, sresult_return};
33
34#[repr(C)]
36#[derive(Debug)]
37pub struct FFI_PhysicalOptimizerRule {
38 pub optimize: unsafe extern "C" fn(
39 &Self,
40 plan: &FFI_ExecutionPlan,
41 config: FFI_ConfigOptions,
42 ) -> FFI_Result<FFI_ExecutionPlan>,
43
44 pub name: unsafe extern "C" fn(&Self) -> SString,
45
46 pub schema_check: unsafe extern "C" fn(&Self) -> bool,
47
48 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
51
52 pub release: unsafe extern "C" fn(arg: &mut Self),
54
55 pub version: unsafe extern "C" fn() -> u64,
57
58 pub private_data: *mut c_void,
61
62 pub library_marker_id: extern "C" fn() -> usize,
65}
66
67unsafe impl Send for FFI_PhysicalOptimizerRule {}
68unsafe impl Sync for FFI_PhysicalOptimizerRule {}
69
70struct RulePrivateData {
71 rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
72 runtime: Option<Handle>,
73}
74
75impl FFI_PhysicalOptimizerRule {
76 fn inner(&self) -> &Arc<dyn PhysicalOptimizerRule + Send + Sync> {
77 let private_data = self.private_data as *const RulePrivateData;
78 unsafe { &(*private_data).rule }
79 }
80
81 fn runtime(&self) -> Option<Handle> {
82 let private_data = self.private_data as *const RulePrivateData;
83 unsafe { (*private_data).runtime.clone() }
84 }
85}
86
87unsafe extern "C" fn optimize_fn_wrapper(
88 rule: &FFI_PhysicalOptimizerRule,
89 plan: &FFI_ExecutionPlan,
90 config: FFI_ConfigOptions,
91) -> FFI_Result<FFI_ExecutionPlan> {
92 let runtime = rule.runtime();
93 let rule = rule.inner();
94 let plan: Arc<dyn ExecutionPlan> = sresult_return!(plan.try_into());
95 let config = sresult_return!(ConfigOptions::try_from(config));
96 let optimized_plan = sresult_return!(rule.optimize(plan, &config));
97
98 FFI_Result::Ok(FFI_ExecutionPlan::new(optimized_plan, runtime))
99}
100
101unsafe extern "C" fn name_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> SString {
102 let rule = rule.inner();
103 rule.name().into()
104}
105
106unsafe extern "C" fn schema_check_fn_wrapper(rule: &FFI_PhysicalOptimizerRule) -> bool {
107 rule.inner().schema_check()
108}
109
110unsafe extern "C" fn release_fn_wrapper(rule: &mut FFI_PhysicalOptimizerRule) {
111 unsafe {
112 debug_assert!(!rule.private_data.is_null());
113 let private_data = Box::from_raw(rule.private_data as *mut RulePrivateData);
114 drop(private_data);
115 rule.private_data = std::ptr::null_mut();
116 }
117}
118
119unsafe extern "C" fn clone_fn_wrapper(
120 rule: &FFI_PhysicalOptimizerRule,
121) -> FFI_PhysicalOptimizerRule {
122 let runtime = rule.runtime();
123 let rule = Arc::clone(rule.inner());
124
125 let private_data =
126 Box::into_raw(Box::new(RulePrivateData { rule, runtime })) as *mut c_void;
127
128 FFI_PhysicalOptimizerRule {
129 optimize: optimize_fn_wrapper,
130 name: name_fn_wrapper,
131 schema_check: schema_check_fn_wrapper,
132 clone: clone_fn_wrapper,
133 release: release_fn_wrapper,
134 version: super::version,
135 private_data,
136 library_marker_id: crate::get_library_marker_id,
137 }
138}
139
140impl Drop for FFI_PhysicalOptimizerRule {
141 fn drop(&mut self) {
142 unsafe { (self.release)(self) }
143 }
144}
145
146impl FFI_PhysicalOptimizerRule {
147 pub fn new(
149 rule: Arc<dyn PhysicalOptimizerRule + Send + Sync>,
150 runtime: Option<Handle>,
151 ) -> Self {
152 if let Some(rule) = (Arc::clone(&rule) as Arc<dyn std::any::Any>)
153 .downcast_ref::<ForeignPhysicalOptimizerRule>()
154 {
155 return rule.rule.clone();
156 }
157
158 let private_data = Box::new(RulePrivateData { rule, runtime });
159 let private_data = Box::into_raw(private_data) as *mut c_void;
160
161 Self {
162 optimize: optimize_fn_wrapper,
163 name: name_fn_wrapper,
164 schema_check: schema_check_fn_wrapper,
165 clone: clone_fn_wrapper,
166 release: release_fn_wrapper,
167 version: super::version,
168 private_data,
169 library_marker_id: crate::get_library_marker_id,
170 }
171 }
172}
173
174#[derive(Debug)]
179pub struct ForeignPhysicalOptimizerRule {
180 name: String,
181 rule: FFI_PhysicalOptimizerRule,
182}
183
184unsafe impl Send for ForeignPhysicalOptimizerRule {}
185unsafe impl Sync for ForeignPhysicalOptimizerRule {}
186
187impl From<&FFI_PhysicalOptimizerRule> for Arc<dyn PhysicalOptimizerRule + Send + Sync> {
188 fn from(rule: &FFI_PhysicalOptimizerRule) -> Self {
189 if (rule.library_marker_id)() == crate::get_library_marker_id() {
190 return Arc::clone(rule.inner());
191 }
192
193 let name: String = unsafe { (rule.name)(rule).into() };
194 Arc::new(ForeignPhysicalOptimizerRule {
195 name,
196 rule: rule.clone(),
197 }) as Arc<dyn PhysicalOptimizerRule + Send + Sync>
198 }
199}
200
201impl Clone for FFI_PhysicalOptimizerRule {
202 fn clone(&self) -> Self {
203 unsafe { (self.clone)(self) }
204 }
205}
206
207#[async_trait]
208impl PhysicalOptimizerRule for ForeignPhysicalOptimizerRule {
209 fn optimize(
210 &self,
211 plan: Arc<dyn ExecutionPlan>,
212 config: &ConfigOptions,
213 ) -> Result<Arc<dyn ExecutionPlan>> {
214 let config_options: FFI_ConfigOptions = config.into();
215 let plan = FFI_ExecutionPlan::new(plan, None);
216
217 let optimized_plan = unsafe {
218 df_result!((self.rule.optimize)(&self.rule, &plan, config_options))?
219 };
220 (&optimized_plan).try_into()
221 }
222
223 fn name(&self) -> &str {
224 &self.name
225 }
226
227 fn schema_check(&self) -> bool {
228 unsafe { (self.rule.schema_check)(&self.rule) }
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use std::sync::Arc;
235
236 use arrow::datatypes::{DataType, Field, Schema};
237 use datafusion_common::config::ConfigOptions;
238 use datafusion_common::error::Result;
239 use datafusion_physical_optimizer::PhysicalOptimizerRule;
240 use datafusion_physical_plan::ExecutionPlan;
241
242 use super::*;
243 use crate::execution_plan::tests::EmptyExec;
244
245 #[derive(Debug)]
246 struct NoOpRule {
247 schema_check: bool,
248 }
249
250 impl PhysicalOptimizerRule for NoOpRule {
251 fn optimize(
252 &self,
253 plan: Arc<dyn ExecutionPlan>,
254 _config: &ConfigOptions,
255 ) -> Result<Arc<dyn ExecutionPlan>> {
256 Ok(plan)
257 }
258
259 fn name(&self) -> &str {
260 "no_op_rule"
261 }
262
263 fn schema_check(&self) -> bool {
264 self.schema_check
265 }
266 }
267
268 fn create_test_plan() -> Arc<dyn ExecutionPlan> {
269 let schema =
270 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
271 Arc::new(EmptyExec::new(schema))
272 }
273
274 #[test]
275 fn test_round_trip_ffi_physical_optimizer_rule() -> Result<()> {
276 for expected_schema_check in [true, false] {
277 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> = Arc::new(NoOpRule {
278 schema_check: expected_schema_check,
279 });
280
281 let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
282 ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
283
284 let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
285 (&ffi_rule).into();
286
287 assert_eq!(foreign_rule.name(), "no_op_rule");
288 assert_eq!(foreign_rule.schema_check(), expected_schema_check);
289 }
290
291 Ok(())
292 }
293
294 #[test]
295 fn test_round_trip_optimize() -> Result<()> {
296 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
297 Arc::new(NoOpRule { schema_check: true });
298
299 let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
300 ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
301
302 let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
303 (&ffi_rule).into();
304
305 let plan = create_test_plan();
306 let config = ConfigOptions::new();
307
308 let optimized = foreign_rule.optimize(plan, &config)?;
309 assert_eq!(optimized.name(), "empty-exec");
310
311 Ok(())
312 }
313
314 #[test]
315 fn test_local_bypass() -> Result<()> {
316 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
317 Arc::new(NoOpRule { schema_check: true });
318
319 let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
321 let recovered: Arc<dyn PhysicalOptimizerRule + Send + Sync> = (&ffi_rule).into();
322 let any_ref: &dyn std::any::Any = &*recovered;
323 assert!(any_ref.downcast_ref::<NoOpRule>().is_some());
324
325 let rule2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
327 Arc::new(NoOpRule { schema_check: true });
328 let mut ffi_rule2 = FFI_PhysicalOptimizerRule::new(rule2, None);
329 ffi_rule2.library_marker_id = crate::mock_foreign_marker_id;
330 let recovered2: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
331 (&ffi_rule2).into();
332 let any_ref2: &dyn std::any::Any = &*recovered2;
333 assert!(
334 any_ref2
335 .downcast_ref::<ForeignPhysicalOptimizerRule>()
336 .is_some()
337 );
338
339 Ok(())
340 }
341
342 #[test]
343 fn test_clone() -> Result<()> {
344 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
345 Arc::new(NoOpRule { schema_check: true });
346
347 let ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
348 let cloned = ffi_rule.clone();
349
350 let name1: String = unsafe { (ffi_rule.name)(&ffi_rule).into() };
351 let name2: String = unsafe { (cloned.name)(&cloned).into() };
352 assert_eq!(name1, name2);
353
354 Ok(())
355 }
356
357 #[test]
358 fn test_foreign_rule_rewrap_bypass() -> Result<()> {
359 let rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
362 Arc::new(NoOpRule { schema_check: true });
363
364 let mut ffi_rule = FFI_PhysicalOptimizerRule::new(rule, None);
365 ffi_rule.library_marker_id = crate::mock_foreign_marker_id;
366
367 let foreign_rule: Arc<dyn PhysicalOptimizerRule + Send + Sync> =
368 (&ffi_rule).into();
369
370 let re_wrapped = FFI_PhysicalOptimizerRule::new(foreign_rule, None);
372 let name: String = unsafe { (re_wrapped.name)(&re_wrapped).into() };
373 assert_eq!(name, "no_op_rule");
374
375 Ok(())
376 }
377}