datafusion_ffi/physical_expr/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod partitioning;
19pub(crate) mod sort;
20
21use std::any::Any;
22use std::ffi::c_void;
23use std::fmt::{Display, Formatter};
24use std::hash::{DefaultHasher, Hash, Hasher};
25use std::sync::Arc;
26
27use abi_stable::StableAbi;
28use abi_stable::std_types::{ROption, RResult, RString, RVec};
29use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
30use arrow::datatypes::SchemaRef;
31use arrow_schema::ffi::FFI_ArrowSchema;
32use arrow_schema::{DataType, Field, FieldRef, Schema};
33use datafusion_common::{Result, ffi_datafusion_err};
34use datafusion_expr::ColumnarValue;
35use datafusion_expr::interval_arithmetic::Interval;
36use datafusion_expr::sort_properties::ExprProperties;
37use datafusion_expr::statistics::Distribution;
38use datafusion_physical_expr::PhysicalExpr;
39use datafusion_physical_expr_common::physical_expr::fmt_sql;
40
41use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
42use crate::expr::columnar_value::FFI_ColumnarValue;
43use crate::expr::distribution::FFI_Distribution;
44use crate::expr::expr_properties::FFI_ExprProperties;
45use crate::expr::interval::FFI_Interval;
46use crate::record_batch_stream::{
47    record_batch_to_wrapped_array, wrapped_array_to_record_batch,
48};
49use crate::util::FFIResult;
50use crate::{df_result, rresult, rresult_return};
51
52#[repr(C)]
53#[derive(Debug, StableAbi)]
54pub struct FFI_PhysicalExpr {
55    pub data_type: unsafe extern "C" fn(
56        &Self,
57        input_schema: WrappedSchema,
58    ) -> FFIResult<WrappedSchema>,
59
60    pub nullable:
61        unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> FFIResult<bool>,
62
63    pub evaluate:
64        unsafe extern "C" fn(&Self, batch: WrappedArray) -> FFIResult<FFI_ColumnarValue>,
65
66    pub return_field: unsafe extern "C" fn(
67        &Self,
68        input_schema: WrappedSchema,
69    ) -> FFIResult<WrappedSchema>,
70
71    pub evaluate_selection: unsafe extern "C" fn(
72        &Self,
73        batch: WrappedArray,
74        selection: WrappedArray,
75    ) -> FFIResult<FFI_ColumnarValue>,
76
77    pub children: unsafe extern "C" fn(&Self) -> RVec<FFI_PhysicalExpr>,
78
79    pub new_with_children:
80        unsafe extern "C" fn(&Self, children: &RVec<FFI_PhysicalExpr>) -> FFIResult<Self>,
81
82    pub evaluate_bounds: unsafe extern "C" fn(
83        &Self,
84        children: RVec<FFI_Interval>,
85    ) -> FFIResult<FFI_Interval>,
86
87    pub propagate_constraints:
88        unsafe extern "C" fn(
89            &Self,
90            interval: FFI_Interval,
91            children: RVec<FFI_Interval>,
92        ) -> FFIResult<ROption<RVec<FFI_Interval>>>,
93
94    pub evaluate_statistics: unsafe extern "C" fn(
95        &Self,
96        children: RVec<FFI_Distribution>,
97    ) -> FFIResult<FFI_Distribution>,
98
99    pub propagate_statistics:
100        unsafe extern "C" fn(
101            &Self,
102            parent: FFI_Distribution,
103            children: RVec<FFI_Distribution>,
104        ) -> FFIResult<ROption<RVec<FFI_Distribution>>>,
105
106    pub get_properties: unsafe extern "C" fn(
107        &Self,
108        children: RVec<FFI_ExprProperties>,
109    ) -> FFIResult<FFI_ExprProperties>,
110
111    pub fmt_sql: unsafe extern "C" fn(&Self) -> FFIResult<RString>,
112
113    pub snapshot: unsafe extern "C" fn(&Self) -> FFIResult<ROption<FFI_PhysicalExpr>>,
114
115    pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
116
117    pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
118
119    // Display trait
120    pub display: unsafe extern "C" fn(&Self) -> RString,
121
122    // Hash trait
123    pub hash: unsafe extern "C" fn(&Self) -> u64,
124
125    /// Used to create a clone on the provider of the execution plan. This should
126    /// only need to be called by the receiver of the plan.
127    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
128
129    /// Release the memory of the private data when it is no longer being used.
130    pub release: unsafe extern "C" fn(arg: &mut Self),
131
132    /// Return the major DataFusion version number of this provider.
133    pub version: unsafe extern "C" fn() -> u64,
134
135    /// Internal data. This is only to be accessed by the provider of the plan.
136    /// A [`ForeignPhysicalExpr`] should never attempt to access this data.
137    pub private_data: *mut c_void,
138
139    /// Utility to identify when FFI objects are accessed locally through
140    /// the foreign interface.
141    pub library_marker_id: extern "C" fn() -> usize,
142}
143
144unsafe impl Send for FFI_PhysicalExpr {}
145unsafe impl Sync for FFI_PhysicalExpr {}
146
147impl FFI_PhysicalExpr {
148    fn inner(&self) -> &Arc<dyn PhysicalExpr> {
149        unsafe {
150            let private_data = self.private_data as *const PhysicalExprPrivateData;
151            &(*private_data).expr
152        }
153    }
154}
155
156struct PhysicalExprPrivateData {
157    expr: Arc<dyn PhysicalExpr>,
158}
159
160unsafe extern "C" fn data_type_fn_wrapper(
161    expr: &FFI_PhysicalExpr,
162    input_schema: WrappedSchema,
163) -> FFIResult<WrappedSchema> {
164    let expr = expr.inner();
165    let schema: SchemaRef = input_schema.into();
166    let data_type = expr
167        .data_type(&schema)
168        .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into))
169        .map(WrappedSchema);
170    rresult!(data_type)
171}
172
173unsafe extern "C" fn nullable_fn_wrapper(
174    expr: &FFI_PhysicalExpr,
175    input_schema: WrappedSchema,
176) -> FFIResult<bool> {
177    let expr = expr.inner();
178    let schema: SchemaRef = input_schema.into();
179    rresult!(expr.nullable(&schema))
180}
181
182unsafe extern "C" fn evaluate_fn_wrapper(
183    expr: &FFI_PhysicalExpr,
184    batch: WrappedArray,
185) -> FFIResult<FFI_ColumnarValue> {
186    let batch = rresult_return!(wrapped_array_to_record_batch(batch));
187    rresult!(
188        expr.inner()
189            .evaluate(&batch)
190            .and_then(FFI_ColumnarValue::try_from)
191    )
192}
193
194unsafe extern "C" fn return_field_fn_wrapper(
195    expr: &FFI_PhysicalExpr,
196    input_schema: WrappedSchema,
197) -> FFIResult<WrappedSchema> {
198    let expr = expr.inner();
199    let schema: SchemaRef = input_schema.into();
200    rresult!(
201        expr.return_field(&schema)
202            .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into))
203            .map(WrappedSchema)
204    )
205}
206
207unsafe extern "C" fn evaluate_selection_fn_wrapper(
208    expr: &FFI_PhysicalExpr,
209    batch: WrappedArray,
210    selection: WrappedArray,
211) -> FFIResult<FFI_ColumnarValue> {
212    let batch = rresult_return!(wrapped_array_to_record_batch(batch));
213    let selection: ArrayRef = rresult_return!(selection.try_into());
214    let selection = rresult_return!(
215        selection
216            .as_any()
217            .downcast_ref::<BooleanArray>()
218            .ok_or(ffi_datafusion_err!("Unexpected selection array type"))
219    );
220    rresult!(
221        expr.inner()
222            .evaluate_selection(&batch, selection)
223            .and_then(FFI_ColumnarValue::try_from)
224    )
225}
226
227unsafe extern "C" fn children_fn_wrapper(
228    expr: &FFI_PhysicalExpr,
229) -> RVec<FFI_PhysicalExpr> {
230    let expr = expr.inner();
231    let children = expr.children();
232    children
233        .into_iter()
234        .map(|child| FFI_PhysicalExpr::from(Arc::clone(child)))
235        .collect()
236}
237
238unsafe extern "C" fn new_with_children_fn_wrapper(
239    expr: &FFI_PhysicalExpr,
240    children: &RVec<FFI_PhysicalExpr>,
241) -> FFIResult<FFI_PhysicalExpr> {
242    let expr = Arc::clone(expr.inner());
243    let children = children.iter().map(Into::into).collect::<Vec<_>>();
244    rresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from))
245}
246
247unsafe extern "C" fn evaluate_bounds_fn_wrapper(
248    expr: &FFI_PhysicalExpr,
249    children: RVec<FFI_Interval>,
250) -> FFIResult<FFI_Interval> {
251    let expr = expr.inner();
252    let children = rresult_return!(
253        children
254            .into_iter()
255            .map(Interval::try_from)
256            .collect::<Result<Vec<_>>>()
257    );
258    let children_borrowed = children.iter().collect::<Vec<_>>();
259
260    rresult!(
261        expr.evaluate_bounds(&children_borrowed)
262            .and_then(FFI_Interval::try_from)
263    )
264}
265
266unsafe extern "C" fn propagate_constraints_fn_wrapper(
267    expr: &FFI_PhysicalExpr,
268    interval: FFI_Interval,
269    children: RVec<FFI_Interval>,
270) -> FFIResult<ROption<RVec<FFI_Interval>>> {
271    let expr = expr.inner();
272    let interval = rresult_return!(Interval::try_from(interval));
273    let children = rresult_return!(
274        children
275            .into_iter()
276            .map(Interval::try_from)
277            .collect::<Result<Vec<_>>>()
278    );
279    let children_borrowed = children.iter().collect::<Vec<_>>();
280
281    let result =
282        rresult_return!(expr.propagate_constraints(&interval, &children_borrowed));
283
284    let result = rresult_return!(
285        result
286            .map(|intervals| intervals
287                .into_iter()
288                .map(FFI_Interval::try_from)
289                .collect::<Result<RVec<_>>>())
290            .transpose()
291    );
292
293    RResult::ROk(result.into())
294}
295
296unsafe extern "C" fn evaluate_statistics_fn_wrapper(
297    expr: &FFI_PhysicalExpr,
298    children: RVec<FFI_Distribution>,
299) -> FFIResult<FFI_Distribution> {
300    let expr = expr.inner();
301    let children = rresult_return!(
302        children
303            .into_iter()
304            .map(Distribution::try_from)
305            .collect::<Result<Vec<_>>>()
306    );
307    let children_borrowed = children.iter().collect::<Vec<_>>();
308    rresult!(
309        expr.evaluate_statistics(&children_borrowed)
310            .and_then(|dist| FFI_Distribution::try_from(&dist))
311    )
312}
313
314unsafe extern "C" fn propagate_statistics_fn_wrapper(
315    expr: &FFI_PhysicalExpr,
316    parent: FFI_Distribution,
317    children: RVec<FFI_Distribution>,
318) -> FFIResult<ROption<RVec<FFI_Distribution>>> {
319    let expr = expr.inner();
320    let parent = rresult_return!(Distribution::try_from(parent));
321    let children = rresult_return!(
322        children
323            .into_iter()
324            .map(Distribution::try_from)
325            .collect::<Result<Vec<_>>>()
326    );
327    let children_borrowed = children.iter().collect::<Vec<_>>();
328
329    let result = rresult_return!(expr.propagate_statistics(&parent, &children_borrowed));
330    let result = rresult_return!(
331        result
332            .map(|dists| dists
333                .iter()
334                .map(FFI_Distribution::try_from)
335                .collect::<Result<RVec<_>>>())
336            .transpose()
337    );
338
339    RResult::ROk(result.into())
340}
341
342unsafe extern "C" fn get_properties_fn_wrapper(
343    expr: &FFI_PhysicalExpr,
344    children: RVec<FFI_ExprProperties>,
345) -> FFIResult<FFI_ExprProperties> {
346    let expr = expr.inner();
347    let children = rresult_return!(
348        children
349            .into_iter()
350            .map(ExprProperties::try_from)
351            .collect::<Result<Vec<_>>>()
352    );
353    rresult!(
354        expr.get_properties(&children)
355            .and_then(|p| FFI_ExprProperties::try_from(&p))
356    )
357}
358
359unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFIResult<RString> {
360    let expr = expr.inner();
361    let result = fmt_sql(expr.as_ref()).to_string();
362    RResult::ROk(result.into())
363}
364
365unsafe extern "C" fn snapshot_fn_wrapper(
366    expr: &FFI_PhysicalExpr,
367) -> FFIResult<ROption<FFI_PhysicalExpr>> {
368    let expr = expr.inner();
369    rresult!(
370        expr.snapshot()
371            .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())
372    )
373}
374
375unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
376    let expr = expr.inner();
377    expr.snapshot_generation()
378}
379
380unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> bool {
381    let expr = expr.inner();
382    expr.is_volatile_node()
383}
384unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> RString {
385    let expr = expr.inner();
386    format!("{expr}").into()
387}
388
389unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
390    let expr = expr.inner();
391    let mut hasher = DefaultHasher::new();
392    expr.hash(&mut hasher);
393    hasher.finish()
394}
395
396unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) {
397    unsafe {
398        debug_assert!(!expr.private_data.is_null());
399        let private_data =
400            Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData);
401        drop(private_data);
402        expr.private_data = std::ptr::null_mut();
403    }
404}
405
406unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalExpr {
407    unsafe {
408        let old_private_data = expr.private_data as *const PhysicalExprPrivateData;
409
410        let private_data = Box::into_raw(Box::new(PhysicalExprPrivateData {
411            expr: Arc::clone(&(*old_private_data).expr),
412        })) as *mut c_void;
413
414        FFI_PhysicalExpr {
415            data_type: data_type_fn_wrapper,
416            nullable: nullable_fn_wrapper,
417            evaluate: evaluate_fn_wrapper,
418            return_field: return_field_fn_wrapper,
419            evaluate_selection: evaluate_selection_fn_wrapper,
420            children: children_fn_wrapper,
421            new_with_children: new_with_children_fn_wrapper,
422            evaluate_bounds: evaluate_bounds_fn_wrapper,
423            propagate_constraints: propagate_constraints_fn_wrapper,
424            evaluate_statistics: evaluate_statistics_fn_wrapper,
425            propagate_statistics: propagate_statistics_fn_wrapper,
426            get_properties: get_properties_fn_wrapper,
427            fmt_sql: fmt_sql_fn_wrapper,
428            snapshot: snapshot_fn_wrapper,
429            snapshot_generation: snapshot_generation_fn_wrapper,
430            is_volatile_node: is_volatile_node_fn_wrapper,
431            display: display_fn_wrapper,
432            hash: hash_fn_wrapper,
433            clone: clone_fn_wrapper,
434            release: release_fn_wrapper,
435            version: super::version,
436            private_data,
437            library_marker_id: crate::get_library_marker_id,
438        }
439    }
440}
441
442impl Drop for FFI_PhysicalExpr {
443    fn drop(&mut self) {
444        unsafe { (self.release)(self) }
445    }
446}
447
448impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
449    /// Creates a new [`FFI_PhysicalExpr`].
450    fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
451        let private_data = Box::new(PhysicalExprPrivateData { expr });
452
453        Self {
454            data_type: data_type_fn_wrapper,
455            nullable: nullable_fn_wrapper,
456            evaluate: evaluate_fn_wrapper,
457            return_field: return_field_fn_wrapper,
458            evaluate_selection: evaluate_selection_fn_wrapper,
459            children: children_fn_wrapper,
460            new_with_children: new_with_children_fn_wrapper,
461            evaluate_bounds: evaluate_bounds_fn_wrapper,
462            propagate_constraints: propagate_constraints_fn_wrapper,
463            evaluate_statistics: evaluate_statistics_fn_wrapper,
464            propagate_statistics: propagate_statistics_fn_wrapper,
465            get_properties: get_properties_fn_wrapper,
466            fmt_sql: fmt_sql_fn_wrapper,
467            snapshot: snapshot_fn_wrapper,
468            snapshot_generation: snapshot_generation_fn_wrapper,
469            is_volatile_node: is_volatile_node_fn_wrapper,
470            display: display_fn_wrapper,
471            hash: hash_fn_wrapper,
472            clone: clone_fn_wrapper,
473            release: release_fn_wrapper,
474            version: super::version,
475            private_data: Box::into_raw(private_data) as *mut c_void,
476            library_marker_id: crate::get_library_marker_id,
477        }
478    }
479}
480
481/// This wrapper struct exists on the receiver side of the FFI interface, so it has
482/// no guarantees about being able to access the data in `private_data`. Any functions
483/// defined on this struct must only use the stable functions provided in
484/// FFI_PhysicalExpr to interact with the expression.
485#[derive(Debug)]
486pub struct ForeignPhysicalExpr {
487    expr: FFI_PhysicalExpr,
488    children: Vec<Arc<dyn PhysicalExpr>>,
489}
490
491unsafe impl Send for ForeignPhysicalExpr {}
492unsafe impl Sync for ForeignPhysicalExpr {}
493
494impl From<&FFI_PhysicalExpr> for Arc<dyn PhysicalExpr> {
495    fn from(ffi_expr: &FFI_PhysicalExpr) -> Self {
496        if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() {
497            Arc::clone(ffi_expr.inner())
498        } else {
499            let children = unsafe {
500                (ffi_expr.children)(ffi_expr)
501                    .into_iter()
502                    .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
503                    .collect()
504            };
505
506            Arc::new(ForeignPhysicalExpr {
507                expr: ffi_expr.clone(),
508                children,
509            })
510        }
511    }
512}
513
514impl Clone for FFI_PhysicalExpr {
515    fn clone(&self) -> Self {
516        unsafe { (self.clone)(self) }
517    }
518}
519
520impl PhysicalExpr for ForeignPhysicalExpr {
521    fn as_any(&self) -> &dyn Any {
522        self
523    }
524
525    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
526        unsafe {
527            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
528            df_result!((self.expr.data_type)(&self.expr, schema))
529                .and_then(|d| DataType::try_from(&d.0).map_err(Into::into))
530        }
531    }
532
533    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
534        unsafe {
535            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
536            df_result!((self.expr.nullable)(&self.expr, schema))
537        }
538    }
539
540    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
541        unsafe {
542            let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
543            df_result!((self.expr.evaluate)(&self.expr, batch))
544                .and_then(ColumnarValue::try_from)
545        }
546    }
547
548    fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
549        unsafe {
550            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
551            let result = df_result!((self.expr.return_field)(&self.expr, schema))?;
552            Field::try_from(&result.0).map(Arc::new).map_err(Into::into)
553        }
554    }
555
556    fn evaluate_selection(
557        &self,
558        batch: &RecordBatch,
559        selection: &BooleanArray,
560    ) -> Result<ColumnarValue> {
561        unsafe {
562            let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
563            // This is not ideal - we are cloning the selection array
564            // This is not terrible since it will be a small array.
565            // The other alternative is to modify the trait signature.
566            let selection: ArrayRef = Arc::new(selection.clone());
567            let selection = WrappedArray::try_from(&selection)?;
568            df_result!((self.expr.evaluate_selection)(&self.expr, batch, selection))
569                .and_then(ColumnarValue::try_from)
570        }
571    }
572
573    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
574        self.children.iter().collect()
575    }
576
577    fn with_new_children(
578        self: Arc<Self>,
579        children: Vec<Arc<dyn PhysicalExpr>>,
580    ) -> Result<Arc<dyn PhysicalExpr>> {
581        unsafe {
582            let children = children.into_iter().map(FFI_PhysicalExpr::from).collect();
583            df_result!(
584                (self.expr.new_with_children)(&self.expr, &children).map(|expr| <Arc<
585                    dyn PhysicalExpr,
586                >>::from(
587                    &expr
588                ))
589            )
590        }
591    }
592
593    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
594        unsafe {
595            let children = children
596                .iter()
597                .map(|interval| FFI_Interval::try_from(*interval))
598                .collect::<Result<RVec<_>>>()?;
599            df_result!((self.expr.evaluate_bounds)(&self.expr, children))
600                .and_then(Interval::try_from)
601        }
602    }
603
604    fn propagate_constraints(
605        &self,
606        interval: &Interval,
607        children: &[&Interval],
608    ) -> Result<Option<Vec<Interval>>> {
609        unsafe {
610            let interval = interval.try_into()?;
611            let children = children
612                .iter()
613                .map(|interval| FFI_Interval::try_from(*interval))
614                .collect::<Result<RVec<_>>>()?;
615            let result = df_result!((self.expr.propagate_constraints)(
616                &self.expr, interval, children
617            ))?;
618
619            let result: Option<_> = result
620                .map(|intervals| {
621                    intervals
622                        .into_iter()
623                        .map(Interval::try_from)
624                        .collect::<Result<Vec<_>>>()
625                })
626                .into();
627            result.transpose()
628        }
629    }
630
631    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
632        unsafe {
633            let children = children
634                .iter()
635                .map(|dist| FFI_Distribution::try_from(*dist))
636                .collect::<Result<RVec<_>>>()?;
637
638            let result =
639                df_result!((self.expr.evaluate_statistics)(&self.expr, children))?;
640            Distribution::try_from(result)
641        }
642    }
643
644    fn propagate_statistics(
645        &self,
646        parent: &Distribution,
647        children: &[&Distribution],
648    ) -> Result<Option<Vec<Distribution>>> {
649        unsafe {
650            let parent = FFI_Distribution::try_from(parent)?;
651            let children = children
652                .iter()
653                .map(|dist| FFI_Distribution::try_from(*dist))
654                .collect::<Result<RVec<_>>>()?;
655            let result = df_result!((self.expr.propagate_statistics)(
656                &self.expr, parent, children
657            ))?;
658
659            let result: Option<Result<Vec<Distribution>>> = result
660                .map(|dists| {
661                    dists
662                        .into_iter()
663                        .map(Distribution::try_from)
664                        .collect::<Result<Vec<_>>>()
665                })
666                .into();
667
668            result.transpose()
669        }
670    }
671
672    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
673        unsafe {
674            let children = children
675                .iter()
676                .map(FFI_ExprProperties::try_from)
677                .collect::<Result<RVec<_>>>()?;
678            df_result!((self.expr.get_properties)(&self.expr, children))
679                .and_then(ExprProperties::try_from)
680        }
681    }
682
683    fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
684        unsafe {
685            match (self.expr.fmt_sql)(&self.expr) {
686                RResult::ROk(sql) => write!(f, "{sql}"),
687                RResult::RErr(_) => Err(std::fmt::Error),
688            }
689        }
690    }
691
692    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
693        unsafe {
694            let result = df_result!((self.expr.snapshot)(&self.expr))?;
695            Ok(result
696                .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
697                .into())
698        }
699    }
700
701    fn snapshot_generation(&self) -> u64 {
702        unsafe { (self.expr.snapshot_generation)(&self.expr) }
703    }
704
705    fn is_volatile_node(&self) -> bool {
706        unsafe { (self.expr.is_volatile_node)(&self.expr) }
707    }
708}
709
710impl Eq for ForeignPhysicalExpr {}
711impl PartialEq for ForeignPhysicalExpr {
712    fn eq(&self, other: &Self) -> bool {
713        // FFI_PhysicalExpr cannot be compared, so identity equality is the best we can do.
714        std::ptr::eq(self, other)
715    }
716}
717impl Hash for ForeignPhysicalExpr {
718    fn hash<H: Hasher>(&self, state: &mut H) {
719        let value = unsafe { (self.expr.hash)(&self.expr) };
720        value.hash(state)
721    }
722}
723
724impl Display for ForeignPhysicalExpr {
725    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
726        let display = unsafe { (self.expr.display)(&self.expr) };
727        write!(f, "{display}")
728    }
729}
730
731#[cfg(test)]
732mod tests {
733    use std::hash::{DefaultHasher, Hash, Hasher};
734    use std::sync::Arc;
735
736    use arrow::array::{BooleanArray, RecordBatch, record_batch};
737    use datafusion_common::tree_node::DynTreeNode;
738    use datafusion_common::{DataFusionError, ScalarValue};
739    use datafusion_expr::interval_arithmetic::Interval;
740    use datafusion_expr::statistics::Distribution;
741    use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr};
742    use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, fmt_sql};
743
744    use crate::physical_expr::FFI_PhysicalExpr;
745
746    fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
747        let original = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
748        let mut ffi_expr = FFI_PhysicalExpr::from(Arc::clone(&original));
749        ffi_expr.library_marker_id = crate::mock_foreign_marker_id;
750
751        let foreign_expr: Arc<dyn PhysicalExpr> = (&ffi_expr).into();
752
753        (original, foreign_expr)
754    }
755
756    fn test_record_batch() -> RecordBatch {
757        record_batch!(("a", Int32, [1, 2, 3])).unwrap()
758    }
759
760    #[test]
761    fn ffi_physical_expr_fields() -> Result<(), DataFusionError> {
762        let (original, foreign_expr) = create_test_expr();
763        let schema = test_record_batch().schema();
764
765        // Verify the mock marker worked, otherwise tests to follow are not useful
766        assert_ne!(original.as_ref(), foreign_expr.as_ref());
767
768        assert_eq!(
769            original.return_field(&schema)?,
770            foreign_expr.return_field(&schema)?
771        );
772
773        assert_eq!(
774            original.data_type(&schema)?,
775            foreign_expr.data_type(&schema)?
776        );
777        assert_eq!(original.nullable(&schema)?, foreign_expr.nullable(&schema)?);
778
779        Ok(())
780    }
781    #[test]
782    fn ffi_physical_expr_evaluate() -> Result<(), DataFusionError> {
783        let (original, foreign_expr) = create_test_expr();
784        let rb = test_record_batch();
785
786        assert_eq!(
787            original.evaluate(&rb)?.to_array(3)?.as_ref(),
788            foreign_expr.evaluate(&rb)?.to_array(3)?.as_ref()
789        );
790
791        Ok(())
792    }
793    #[test]
794    fn ffi_physical_expr_selection() -> Result<(), DataFusionError> {
795        let (original, foreign_expr) = create_test_expr();
796        let rb = test_record_batch();
797
798        let selection = BooleanArray::from(vec![true, false, true]);
799
800        assert_eq!(
801            original
802                .evaluate_selection(&rb, &selection)?
803                .to_array(3)?
804                .as_ref(),
805            foreign_expr
806                .evaluate_selection(&rb, &selection)?
807                .to_array(3)?
808                .as_ref()
809        );
810        Ok(())
811    }
812
813    #[test]
814    fn ffi_physical_expr_with_children() -> Result<(), DataFusionError> {
815        let (original, _) = create_test_expr();
816        let not_expr =
817            Arc::new(NotExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
818        let mut ffi_not = FFI_PhysicalExpr::from(not_expr);
819        ffi_not.library_marker_id = crate::mock_foreign_marker_id;
820        let foreign_not: Arc<dyn PhysicalExpr> = (&ffi_not).into();
821
822        let replacement = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
823        let updated =
824            Arc::clone(&foreign_not).with_new_children(vec![Arc::clone(&replacement)])?;
825        assert_eq!(
826            format!("{updated:?}").as_str(),
827            "NotExpr { arg: Column { name: \"b\", index: 1 } }"
828        );
829
830        let updated = foreign_not
831            .with_new_arc_children(Arc::clone(&foreign_not), vec![replacement])?;
832        assert_eq!(format!("{updated}").as_str(), "NOT b@1");
833
834        Ok(())
835    }
836
837    fn create_test_negative_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
838        let (original, _) = create_test_expr();
839
840        let negative_expr =
841            Arc::new(NegativeExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
842        let mut ffi_neg = FFI_PhysicalExpr::from(Arc::clone(&negative_expr));
843        ffi_neg.library_marker_id = crate::mock_foreign_marker_id;
844        let foreign_neg: Arc<dyn PhysicalExpr> = (&ffi_neg).into();
845
846        (negative_expr, foreign_neg)
847    }
848
849    #[test]
850    fn ffi_physical_expr_bounds() -> Result<(), DataFusionError> {
851        let (negative_expr, foreign_neg) = create_test_negative_expr();
852
853        let interval =
854            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
855        let left = negative_expr.evaluate_bounds(&[&interval])?;
856        let right = foreign_neg.evaluate_bounds(&[&interval])?;
857
858        assert_eq!(left, right);
859
860        Ok(())
861    }
862
863    #[test]
864    fn ffi_physical_expr_constraints() -> Result<(), DataFusionError> {
865        let (negative_expr, foreign_neg) = create_test_negative_expr();
866
867        let interval =
868            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
869
870        let child =
871            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
872        let left = negative_expr.propagate_constraints(&interval, &[&child])?;
873        let right = foreign_neg.propagate_constraints(&interval, &[&child])?;
874
875        assert_eq!(left, right);
876        Ok(())
877    }
878
879    #[test]
880    fn ffi_physical_expr_statistics() -> Result<(), DataFusionError> {
881        let (negative_expr, foreign_neg) = create_test_negative_expr();
882        let interval =
883            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
884
885        for distribution in [
886            Distribution::new_uniform(interval.clone())?,
887            Distribution::new_exponential(
888                ScalarValue::Int32(Some(10)),
889                ScalarValue::Int32(Some(10)),
890                true,
891            )?,
892            Distribution::new_gaussian(
893                ScalarValue::Int32(Some(10)),
894                ScalarValue::Int32(Some(10)),
895            )?,
896            Distribution::new_generic(
897                ScalarValue::Int32(Some(10)),
898                ScalarValue::Int32(Some(10)),
899                ScalarValue::Int32(Some(10)),
900                interval,
901            )?,
902        ] {
903            let left = negative_expr.evaluate_statistics(&[&distribution])?;
904            let right = foreign_neg.evaluate_statistics(&[&distribution])?;
905
906            assert_eq!(left, right);
907
908            let left =
909                negative_expr.propagate_statistics(&distribution, &[&distribution])?;
910            let right =
911                foreign_neg.propagate_statistics(&distribution, &[&distribution])?;
912
913            assert_eq!(left, right);
914        }
915        Ok(())
916    }
917
918    #[test]
919    fn ffi_physical_expr_properties() -> Result<(), DataFusionError> {
920        let (original, foreign_expr) = create_test_expr();
921
922        let left = original.get_properties(&[])?;
923        let right = foreign_expr.get_properties(&[])?;
924
925        assert_eq!(left.sort_properties, right.sort_properties);
926        assert_eq!(left.range, right.range);
927
928        Ok(())
929    }
930
931    #[test]
932    fn ffi_physical_formatting() {
933        let (original, foreign_expr) = create_test_expr();
934
935        let left = format!("{}", fmt_sql(original.as_ref()));
936        let right = format!("{}", fmt_sql(foreign_expr.as_ref()));
937        assert_eq!(left, right);
938    }
939
940    #[test]
941    fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
942        let (original, foreign_expr) = create_test_expr();
943
944        let left = original.snapshot()?;
945        let right = foreign_expr.snapshot()?;
946        assert_eq!(left, right);
947
948        assert_eq!(
949            original.snapshot_generation(),
950            foreign_expr.snapshot_generation()
951        );
952
953        Ok(())
954    }
955
956    #[test]
957    fn ffi_physical_expr_volatility() {
958        let (original, foreign_expr) = create_test_expr();
959        assert_eq!(original.is_volatile_node(), foreign_expr.is_volatile_node());
960    }
961
962    #[test]
963    fn ffi_physical_expr_hash() {
964        let (_, foreign_1) = create_test_expr();
965        let (_, foreign_2) = create_test_expr();
966
967        assert_ne!(&foreign_1, &foreign_2);
968
969        let mut hasher = DefaultHasher::new();
970        foreign_1.as_ref().hash(&mut hasher);
971        let hash_1 = hasher.finish();
972
973        let mut hasher = DefaultHasher::new();
974        foreign_2.as_ref().hash(&mut hasher);
975        let hash_2 = hasher.finish();
976
977        // We cannot compare a local object and a foreign object
978        // so create two foreign objects that *should* be identical
979        // even though they were created differently.
980        assert_eq!(hash_1, hash_2);
981    }
982
983    #[test]
984    fn ffi_physical_expr_display() {
985        let (original, foreign_expr) = create_test_expr();
986        assert_eq!(format!("{original}"), format!("{foreign_expr}"));
987    }
988}