supertable_core/
datafusion.rs1use crate::table::Table;
7use arrow::datatypes::SchemaRef;
8use async_trait::async_trait;
9use datafusion::catalog::Session;
10use datafusion::datasource::TableProvider;
11use datafusion::logical_expr::{Expr, TableType};
12use datafusion::physical_plan::ExecutionPlan;
13use std::any::Any;
14use std::sync::Arc;
15
16#[derive(Debug)]
18pub struct SuperTableProvider {
19 table: Table,
20}
21
22impl SuperTableProvider {
23 pub fn new(table: Table) -> Self {
24 Self { table }
25 }
26}
27
28#[async_trait]
29impl TableProvider for SuperTableProvider {
30 fn as_any(&self) -> &dyn Any {
31 self
32 }
33
34 fn schema(&self) -> SchemaRef {
35 self.table.metadata.current_schema().to_arrow_schema_ref()
36 }
37
38 fn table_type(&self) -> TableType {
39 TableType::Base
40 }
41
42 async fn scan(
43 &self,
44 _state: &dyn Session,
45 _projection: Option<&Vec<usize>>,
46 _filters: &[Expr],
47 _limit: Option<usize>,
48 ) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
49 let snapshot = self.table.metadata.current_snapshot().ok_or_else(|| {
50 datafusion::error::DataFusionError::Plan("No current snapshot found".to_string())
51 })?;
52
53 let _files = snapshot
59 .all_data_files(&self.table.storage)
60 .await
61 .map_err(|e| datafusion::error::DataFusionError::External(e.into()))?;
62
63 Err(datafusion::error::DataFusionError::NotImplemented(
64 "Physical scan execution requires further integration with DataFusion's ParquetExec"
65 .to_string(),
66 ))
67 }
68}