datafusion_ffi/tests/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use abi_stable::library::{LibraryError, RootModule};
21use abi_stable::prefix_type::PrefixTypeTrait;
22use abi_stable::sabi_types::VersionStrings;
23use abi_stable::{
24    StableAbi, declare_root_module_statics, export_root_module, package_version_strings,
25};
26use arrow::array::RecordBatch;
27use arrow_schema::{DataType, Field, Schema};
28use async_provider::create_async_table_provider;
29use catalog::create_catalog_provider;
30use datafusion_common::record_batch;
31use sync_provider::create_sync_table_provider;
32use udf_udaf_udwf::{
33    create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
34    create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
35};
36
37use super::table_provider::FFI_TableProvider;
38use super::udf::FFI_ScalarUDF;
39use crate::catalog_provider::FFI_CatalogProvider;
40use crate::catalog_provider_list::FFI_CatalogProviderList;
41use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
42use crate::tests::catalog::create_catalog_provider_list;
43use crate::udaf::FFI_AggregateUDF;
44use crate::udtf::FFI_TableFunction;
45use crate::udwf::FFI_WindowUDF;
46
47mod async_provider;
48pub mod catalog;
49mod sync_provider;
50mod udf_udaf_udwf;
51pub mod utils;
52
53#[repr(C)]
54#[derive(StableAbi)]
55#[sabi(kind(Prefix(prefix_ref = ForeignLibraryModuleRef)))]
56/// This struct defines the module interfaces. It is to be shared by
57/// both the module loading program and library that implements the
58/// module.
59pub struct ForeignLibraryModule {
60    /// Construct an opinionated catalog provider
61    pub create_catalog:
62        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProvider,
63
64    /// Construct an opinionated catalog provider list
65    pub create_catalog_list:
66        extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProviderList,
67
68    /// Constructs the table provider
69    pub create_table: extern "C" fn(
70        synchronous: bool,
71        codec: FFI_LogicalExtensionCodec,
72    ) -> FFI_TableProvider,
73
74    /// Create a scalar UDF
75    pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
76
77    pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
78
79    pub create_table_function:
80        extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
81
82    /// Create an aggregate UDAF using sum
83    pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
84
85    /// Create  grouping UDAF using stddev
86    pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
87
88    pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
89
90    pub version: extern "C" fn() -> u64,
91}
92
93impl RootModule for ForeignLibraryModuleRef {
94    declare_root_module_statics! {ForeignLibraryModuleRef}
95    const BASE_NAME: &'static str = "datafusion_ffi";
96    const NAME: &'static str = "datafusion_ffi";
97    const VERSION_STRINGS: VersionStrings = package_version_strings!();
98
99    fn initialization(self) -> Result<Self, LibraryError> {
100        Ok(self)
101    }
102}
103
104pub fn create_test_schema() -> Arc<Schema> {
105    Arc::new(Schema::new(vec![
106        Field::new("a", DataType::Int32, true),
107        Field::new("b", DataType::Float64, true),
108    ]))
109}
110
111pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
112    let end_value = start_value + num_values as i32;
113    let a_vals: Vec<i32> = (start_value..end_value).collect();
114    let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
115
116    record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
117}
118
119/// Here we only wish to create a simple table provider as an example.
120/// We create an in-memory table and convert it to it's FFI counterpart.
121extern "C" fn construct_table_provider(
122    synchronous: bool,
123    codec: FFI_LogicalExtensionCodec,
124) -> FFI_TableProvider {
125    match synchronous {
126        true => create_sync_table_provider(codec),
127        false => create_async_table_provider(codec),
128    }
129}
130
131#[export_root_module]
132/// This defines the entry point for using the module.
133pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
134    ForeignLibraryModule {
135        create_catalog: create_catalog_provider,
136        create_catalog_list: create_catalog_provider_list,
137        create_table: construct_table_provider,
138        create_scalar_udf: create_ffi_abs_func,
139        create_nullary_udf: create_ffi_random_func,
140        create_table_function: create_ffi_table_func,
141        create_sum_udaf: create_ffi_sum_func,
142        create_stddev_udaf: create_ffi_stddev_func,
143        create_rank_udwf: create_ffi_rank_func,
144        version: super::version,
145    }
146    .leak_into_prefix()
147}