zarr_datafusion/datasource/
factory.rs1use std::sync::Arc;
2
3use async_trait::async_trait;
4use datafusion::catalog::{Session, TableProviderFactory};
5use datafusion::common::DataFusionError;
6use datafusion::datasource::TableProvider;
7use datafusion::error::Result;
8use datafusion::logical_expr::CreateExternalTable;
9use tracing::{debug, info, instrument};
10
11use crate::datasource::zarr::ZarrTable;
12use crate::reader::schema_inference::{infer_schema_with_meta, infer_schema_with_meta_async};
13use crate::reader::storage::{create_async_store, is_remote_url};
14
15#[derive(Debug, Default)]
16pub struct ZarrTableFactory;
17
18#[async_trait]
19impl TableProviderFactory for ZarrTableFactory {
20 #[instrument(level = "info", skip_all)]
21 async fn create(
22 &self,
23 _state: &dyn Session,
24 cmd: &CreateExternalTable,
25 ) -> Result<Arc<dyn TableProvider>> {
26 info!("Creating Zarr table");
27
28 if is_remote_url(&cmd.location) {
29 info!("Remote URL detected - using async schema inference");
30 let (store, prefix) = create_async_store(&cmd.location)
31 .await
32 .map_err(|e| DataFusionError::External(Box::new(e)))?;
33 debug!("Store created, inferring schema");
34 let (schema, metadata) = infer_schema_with_meta_async(&store, &prefix)
35 .await
36 .map_err(DataFusionError::External)?;
37 let schema = Arc::new(schema);
38 info!(
39 num_fields = schema.fields().len(),
40 "Table created successfully (with cached store and metadata)"
41 );
42 Ok(Arc::new(ZarrTable::with_cached_remote(
43 schema,
44 &cmd.location,
45 store,
46 prefix,
47 metadata,
48 )))
49 } else {
50 info!("Local path detected - using sync schema inference");
51 let (schema, metadata) = infer_schema_with_meta(&cmd.location)?;
52 let schema = Arc::new(schema);
53 info!(
54 num_fields = schema.fields().len(),
55 total_rows = metadata.total_rows,
56 "Table created successfully (with metadata for statistics)"
57 );
58 Ok(Arc::new(ZarrTable::with_metadata(
59 schema,
60 &cmd.location,
61 metadata,
62 )))
63 }
64 }
65}