datafusion-ffi 53.1.0

Foreign Function Interface implementation for DataFusion
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

mod utils;

/// Add an additional module here for convenience to scope this to only
/// when the feature integration-tests is built
#[cfg(feature = "integration-tests")]
mod tests {
    use std::collections::HashMap;
    use std::sync::Arc;

    use arrow::datatypes::Schema;
    use datafusion::catalog::{TableProvider, TableProviderFactory};
    use datafusion::error::{DataFusionError, Result};
    use datafusion_common::TableReference;
    use datafusion_common::ToDFSchema;
    use datafusion_expr::CreateExternalTable;
    use datafusion_ffi::tests::create_record_batch;
    use datafusion_ffi::tests::utils::get_module;

    /// It is important that this test is in the `tests` directory and not in the
    /// library directory so we can verify we are building a dynamic library and
    /// testing it via a different executable.
    async fn test_table_provider(synchronous: bool) -> Result<()> {
        let table_provider_module = get_module()?;
        let (ctx, codec) = super::utils::ctx_and_codec();

        // By calling the code below, the table provided will be created within
        // the module's code.
        let ffi_table_provider = table_provider_module.create_table().ok_or(
            DataFusionError::NotImplemented(
                "External table provider failed to implement create_table".to_string(),
            ),
        )?(synchronous, codec);

        // In order to access the table provider within this executable, we need to
        // turn it into a `TableProvider`.
        let foreign_table_provider: Arc<dyn TableProvider> = (&ffi_table_provider).into();

        // Display the data to show the full cycle works.
        ctx.register_table("external_table", foreign_table_provider)?;
        let df = ctx.table("external_table").await?;
        let results = df.collect().await?;

        assert_eq!(results.len(), 3);
        assert_eq!(results[0], create_record_batch(1, 5));
        assert_eq!(results[1], create_record_batch(6, 1));
        assert_eq!(results[2], create_record_batch(7, 5));

        Ok(())
    }

    #[tokio::test]
    async fn async_test_table_provider() -> Result<()> {
        test_table_provider(false).await
    }

    #[tokio::test]
    async fn sync_test_table_provider() -> Result<()> {
        test_table_provider(true).await
    }

    #[tokio::test]
    async fn test_table_provider_factory() -> Result<()> {
        let table_provider_module = get_module()?;
        let (ctx, codec) = super::utils::ctx_and_codec();

        let ffi_table_provider_factory = table_provider_module
            .create_table_factory()
            .ok_or(DataFusionError::NotImplemented(
                "External table provider factory failed to implement create".to_string(),
            ))?(codec);

        let foreign_table_provider_factory: Arc<dyn TableProviderFactory> =
            (&ffi_table_provider_factory).into();

        let cmd = CreateExternalTable {
            schema: Schema::empty().to_dfschema_ref()?,
            name: TableReference::bare("cloned_test"),
            location: "test".to_string(),
            file_type: "test".to_string(),
            table_partition_cols: vec![],
            if_not_exists: false,
            or_replace: false,
            temporary: false,
            definition: None,
            order_exprs: vec![],
            unbounded: false,
            options: HashMap::new(),
            constraints: Default::default(),
            column_defaults: HashMap::new(),
        };

        let provider = foreign_table_provider_factory
            .create(&ctx.state(), &cmd)
            .await?;
        assert_eq!(provider.schema().fields().len(), 2);

        Ok(())
    }
}