datafusion_ffi/tests/
mod.rs1use std::sync::Arc;
19
20use abi_stable::library::{LibraryError, RootModule};
21use abi_stable::prefix_type::PrefixTypeTrait;
22use abi_stable::sabi_types::VersionStrings;
23use abi_stable::{
24 StableAbi, declare_root_module_statics, export_root_module, package_version_strings,
25};
26use arrow::array::RecordBatch;
27use arrow_schema::{DataType, Field, Schema};
28use async_provider::create_async_table_provider;
29use catalog::create_catalog_provider;
30use datafusion_common::record_batch;
31use sync_provider::create_sync_table_provider;
32use udf_udaf_udwf::{
33 create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
34 create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
35};
36
37use crate::catalog_provider::FFI_CatalogProvider;
38use crate::catalog_provider_list::FFI_CatalogProviderList;
39use crate::config::extension_options::FFI_ExtensionOptions;
40use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
41use crate::table_provider::FFI_TableProvider;
42use crate::table_provider_factory::FFI_TableProviderFactory;
43use crate::tests::catalog::create_catalog_provider_list;
44use crate::udaf::FFI_AggregateUDF;
45use crate::udf::FFI_ScalarUDF;
46use crate::udtf::FFI_TableFunction;
47use crate::udwf::FFI_WindowUDF;
48
49mod async_provider;
50pub mod catalog;
51pub mod config;
52mod sync_provider;
53mod table_provider_factory;
54mod udf_udaf_udwf;
55pub mod utils;
56
57#[repr(C)]
58#[derive(StableAbi)]
59#[sabi(kind(Prefix(prefix_ref = ForeignLibraryModuleRef)))]
60pub struct ForeignLibraryModule {
64 pub create_catalog:
66 extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProvider,
67
68 pub create_catalog_list:
70 extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProviderList,
71
72 pub create_table: extern "C" fn(
74 synchronous: bool,
75 codec: FFI_LogicalExtensionCodec,
76 ) -> FFI_TableProvider,
77
78 pub create_table_factory:
80 extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_TableProviderFactory,
81
82 pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
84
85 pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
86
87 pub create_table_function:
88 extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
89
90 pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
92
93 pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
95
96 pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
97
98 pub create_extension_options: extern "C" fn() -> FFI_ExtensionOptions,
100
101 pub version: extern "C" fn() -> u64,
102}
103
104impl RootModule for ForeignLibraryModuleRef {
105 declare_root_module_statics! {ForeignLibraryModuleRef}
106 const BASE_NAME: &'static str = "datafusion_ffi";
107 const NAME: &'static str = "datafusion_ffi";
108 const VERSION_STRINGS: VersionStrings = package_version_strings!();
109
110 fn initialization(self) -> Result<Self, LibraryError> {
111 Ok(self)
112 }
113}
114
115pub fn create_test_schema() -> Arc<Schema> {
116 Arc::new(Schema::new(vec![
117 Field::new("a", DataType::Int32, true),
118 Field::new("b", DataType::Float64, true),
119 ]))
120}
121
122pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
123 let end_value = start_value + num_values as i32;
124 let a_vals: Vec<i32> = (start_value..end_value).collect();
125 let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
126
127 record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
128}
129
130extern "C" fn construct_table_provider(
133 synchronous: bool,
134 codec: FFI_LogicalExtensionCodec,
135) -> FFI_TableProvider {
136 match synchronous {
137 true => create_sync_table_provider(codec),
138 false => create_async_table_provider(codec),
139 }
140}
141
142extern "C" fn construct_table_provider_factory(
145 codec: FFI_LogicalExtensionCodec,
146) -> FFI_TableProviderFactory {
147 table_provider_factory::create(codec)
148}
149
150#[export_root_module]
151pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
153 ForeignLibraryModule {
154 create_catalog: create_catalog_provider,
155 create_catalog_list: create_catalog_provider_list,
156 create_table: construct_table_provider,
157 create_table_factory: construct_table_provider_factory,
158 create_scalar_udf: create_ffi_abs_func,
159 create_nullary_udf: create_ffi_random_func,
160 create_table_function: create_ffi_table_func,
161 create_sum_udaf: create_ffi_sum_func,
162 create_stddev_udaf: create_ffi_stddev_func,
163 create_rank_udwf: create_ffi_rank_func,
164 create_extension_options: config::create_extension_options,
165 version: super::version,
166 }
167 .leak_into_prefix()
168}