Skip to main content

vortex_datafusion/v2/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`VortexTable`] implements DataFusion's [`TableProvider`] trait, providing a direct
5//! integration between a Vortex [`DataSource`] and DataFusion's query engine.
6
7use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion_catalog::Session;
14use datafusion_catalog::TableProvider;
15use datafusion_common::ColumnStatistics;
16use datafusion_common::DataFusionError;
17use datafusion_common::Result as DFResult;
18use datafusion_common::Statistics;
19use datafusion_common::stats::Precision;
20use datafusion_datasource::source::DataSourceExec;
21use datafusion_expr::Expr;
22use datafusion_expr::TableType;
23use datafusion_physical_plan::ExecutionPlan;
24use vortex::scan::api::DataSourceRef;
25use vortex::session::VortexSession;
26
27use crate::v2::source::VortexDataSource;
28
29/// A DataFusion [`TableProvider`] backed by a Vortex [`DataSourceRef`].
30pub struct VortexTable {
31    data_source: DataSourceRef,
32    session: VortexSession,
33    arrow_schema: SchemaRef,
34}
35
36impl fmt::Debug for VortexTable {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("VortexTable")
39            .field("schema", &self.arrow_schema)
40            .finish()
41    }
42}
43
44impl VortexTable {
45    /// Creates a new [`VortexTable`] from a Vortex data source and session.
46    ///
47    /// The Arrow schema will be used to emit the correct column names and types to DataFusion.
48    /// The Vortex DType of the data source should be compatible with this Arrow schema.
49    pub fn new(
50        data_source: DataSourceRef,
51        session: VortexSession,
52        arrow_schema: SchemaRef,
53    ) -> Self {
54        Self {
55            data_source,
56            session,
57            arrow_schema,
58        }
59    }
60}
61
62#[async_trait]
63impl TableProvider for VortexTable {
64    fn as_any(&self) -> &dyn Any {
65        self
66    }
67
68    fn schema(&self) -> SchemaRef {
69        self.arrow_schema.clone()
70    }
71
72    fn table_type(&self) -> TableType {
73        TableType::Base
74    }
75
76    async fn scan(
77        &self,
78        _state: &dyn Session,
79        projection: Option<&Vec<usize>>,
80        _filters: &[Expr],
81        _limit: Option<usize>,
82    ) -> DFResult<Arc<dyn ExecutionPlan>> {
83        // Construct the physical node representing this table.
84        let data_source = VortexDataSource::builder(self.data_source.clone(), self.session.clone())
85            .with_arrow_schema(self.arrow_schema.clone())
86            // We push down the projection now since it can make building the physical plan a lot
87            // cheaper, e.g. by only computing stats for the projected columns.
88            .with_some_projection(projection.cloned())
89            // We don't push down filters for two reasons:
90            //  1. Vortex requires a physical expression, not logical. DataFusion will try to push
91            //     the physical filters later.
92            //  2. There's nothing useful we can do with filters now to reduce the amount of work
93            //     we have to do.
94            //
95            // We also don't push down the limit for the same reason, there's nothing useful we
96            // can do with it.
97            .build()
98            .await
99            .map_err(|e| DataFusionError::External(Box::new(e)))?;
100
101        Ok(DataSourceExec::from_data_source(data_source))
102    }
103
104    /// Returns statistics for the full table, prior to any projection.
105    ///
106    /// We should not (and actually, cannot) perform I/O here, so the best we can do is return
107    /// cardinality and byte size estimates.
108    ///
109    // NOTE(ngates): it's not obvious these are actually used? I think DataFusion does join
110    //  planning over stats from the physical plan?
111    fn statistics(&self) -> Option<Statistics> {
112        let num_rows = match self.data_source.row_count() {
113            Some(vortex::expr::stats::Precision::Exact(v)) => {
114                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
115            }
116            _ => Precision::Absent,
117        };
118
119        let total_byte_size = match self.data_source.byte_size() {
120            Some(vortex::expr::stats::Precision::Exact(v)) => {
121                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
122            }
123            _ => Precision::Absent,
124        };
125
126        let column_statistics =
127            vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
128
129        Some(Statistics {
130            num_rows,
131            total_byte_size,
132            column_statistics,
133        })
134    }
135}