mod utils;
#[cfg(feature = "integration-tests")]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use arrow::datatypes::Schema;
use datafusion::catalog::{TableProvider, TableProviderFactory};
use datafusion::error::{DataFusionError, Result};
use datafusion_common::TableReference;
use datafusion_common::ToDFSchema;
use datafusion_expr::CreateExternalTable;
use datafusion_ffi::tests::create_record_batch;
use datafusion_ffi::tests::utils::get_module;
async fn test_table_provider(synchronous: bool) -> Result<()> {
let table_provider_module = get_module()?;
let (ctx, codec) = super::utils::ctx_and_codec();
let ffi_table_provider = table_provider_module.create_table().ok_or(
DataFusionError::NotImplemented(
"External table provider failed to implement create_table".to_string(),
),
)?(synchronous, codec);
let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();
ctx.register_table("external_table", foreign_table_provider)?;
let df = ctx.table("external_table").await?;
let results = df.collect().await?;
assert_eq!(results.len(), 3);
assert_eq!(results[0], create_record_batch(1, 5));
assert_eq!(results[1], create_record_batch(6, 1));
assert_eq!(results[2], create_record_batch(7, 5));
Ok(())
}
#[tokio::test]
async fn async_test_table_provider() -> Result<()> {
test_table_provider(false).await
}
#[tokio::test]
async fn sync_test_table_provider() -> Result<()> {
test_table_provider(true).await
}
#[tokio::test]
async fn test_table_provider_factory() -> Result<()> {
let table_provider_module = get_module()?;
let (ctx, codec) = super::utils::ctx_and_codec();
let ffi_table_provider_factory = table_provider_module
.create_table_factory()
.ok_or(DataFusionError::NotImplemented(
"External table provider factory failed to implement create".to_string(),
))?(codec);
let foreign_table_provider_factory: Arc<dyn TableProviderFactory> =
(&ffi_table_provider_factory).into();
let cmd = CreateExternalTable {
schema: Schema::empty().to_dfschema_ref()?,
name: TableReference::bare("cloned_test"),
location: "test".to_string(),
file_type: "test".to_string(),
table_partition_cols: vec![],
if_not_exists: false,
or_replace: false,
temporary: false,
definition: None,
order_exprs: vec![],
unbounded: false,
options: HashMap::new(),
constraints: Default::default(),
column_defaults: HashMap::new(),
};
let provider = foreign_table_provider_factory
.create(&ctx.state(), &cmd)
.await?;
assert_eq!(provider.schema().fields().len(), 2);
Ok(())
}
}