Skip to main content

datafusion_ffi/physical_expr/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub(crate) mod metrics;
19pub(crate) mod partitioning;
20pub(crate) mod sort;
21
22use std::ffi::c_void;
23use std::fmt::{Display, Formatter};
24use std::hash::{DefaultHasher, Hash, Hasher};
25use std::sync::Arc;
26
27use arrow::array::{ArrayRef, BooleanArray, RecordBatch};
28use arrow::datatypes::SchemaRef;
29use arrow_schema::ffi::FFI_ArrowSchema;
30use arrow_schema::{DataType, Field, FieldRef, Schema};
31use datafusion_common::{Result, ffi_datafusion_err};
32use datafusion_expr::ColumnarValue;
33use datafusion_expr::interval_arithmetic::Interval;
34use datafusion_expr::sort_properties::ExprProperties;
35#[expect(deprecated)]
36use datafusion_expr::statistics::Distribution;
37use datafusion_physical_expr::PhysicalExpr;
38use datafusion_physical_expr_common::physical_expr::fmt_sql;
39
40use stabby::string::String as SString;
41use stabby::vec::Vec as SVec;
42
43use crate::arrow_wrappers::{WrappedArray, WrappedSchema};
44use crate::expr::columnar_value::FFI_ColumnarValue;
45use crate::expr::distribution::FFI_Distribution;
46use crate::expr::expr_properties::FFI_ExprProperties;
47use crate::expr::interval::FFI_Interval;
48use crate::record_batch_stream::{
49    record_batch_to_wrapped_array, wrapped_array_to_record_batch,
50};
51use crate::util::{FFI_Option, FFI_Result};
52use crate::{df_result, sresult, sresult_return};
53
54#[repr(C)]
55#[derive(Debug)]
56pub struct FFI_PhysicalExpr {
57    pub data_type: unsafe extern "C" fn(
58        &Self,
59        input_schema: WrappedSchema,
60    ) -> FFI_Result<WrappedSchema>,
61
62    pub nullable:
63        unsafe extern "C" fn(&Self, input_schema: WrappedSchema) -> FFI_Result<bool>,
64
65    pub evaluate:
66        unsafe extern "C" fn(&Self, batch: WrappedArray) -> FFI_Result<FFI_ColumnarValue>,
67
68    pub return_field: unsafe extern "C" fn(
69        &Self,
70        input_schema: WrappedSchema,
71    ) -> FFI_Result<WrappedSchema>,
72
73    pub evaluate_selection: unsafe extern "C" fn(
74        &Self,
75        batch: WrappedArray,
76        selection: WrappedArray,
77    ) -> FFI_Result<FFI_ColumnarValue>,
78
79    pub children: unsafe extern "C" fn(&Self) -> SVec<FFI_PhysicalExpr>,
80
81    pub new_with_children: unsafe extern "C" fn(
82        &Self,
83        children: &SVec<FFI_PhysicalExpr>,
84    ) -> FFI_Result<Self>,
85
86    pub evaluate_bounds: unsafe extern "C" fn(
87        &Self,
88        children: SVec<FFI_Interval>,
89    ) -> FFI_Result<FFI_Interval>,
90
91    pub propagate_constraints:
92        unsafe extern "C" fn(
93            &Self,
94            interval: FFI_Interval,
95            children: SVec<FFI_Interval>,
96        ) -> FFI_Result<FFI_Option<SVec<FFI_Interval>>>,
97
98    pub evaluate_statistics: unsafe extern "C" fn(
99        &Self,
100        children: SVec<FFI_Distribution>,
101    ) -> FFI_Result<FFI_Distribution>,
102
103    pub propagate_statistics:
104        unsafe extern "C" fn(
105            &Self,
106            parent: FFI_Distribution,
107            children: SVec<FFI_Distribution>,
108        ) -> FFI_Result<FFI_Option<SVec<FFI_Distribution>>>,
109
110    pub get_properties: unsafe extern "C" fn(
111        &Self,
112        children: SVec<FFI_ExprProperties>,
113    ) -> FFI_Result<FFI_ExprProperties>,
114
115    pub fmt_sql: unsafe extern "C" fn(&Self) -> FFI_Result<SString>,
116
117    pub snapshot: unsafe extern "C" fn(&Self) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>>,
118
119    pub snapshot_generation: unsafe extern "C" fn(&Self) -> u64,
120
121    pub is_volatile_node: unsafe extern "C" fn(&Self) -> bool,
122
123    // Display trait
124    pub display: unsafe extern "C" fn(&Self) -> SString,
125
126    // Hash trait
127    pub hash: unsafe extern "C" fn(&Self) -> u64,
128
129    /// Used to create a clone on the provider of the execution plan. This should
130    /// only need to be called by the receiver of the plan.
131    pub clone: unsafe extern "C" fn(plan: &Self) -> Self,
132
133    /// Release the memory of the private data when it is no longer being used.
134    pub release: unsafe extern "C" fn(arg: &mut Self),
135
136    /// Return the major DataFusion version number of this provider.
137    pub version: unsafe extern "C" fn() -> u64,
138
139    /// Internal data. This is only to be accessed by the provider of the plan.
140    /// A [`ForeignPhysicalExpr`] should never attempt to access this data.
141    pub private_data: *mut c_void,
142
143    /// Utility to identify when FFI objects are accessed locally through
144    /// the foreign interface.
145    pub library_marker_id: extern "C" fn() -> usize,
146}
147
148unsafe impl Send for FFI_PhysicalExpr {}
149unsafe impl Sync for FFI_PhysicalExpr {}
150
151impl FFI_PhysicalExpr {
152    fn inner(&self) -> &Arc<dyn PhysicalExpr> {
153        unsafe {
154            let private_data = self.private_data as *const PhysicalExprPrivateData;
155            &(*private_data).expr
156        }
157    }
158}
159
160struct PhysicalExprPrivateData {
161    expr: Arc<dyn PhysicalExpr>,
162}
163
164unsafe extern "C" fn data_type_fn_wrapper(
165    expr: &FFI_PhysicalExpr,
166    input_schema: WrappedSchema,
167) -> FFI_Result<WrappedSchema> {
168    let expr = expr.inner();
169    let schema: SchemaRef = input_schema.into();
170    let data_type = expr
171        .data_type(&schema)
172        .and_then(|dt| FFI_ArrowSchema::try_from(dt).map_err(Into::into))
173        .map(WrappedSchema);
174    sresult!(data_type)
175}
176
177unsafe extern "C" fn nullable_fn_wrapper(
178    expr: &FFI_PhysicalExpr,
179    input_schema: WrappedSchema,
180) -> FFI_Result<bool> {
181    let expr = expr.inner();
182    let schema: SchemaRef = input_schema.into();
183    sresult!(expr.nullable(&schema))
184}
185
186unsafe extern "C" fn evaluate_fn_wrapper(
187    expr: &FFI_PhysicalExpr,
188    batch: WrappedArray,
189) -> FFI_Result<FFI_ColumnarValue> {
190    let batch = sresult_return!(wrapped_array_to_record_batch(batch));
191    sresult!(
192        expr.inner()
193            .evaluate(&batch)
194            .and_then(FFI_ColumnarValue::try_from)
195    )
196}
197
198unsafe extern "C" fn return_field_fn_wrapper(
199    expr: &FFI_PhysicalExpr,
200    input_schema: WrappedSchema,
201) -> FFI_Result<WrappedSchema> {
202    let expr = expr.inner();
203    let schema: SchemaRef = input_schema.into();
204    sresult!(
205        expr.return_field(&schema)
206            .and_then(|f| FFI_ArrowSchema::try_from(&f).map_err(Into::into))
207            .map(WrappedSchema)
208    )
209}
210
211unsafe extern "C" fn evaluate_selection_fn_wrapper(
212    expr: &FFI_PhysicalExpr,
213    batch: WrappedArray,
214    selection: WrappedArray,
215) -> FFI_Result<FFI_ColumnarValue> {
216    let batch = sresult_return!(wrapped_array_to_record_batch(batch));
217    let selection: ArrayRef = sresult_return!(selection.try_into());
218    let selection = sresult_return!(
219        selection
220            .as_any()
221            .downcast_ref::<BooleanArray>()
222            .ok_or(ffi_datafusion_err!("Unexpected selection array type"))
223    );
224    sresult!(
225        expr.inner()
226            .evaluate_selection(&batch, selection)
227            .and_then(FFI_ColumnarValue::try_from)
228    )
229}
230
231unsafe extern "C" fn children_fn_wrapper(
232    expr: &FFI_PhysicalExpr,
233) -> SVec<FFI_PhysicalExpr> {
234    let expr = expr.inner();
235    let children = expr.children();
236    children
237        .into_iter()
238        .map(|child| FFI_PhysicalExpr::from(Arc::clone(child)))
239        .collect()
240}
241
242unsafe extern "C" fn new_with_children_fn_wrapper(
243    expr: &FFI_PhysicalExpr,
244    children: &SVec<FFI_PhysicalExpr>,
245) -> FFI_Result<FFI_PhysicalExpr> {
246    let expr = Arc::clone(expr.inner());
247    let children = children.iter().map(Into::into).collect::<Vec<_>>();
248    sresult!(expr.with_new_children(children).map(FFI_PhysicalExpr::from))
249}
250
251unsafe extern "C" fn evaluate_bounds_fn_wrapper(
252    expr: &FFI_PhysicalExpr,
253    children: SVec<FFI_Interval>,
254) -> FFI_Result<FFI_Interval> {
255    let expr = expr.inner();
256    let children = sresult_return!(
257        children
258            .into_iter()
259            .map(Interval::try_from)
260            .collect::<Result<Vec<_>>>()
261    );
262    let children_borrowed = children.iter().collect::<Vec<_>>();
263
264    sresult!(
265        expr.evaluate_bounds(&children_borrowed)
266            .and_then(FFI_Interval::try_from)
267    )
268}
269
270unsafe extern "C" fn propagate_constraints_fn_wrapper(
271    expr: &FFI_PhysicalExpr,
272    interval: FFI_Interval,
273    children: SVec<FFI_Interval>,
274) -> FFI_Result<FFI_Option<SVec<FFI_Interval>>> {
275    let expr = expr.inner();
276    let interval = sresult_return!(Interval::try_from(interval));
277    let children = sresult_return!(
278        children
279            .into_iter()
280            .map(Interval::try_from)
281            .collect::<Result<Vec<_>>>()
282    );
283    let children_borrowed = children.iter().collect::<Vec<_>>();
284
285    let result =
286        sresult_return!(expr.propagate_constraints(&interval, &children_borrowed));
287
288    let result = sresult_return!(
289        result
290            .map(|intervals| intervals
291                .into_iter()
292                .map(FFI_Interval::try_from)
293                .collect::<Result<SVec<_>>>())
294            .transpose()
295    );
296
297    FFI_Result::Ok(result.into())
298}
299
300#[expect(deprecated)]
301unsafe extern "C" fn evaluate_statistics_fn_wrapper(
302    expr: &FFI_PhysicalExpr,
303    children: SVec<FFI_Distribution>,
304) -> FFI_Result<FFI_Distribution> {
305    let expr = expr.inner();
306    let children = sresult_return!(
307        children
308            .into_iter()
309            .map(Distribution::try_from)
310            .collect::<Result<Vec<_>>>()
311    );
312    let children_borrowed = children.iter().collect::<Vec<_>>();
313    sresult!(
314        expr.evaluate_statistics(&children_borrowed)
315            .and_then(|dist| FFI_Distribution::try_from(&dist))
316    )
317}
318
319#[expect(deprecated)]
320unsafe extern "C" fn propagate_statistics_fn_wrapper(
321    expr: &FFI_PhysicalExpr,
322    parent: FFI_Distribution,
323    children: SVec<FFI_Distribution>,
324) -> FFI_Result<FFI_Option<SVec<FFI_Distribution>>> {
325    let expr = expr.inner();
326    let parent = sresult_return!(Distribution::try_from(parent));
327    let children = sresult_return!(
328        children
329            .into_iter()
330            .map(Distribution::try_from)
331            .collect::<Result<Vec<_>>>()
332    );
333    let children_borrowed = children.iter().collect::<Vec<_>>();
334
335    let result = sresult_return!(expr.propagate_statistics(&parent, &children_borrowed));
336    let result = sresult_return!(
337        result
338            .map(|dists| dists
339                .iter()
340                .map(FFI_Distribution::try_from)
341                .collect::<Result<SVec<_>>>())
342            .transpose()
343    );
344
345    FFI_Result::Ok(result.into())
346}
347
348unsafe extern "C" fn get_properties_fn_wrapper(
349    expr: &FFI_PhysicalExpr,
350    children: SVec<FFI_ExprProperties>,
351) -> FFI_Result<FFI_ExprProperties> {
352    let expr = expr.inner();
353    let children = sresult_return!(
354        children
355            .into_iter()
356            .map(ExprProperties::try_from)
357            .collect::<Result<Vec<_>>>()
358    );
359    sresult!(
360        expr.get_properties(&children)
361            .and_then(|p| FFI_ExprProperties::try_from(&p))
362    )
363}
364
365unsafe extern "C" fn fmt_sql_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_Result<SString> {
366    let expr = expr.inner();
367    let result = fmt_sql(expr.as_ref()).to_string();
368    FFI_Result::Ok(result.into())
369}
370
371unsafe extern "C" fn snapshot_fn_wrapper(
372    expr: &FFI_PhysicalExpr,
373) -> FFI_Result<FFI_Option<FFI_PhysicalExpr>> {
374    let expr = expr.inner();
375    sresult!(
376        expr.snapshot()
377            .map(|snapshot| snapshot.map(FFI_PhysicalExpr::from).into())
378    )
379}
380
381unsafe extern "C" fn snapshot_generation_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
382    let expr = expr.inner();
383    expr.snapshot_generation()
384}
385
386unsafe extern "C" fn is_volatile_node_fn_wrapper(expr: &FFI_PhysicalExpr) -> bool {
387    let expr = expr.inner();
388    expr.is_volatile_node()
389}
390unsafe extern "C" fn display_fn_wrapper(expr: &FFI_PhysicalExpr) -> SString {
391    let expr = expr.inner();
392    format!("{expr}").into()
393}
394
395unsafe extern "C" fn hash_fn_wrapper(expr: &FFI_PhysicalExpr) -> u64 {
396    let expr = expr.inner();
397    let mut hasher = DefaultHasher::new();
398    expr.hash(&mut hasher);
399    hasher.finish()
400}
401
402unsafe extern "C" fn release_fn_wrapper(expr: &mut FFI_PhysicalExpr) {
403    unsafe {
404        debug_assert!(!expr.private_data.is_null());
405        let private_data =
406            Box::from_raw(expr.private_data as *mut PhysicalExprPrivateData);
407        drop(private_data);
408        expr.private_data = std::ptr::null_mut();
409    }
410}
411
412unsafe extern "C" fn clone_fn_wrapper(expr: &FFI_PhysicalExpr) -> FFI_PhysicalExpr {
413    unsafe {
414        let old_private_data = expr.private_data as *const PhysicalExprPrivateData;
415
416        let private_data = Box::into_raw(Box::new(PhysicalExprPrivateData {
417            expr: Arc::clone(&(*old_private_data).expr),
418        })) as *mut c_void;
419
420        FFI_PhysicalExpr {
421            data_type: data_type_fn_wrapper,
422            nullable: nullable_fn_wrapper,
423            evaluate: evaluate_fn_wrapper,
424            return_field: return_field_fn_wrapper,
425            evaluate_selection: evaluate_selection_fn_wrapper,
426            children: children_fn_wrapper,
427            new_with_children: new_with_children_fn_wrapper,
428            evaluate_bounds: evaluate_bounds_fn_wrapper,
429            propagate_constraints: propagate_constraints_fn_wrapper,
430            evaluate_statistics: evaluate_statistics_fn_wrapper,
431            propagate_statistics: propagate_statistics_fn_wrapper,
432            get_properties: get_properties_fn_wrapper,
433            fmt_sql: fmt_sql_fn_wrapper,
434            snapshot: snapshot_fn_wrapper,
435            snapshot_generation: snapshot_generation_fn_wrapper,
436            is_volatile_node: is_volatile_node_fn_wrapper,
437            display: display_fn_wrapper,
438            hash: hash_fn_wrapper,
439            clone: clone_fn_wrapper,
440            release: release_fn_wrapper,
441            version: super::version,
442            private_data,
443            library_marker_id: crate::get_library_marker_id,
444        }
445    }
446}
447
448impl Drop for FFI_PhysicalExpr {
449    fn drop(&mut self) {
450        unsafe { (self.release)(self) }
451    }
452}
453
454impl From<Arc<dyn PhysicalExpr>> for FFI_PhysicalExpr {
455    /// Creates a new [`FFI_PhysicalExpr`].
456    fn from(expr: Arc<dyn PhysicalExpr>) -> Self {
457        if let Some(expr) = expr.downcast_ref::<ForeignPhysicalExpr>() {
458            return expr.expr.clone();
459        }
460
461        let private_data = Box::new(PhysicalExprPrivateData { expr });
462
463        Self {
464            data_type: data_type_fn_wrapper,
465            nullable: nullable_fn_wrapper,
466            evaluate: evaluate_fn_wrapper,
467            return_field: return_field_fn_wrapper,
468            evaluate_selection: evaluate_selection_fn_wrapper,
469            children: children_fn_wrapper,
470            new_with_children: new_with_children_fn_wrapper,
471            evaluate_bounds: evaluate_bounds_fn_wrapper,
472            propagate_constraints: propagate_constraints_fn_wrapper,
473            evaluate_statistics: evaluate_statistics_fn_wrapper,
474            propagate_statistics: propagate_statistics_fn_wrapper,
475            get_properties: get_properties_fn_wrapper,
476            fmt_sql: fmt_sql_fn_wrapper,
477            snapshot: snapshot_fn_wrapper,
478            snapshot_generation: snapshot_generation_fn_wrapper,
479            is_volatile_node: is_volatile_node_fn_wrapper,
480            display: display_fn_wrapper,
481            hash: hash_fn_wrapper,
482            clone: clone_fn_wrapper,
483            release: release_fn_wrapper,
484            version: super::version,
485            private_data: Box::into_raw(private_data) as *mut c_void,
486            library_marker_id: crate::get_library_marker_id,
487        }
488    }
489}
490
491/// This wrapper struct exists on the receiver side of the FFI interface, so it has
492/// no guarantees about being able to access the data in `private_data`. Any functions
493/// defined on this struct must only use the stable functions provided in
494/// FFI_PhysicalExpr to interact with the expression.
495#[derive(Debug)]
496pub struct ForeignPhysicalExpr {
497    expr: FFI_PhysicalExpr,
498    children: Vec<Arc<dyn PhysicalExpr>>,
499}
500
501unsafe impl Send for ForeignPhysicalExpr {}
502unsafe impl Sync for ForeignPhysicalExpr {}
503
504impl From<&FFI_PhysicalExpr> for Arc<dyn PhysicalExpr> {
505    fn from(ffi_expr: &FFI_PhysicalExpr) -> Self {
506        if (ffi_expr.library_marker_id)() == crate::get_library_marker_id() {
507            Arc::clone(ffi_expr.inner())
508        } else {
509            let children = unsafe {
510                (ffi_expr.children)(ffi_expr)
511                    .into_iter()
512                    .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
513                    .collect()
514            };
515
516            Arc::new(ForeignPhysicalExpr {
517                expr: ffi_expr.clone(),
518                children,
519            })
520        }
521    }
522}
523
524impl Clone for FFI_PhysicalExpr {
525    fn clone(&self) -> Self {
526        unsafe { (self.clone)(self) }
527    }
528}
529
530impl PhysicalExpr for ForeignPhysicalExpr {
531    fn data_type(&self, input_schema: &Schema) -> Result<DataType> {
532        unsafe {
533            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
534            df_result!((self.expr.data_type)(&self.expr, schema))
535                .and_then(|d| DataType::try_from(&d.0).map_err(Into::into))
536        }
537    }
538
539    fn nullable(&self, input_schema: &Schema) -> Result<bool> {
540        unsafe {
541            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
542            df_result!((self.expr.nullable)(&self.expr, schema))
543        }
544    }
545
546    fn evaluate(&self, batch: &RecordBatch) -> Result<ColumnarValue> {
547        unsafe {
548            let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
549            df_result!((self.expr.evaluate)(&self.expr, batch))
550                .and_then(ColumnarValue::try_from)
551        }
552    }
553
554    fn return_field(&self, input_schema: &Schema) -> Result<FieldRef> {
555        unsafe {
556            let schema = WrappedSchema::from(Arc::new(input_schema.clone()));
557            let result = df_result!((self.expr.return_field)(&self.expr, schema))?;
558            Field::try_from(&result.0).map(Arc::new).map_err(Into::into)
559        }
560    }
561
562    fn evaluate_selection(
563        &self,
564        batch: &RecordBatch,
565        selection: &BooleanArray,
566    ) -> Result<ColumnarValue> {
567        unsafe {
568            let batch = df_result!(record_batch_to_wrapped_array(batch.clone()))?;
569            // This is not ideal - we are cloning the selection array
570            // This is not terrible since it will be a small array.
571            // The other alternative is to modify the trait signature.
572            let selection: ArrayRef = Arc::new(selection.clone());
573            let selection = WrappedArray::try_from(&selection)?;
574            df_result!((self.expr.evaluate_selection)(&self.expr, batch, selection))
575                .and_then(ColumnarValue::try_from)
576        }
577    }
578
579    fn children(&self) -> Vec<&Arc<dyn PhysicalExpr>> {
580        self.children.iter().collect()
581    }
582
583    fn with_new_children(
584        self: Arc<Self>,
585        children: Vec<Arc<dyn PhysicalExpr>>,
586    ) -> Result<Arc<dyn PhysicalExpr>> {
587        unsafe {
588            let children = children.into_iter().map(FFI_PhysicalExpr::from).collect();
589            df_result!(
590                (self.expr.new_with_children)(&self.expr, &children).map(|expr| <Arc<
591                    dyn PhysicalExpr,
592                >>::from(
593                    &expr
594                ))
595            )
596        }
597    }
598
599    fn evaluate_bounds(&self, children: &[&Interval]) -> Result<Interval> {
600        unsafe {
601            let children = children
602                .iter()
603                .map(|interval| FFI_Interval::try_from(*interval))
604                .collect::<Result<SVec<_>>>()?;
605            df_result!((self.expr.evaluate_bounds)(&self.expr, children))
606                .and_then(Interval::try_from)
607        }
608    }
609
610    fn propagate_constraints(
611        &self,
612        interval: &Interval,
613        children: &[&Interval],
614    ) -> Result<Option<Vec<Interval>>> {
615        unsafe {
616            let interval = interval.try_into()?;
617            let children = children
618                .iter()
619                .map(|interval| FFI_Interval::try_from(*interval))
620                .collect::<Result<SVec<_>>>()?;
621            let result = df_result!((self.expr.propagate_constraints)(
622                &self.expr, interval, children
623            ))?;
624
625            let result: Option<_> = result
626                .map(|intervals| {
627                    intervals
628                        .into_iter()
629                        .map(Interval::try_from)
630                        .collect::<Result<Vec<_>>>()
631                })
632                .into();
633            result.transpose()
634        }
635    }
636
637    #[expect(deprecated)]
638    fn evaluate_statistics(&self, children: &[&Distribution]) -> Result<Distribution> {
639        unsafe {
640            let children = children
641                .iter()
642                .map(|dist| FFI_Distribution::try_from(*dist))
643                .collect::<Result<SVec<_>>>()?;
644
645            let result =
646                df_result!((self.expr.evaluate_statistics)(&self.expr, children))?;
647            Distribution::try_from(result)
648        }
649    }
650
651    #[expect(deprecated)]
652    fn propagate_statistics(
653        &self,
654        parent: &Distribution,
655        children: &[&Distribution],
656    ) -> Result<Option<Vec<Distribution>>> {
657        unsafe {
658            let parent = FFI_Distribution::try_from(parent)?;
659            let children = children
660                .iter()
661                .map(|dist| FFI_Distribution::try_from(*dist))
662                .collect::<Result<SVec<_>>>()?;
663            let result = df_result!((self.expr.propagate_statistics)(
664                &self.expr, parent, children
665            ))?;
666
667            let result: Option<Result<Vec<Distribution>>> = result
668                .map(|dists| {
669                    dists
670                        .into_iter()
671                        .map(Distribution::try_from)
672                        .collect::<Result<Vec<_>>>()
673                })
674                .into();
675
676            result.transpose()
677        }
678    }
679
680    fn get_properties(&self, children: &[ExprProperties]) -> Result<ExprProperties> {
681        unsafe {
682            let children = children
683                .iter()
684                .map(FFI_ExprProperties::try_from)
685                .collect::<Result<SVec<_>>>()?;
686            df_result!((self.expr.get_properties)(&self.expr, children))
687                .and_then(ExprProperties::try_from)
688        }
689    }
690
691    fn fmt_sql(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
692        unsafe {
693            match (self.expr.fmt_sql)(&self.expr) {
694                FFI_Result::Ok(sql) => write!(f, "{sql}"),
695                FFI_Result::Err(_) => Err(std::fmt::Error),
696            }
697        }
698    }
699
700    fn snapshot(&self) -> Result<Option<Arc<dyn PhysicalExpr>>> {
701        unsafe {
702            let result = df_result!((self.expr.snapshot)(&self.expr))?;
703            Ok(result
704                .map(|expr| <Arc<dyn PhysicalExpr>>::from(&expr))
705                .into())
706        }
707    }
708
709    fn snapshot_generation(&self) -> u64 {
710        unsafe { (self.expr.snapshot_generation)(&self.expr) }
711    }
712
713    fn is_volatile_node(&self) -> bool {
714        unsafe { (self.expr.is_volatile_node)(&self.expr) }
715    }
716}
717
718impl Eq for ForeignPhysicalExpr {}
719impl PartialEq for ForeignPhysicalExpr {
720    fn eq(&self, other: &Self) -> bool {
721        // FFI_PhysicalExpr cannot be compared, so identity equality is the best we can do.
722        std::ptr::eq(self, other)
723    }
724}
725impl Hash for ForeignPhysicalExpr {
726    fn hash<H: Hasher>(&self, state: &mut H) {
727        let value = unsafe { (self.expr.hash)(&self.expr) };
728        value.hash(state)
729    }
730}
731
732impl Display for ForeignPhysicalExpr {
733    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
734        let display = unsafe { (self.expr.display)(&self.expr) };
735        write!(f, "{display}")
736    }
737}
738
739#[cfg(test)]
740mod tests {
741    use std::hash::{DefaultHasher, Hash, Hasher};
742    use std::sync::Arc;
743
744    use arrow::array::{BooleanArray, RecordBatch, record_batch};
745    use datafusion_common::tree_node::DynTreeNode;
746    use datafusion_common::{DataFusionError, ScalarValue};
747    use datafusion_expr::interval_arithmetic::Interval;
748    #[expect(deprecated)]
749    use datafusion_expr::statistics::Distribution;
750    use datafusion_physical_expr::expressions::{Column, NegativeExpr, NotExpr};
751    use datafusion_physical_expr_common::physical_expr::{PhysicalExpr, fmt_sql};
752
753    use crate::physical_expr::FFI_PhysicalExpr;
754
755    fn create_test_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
756        let original = Arc::new(Column::new("a", 0)) as Arc<dyn PhysicalExpr>;
757        let mut ffi_expr = FFI_PhysicalExpr::from(Arc::clone(&original));
758        ffi_expr.library_marker_id = crate::mock_foreign_marker_id;
759
760        let foreign_expr: Arc<dyn PhysicalExpr> = (&ffi_expr).into();
761
762        (original, foreign_expr)
763    }
764
765    fn test_record_batch() -> RecordBatch {
766        record_batch!(("a", Int32, [1, 2, 3])).unwrap()
767    }
768
769    #[test]
770    fn ffi_physical_expr_fields() -> Result<(), DataFusionError> {
771        let (original, foreign_expr) = create_test_expr();
772        let schema = test_record_batch().schema();
773
774        // Verify the mock marker worked, otherwise tests to follow are not useful
775        assert_ne!(original.as_ref(), foreign_expr.as_ref());
776
777        assert_eq!(
778            original.return_field(&schema)?,
779            foreign_expr.return_field(&schema)?
780        );
781
782        assert_eq!(
783            original.data_type(&schema)?,
784            foreign_expr.data_type(&schema)?
785        );
786        assert_eq!(original.nullable(&schema)?, foreign_expr.nullable(&schema)?);
787
788        Ok(())
789    }
790    #[test]
791    fn ffi_physical_expr_evaluate() -> Result<(), DataFusionError> {
792        let (original, foreign_expr) = create_test_expr();
793        let rb = test_record_batch();
794
795        assert_eq!(
796            original.evaluate(&rb)?.to_array(3)?.as_ref(),
797            foreign_expr.evaluate(&rb)?.to_array(3)?.as_ref()
798        );
799
800        Ok(())
801    }
802    #[test]
803    fn ffi_physical_expr_selection() -> Result<(), DataFusionError> {
804        let (original, foreign_expr) = create_test_expr();
805        let rb = test_record_batch();
806
807        let selection = BooleanArray::from(vec![true, false, true]);
808
809        assert_eq!(
810            original
811                .evaluate_selection(&rb, &selection)?
812                .to_array(3)?
813                .as_ref(),
814            foreign_expr
815                .evaluate_selection(&rb, &selection)?
816                .to_array(3)?
817                .as_ref()
818        );
819        Ok(())
820    }
821
822    #[test]
823    fn ffi_physical_expr_with_children() -> Result<(), DataFusionError> {
824        let (original, _) = create_test_expr();
825        let not_expr =
826            Arc::new(NotExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
827        let mut ffi_not = FFI_PhysicalExpr::from(not_expr);
828        ffi_not.library_marker_id = crate::mock_foreign_marker_id;
829        let foreign_not: Arc<dyn PhysicalExpr> = (&ffi_not).into();
830
831        let replacement = Arc::new(Column::new("b", 1)) as Arc<dyn PhysicalExpr>;
832        let updated =
833            Arc::clone(&foreign_not).with_new_children(vec![Arc::clone(&replacement)])?;
834        assert_eq!(
835            format!("{updated:?}").as_str(),
836            "NotExpr { arg: Column { name: \"b\", index: 1 } }"
837        );
838
839        let updated = foreign_not
840            .with_new_arc_children(Arc::clone(&foreign_not), vec![replacement])?;
841        assert_eq!(format!("{updated}").as_str(), "NOT b@1");
842
843        Ok(())
844    }
845
846    fn create_test_negative_expr() -> (Arc<dyn PhysicalExpr>, Arc<dyn PhysicalExpr>) {
847        let (original, _) = create_test_expr();
848
849        let negative_expr =
850            Arc::new(NegativeExpr::new(Arc::clone(&original))) as Arc<dyn PhysicalExpr>;
851        let mut ffi_neg = FFI_PhysicalExpr::from(Arc::clone(&negative_expr));
852        ffi_neg.library_marker_id = crate::mock_foreign_marker_id;
853        let foreign_neg: Arc<dyn PhysicalExpr> = (&ffi_neg).into();
854
855        (negative_expr, foreign_neg)
856    }
857
858    #[test]
859    fn ffi_physical_expr_bounds() -> Result<(), DataFusionError> {
860        let (negative_expr, foreign_neg) = create_test_negative_expr();
861
862        let interval =
863            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
864        let left = negative_expr.evaluate_bounds(&[&interval])?;
865        let right = foreign_neg.evaluate_bounds(&[&interval])?;
866
867        assert_eq!(left, right);
868
869        Ok(())
870    }
871
872    #[test]
873    fn ffi_physical_expr_constraints() -> Result<(), DataFusionError> {
874        let (negative_expr, foreign_neg) = create_test_negative_expr();
875
876        let interval =
877            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
878
879        let child =
880            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
881        let left = negative_expr.propagate_constraints(&interval, &[&child])?;
882        let right = foreign_neg.propagate_constraints(&interval, &[&child])?;
883
884        assert_eq!(left, right);
885        Ok(())
886    }
887
888    #[test]
889    #[expect(deprecated)]
890    fn ffi_physical_expr_statistics() -> Result<(), DataFusionError> {
891        let (negative_expr, foreign_neg) = create_test_negative_expr();
892        let interval =
893            Interval::try_new(ScalarValue::Int32(Some(0)), ScalarValue::Int32(Some(10)))?;
894
895        for distribution in [
896            Distribution::new_uniform(interval.clone())?,
897            Distribution::new_exponential(
898                ScalarValue::Int32(Some(10)),
899                ScalarValue::Int32(Some(10)),
900                true,
901            )?,
902            Distribution::new_gaussian(
903                ScalarValue::Int32(Some(10)),
904                ScalarValue::Int32(Some(10)),
905            )?,
906            Distribution::new_generic(
907                ScalarValue::Int32(Some(10)),
908                ScalarValue::Int32(Some(10)),
909                ScalarValue::Int32(Some(10)),
910                interval,
911            )?,
912        ] {
913            let left = negative_expr.evaluate_statistics(&[&distribution])?;
914            let right = foreign_neg.evaluate_statistics(&[&distribution])?;
915
916            assert_eq!(left, right);
917
918            let left =
919                negative_expr.propagate_statistics(&distribution, &[&distribution])?;
920            let right =
921                foreign_neg.propagate_statistics(&distribution, &[&distribution])?;
922
923            assert_eq!(left, right);
924        }
925        Ok(())
926    }
927
928    #[test]
929    fn ffi_physical_expr_properties() -> Result<(), DataFusionError> {
930        let (original, foreign_expr) = create_test_expr();
931
932        let left = original.get_properties(&[])?;
933        let right = foreign_expr.get_properties(&[])?;
934
935        assert_eq!(left.sort_properties, right.sort_properties);
936        assert_eq!(left.range, right.range);
937
938        Ok(())
939    }
940
941    #[test]
942    fn ffi_physical_formatting() {
943        let (original, foreign_expr) = create_test_expr();
944
945        let left = format!("{}", fmt_sql(original.as_ref()));
946        let right = format!("{}", fmt_sql(foreign_expr.as_ref()));
947        assert_eq!(left, right);
948    }
949
950    #[test]
951    fn ffi_physical_expr_snapshots() -> Result<(), DataFusionError> {
952        let (original, foreign_expr) = create_test_expr();
953
954        let left = original.snapshot()?;
955        let right = foreign_expr.snapshot()?;
956        assert_eq!(left, right);
957
958        assert_eq!(
959            original.snapshot_generation(),
960            foreign_expr.snapshot_generation()
961        );
962
963        Ok(())
964    }
965
966    #[test]
967    fn ffi_physical_expr_volatility() {
968        let (original, foreign_expr) = create_test_expr();
969        assert_eq!(original.is_volatile_node(), foreign_expr.is_volatile_node());
970    }
971
972    #[test]
973    fn ffi_physical_expr_hash() {
974        let (_, foreign_1) = create_test_expr();
975        let (_, foreign_2) = create_test_expr();
976
977        assert_ne!(&foreign_1, &foreign_2);
978
979        let mut hasher = DefaultHasher::new();
980        foreign_1.as_ref().hash(&mut hasher);
981        let hash_1 = hasher.finish();
982
983        let mut hasher = DefaultHasher::new();
984        foreign_2.as_ref().hash(&mut hasher);
985        let hash_2 = hasher.finish();
986
987        // We cannot compare a local object and a foreign object
988        // so create two foreign objects that *should* be identical
989        // even though they were created differently.
990        assert_eq!(hash_1, hash_2);
991    }
992
993    #[test]
994    fn ffi_physical_expr_display() {
995        let (original, foreign_expr) = create_test_expr();
996        assert_eq!(format!("{original}"), format!("{foreign_expr}"));
997    }
998}