use std::sync::Arc;
use abi_stable::library::{LibraryError, RootModule};
use abi_stable::prefix_type::PrefixTypeTrait;
use abi_stable::sabi_types::VersionStrings;
use abi_stable::{
StableAbi, declare_root_module_statics, export_root_module, package_version_strings,
};
use arrow::array::RecordBatch;
use arrow_schema::{DataType, Field, Schema};
use async_provider::create_async_table_provider;
use catalog::create_catalog_provider;
use datafusion_common::record_batch;
use sync_provider::create_sync_table_provider;
use udf_udaf_udwf::{
create_ffi_abs_func, create_ffi_random_func, create_ffi_rank_func,
create_ffi_stddev_func, create_ffi_sum_func, create_ffi_table_func,
};
use super::table_provider::FFI_TableProvider;
use super::udf::FFI_ScalarUDF;
use crate::catalog_provider::FFI_CatalogProvider;
use crate::catalog_provider_list::FFI_CatalogProviderList;
use crate::proto::logical_extension_codec::FFI_LogicalExtensionCodec;
use crate::tests::catalog::create_catalog_provider_list;
use crate::udaf::FFI_AggregateUDF;
use crate::udtf::FFI_TableFunction;
use crate::udwf::FFI_WindowUDF;
mod async_provider;
pub mod catalog;
mod sync_provider;
mod udf_udaf_udwf;
pub mod utils;
#[repr(C)]
#[derive(StableAbi)]
#[sabi(kind(Prefix(prefix_ref = ForeignLibraryModuleRef)))]
pub struct ForeignLibraryModule {
pub create_catalog:
extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProvider,
pub create_catalog_list:
extern "C" fn(codec: FFI_LogicalExtensionCodec) -> FFI_CatalogProviderList,
pub create_table: extern "C" fn(
synchronous: bool,
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableProvider,
pub create_scalar_udf: extern "C" fn() -> FFI_ScalarUDF,
pub create_nullary_udf: extern "C" fn() -> FFI_ScalarUDF,
pub create_table_function:
extern "C" fn(FFI_LogicalExtensionCodec) -> FFI_TableFunction,
pub create_sum_udaf: extern "C" fn() -> FFI_AggregateUDF,
pub create_stddev_udaf: extern "C" fn() -> FFI_AggregateUDF,
pub create_rank_udwf: extern "C" fn() -> FFI_WindowUDF,
pub version: extern "C" fn() -> u64,
}
impl RootModule for ForeignLibraryModuleRef {
declare_root_module_statics! {ForeignLibraryModuleRef}
const BASE_NAME: &'static str = "datafusion_ffi";
const NAME: &'static str = "datafusion_ffi";
const VERSION_STRINGS: VersionStrings = package_version_strings!();
fn initialization(self) -> Result<Self, LibraryError> {
Ok(self)
}
}
pub fn create_test_schema() -> Arc<Schema> {
Arc::new(Schema::new(vec![
Field::new("a", DataType::Int32, true),
Field::new("b", DataType::Float64, true),
]))
}
pub fn create_record_batch(start_value: i32, num_values: usize) -> RecordBatch {
let end_value = start_value + num_values as i32;
let a_vals: Vec<i32> = (start_value..end_value).collect();
let b_vals: Vec<f64> = a_vals.iter().map(|v| *v as f64).collect();
record_batch!(("a", Int32, a_vals), ("b", Float64, b_vals)).unwrap()
}
extern "C" fn construct_table_provider(
synchronous: bool,
codec: FFI_LogicalExtensionCodec,
) -> FFI_TableProvider {
match synchronous {
true => create_sync_table_provider(codec),
false => create_async_table_provider(codec),
}
}
#[export_root_module]
pub fn get_foreign_library_module() -> ForeignLibraryModuleRef {
ForeignLibraryModule {
create_catalog: create_catalog_provider,
create_catalog_list: create_catalog_provider_list,
create_table: construct_table_provider,
create_scalar_udf: create_ffi_abs_func,
create_nullary_udf: create_ffi_random_func,
create_table_function: create_ffi_table_func,
create_sum_udaf: create_ffi_sum_func,
create_stddev_udaf: create_ffi_stddev_func,
create_rank_udwf: create_ffi_rank_func,
version: super::version,
}
.leak_into_prefix()
}