1pub(crate) mod metrics;
19pub(crate) mod partitioning;
20pub(crate) mod sort;
21
22use std::ffi::c_void;
23use std::fmt::{Display, Formatter};
24use std::hash::{DefaultHasher, Hash, Hasher};
25use std::sync::Arc;
26
27use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
28use arrow::datatypes::SchemaRef;
29use arrow_schema::ffi::FFI_ArrowSchema;
30use arrow_schema::{DataType, Field, FieldRef, Schema};
31use datafusion_common::{Result, ffi_datafusion_err};
32use datafusion_expr::ColumnarValue;
33use datafusion_expr::interval_arithmetic::Interval;
34use datafusion_expr::sort_properties::ExprProperties;
35#[expect(deprecated)]
36use datafusion_expr::statistics::Distribution;
37use datafusion_physical_expr::PhysicalExpr;
38use datafusion_physical_expr_common::physical_expr::fmt_sql;
39
40use stabby::string::String as SString;
41use stabby::vec::Vec as SVec;
42
43use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
44use crate::expr::columnar_value::FFI_ColumnarValue;
45use crate::expr::distribution::FFI_Distribution;
46use crate::expr::expr_properties::FFI_ExprProperties;
47use crate::expr::interval::FFI_Interval;
48use crate::record_batch_stream::{
49 record_batch_to_wrapped_array, wrapped_array_to_record_batch,
50};
51use crate::util::{FFI_Option, FFI_Result};
52use crate::{df_result, sresult, sresult_return};
53
54#[repr(C)]
55#[derive(Debug)]
56pub struct FFI_PhysicalExpr {
57 pub data_type: unsafe extern "C" fn(
58 &Self,
59 input_schema: WrappedSchema,
60 ) -> FFI_Result<WrappedSchema>,
61
62 pub nullable:
63 unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> FFI_Result<bool>,
64
65 pub evaluate:
66 unsafe extern "C" fn(&Self, batch: WrappedArray) -> FFI_Result<FFI_ColumnarValue>,
67
68 pub return_field: unsafe extern "C" fn(
69 &Self,
70 input_schema: WrappedSchema,
71 ) -> FFI_Result<WrappedSchema>,
72
73 pub evaluate_selection: unsafe extern "C" fn(
74 &Self,
75 batch: WrappedArray,
76 selection: WrappedArray,
77 ) -> FFI_Result<FFI_ColumnarValue>,
78
79 pub children: unsafe extern "C" fn(&Self) -> SVec<FFI_PhysicalExpr>,
80
81 pub new_with_children: unsafe extern "C" fn(
82 &Self,
83 children: &SVec<FFI_PhysicalExpr>,
84 ) -> FFI_Result<Self>,
85
86 pub evaluate_bounds: unsafe extern "C" fn(
87 &Self,
88 children: SVec<FFI_Interval>,
89 ) -> FFI_Result<FFI_Interval>,
90
91 pub propagate_constraints:
92 unsafe extern "C" fn(
93 &Self,
94 interval: FFI_Interval,
95 children: SVec<FFI_Interval>,
96 ) -> FFI_Result<FFI_Option<SVec<FFI_Interval>>>,
97
98 pub evaluate_statistics: unsafe extern "C" fn(
99 &Self,
100 children: SVec<FFI_Distribution>,
101 ) -> FFI_Result<FFI_Distribution>,
102
103 pub propagate_statistics:
104 unsafe extern "C" fn(
105 &Self,
106 parent: FFI_Distribution,
107 children: SVec<FFI_Distribution>,
108 ) -> FFI_Result<FFI_Option<SVec<FFI_Distribution>>>,
109
110 pub get_properties: unsafe extern "C" fn(
111 &Self,
112 children: SVec<FFI_ExprProperties>,
113 ) -> FFI_Result<FFI_ExprProperties>,
114
115 pub fmt_sql: unsafe extern "C" fn(&Self) -> FFI_Result<SString>,
116
117 pub snapshot: unsafe extern "C" fn(&Self) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>>,
118
119 pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
120
121 pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
122
123 pub display: unsafe extern "C" fn(&Self) -> SString,
125
126 pub hash: unsafe extern "C" fn(&Self) -> u64,
128
129 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
132
133 pub release: unsafe extern "C" fn(arg: &mut Self),
135
136 pub version: unsafe extern "C" fn() -> u64,
138
139 pub private_data: *mut c_void,
142
143 pub library_marker_id: extern "C" fn() -> usize,
146}
147
148unsafe impl Send for FFI_PhysicalExpr {}
149unsafe impl Sync for FFI_PhysicalExpr {}
150
151impl FFI_PhysicalExpr {
152 fn inner(&self) -> &Arc<dyn PhysicalExpr> {
153 unsafe {
154 let private_data = self.private_data as *const PhysicalExprPrivateData;
155 &(*private_data).expr
156 }
157 }
158}
159
160struct PhysicalExprPrivateData {
161 expr: Arc<dyn PhysicalExpr>,
162}
163
164unsafe extern "C" fn data_type_fn_wrapper(
165 expr: &FFI_PhysicalExpr,
166 input_schema: WrappedSchema,
167) -> FFI_Result<WrappedSchema> {
168 let expr = expr.inner();
169 let schema: SchemaRef = input_schema.into();
170 let data_type = expr
171 .data_type(&schema)
172 .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into))
173 .map(WrappedSchema);
174 sresult!(data_type)
175}
176
177unsafe extern "C" fn nullable_fn_wrapper(
178 expr: &FFI_PhysicalExpr,
179 input_schema: WrappedSchema,
180) -> FFI_Result<bool> {
181 let expr = expr.inner();
182 let schema: SchemaRef = input_schema.into();
183 sresult!(expr.nullable(&schema))
184}
185
186unsafe extern "C" fn evaluate_fn_wrapper(
187 expr: &FFI_PhysicalExpr,
188 batch: WrappedArray,
189) -> FFI_Result<FFI_ColumnarValue> {
190 let batch = sresult_return!(wrapped_array_to_record_batch(batch));
191 sresult!(
192 expr.inner()
193 .evaluate(&batch)
194 .and_then(FFI_ColumnarValue::try_from)
195 )
196}
197
198unsafe extern "C" fn return_field_fn_wrapper(
199 expr: &FFI_PhysicalExpr,
200 input_schema: WrappedSchema,
201) -> FFI_Result<WrappedSchema> {
202 let expr = expr.inner();
203 let schema: SchemaRef = input_schema.into();
204 sresult!(
205 expr.return_field(&schema)
206 .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into))
207 .map(WrappedSchema)
208 )
209}
210
211unsafe extern "C" fn evaluate_selection_fn_wrapper(
212 expr: &FFI_PhysicalExpr,
213 batch: WrappedArray,
214 selection: WrappedArray,
215) -> FFI_Result<FFI_ColumnarValue> {
216 let batch = sresult_return!(wrapped_array_to_record_batch(batch));
217 let selection: ArrayRef = sresult_return!(selection.try_into());
218 let selection = sresult_return!(
219 selection
220 .as_any()
221 .downcast_ref::<BooleanArray>()
222 .ok_or(ffi_datafusion_err!("Unexpected selection array type"))
223 );
224 sresult!(
225 expr.inner()
226 .evaluate_selection(&batch, selection)
227 .and_then(FFI_ColumnarValue::try_from)
228 )
229}
230
231unsafe extern "C" fn children_fn_wrapper(
232 expr: &FFI_PhysicalExpr,
233) -> SVec<FFI_PhysicalExpr> {
234 let expr = expr.inner();
235 let children = expr.children();
236 children
237 .into_iter()
238 .map(|child| FFI_PhysicalExpr::from(Arc::clone(child)))
239 .collect()
240}
241
242unsafe extern "C" fn new_with_children_fn_wrapper(
243 expr: &FFI_PhysicalExpr,
244 children: &SVec<FFI_PhysicalExpr>,
245) -> FFI_Result<FFI_PhysicalExpr> {
246 let expr = Arc::clone(expr.inner());
247 let children = children.iter().map(Into::into).collect::<Vec<_>>();
248 sresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from))
249}
250
251unsafe extern "C" fn evaluate_bounds_fn_wrapper(
252 expr: &FFI_PhysicalExpr,
253 children: SVec<FFI_Interval>,
254) -> FFI_Result<FFI_Interval> {
255 let expr = expr.inner();
256 let children = sresult_return!(
257 children
258 .into_iter()
259 .map(Interval::try_from)
260 .collect::<Result<Vec<_>>>()
261 );
262 let children_borrowed = children.iter().collect::<Vec<_>>();
263
264 sresult!(
265 expr.evaluate_bounds(&children_borrowed)
266 .and_then(FFI_Interval::try_from)
267 )
268}
269
270unsafe extern "C" fn propagate_constraints_fn_wrapper(
271 expr: &FFI_PhysicalExpr,
272 interval: FFI_Interval,
273 children: SVec<FFI_Interval>,
274) -> FFI_Result<FFI_Option<SVec<FFI_Interval>>> {
275 let expr = expr.inner();
276 let interval = sresult_return!(Interval::try_from(interval));
277 let children = sresult_return!(
278 children
279 .into_iter()
280 .map(Interval::try_from)
281 .collect::<Result<Vec<_>>>()
282 );
283 let children_borrowed = children.iter().collect::<Vec<_>>();
284
285 let result =
286 sresult_return!(expr.propagate_constraints(&interval, &children_borrowed));
287
288 let result = sresult_return!(
289 result
290 .map(|intervals| intervals
291 .into_iter()
292 .map(FFI_Interval::try_from)
293 .collect::<Result<SVec<_>>>())
294 .transpose()
295 );
296
297 FFI_Result::Ok(result.into())
298}
299
300#[expect(deprecated)]
301unsafe extern "C" fn evaluate_statistics_fn_wrapper(
302 expr: &FFI_PhysicalExpr,
303 children: SVec<FFI_Distribution>,
304) -> FFI_Result<FFI_Distribution> {
305 let expr = expr.inner();
306 let children = sresult_return!(
307 children
308 .into_iter()
309 .map(Distribution::try_from)
310 .collect::<Result<Vec<_>>>()
311 );
312 let children_borrowed = children.iter().collect::<Vec<_>>();
313 sresult!(
314 expr.evaluate_statistics(&children_borrowed)
315 .and_then(|dist| FFI_Distribution::try_from(&dist))
316 )
317}
318
319#[expect(deprecated)]
320unsafe extern "C" fn propagate_statistics_fn_wrapper(
321 expr: &FFI_PhysicalExpr,
322 parent: FFI_Distribution,
323 children: SVec<FFI_Distribution>,
324) -> FFI_Result<FFI_Option<SVec<FFI_Distribution>>> {
325 let expr = expr.inner();
326 let parent = sresult_return!(Distribution::try_from(parent));
327 let children = sresult_return!(
328 children
329 .into_iter()
330 .map(Distribution::try_from)
331 .collect::<Result<Vec<_>>>()
332 );
333 let children_borrowed = children.iter().collect::<Vec<_>>();
334
335 let result = sresult_return!(expr.propagate_statistics(&parent, &children_borrowed));
336 let result = sresult_return!(
337 result
338 .map(|dists| dists
339 .iter()
340 .map(FFI_Distribution::try_from)
341 .collect::<Result<SVec<_>>>())
342 .transpose()
343 );
344
345 FFI_Result::Ok(result.into())
346}
347
348unsafe extern "C" fn get_properties_fn_wrapper(
349 expr: &FFI_PhysicalExpr,
350 children: SVec<FFI_ExprProperties>,
351) -> FFI_Result<FFI_ExprProperties> {
352 let expr = expr.inner();
353 let children = sresult_return!(
354 children
355 .into_iter()
356 .map(ExprProperties::try_from)
357 .collect::<Result<Vec<_>>>()
358 );
359 sresult!(
360 expr.get_properties(&children)
361 .and_then(|p| FFI_ExprProperties::try_from(&p))
362 )
363}
364
365unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_Result<SString> {
366 let expr = expr.inner();
367 let result = fmt_sql(expr.as_ref()).to_string();
368 FFI_Result::Ok(result.into())
369}
370
371unsafe extern "C" fn snapshot_fn_wrapper(
372 expr: &FFI_PhysicalExpr,
373) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>> {
374 let expr = expr.inner();
375 sresult!(
376 expr.snapshot()
377 .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())
378 )
379}
380
381unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
382 let expr = expr.inner();
383 expr.snapshot_generation()
384}
385
386unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> bool {
387 let expr = expr.inner();
388 expr.is_volatile_node()
389}
390unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> SString {
391 let expr = expr.inner();
392 format!("{expr}").into()
393}
394
395unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
396 let expr = expr.inner();
397 let mut hasher = DefaultHasher::new();
398 expr.hash(&mut hasher);
399 hasher.finish()
400}
401
402unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) {
403 unsafe {
404 debug_assert!(!expr.private_data.is_null());
405 let private_data =
406 Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData);
407 drop(private_data);
408 expr.private_data = std::ptr::null_mut();
409 }
410}
411
412unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalExpr {
413 unsafe {
414 let old_private_data = expr.private_data as *const PhysicalExprPrivateData;
415
416 let private_data = Box::into_raw(Box::new(PhysicalExprPrivateData {
417 expr: Arc::clone(&(*old_private_data).expr),
418 })) as *mut c_void;
419
420 FFI_PhysicalExpr {
421 data_type: data_type_fn_wrapper,
422 nullable: nullable_fn_wrapper,
423 evaluate: evaluate_fn_wrapper,
424 return_field: return_field_fn_wrapper,
425 evaluate_selection: evaluate_selection_fn_wrapper,
426 children: children_fn_wrapper,
427 new_with_children: new_with_children_fn_wrapper,
428 evaluate_bounds: evaluate_bounds_fn_wrapper,
429 propagate_constraints: propagate_constraints_fn_wrapper,
430 evaluate_statistics: evaluate_statistics_fn_wrapper,
431 propagate_statistics: propagate_statistics_fn_wrapper,
432 get_properties: get_properties_fn_wrapper,
433 fmt_sql: fmt_sql_fn_wrapper,
434 snapshot: snapshot_fn_wrapper,
435 snapshot_generation: snapshot_generation_fn_wrapper,
436 is_volatile_node: is_volatile_node_fn_wrapper,
437 display: display_fn_wrapper,
438 hash: hash_fn_wrapper,
439 clone: clone_fn_wrapper,
440 release: release_fn_wrapper,
441 version: super::version,
442 private_data,
443 library_marker_id: crate::get_library_marker_id,
444 }
445 }
446}
447
448impl Drop for FFI_PhysicalExpr {
449 fn drop(&mut self) {
450 unsafe { (self.release)(self) }
451 }
452}
453
454impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
455 fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
457 if let Some(expr) = expr.downcast_ref::<ForeignPhysicalExpr>() {
458 return expr.expr.clone();
459 }
460
461 let private_data = Box::new(PhysicalExprPrivateData { expr });
462
463 Self {
464 data_type: data_type_fn_wrapper,
465 nullable: nullable_fn_wrapper,
466 evaluate: evaluate_fn_wrapper,
467 return_field: return_field_fn_wrapper,
468 evaluate_selection: evaluate_selection_fn_wrapper,
469 children: children_fn_wrapper,
470 new_with_children: new_with_children_fn_wrapper,
471 evaluate_bounds: evaluate_bounds_fn_wrapper,
472 propagate_constraints: propagate_constraints_fn_wrapper,
473 evaluate_statistics: evaluate_statistics_fn_wrapper,
474 propagate_statistics: propagate_statistics_fn_wrapper,
475 get_properties: get_properties_fn_wrapper,
476 fmt_sql: fmt_sql_fn_wrapper,
477 snapshot: snapshot_fn_wrapper,
478 snapshot_generation: snapshot_generation_fn_wrapper,
479 is_volatile_node: is_volatile_node_fn_wrapper,
480 display: display_fn_wrapper,
481 hash: hash_fn_wrapper,
482 clone: clone_fn_wrapper,
483 release: release_fn_wrapper,
484 version: super::version,
485 private_data: Box::into_raw(private_data) as *mut c_void,
486 library_marker_id: crate::get_library_marker_id,
487 }
488 }
489}
490
491#[derive(Debug)]
496pub struct ForeignPhysicalExpr {
497 expr: FFI_PhysicalExpr,
498 children: Vec<Arc<dyn PhysicalExpr>>,
499}
500
501unsafe impl Send for ForeignPhysicalExpr {}
502unsafe impl Sync for ForeignPhysicalExpr {}
503
504impl From<&FFI_PhysicalExpr> for Arc<dyn PhysicalExpr> {
505 fn from(ffi_expr: &FFI_PhysicalExpr) -> Self {
506 if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() {
507 Arc::clone(ffi_expr.inner())
508 } else {
509 let children = unsafe {
510 (ffi_expr.children)(ffi_expr)
511 .into_iter()
512 .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
513 .collect()
514 };
515
516 Arc::new(ForeignPhysicalExpr {
517 expr: ffi_expr.clone(),
518 children,
519 })
520 }
521 }
522}
523
524impl Clone for FFI_PhysicalExpr {
525 fn clone(&self) -> Self {
526 unsafe { (self.clone)(self) }
527 }
528}
529
530impl PhysicalExpr for ForeignPhysicalExpr {
531 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
532 unsafe {
533 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
534 df_result!((self.expr.data_type)(&self.expr, schema))
535 .and_then(|d| DataType::try_from(&d.0).map_err(Into::into))
536 }
537 }
538
539 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
540 unsafe {
541 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
542 df_result!((self.expr.nullable)(&self.expr, schema))
543 }
544 }
545
546 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
547 unsafe {
548 let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
549 df_result!((self.expr.evaluate)(&self.expr, batch))
550 .and_then(ColumnarValue::try_from)
551 }
552 }
553
554 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
555 unsafe {
556 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
557 let result = df_result!((self.expr.return_field)(&self.expr, schema))?;
558 Field::try_from(&result.0).map(Arc::new).map_err(Into::into)
559 }
560 }
561
562 fn evaluate_selection(
563 &self,
564 batch: &RecordBatch,
565 selection: &BooleanArray,
566 ) -> Result<ColumnarValue> {
567 unsafe {
568 let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
569 let selection: ArrayRef = Arc::new(selection.clone());
573 let selection = WrappedArray::try_from(&selection)?;
574 df_result!((self.expr.evaluate_selection)(&self.expr, batch, selection))
575 .and_then(ColumnarValue::try_from)
576 }
577 }
578
579 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
580 self.children.iter().collect()
581 }
582
583 fn with_new_children(
584 self: Arc<Self>,
585 children: Vec<Arc<dyn PhysicalExpr>>,
586 ) -> Result<Arc<dyn PhysicalExpr>> {
587 unsafe {
588 let children = children.into_iter().map(FFI_PhysicalExpr::from).collect();
589 df_result!(
590 (self.expr.new_with_children)(&self.expr, &children).map(|expr| <Arc<
591 dyn PhysicalExpr,
592 >>::from(
593 &expr
594 ))
595 )
596 }
597 }
598
599 fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
600 unsafe {
601 let children = children
602 .iter()
603 .map(|interval| FFI_Interval::try_from(*interval))
604 .collect::<Result<SVec<_>>>()?;
605 df_result!((self.expr.evaluate_bounds)(&self.expr, children))
606 .and_then(Interval::try_from)
607 }
608 }
609
610 fn propagate_constraints(
611 &self,
612 interval: &Interval,
613 children: &[&Interval],
614 ) -> Result<Option<Vec<Interval>>> {
615 unsafe {
616 let interval = interval.try_into()?;
617 let children = children
618 .iter()
619 .map(|interval| FFI_Interval::try_from(*interval))
620 .collect::<Result<SVec<_>>>()?;
621 let result = df_result!((self.expr.propagate_constraints)(
622 &self.expr, interval, children
623 ))?;
624
625 let result: Option<_> = result
626 .map(|intervals| {
627 intervals
628 .into_iter()
629 .map(Interval::try_from)
630 .collect::<Result<Vec<_>>>()
631 })
632 .into();
633 result.transpose()
634 }
635 }
636
637 #[expect(deprecated)]
638 fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
639 unsafe {
640 let children = children
641 .iter()
642 .map(|dist| FFI_Distribution::try_from(*dist))
643 .collect::<Result<SVec<_>>>()?;
644
645 let result =
646 df_result!((self.expr.evaluate_statistics)(&self.expr, children))?;
647 Distribution::try_from(result)
648 }
649 }
650
651 #[expect(deprecated)]
652 fn propagate_statistics(
653 &self,
654 parent: &Distribution,
655 children: &[&Distribution],
656 ) -> Result<Option<Vec<Distribution>>> {
657 unsafe {
658 let parent = FFI_Distribution::try_from(parent)?;
659 let children = children
660 .iter()
661 .map(|dist| FFI_Distribution::try_from(*dist))
662 .collect::<Result<SVec<_>>>()?;
663 let result = df_result!((self.expr.propagate_statistics)(
664 &self.expr, parent, children
665 ))?;
666
667 let result: Option<Result<Vec<Distribution>>> = result
668 .map(|dists| {
669 dists
670 .into_iter()
671 .map(Distribution::try_from)
672 .collect::<Result<Vec<_>>>()
673 })
674 .into();
675
676 result.transpose()
677 }
678 }
679
680 fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
681 unsafe {
682 let children = children
683 .iter()
684 .map(FFI_ExprProperties::try_from)
685 .collect::<Result<SVec<_>>>()?;
686 df_result!((self.expr.get_properties)(&self.expr, children))
687 .and_then(ExprProperties::try_from)
688 }
689 }
690
691 fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692 unsafe {
693 match (self.expr.fmt_sql)(&self.expr) {
694 FFI_Result::Ok(sql) => write!(f, "{sql}"),
695 FFI_Result::Err(_) => Err(std::fmt::Error),
696 }
697 }
698 }
699
700 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
701 unsafe {
702 let result = df_result!((self.expr.snapshot)(&self.expr))?;
703 Ok(result
704 .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
705 .into())
706 }
707 }
708
709 fn snapshot_generation(&self) -> u64 {
710 unsafe { (self.expr.snapshot_generation)(&self.expr) }
711 }
712
713 fn is_volatile_node(&self) -> bool {
714 unsafe { (self.expr.is_volatile_node)(&self.expr) }
715 }
716}
717
718impl Eq for ForeignPhysicalExpr {}
719impl PartialEq for ForeignPhysicalExpr {
720 fn eq(&self, other: &Self) -> bool {
721 std::ptr::eq(self, other)
723 }
724}
725impl Hash for ForeignPhysicalExpr {
726 fn hash<H: Hasher>(&self, state: &mut H) {
727 let value = unsafe { (self.expr.hash)(&self.expr) };
728 value.hash(state)
729 }
730}
731
732impl Display for ForeignPhysicalExpr {
733 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
734 let display = unsafe { (self.expr.display)(&self.expr) };
735 write!(f, "{display}")
736 }
737}
738
739#[cfg(test)]
740mod tests {
741 use std::hash::{DefaultHasher, Hash, Hasher};
742 use std::sync::Arc;
743
744 use arrow::array::{BooleanArray, RecordBatch, record_batch};
745 use datafusion_common::tree_node::DynTreeNode;
746 use datafusion_common::{DataFusionError, ScalarValue};
747 use datafusion_expr::interval_arithmetic::Interval;
748 #[expect(deprecated)]
749 use datafusion_expr::statistics::Distribution;
750 use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr};
751 use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, fmt_sql};
752
753 use crate::physical_expr::FFI_PhysicalExpr;
754
755 fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
756 let original = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
757 let mut ffi_expr = FFI_PhysicalExpr::from(Arc::clone(&original));
758 ffi_expr.library_marker_id = crate::mock_foreign_marker_id;
759
760 let foreign_expr: Arc<dyn PhysicalExpr> = (&ffi_expr).into();
761
762 (original, foreign_expr)
763 }
764
765 fn test_record_batch() -> RecordBatch {
766 record_batch!(("a", Int32, [1, 2, 3])).unwrap()
767 }
768
769 #[test]
770 fn ffi_physical_expr_fields() -> Result<(), DataFusionError> {
771 let (original, foreign_expr) = create_test_expr();
772 let schema = test_record_batch().schema();
773
774 assert_ne!(original.as_ref(), foreign_expr.as_ref());
776
777 assert_eq!(
778 original.return_field(&schema)?,
779 foreign_expr.return_field(&schema)?
780 );
781
782 assert_eq!(
783 original.data_type(&schema)?,
784 foreign_expr.data_type(&schema)?
785 );
786 assert_eq!(original.nullable(&schema)?, foreign_expr.nullable(&schema)?);
787
788 Ok(())
789 }
790 #[test]
791 fn ffi_physical_expr_evaluate() -> Result<(), DataFusionError> {
792 let (original, foreign_expr) = create_test_expr();
793 let rb = test_record_batch();
794
795 assert_eq!(
796 original.evaluate(&rb)?.to_array(3)?.as_ref(),
797 foreign_expr.evaluate(&rb)?.to_array(3)?.as_ref()
798 );
799
800 Ok(())
801 }
802 #[test]
803 fn ffi_physical_expr_selection() -> Result<(), DataFusionError> {
804 let (original, foreign_expr) = create_test_expr();
805 let rb = test_record_batch();
806
807 let selection = BooleanArray::from(vec![true, false, true]);
808
809 assert_eq!(
810 original
811 .evaluate_selection(&rb, &selection)?
812 .to_array(3)?
813 .as_ref(),
814 foreign_expr
815 .evaluate_selection(&rb, &selection)?
816 .to_array(3)?
817 .as_ref()
818 );
819 Ok(())
820 }
821
822 #[test]
823 fn ffi_physical_expr_with_children() -> Result<(), DataFusionError> {
824 let (original, _) = create_test_expr();
825 let not_expr =
826 Arc::new(NotExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
827 let mut ffi_not = FFI_PhysicalExpr::from(not_expr);
828 ffi_not.library_marker_id = crate::mock_foreign_marker_id;
829 let foreign_not: Arc<dyn PhysicalExpr> = (&ffi_not).into();
830
831 let replacement = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
832 let updated =
833 Arc::clone(&foreign_not).with_new_children(vec![Arc::clone(&replacement)])?;
834 assert_eq!(
835 format!("{updated:?}").as_str(),
836 "NotExpr { arg: Column { name: \"b\", index: 1 } }"
837 );
838
839 let updated = foreign_not
840 .with_new_arc_children(Arc::clone(&foreign_not), vec![replacement])?;
841 assert_eq!(format!("{updated}").as_str(), "NOT b@1");
842
843 Ok(())
844 }
845
846 fn create_test_negative_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
847 let (original, _) = create_test_expr();
848
849 let negative_expr =
850 Arc::new(NegativeExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
851 let mut ffi_neg = FFI_PhysicalExpr::from(Arc::clone(&negative_expr));
852 ffi_neg.library_marker_id = crate::mock_foreign_marker_id;
853 let foreign_neg: Arc<dyn PhysicalExpr> = (&ffi_neg).into();
854
855 (negative_expr, foreign_neg)
856 }
857
858 #[test]
859 fn ffi_physical_expr_bounds() -> Result<(), DataFusionError> {
860 let (negative_expr, foreign_neg) = create_test_negative_expr();
861
862 let interval =
863 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
864 let left = negative_expr.evaluate_bounds(&[&interval])?;
865 let right = foreign_neg.evaluate_bounds(&[&interval])?;
866
867 assert_eq!(left, right);
868
869 Ok(())
870 }
871
872 #[test]
873 fn ffi_physical_expr_constraints() -> Result<(), DataFusionError> {
874 let (negative_expr, foreign_neg) = create_test_negative_expr();
875
876 let interval =
877 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
878
879 let child =
880 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
881 let left = negative_expr.propagate_constraints(&interval, &[&child])?;
882 let right = foreign_neg.propagate_constraints(&interval, &[&child])?;
883
884 assert_eq!(left, right);
885 Ok(())
886 }
887
888 #[test]
889 #[expect(deprecated)]
890 fn ffi_physical_expr_statistics() -> Result<(), DataFusionError> {
891 let (negative_expr, foreign_neg) = create_test_negative_expr();
892 let interval =
893 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
894
895 for distribution in [
896 Distribution::new_uniform(interval.clone())?,
897 Distribution::new_exponential(
898 ScalarValue::Int32(Some(10)),
899 ScalarValue::Int32(Some(10)),
900 true,
901 )?,
902 Distribution::new_gaussian(
903 ScalarValue::Int32(Some(10)),
904 ScalarValue::Int32(Some(10)),
905 )?,
906 Distribution::new_generic(
907 ScalarValue::Int32(Some(10)),
908 ScalarValue::Int32(Some(10)),
909 ScalarValue::Int32(Some(10)),
910 interval,
911 )?,
912 ] {
913 let left = negative_expr.evaluate_statistics(&[&distribution])?;
914 let right = foreign_neg.evaluate_statistics(&[&distribution])?;
915
916 assert_eq!(left, right);
917
918 let left =
919 negative_expr.propagate_statistics(&distribution, &[&distribution])?;
920 let right =
921 foreign_neg.propagate_statistics(&distribution, &[&distribution])?;
922
923 assert_eq!(left, right);
924 }
925 Ok(())
926 }
927
928 #[test]
929 fn ffi_physical_expr_properties() -> Result<(), DataFusionError> {
930 let (original, foreign_expr) = create_test_expr();
931
932 let left = original.get_properties(&[])?;
933 let right = foreign_expr.get_properties(&[])?;
934
935 assert_eq!(left.sort_properties, right.sort_properties);
936 assert_eq!(left.range, right.range);
937
938 Ok(())
939 }
940
941 #[test]
942 fn ffi_physical_formatting() {
943 let (original, foreign_expr) = create_test_expr();
944
945 let left = format!("{}", fmt_sql(original.as_ref()));
946 let right = format!("{}", fmt_sql(foreign_expr.as_ref()));
947 assert_eq!(left, right);
948 }
949
950 #[test]
951 fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
952 let (original, foreign_expr) = create_test_expr();
953
954 let left = original.snapshot()?;
955 let right = foreign_expr.snapshot()?;
956 assert_eq!(left, right);
957
958 assert_eq!(
959 original.snapshot_generation(),
960 foreign_expr.snapshot_generation()
961 );
962
963 Ok(())
964 }
965
966 #[test]
967 fn ffi_physical_expr_volatility() {
968 let (original, foreign_expr) = create_test_expr();
969 assert_eq!(original.is_volatile_node(), foreign_expr.is_volatile_node());
970 }
971
972 #[test]
973 fn ffi_physical_expr_hash() {
974 let (_, foreign_1) = create_test_expr();
975 let (_, foreign_2) = create_test_expr();
976
977 assert_ne!(&foreign_1, &foreign_2);
978
979 let mut hasher = DefaultHasher::new();
980 foreign_1.as_ref().hash(&mut hasher);
981 let hash_1 = hasher.finish();
982
983 let mut hasher = DefaultHasher::new();
984 foreign_2.as_ref().hash(&mut hasher);
985 let hash_2 = hasher.finish();
986
987 assert_eq!(hash_1, hash_2);
991 }
992
993 #[test]
994 fn ffi_physical_expr_display() {
995 let (original, foreign_expr) = create_test_expr();
996 assert_eq!(format!("{original}"), format!("{foreign_expr}"));
997 }
998}