Skip to main content

datafusion_ffi/tests/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use arrow::array::RecordBatch;
21use arrow_schema::{DataType, Field, Schema};
22use async_provider::create_async_table_provider;
23use async_trait::async_trait;
24use catalog::create_catalog_provider;
25use datafusion_catalog::MemTable;
26use datafusion_catalog::{Session, TableProvider};
27use datafusion_common::record_batch;
28use datafusion_common::stats::Precision;
29use datafusion_common::{ColumnStatistics, Statistics};
30use datafusion_common::{Result, ScalarValue};
31use datafusion_expr::{Expr, TableType};
32use datafusion_physical_plan::ExecutionPlan;
33use sync_provider::create_sync_table_provider;
34use udf_udaf_udwf::{
35    create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
36    create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
37};
38
39use crate::catalog_provider::FFI_CatalogProvider;
40use crate::catalog_provider_list::FFI_CatalogProviderList;
41use crate::config::extension_options::FFI_ExtensionOptions;
42use crate::execution_plan::FFI_ExecutionPlan;
43use crate::execution_plan::tests::EmptyExec;
44use crate::physical_optimizer::FFI_PhysicalOptimizerRule;
45use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
46use crate::table_provider::FFI_TableProvider;
47use crate::table_provider_factory::FFI_TableProviderFactory;
48use crate::tests::catalog::create_catalog_provider_list;
49use crate::udaf::FFI_AggregateUDF;
50use crate::udf::FFI_ScalarUDF;
51use crate::udtf::FFI_TableFunction;
52use crate::udwf::FFI_WindowUDF;
53
54mod async_provider;
55pub mod catalog;
56pub mod config;
57mod physical_optimizer;
58mod sync_provider;
59mod table_provider_factory;
60mod udf_udaf_udwf;
61pub mod utils;
62
63#[repr(C)]
64/// This struct defines the module interfaces. It is to be shared by
65/// both the module loading program and library that implements the
66/// module.
67pub struct ForeignLibraryModule {
68    /// Construct an opinionated catalog provider
69    pub create_catalog:
70        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProvider,
71
72    /// Construct an opinionated catalog provider list
73    pub create_catalog_list:
74        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProviderList,
75
76    /// Constructs the table provider
77    pub create_table: extern "C" fn(
78        synchronous: bool,
79        codec: FFI_LogicalExtensionCodec,
80    ) -> FFI_TableProvider,
81
82    /// Constructs the table provider factory
83    pub create_table_factory:
84        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory,
85
86    /// Create a scalar UDF
87    pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
88
89    pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
90
91    pub create_timezone_udf: extern "C" fn() -> FFI_ScalarUDF,
92
93    pub create_table_function:
94        extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
95
96    /// Create an aggregate UDAF using sum
97    pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
98
99    /// Create  grouping UDAF using stddev
100    pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
101
102    pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
103
104    /// Create extension options, for either ConfigOptions or TableOptions
105    pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
106
107    pub create_empty_exec: extern "C" fn() -> FFI_ExecutionPlan,
108
109    pub create_exec_with_statistics: extern "C" fn() -> FFI_ExecutionPlan,
110
111    pub create_table_with_statistics:
112        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProvider,
113
114    pub create_physical_optimizer_rule: extern "C" fn() -> FFI_PhysicalOptimizerRule,
115
116    pub version: extern "C" fn() -> u64,
117}
118
119pub fn create_test_schema() -> Arc<Schema> {
120    Arc::new(Schema::new(vec![
121        Field::new("a", DataType::Int32, true),
122        Field::new("b", DataType::Float64, true),
123    ]))
124}
125
126pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
127    let end_value = start_value + num_values as i32;
128    let a_vals: Vec<i32> = (start_value..end_value).collect();
129    let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
130
131    record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
132}
133
134/// Here we only wish to create a simple table provider as an example.
135/// We create an in-memory table and convert it to it's FFI counterpart.
136extern "C" fn construct_table_provider(
137    synchronous: bool,
138    codec: FFI_LogicalExtensionCodec,
139) -> FFI_TableProvider {
140    match synchronous {
141        true => create_sync_table_provider(codec),
142        false => create_async_table_provider(codec),
143    }
144}
145
146/// Here we only wish to create a simple table provider as an example.
147/// We create an in-memory table and convert it to it's FFI counterpart.
148extern "C" fn construct_table_provider_factory(
149    codec: FFI_LogicalExtensionCodec,
150) -> FFI_TableProviderFactory {
151    table_provider_factory::create(codec)
152}
153
154pub(crate) extern "C" fn create_empty_exec() -> FFI_ExecutionPlan {
155    let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, false)]));
156
157    let plan = Arc::new(EmptyExec::new(schema));
158    FFI_ExecutionPlan::new(plan, None)
159}
160
161/// Returns canonical statistics used by both the producer and consumer sides of
162/// the integration tests so round-trips can be asserted without hard-coding
163/// the values in two places.
164pub fn make_test_statistics() -> Statistics {
165    Statistics {
166        num_rows: Precision::Exact(42),
167        total_byte_size: Precision::Exact(672),
168        column_statistics: vec![
169            ColumnStatistics {
170                null_count: Precision::Exact(0),
171                max_value: Precision::Exact(ScalarValue::Int32(Some(100))),
172                min_value: Precision::Exact(ScalarValue::Int32(Some(-10))),
173                sum_value: Precision::Exact(ScalarValue::Int64(Some(1890))),
174                distinct_count: Precision::Inexact(40),
175                byte_size: Precision::Exact(168),
176            },
177            ColumnStatistics {
178                null_count: Precision::Exact(1),
179                max_value: Precision::Exact(ScalarValue::Float64(Some(99.5))),
180                min_value: Precision::Exact(ScalarValue::Float64(Some(-1.5))),
181                sum_value: Precision::Absent,
182                distinct_count: Precision::Absent,
183                byte_size: Precision::Exact(328),
184            },
185        ],
186    }
187}
188
189pub(crate) extern "C" fn create_exec_with_statistics() -> FFI_ExecutionPlan {
190    let schema = create_test_schema();
191    let plan = Arc::new(EmptyExec::new(schema).with_statistics(make_test_statistics()));
192    FFI_ExecutionPlan::new(plan, None)
193}
194
195/// Thin wrapper that attaches a fixed [`Statistics`] snapshot to any inner
196/// [`TableProvider`] without changing its scan behaviour.
197#[derive(Debug)]
198struct TableWithStats {
199    inner: Arc<dyn TableProvider>,
200    stats: Statistics,
201}
202
203#[async_trait]
204impl TableProvider for TableWithStats {
205    fn schema(&self) -> arrow_schema::SchemaRef {
206        self.inner.schema()
207    }
208
209    fn table_type(&self) -> TableType {
210        self.inner.table_type()
211    }
212
213    fn statistics(&self) -> Option<Statistics> {
214        Some(self.stats.clone())
215    }
216
217    async fn scan(
218        &self,
219        session: &dyn Session,
220        projection: Option<&Vec<usize>>,
221        filters: &[Expr],
222        limit: Option<usize>,
223    ) -> Result<Arc<dyn ExecutionPlan>> {
224        self.inner.scan(session, projection, filters, limit).await
225    }
226}
227
228pub(crate) extern "C" fn create_table_with_statistics(
229    codec: FFI_LogicalExtensionCodec,
230) -> FFI_TableProvider {
231    let schema = create_test_schema();
232    let batch = create_record_batch(1, 5);
233    let inner = Arc::new(MemTable::try_new(schema, vec![vec![batch]]).unwrap());
234    let provider = Arc::new(TableWithStats {
235        inner,
236        stats: make_test_statistics(),
237    });
238    FFI_TableProvider::new_with_ffi_codec(provider, true, None, codec)
239}
240
241/// This defines the entry point for using the module.
242#[unsafe(no_mangle)]
243pub extern "C" fn datafusion_ffi_get_module() -> ForeignLibraryModule {
244    ForeignLibraryModule {
245        create_catalog: create_catalog_provider,
246        create_catalog_list: create_catalog_provider_list,
247        create_table: construct_table_provider,
248        create_table_factory: construct_table_provider_factory,
249        create_scalar_udf: create_ffi_abs_func,
250        create_nullary_udf: create_ffi_random_func,
251        create_timezone_udf: udf_udaf_udwf::create_timezone_func,
252        create_table_function: create_ffi_table_func,
253        create_sum_udaf: create_ffi_sum_func,
254        create_stddev_udaf: create_ffi_stddev_func,
255        create_rank_udwf: create_ffi_rank_func,
256        create_extension_options: config::create_extension_options,
257        create_empty_exec,
258        create_exec_with_statistics,
259        create_table_with_statistics,
260        create_physical_optimizer_rule:
261            physical_optimizer::create_physical_optimizer_rule,
262        version: super::version,
263    }
264}