Skip to main content

supertable_core/
datafusion.rs

1//! # SuperTable DataFusion Integration
2//!
3//! This module provides the `TableProvider` implementation for DataFusion,
4//! enabling SuperTable to be used as a source for SQL queries and DataFrame operations.
5
6use crate::table::Table;
7use arrow::datatypes::SchemaRef;
8use async_trait::async_trait;
9use datafusion::catalog::Session;
10use datafusion::datasource::TableProvider;
11use datafusion::logical_expr::{Expr, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use std::any::Any;
14use std::sync::Arc;
15
16/// A DataFusion TableProvider for SuperTable.
17#[derive(Debug)]
18pub struct SuperTableProvider {
19    table: Table,
20}
21
22impl SuperTableProvider {
23    pub fn new(table: Table) -> Self {
24        Self { table }
25    }
26}
27
28#[async_trait]
29impl TableProvider for SuperTableProvider {
30    fn as_any(&self) -> &dyn Any {
31        self
32    }
33
34    fn schema(&self) -> SchemaRef {
35        self.table.metadata.current_schema().to_arrow_schema_ref()
36    }
37
38    fn table_type(&self) -> TableType {
39        TableType::Base
40    }
41
42    async fn scan(
43        &self,
44        _state: &dyn Session,
45        _projection: Option<&Vec<usize>>,
46        _filters: &[Expr],
47        _limit: Option<usize>,
48    ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
49        let snapshot = self.table.metadata.current_snapshot().ok_or_else(|| {
50            datafusion::error::DataFusionError::Plan("No current snapshot found".to_string())
51        })?;
52
53        // This is a placeholder. In a production implementation, we would:
54        // 1. Get the list of data files from the snapshot.
55        // 2. Apply partition pruning based on `_filters`.
56        // 3. Apply file pruning based on `_statistics`.
57        // 4. Create a ParquetExec with the filtered files.
58        let _files = snapshot
59            .all_data_files(&self.table.storage)
60            .await
61            .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
62
63        Err(datafusion::error::DataFusionError::NotImplemented(
64            "Physical scan execution requires further integration with DataFusion's ParquetExec"
65                .to_string(),
66        ))
67    }
68}