1pub(crate) mod partitioning;
19pub(crate) mod sort;
20
21use std::any::Any;
22use std::ffi::c_void;
23use std::fmt::{Display, Formatter};
24use std::hash::{DefaultHasher, Hash, Hasher};
25use std::sync::Arc;
26
27use abi_stable::StableAbi;
28use abi_stable::std_types::{ROption, RResult, RString, RVec};
29use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
30use arrow::datatypes::SchemaRef;
31use arrow_schema::ffi::FFI_ArrowSchema;
32use arrow_schema::{DataType, Field, FieldRef, Schema};
33use datafusion_common::{Result, ffi_datafusion_err};
34use datafusion_expr::ColumnarValue;
35use datafusion_expr::interval_arithmetic::Interval;
36use datafusion_expr::sort_properties::ExprProperties;
37use datafusion_expr::statistics::Distribution;
38use datafusion_physical_expr::PhysicalExpr;
39use datafusion_physical_expr_common::physical_expr::fmt_sql;
40
41use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
42use crate::expr::columnar_value::FFI_ColumnarValue;
43use crate::expr::distribution::FFI_Distribution;
44use crate::expr::expr_properties::FFI_ExprProperties;
45use crate::expr::interval::FFI_Interval;
46use crate::record_batch_stream::{
47 record_batch_to_wrapped_array, wrapped_array_to_record_batch,
48};
49use crate::util::FFIResult;
50use crate::{df_result, rresult, rresult_return};
51
52#[repr(C)]
53#[derive(Debug, StableAbi)]
54pub struct FFI_PhysicalExpr {
55 pub data_type: unsafe extern "C" fn(
56 &Self,
57 input_schema: WrappedSchema,
58 ) -> FFIResult<WrappedSchema>,
59
60 pub nullable:
61 unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> FFIResult<bool>,
62
63 pub evaluate:
64 unsafe extern "C" fn(&Self, batch: WrappedArray) -> FFIResult<FFI_ColumnarValue>,
65
66 pub return_field: unsafe extern "C" fn(
67 &Self,
68 input_schema: WrappedSchema,
69 ) -> FFIResult<WrappedSchema>,
70
71 pub evaluate_selection: unsafe extern "C" fn(
72 &Self,
73 batch: WrappedArray,
74 selection: WrappedArray,
75 ) -> FFIResult<FFI_ColumnarValue>,
76
77 pub children: unsafe extern "C" fn(&Self) -> RVec<FFI_PhysicalExpr>,
78
79 pub new_with_children:
80 unsafe extern "C" fn(&Self, children: &RVec<FFI_PhysicalExpr>) -> FFIResult<Self>,
81
82 pub evaluate_bounds: unsafe extern "C" fn(
83 &Self,
84 children: RVec<FFI_Interval>,
85 ) -> FFIResult<FFI_Interval>,
86
87 pub propagate_constraints:
88 unsafe extern "C" fn(
89 &Self,
90 interval: FFI_Interval,
91 children: RVec<FFI_Interval>,
92 ) -> FFIResult<ROption<RVec<FFI_Interval>>>,
93
94 pub evaluate_statistics: unsafe extern "C" fn(
95 &Self,
96 children: RVec<FFI_Distribution>,
97 ) -> FFIResult<FFI_Distribution>,
98
99 pub propagate_statistics:
100 unsafe extern "C" fn(
101 &Self,
102 parent: FFI_Distribution,
103 children: RVec<FFI_Distribution>,
104 ) -> FFIResult<ROption<RVec<FFI_Distribution>>>,
105
106 pub get_properties: unsafe extern "C" fn(
107 &Self,
108 children: RVec<FFI_ExprProperties>,
109 ) -> FFIResult<FFI_ExprProperties>,
110
111 pub fmt_sql: unsafe extern "C" fn(&Self) -> FFIResult<RString>,
112
113 pub snapshot: unsafe extern "C" fn(&Self) -> FFIResult<ROption<FFI_PhysicalExpr>>,
114
115 pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
116
117 pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
118
119 pub display: unsafe extern "C" fn(&Self) -> RString,
121
122 pub hash: unsafe extern "C" fn(&Self) -> u64,
124
125 pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
128
129 pub release: unsafe extern "C" fn(arg: &mut Self),
131
132 pub version: unsafe extern "C" fn() -> u64,
134
135 pub private_data: *mut c_void,
138
139 pub library_marker_id: extern "C" fn() -> usize,
142}
143
144unsafe impl Send for FFI_PhysicalExpr {}
145unsafe impl Sync for FFI_PhysicalExpr {}
146
147impl FFI_PhysicalExpr {
148 fn inner(&self) -> &Arc<dyn PhysicalExpr> {
149 unsafe {
150 let private_data = self.private_data as *const PhysicalExprPrivateData;
151 &(*private_data).expr
152 }
153 }
154}
155
156struct PhysicalExprPrivateData {
157 expr: Arc<dyn PhysicalExpr>,
158}
159
160unsafe extern "C" fn data_type_fn_wrapper(
161 expr: &FFI_PhysicalExpr,
162 input_schema: WrappedSchema,
163) -> FFIResult<WrappedSchema> {
164 let expr = expr.inner();
165 let schema: SchemaRef = input_schema.into();
166 let data_type = expr
167 .data_type(&schema)
168 .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into))
169 .map(WrappedSchema);
170 rresult!(data_type)
171}
172
173unsafe extern "C" fn nullable_fn_wrapper(
174 expr: &FFI_PhysicalExpr,
175 input_schema: WrappedSchema,
176) -> FFIResult<bool> {
177 let expr = expr.inner();
178 let schema: SchemaRef = input_schema.into();
179 rresult!(expr.nullable(&schema))
180}
181
182unsafe extern "C" fn evaluate_fn_wrapper(
183 expr: &FFI_PhysicalExpr,
184 batch: WrappedArray,
185) -> FFIResult<FFI_ColumnarValue> {
186 let batch = rresult_return!(wrapped_array_to_record_batch(batch));
187 rresult!(
188 expr.inner()
189 .evaluate(&batch)
190 .and_then(FFI_ColumnarValue::try_from)
191 )
192}
193
194unsafe extern "C" fn return_field_fn_wrapper(
195 expr: &FFI_PhysicalExpr,
196 input_schema: WrappedSchema,
197) -> FFIResult<WrappedSchema> {
198 let expr = expr.inner();
199 let schema: SchemaRef = input_schema.into();
200 rresult!(
201 expr.return_field(&schema)
202 .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into))
203 .map(WrappedSchema)
204 )
205}
206
207unsafe extern "C" fn evaluate_selection_fn_wrapper(
208 expr: &FFI_PhysicalExpr,
209 batch: WrappedArray,
210 selection: WrappedArray,
211) -> FFIResult<FFI_ColumnarValue> {
212 let batch = rresult_return!(wrapped_array_to_record_batch(batch));
213 let selection: ArrayRef = rresult_return!(selection.try_into());
214 let selection = rresult_return!(
215 selection
216 .as_any()
217 .downcast_ref::<BooleanArray>()
218 .ok_or(ffi_datafusion_err!("Unexpected selection array type"))
219 );
220 rresult!(
221 expr.inner()
222 .evaluate_selection(&batch, selection)
223 .and_then(FFI_ColumnarValue::try_from)
224 )
225}
226
227unsafe extern "C" fn children_fn_wrapper(
228 expr: &FFI_PhysicalExpr,
229) -> RVec<FFI_PhysicalExpr> {
230 let expr = expr.inner();
231 let children = expr.children();
232 children
233 .into_iter()
234 .map(|child| FFI_PhysicalExpr::from(Arc::clone(child)))
235 .collect()
236}
237
238unsafe extern "C" fn new_with_children_fn_wrapper(
239 expr: &FFI_PhysicalExpr,
240 children: &RVec<FFI_PhysicalExpr>,
241) -> FFIResult<FFI_PhysicalExpr> {
242 let expr = Arc::clone(expr.inner());
243 let children = children.iter().map(Into::into).collect::<Vec<_>>();
244 rresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from))
245}
246
247unsafe extern "C" fn evaluate_bounds_fn_wrapper(
248 expr: &FFI_PhysicalExpr,
249 children: RVec<FFI_Interval>,
250) -> FFIResult<FFI_Interval> {
251 let expr = expr.inner();
252 let children = rresult_return!(
253 children
254 .into_iter()
255 .map(Interval::try_from)
256 .collect::<Result<Vec<_>>>()
257 );
258 let children_borrowed = children.iter().collect::<Vec<_>>();
259
260 rresult!(
261 expr.evaluate_bounds(&children_borrowed)
262 .and_then(FFI_Interval::try_from)
263 )
264}
265
266unsafe extern "C" fn propagate_constraints_fn_wrapper(
267 expr: &FFI_PhysicalExpr,
268 interval: FFI_Interval,
269 children: RVec<FFI_Interval>,
270) -> FFIResult<ROption<RVec<FFI_Interval>>> {
271 let expr = expr.inner();
272 let interval = rresult_return!(Interval::try_from(interval));
273 let children = rresult_return!(
274 children
275 .into_iter()
276 .map(Interval::try_from)
277 .collect::<Result<Vec<_>>>()
278 );
279 let children_borrowed = children.iter().collect::<Vec<_>>();
280
281 let result =
282 rresult_return!(expr.propagate_constraints(&interval, &children_borrowed));
283
284 let result = rresult_return!(
285 result
286 .map(|intervals| intervals
287 .into_iter()
288 .map(FFI_Interval::try_from)
289 .collect::<Result<RVec<_>>>())
290 .transpose()
291 );
292
293 RResult::ROk(result.into())
294}
295
296unsafe extern "C" fn evaluate_statistics_fn_wrapper(
297 expr: &FFI_PhysicalExpr,
298 children: RVec<FFI_Distribution>,
299) -> FFIResult<FFI_Distribution> {
300 let expr = expr.inner();
301 let children = rresult_return!(
302 children
303 .into_iter()
304 .map(Distribution::try_from)
305 .collect::<Result<Vec<_>>>()
306 );
307 let children_borrowed = children.iter().collect::<Vec<_>>();
308 rresult!(
309 expr.evaluate_statistics(&children_borrowed)
310 .and_then(|dist| FFI_Distribution::try_from(&dist))
311 )
312}
313
314unsafe extern "C" fn propagate_statistics_fn_wrapper(
315 expr: &FFI_PhysicalExpr,
316 parent: FFI_Distribution,
317 children: RVec<FFI_Distribution>,
318) -> FFIResult<ROption<RVec<FFI_Distribution>>> {
319 let expr = expr.inner();
320 let parent = rresult_return!(Distribution::try_from(parent));
321 let children = rresult_return!(
322 children
323 .into_iter()
324 .map(Distribution::try_from)
325 .collect::<Result<Vec<_>>>()
326 );
327 let children_borrowed = children.iter().collect::<Vec<_>>();
328
329 let result = rresult_return!(expr.propagate_statistics(&parent, &children_borrowed));
330 let result = rresult_return!(
331 result
332 .map(|dists| dists
333 .iter()
334 .map(FFI_Distribution::try_from)
335 .collect::<Result<RVec<_>>>())
336 .transpose()
337 );
338
339 RResult::ROk(result.into())
340}
341
342unsafe extern "C" fn get_properties_fn_wrapper(
343 expr: &FFI_PhysicalExpr,
344 children: RVec<FFI_ExprProperties>,
345) -> FFIResult<FFI_ExprProperties> {
346 let expr = expr.inner();
347 let children = rresult_return!(
348 children
349 .into_iter()
350 .map(ExprProperties::try_from)
351 .collect::<Result<Vec<_>>>()
352 );
353 rresult!(
354 expr.get_properties(&children)
355 .and_then(|p| FFI_ExprProperties::try_from(&p))
356 )
357}
358
359unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFIResult<RString> {
360 let expr = expr.inner();
361 let result = fmt_sql(expr.as_ref()).to_string();
362 RResult::ROk(result.into())
363}
364
365unsafe extern "C" fn snapshot_fn_wrapper(
366 expr: &FFI_PhysicalExpr,
367) -> FFIResult<ROption<FFI_PhysicalExpr>> {
368 let expr = expr.inner();
369 rresult!(
370 expr.snapshot()
371 .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())
372 )
373}
374
375unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
376 let expr = expr.inner();
377 expr.snapshot_generation()
378}
379
380unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> bool {
381 let expr = expr.inner();
382 expr.is_volatile_node()
383}
384unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> RString {
385 let expr = expr.inner();
386 format!("{expr}").into()
387}
388
389unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
390 let expr = expr.inner();
391 let mut hasher = DefaultHasher::new();
392 expr.hash(&mut hasher);
393 hasher.finish()
394}
395
396unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) {
397 unsafe {
398 debug_assert!(!expr.private_data.is_null());
399 let private_data =
400 Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData);
401 drop(private_data);
402 expr.private_data = std::ptr::null_mut();
403 }
404}
405
406unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalExpr {
407 unsafe {
408 let old_private_data = expr.private_data as *const PhysicalExprPrivateData;
409
410 let private_data = Box::into_raw(Box::new(PhysicalExprPrivateData {
411 expr: Arc::clone(&(*old_private_data).expr),
412 })) as *mut c_void;
413
414 FFI_PhysicalExpr {
415 data_type: data_type_fn_wrapper,
416 nullable: nullable_fn_wrapper,
417 evaluate: evaluate_fn_wrapper,
418 return_field: return_field_fn_wrapper,
419 evaluate_selection: evaluate_selection_fn_wrapper,
420 children: children_fn_wrapper,
421 new_with_children: new_with_children_fn_wrapper,
422 evaluate_bounds: evaluate_bounds_fn_wrapper,
423 propagate_constraints: propagate_constraints_fn_wrapper,
424 evaluate_statistics: evaluate_statistics_fn_wrapper,
425 propagate_statistics: propagate_statistics_fn_wrapper,
426 get_properties: get_properties_fn_wrapper,
427 fmt_sql: fmt_sql_fn_wrapper,
428 snapshot: snapshot_fn_wrapper,
429 snapshot_generation: snapshot_generation_fn_wrapper,
430 is_volatile_node: is_volatile_node_fn_wrapper,
431 display: display_fn_wrapper,
432 hash: hash_fn_wrapper,
433 clone: clone_fn_wrapper,
434 release: release_fn_wrapper,
435 version: super::version,
436 private_data,
437 library_marker_id: crate::get_library_marker_id,
438 }
439 }
440}
441
442impl Drop for FFI_PhysicalExpr {
443 fn drop(&mut self) {
444 unsafe { (self.release)(self) }
445 }
446}
447
448impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
449 fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
451 let private_data = Box::new(PhysicalExprPrivateData { expr });
452
453 Self {
454 data_type: data_type_fn_wrapper,
455 nullable: nullable_fn_wrapper,
456 evaluate: evaluate_fn_wrapper,
457 return_field: return_field_fn_wrapper,
458 evaluate_selection: evaluate_selection_fn_wrapper,
459 children: children_fn_wrapper,
460 new_with_children: new_with_children_fn_wrapper,
461 evaluate_bounds: evaluate_bounds_fn_wrapper,
462 propagate_constraints: propagate_constraints_fn_wrapper,
463 evaluate_statistics: evaluate_statistics_fn_wrapper,
464 propagate_statistics: propagate_statistics_fn_wrapper,
465 get_properties: get_properties_fn_wrapper,
466 fmt_sql: fmt_sql_fn_wrapper,
467 snapshot: snapshot_fn_wrapper,
468 snapshot_generation: snapshot_generation_fn_wrapper,
469 is_volatile_node: is_volatile_node_fn_wrapper,
470 display: display_fn_wrapper,
471 hash: hash_fn_wrapper,
472 clone: clone_fn_wrapper,
473 release: release_fn_wrapper,
474 version: super::version,
475 private_data: Box::into_raw(private_data) as *mut c_void,
476 library_marker_id: crate::get_library_marker_id,
477 }
478 }
479}
480
481#[derive(Debug)]
486pub struct ForeignPhysicalExpr {
487 expr: FFI_PhysicalExpr,
488 children: Vec<Arc<dyn PhysicalExpr>>,
489}
490
491unsafe impl Send for ForeignPhysicalExpr {}
492unsafe impl Sync for ForeignPhysicalExpr {}
493
494impl From<&FFI_PhysicalExpr> for Arc<dyn PhysicalExpr> {
495 fn from(ffi_expr: &FFI_PhysicalExpr) -> Self {
496 if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() {
497 Arc::clone(ffi_expr.inner())
498 } else {
499 let children = unsafe {
500 (ffi_expr.children)(ffi_expr)
501 .into_iter()
502 .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
503 .collect()
504 };
505
506 Arc::new(ForeignPhysicalExpr {
507 expr: ffi_expr.clone(),
508 children,
509 })
510 }
511 }
512}
513
514impl Clone for FFI_PhysicalExpr {
515 fn clone(&self) -> Self {
516 unsafe { (self.clone)(self) }
517 }
518}
519
520impl PhysicalExpr for ForeignPhysicalExpr {
521 fn as_any(&self) -> &dyn Any {
522 self
523 }
524
525 fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
526 unsafe {
527 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
528 df_result!((self.expr.data_type)(&self.expr, schema))
529 .and_then(|d| DataType::try_from(&d.0).map_err(Into::into))
530 }
531 }
532
533 fn nullable(&self, input_schema: &Schema) -> Result<bool> {
534 unsafe {
535 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
536 df_result!((self.expr.nullable)(&self.expr, schema))
537 }
538 }
539
540 fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
541 unsafe {
542 let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
543 df_result!((self.expr.evaluate)(&self.expr, batch))
544 .and_then(ColumnarValue::try_from)
545 }
546 }
547
548 fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
549 unsafe {
550 let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
551 let result = df_result!((self.expr.return_field)(&self.expr, schema))?;
552 Field::try_from(&result.0).map(Arc::new).map_err(Into::into)
553 }
554 }
555
556 fn evaluate_selection(
557 &self,
558 batch: &RecordBatch,
559 selection: &BooleanArray,
560 ) -> Result<ColumnarValue> {
561 unsafe {
562 let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
563 let selection: ArrayRef = Arc::new(selection.clone());
567 let selection = WrappedArray::try_from(&selection)?;
568 df_result!((self.expr.evaluate_selection)(&self.expr, batch, selection))
569 .and_then(ColumnarValue::try_from)
570 }
571 }
572
573 fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
574 self.children.iter().collect()
575 }
576
577 fn with_new_children(
578 self: Arc<Self>,
579 children: Vec<Arc<dyn PhysicalExpr>>,
580 ) -> Result<Arc<dyn PhysicalExpr>> {
581 unsafe {
582 let children = children.into_iter().map(FFI_PhysicalExpr::from).collect();
583 df_result!(
584 (self.expr.new_with_children)(&self.expr, &children).map(|expr| <Arc<
585 dyn PhysicalExpr,
586 >>::from(
587 &expr
588 ))
589 )
590 }
591 }
592
593 fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
594 unsafe {
595 let children = children
596 .iter()
597 .map(|interval| FFI_Interval::try_from(*interval))
598 .collect::<Result<RVec<_>>>()?;
599 df_result!((self.expr.evaluate_bounds)(&self.expr, children))
600 .and_then(Interval::try_from)
601 }
602 }
603
604 fn propagate_constraints(
605 &self,
606 interval: &Interval,
607 children: &[&Interval],
608 ) -> Result<Option<Vec<Interval>>> {
609 unsafe {
610 let interval = interval.try_into()?;
611 let children = children
612 .iter()
613 .map(|interval| FFI_Interval::try_from(*interval))
614 .collect::<Result<RVec<_>>>()?;
615 let result = df_result!((self.expr.propagate_constraints)(
616 &self.expr, interval, children
617 ))?;
618
619 let result: Option<_> = result
620 .map(|intervals| {
621 intervals
622 .into_iter()
623 .map(Interval::try_from)
624 .collect::<Result<Vec<_>>>()
625 })
626 .into();
627 result.transpose()
628 }
629 }
630
631 fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
632 unsafe {
633 let children = children
634 .iter()
635 .map(|dist| FFI_Distribution::try_from(*dist))
636 .collect::<Result<RVec<_>>>()?;
637
638 let result =
639 df_result!((self.expr.evaluate_statistics)(&self.expr, children))?;
640 Distribution::try_from(result)
641 }
642 }
643
644 fn propagate_statistics(
645 &self,
646 parent: &Distribution,
647 children: &[&Distribution],
648 ) -> Result<Option<Vec<Distribution>>> {
649 unsafe {
650 let parent = FFI_Distribution::try_from(parent)?;
651 let children = children
652 .iter()
653 .map(|dist| FFI_Distribution::try_from(*dist))
654 .collect::<Result<RVec<_>>>()?;
655 let result = df_result!((self.expr.propagate_statistics)(
656 &self.expr, parent, children
657 ))?;
658
659 let result: Option<Result<Vec<Distribution>>> = result
660 .map(|dists| {
661 dists
662 .into_iter()
663 .map(Distribution::try_from)
664 .collect::<Result<Vec<_>>>()
665 })
666 .into();
667
668 result.transpose()
669 }
670 }
671
672 fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
673 unsafe {
674 let children = children
675 .iter()
676 .map(FFI_ExprProperties::try_from)
677 .collect::<Result<RVec<_>>>()?;
678 df_result!((self.expr.get_properties)(&self.expr, children))
679 .and_then(ExprProperties::try_from)
680 }
681 }
682
683 fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
684 unsafe {
685 match (self.expr.fmt_sql)(&self.expr) {
686 RResult::ROk(sql) => write!(f, "{sql}"),
687 RResult::RErr(_) => Err(std::fmt::Error),
688 }
689 }
690 }
691
692 fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
693 unsafe {
694 let result = df_result!((self.expr.snapshot)(&self.expr))?;
695 Ok(result
696 .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
697 .into())
698 }
699 }
700
701 fn snapshot_generation(&self) -> u64 {
702 unsafe { (self.expr.snapshot_generation)(&self.expr) }
703 }
704
705 fn is_volatile_node(&self) -> bool {
706 unsafe { (self.expr.is_volatile_node)(&self.expr) }
707 }
708}
709
710impl Eq for ForeignPhysicalExpr {}
711impl PartialEq for ForeignPhysicalExpr {
712 fn eq(&self, other: &Self) -> bool {
713 std::ptr::eq(self, other)
715 }
716}
717impl Hash for ForeignPhysicalExpr {
718 fn hash<H: Hasher>(&self, state: &mut H) {
719 let value = unsafe { (self.expr.hash)(&self.expr) };
720 value.hash(state)
721 }
722}
723
724impl Display for ForeignPhysicalExpr {
725 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
726 let display = unsafe { (self.expr.display)(&self.expr) };
727 write!(f, "{display}")
728 }
729}
730
731#[cfg(test)]
732mod tests {
733 use std::hash::{DefaultHasher, Hash, Hasher};
734 use std::sync::Arc;
735
736 use arrow::array::{BooleanArray, RecordBatch, record_batch};
737 use datafusion_common::tree_node::DynTreeNode;
738 use datafusion_common::{DataFusionError, ScalarValue};
739 use datafusion_expr::interval_arithmetic::Interval;
740 use datafusion_expr::statistics::Distribution;
741 use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr};
742 use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, fmt_sql};
743
744 use crate::physical_expr::FFI_PhysicalExpr;
745
746 fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
747 let original = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
748 let mut ffi_expr = FFI_PhysicalExpr::from(Arc::clone(&original));
749 ffi_expr.library_marker_id = crate::mock_foreign_marker_id;
750
751 let foreign_expr: Arc<dyn PhysicalExpr> = (&ffi_expr).into();
752
753 (original, foreign_expr)
754 }
755
756 fn test_record_batch() -> RecordBatch {
757 record_batch!(("a", Int32, [1, 2, 3])).unwrap()
758 }
759
760 #[test]
761 fn ffi_physical_expr_fields() -> Result<(), DataFusionError> {
762 let (original, foreign_expr) = create_test_expr();
763 let schema = test_record_batch().schema();
764
765 assert_ne!(original.as_ref(), foreign_expr.as_ref());
767
768 assert_eq!(
769 original.return_field(&schema)?,
770 foreign_expr.return_field(&schema)?
771 );
772
773 assert_eq!(
774 original.data_type(&schema)?,
775 foreign_expr.data_type(&schema)?
776 );
777 assert_eq!(original.nullable(&schema)?, foreign_expr.nullable(&schema)?);
778
779 Ok(())
780 }
781 #[test]
782 fn ffi_physical_expr_evaluate() -> Result<(), DataFusionError> {
783 let (original, foreign_expr) = create_test_expr();
784 let rb = test_record_batch();
785
786 assert_eq!(
787 original.evaluate(&rb)?.to_array(3)?.as_ref(),
788 foreign_expr.evaluate(&rb)?.to_array(3)?.as_ref()
789 );
790
791 Ok(())
792 }
793 #[test]
794 fn ffi_physical_expr_selection() -> Result<(), DataFusionError> {
795 let (original, foreign_expr) = create_test_expr();
796 let rb = test_record_batch();
797
798 let selection = BooleanArray::from(vec![true, false, true]);
799
800 assert_eq!(
801 original
802 .evaluate_selection(&rb, &selection)?
803 .to_array(3)?
804 .as_ref(),
805 foreign_expr
806 .evaluate_selection(&rb, &selection)?
807 .to_array(3)?
808 .as_ref()
809 );
810 Ok(())
811 }
812
813 #[test]
814 fn ffi_physical_expr_with_children() -> Result<(), DataFusionError> {
815 let (original, _) = create_test_expr();
816 let not_expr =
817 Arc::new(NotExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
818 let mut ffi_not = FFI_PhysicalExpr::from(not_expr);
819 ffi_not.library_marker_id = crate::mock_foreign_marker_id;
820 let foreign_not: Arc<dyn PhysicalExpr> = (&ffi_not).into();
821
822 let replacement = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
823 let updated =
824 Arc::clone(&foreign_not).with_new_children(vec![Arc::clone(&replacement)])?;
825 assert_eq!(
826 format!("{updated:?}").as_str(),
827 "NotExpr { arg: Column { name: \"b\", index: 1 } }"
828 );
829
830 let updated = foreign_not
831 .with_new_arc_children(Arc::clone(&foreign_not), vec![replacement])?;
832 assert_eq!(format!("{updated}").as_str(), "NOT b@1");
833
834 Ok(())
835 }
836
837 fn create_test_negative_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
838 let (original, _) = create_test_expr();
839
840 let negative_expr =
841 Arc::new(NegativeExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
842 let mut ffi_neg = FFI_PhysicalExpr::from(Arc::clone(&negative_expr));
843 ffi_neg.library_marker_id = crate::mock_foreign_marker_id;
844 let foreign_neg: Arc<dyn PhysicalExpr> = (&ffi_neg).into();
845
846 (negative_expr, foreign_neg)
847 }
848
849 #[test]
850 fn ffi_physical_expr_bounds() -> Result<(), DataFusionError> {
851 let (negative_expr, foreign_neg) = create_test_negative_expr();
852
853 let interval =
854 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
855 let left = negative_expr.evaluate_bounds(&[&interval])?;
856 let right = foreign_neg.evaluate_bounds(&[&interval])?;
857
858 assert_eq!(left, right);
859
860 Ok(())
861 }
862
863 #[test]
864 fn ffi_physical_expr_constraints() -> Result<(), DataFusionError> {
865 let (negative_expr, foreign_neg) = create_test_negative_expr();
866
867 let interval =
868 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
869
870 let child =
871 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
872 let left = negative_expr.propagate_constraints(&interval, &[&child])?;
873 let right = foreign_neg.propagate_constraints(&interval, &[&child])?;
874
875 assert_eq!(left, right);
876 Ok(())
877 }
878
879 #[test]
880 fn ffi_physical_expr_statistics() -> Result<(), DataFusionError> {
881 let (negative_expr, foreign_neg) = create_test_negative_expr();
882 let interval =
883 Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
884
885 for distribution in [
886 Distribution::new_uniform(interval.clone())?,
887 Distribution::new_exponential(
888 ScalarValue::Int32(Some(10)),
889 ScalarValue::Int32(Some(10)),
890 true,
891 )?,
892 Distribution::new_gaussian(
893 ScalarValue::Int32(Some(10)),
894 ScalarValue::Int32(Some(10)),
895 )?,
896 Distribution::new_generic(
897 ScalarValue::Int32(Some(10)),
898 ScalarValue::Int32(Some(10)),
899 ScalarValue::Int32(Some(10)),
900 interval,
901 )?,
902 ] {
903 let left = negative_expr.evaluate_statistics(&[&distribution])?;
904 let right = foreign_neg.evaluate_statistics(&[&distribution])?;
905
906 assert_eq!(left, right);
907
908 let left =
909 negative_expr.propagate_statistics(&distribution, &[&distribution])?;
910 let right =
911 foreign_neg.propagate_statistics(&distribution, &[&distribution])?;
912
913 assert_eq!(left, right);
914 }
915 Ok(())
916 }
917
918 #[test]
919 fn ffi_physical_expr_properties() -> Result<(), DataFusionError> {
920 let (original, foreign_expr) = create_test_expr();
921
922 let left = original.get_properties(&[])?;
923 let right = foreign_expr.get_properties(&[])?;
924
925 assert_eq!(left.sort_properties, right.sort_properties);
926 assert_eq!(left.range, right.range);
927
928 Ok(())
929 }
930
931 #[test]
932 fn ffi_physical_formatting() {
933 let (original, foreign_expr) = create_test_expr();
934
935 let left = format!("{}", fmt_sql(original.as_ref()));
936 let right = format!("{}", fmt_sql(foreign_expr.as_ref()));
937 assert_eq!(left, right);
938 }
939
940 #[test]
941 fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
942 let (original, foreign_expr) = create_test_expr();
943
944 let left = original.snapshot()?;
945 let right = foreign_expr.snapshot()?;
946 assert_eq!(left, right);
947
948 assert_eq!(
949 original.snapshot_generation(),
950 foreign_expr.snapshot_generation()
951 );
952
953 Ok(())
954 }
955
956 #[test]
957 fn ffi_physical_expr_volatility() {
958 let (original, foreign_expr) = create_test_expr();
959 assert_eq!(original.is_volatile_node(), foreign_expr.is_volatile_node());
960 }
961
962 #[test]
963 fn ffi_physical_expr_hash() {
964 let (_, foreign_1) = create_test_expr();
965 let (_, foreign_2) = create_test_expr();
966
967 assert_ne!(&foreign_1, &foreign_2);
968
969 let mut hasher = DefaultHasher::new();
970 foreign_1.as_ref().hash(&mut hasher);
971 let hash_1 = hasher.finish();
972
973 let mut hasher = DefaultHasher::new();
974 foreign_2.as_ref().hash(&mut hasher);
975 let hash_2 = hasher.finish();
976
977 assert_eq!(hash_1, hash_2);
981 }
982
983 #[test]
984 fn ffi_physical_expr_display() {
985 let (original, foreign_expr) = create_test_expr();
986 assert_eq!(format!("{original}"), format!("{foreign_expr}"));
987 }
988}