datafusion_ffi/tests/
mod.rs1use std::sync::Arc;
19
20use abi_stable::{
21 declare_root_module_statics, export_root_module,
22 library::{LibraryError, RootModule},
23 package_version_strings,
24 prefix_type::PrefixTypeTrait,
25 sabi_types::VersionStrings,
26 StableAbi,
27};
28use catalog::create_catalog_provider;
29
30use crate::{catalog_provider::FFI_CatalogProvider, udtf::FFI_TableFunction};
31
32use crate::udaf::FFI_AggregateUDF;
33
34use crate::udwf::FFI_WindowUDF;
35
36use super::{table_provider::FFI_TableProvider, udf::FFI_ScalarUDF};
37use arrow::array::RecordBatch;
38use async_provider::create_async_table_provider;
39use datafusion::{
40 arrow::datatypes::{DataType, Field, Schema},
41 common::record_batch,
42};
43use sync_provider::create_sync_table_provider;
44use udf_udaf_udwf::{
45 create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
46 create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
47};
48
49mod async_provider;
50pub mod catalog;
51mod sync_provider;
52mod udf_udaf_udwf;
53pub mod utils;
54
55#[repr(C)]
56#[derive(StableAbi)]
57#[sabi(kind(Prefix(prefix_ref = ForeignLibraryModuleRef)))]
58pub struct ForeignLibraryModule {
62 pub create_catalog: extern "C" fn() -> FFI_CatalogProvider,
64
65 pub create_table: extern "C" fn(synchronous: bool) -> FFI_TableProvider,
67
68 pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
70
71 pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
72
73 pub create_table_function: extern "C" fn() -> FFI_TableFunction,
74
75 pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
77
78 pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
80
81 pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
82
83 pub version: extern "C" fn() -> u64,
84}
85
86impl RootModule for ForeignLibraryModuleRef {
87 declare_root_module_statics! {ForeignLibraryModuleRef}
88 const BASE_NAME: &'static str = "datafusion_ffi";
89 const NAME: &'static str = "datafusion_ffi";
90 const VERSION_STRINGS: VersionStrings = package_version_strings!();
91
92 fn initialization(self) -> Result<Self, LibraryError> {
93 Ok(self)
94 }
95}
96
97pub fn create_test_schema() -> Arc<Schema> {
98 Arc::new(Schema::new(vec![
99 Field::new("a", DataType::Int32, true),
100 Field::new("b", DataType::Float64, true),
101 ]))
102}
103
104pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
105 let end_value = start_value + num_values as i32;
106 let a_vals: Vec<i32> = (start_value..end_value).collect();
107 let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
108
109 record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
110}
111
112extern "C" fn construct_table_provider(synchronous: bool) -> FFI_TableProvider {
115 match synchronous {
116 true => create_sync_table_provider(),
117 false => create_async_table_provider(),
118 }
119}
120
121#[export_root_module]
122pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
124 ForeignLibraryModule {
125 create_catalog: create_catalog_provider,
126 create_table: construct_table_provider,
127 create_scalar_udf: create_ffi_abs_func,
128 create_nullary_udf: create_ffi_random_func,
129 create_table_function: create_ffi_table_func,
130 create_sum_udaf: create_ffi_sum_func,
131 create_stddev_udaf: create_ffi_stddev_func,
132 create_rank_udwf: create_ffi_rank_func,
133 version: super::version,
134 }
135 .leak_into_prefix()
136}