zarr_datafusion/datasource/
factory.rs

1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::catalog::{Session, TableProviderFactory};
5use datafusion::common::DataFusionError;
6use datafusion::datasource::TableProvider;
7use datafusion::error::Result;
8use datafusion::logical_expr::CreateExternalTable;
9use tracing::{debug, info, instrument};
10
11use crate::datasource::zarr::ZarrTable;
12use crate::reader::schema_inference::{infer_schema_with_meta, infer_schema_with_meta_async};
13use crate::reader::storage::{create_async_store, is_remote_url};
14
15#[derive(Debug, Default)]
16pub struct ZarrTableFactory;
17
18#[async_trait]
19impl TableProviderFactory for ZarrTableFactory {
20    #[instrument(level = "info", skip_all)]
21    async fn create(
22        &self,
23        _state: &dyn Session,
24        cmd: &CreateExternalTable,
25    ) -> Result<Arc<dyn TableProvider>> {
26        info!("Creating Zarr table");
27
28        if is_remote_url(&cmd.location) {
29            info!("Remote URL detected - using async schema inference");
30            let (store, prefix) = create_async_store(&cmd.location)
31                .await
32                .map_err(|e| DataFusionError::External(Box::new(e)))?;
33            debug!("Store created, inferring schema");
34            let (schema, metadata) = infer_schema_with_meta_async(&store, &prefix)
35                .await
36                .map_err(DataFusionError::External)?;
37            let schema = Arc::new(schema);
38            info!(
39                num_fields = schema.fields().len(),
40                "Table created successfully (with cached store and metadata)"
41            );
42            Ok(Arc::new(ZarrTable::with_cached_remote(
43                schema,
44                &cmd.location,
45                store,
46                prefix,
47                metadata,
48            )))
49        } else {
50            info!("Local path detected - using sync schema inference");
51            let (schema, metadata) = infer_schema_with_meta(&cmd.location)?;
52            let schema = Arc::new(schema);
53            info!(
54                num_fields = schema.fields().len(),
55                total_rows = metadata.total_rows,
56                "Table created successfully (with metadata for statistics)"
57            );
58            Ok(Arc::new(ZarrTable::with_metadata(
59                schema,
60                &cmd.location,
61                metadata,
62            )))
63        }
64    }
65}