zarr-datafusion 0.1.0

Extending DataFusion to do SQL queries on Zarr data.
Documentation
use std::sync::Arc;

use async_trait::async_trait;
use datafusion::catalog::{Session, TableProviderFactory};
use datafusion::common::DataFusionError;
use datafusion::datasource::TableProvider;
use datafusion::error::Result;
use datafusion::logical_expr::CreateExternalTable;
use tracing::{debug, info, instrument};

use crate::datasource::zarr::ZarrTable;
use crate::reader::schema_inference::{infer_schema_with_meta, infer_schema_with_meta_async};
use crate::reader::storage::{create_async_store, is_remote_url};

#[derive(Debug, Default)]
pub struct ZarrTableFactory;

#[async_trait]
impl TableProviderFactory for ZarrTableFactory {
    #[instrument(level = "info", skip_all)]
    async fn create(
        &self,
        _state: &dyn Session,
        cmd: &CreateExternalTable,
    ) -> Result<Arc<dyn TableProvider>> {
        info!("Creating Zarr table");

        if is_remote_url(&cmd.location) {
            info!("Remote URL detected - using async schema inference");
            let (store, prefix) = create_async_store(&cmd.location)
                .await
                .map_err(|e| DataFusionError::External(Box::new(e)))?;
            debug!("Store created, inferring schema");
            let (schema, metadata) = infer_schema_with_meta_async(&store, &prefix)
                .await
                .map_err(DataFusionError::External)?;
            let schema = Arc::new(schema);
            info!(
                num_fields = schema.fields().len(),
                "Table created successfully (with cached store and metadata)"
            );
            Ok(Arc::new(ZarrTable::with_cached_remote(
                schema,
                &cmd.location,
                store,
                prefix,
                metadata,
            )))
        } else {
            info!("Local path detected - using sync schema inference");
            let (schema, metadata) = infer_schema_with_meta(&cmd.location)?;
            let schema = Arc::new(schema);
            info!(
                num_fields = schema.fields().len(),
                total_rows = metadata.total_rows,
                "Table created successfully (with metadata for statistics)"
            );
            Ok(Arc::new(ZarrTable::with_metadata(
                schema,
                &cmd.location,
                metadata,
            )))
        }
    }
}