Skip to main content

vortex_datafusion/v2/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`VortexTable`] implements DataFusion's [`TableProvider`] trait, providing a direct
5//! integration between a Vortex [`DataSource`] and DataFusion's query engine.
6
7use std::any::Any;
8use std::fmt;
9use std::sync::Arc;
10
11use arrow_schema::SchemaRef;
12use async_trait::async_trait;
13use datafusion_catalog::Session;
14use datafusion_catalog::TableProvider;
15use datafusion_common::ColumnStatistics;
16use datafusion_common::DataFusionError;
17use datafusion_common::Result as DFResult;
18use datafusion_common::Statistics;
19use datafusion_common::stats::Precision;
20use datafusion_datasource::source::DataSourceExec;
21use datafusion_expr::Expr;
22use datafusion_expr::TableType;
23use datafusion_physical_plan::ExecutionPlan;
24use vortex::scan::DataSourceRef;
25use vortex::session::VortexSession;
26
27use crate::v2::source::VortexDataSource;
28
29/// A DataFusion [`TableProvider`] backed by a Vortex [`DataSourceRef`].
30pub struct VortexTable {
31    data_source: DataSourceRef,
32    session: VortexSession,
33    arrow_schema: SchemaRef,
34}
35
36impl fmt::Debug for VortexTable {
37    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
38        f.debug_struct("VortexTable")
39            .field("schema", &self.arrow_schema)
40            .finish()
41    }
42}
43
44impl VortexTable {
45    /// Creates a new [`VortexTable`] from a Vortex data source and session.
46    ///
47    /// The Arrow schema will be used to emit the correct column names and types to DataFusion.
48    /// The Vortex DType of the data source should be compatible with this Arrow schema.
49    pub fn new(
50        data_source: DataSourceRef,
51        session: VortexSession,
52        arrow_schema: SchemaRef,
53    ) -> Self {
54        Self {
55            data_source,
56            session,
57            arrow_schema,
58        }
59    }
60}
61
62#[async_trait]
63impl TableProvider for VortexTable {
64    fn as_any(&self) -> &dyn Any {
65        self
66    }
67
68    fn schema(&self) -> SchemaRef {
69        Arc::clone(&self.arrow_schema)
70    }
71
72    fn table_type(&self) -> TableType {
73        TableType::Base
74    }
75
76    async fn scan(
77        &self,
78        _state: &dyn Session,
79        projection: Option<&Vec<usize>>,
80        _filters: &[Expr],
81        _limit: Option<usize>,
82    ) -> DFResult<Arc<dyn ExecutionPlan>> {
83        // Construct the physical node representing this table.
84        let data_source =
85            VortexDataSource::builder(Arc::clone(&self.data_source), self.session.clone())
86                .with_arrow_schema(Arc::clone(&self.arrow_schema))
87                // We push down the projection now since it can make building the physical plan a lot
88                // cheaper, e.g. by only computing stats for the projected columns.
89                .with_some_projection(projection.cloned())
90                // We don't push down filters for two reasons:
91                //  1. Vortex requires a physical expression, not logical. DataFusion will try to push
92                //     the physical filters later.
93                //  2. There's nothing useful we can do with filters now to reduce the amount of work
94                //     we have to do.
95                //
96                // We also don't push down the limit for the same reason, there's nothing useful we
97                // can do with it.
98                .build()
99                .await
100                .map_err(|e| DataFusionError::External(Box::new(e)))?;
101
102        Ok(DataSourceExec::from_data_source(data_source))
103    }
104
105    /// Returns statistics for the full table, prior to any projection.
106    ///
107    /// We should not (and actually, cannot) perform I/O here, so the best we can do is return
108    /// cardinality and byte size estimates.
109    ///
110    // NOTE(ngates): it's not obvious these are actually used? I think DataFusion does join
111    //  planning over stats from the physical plan?
112    fn statistics(&self) -> Option<Statistics> {
113        let num_rows = match self.data_source.row_count() {
114            Some(vortex::expr::stats::Precision::Exact(v)) => {
115                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
116            }
117            _ => Precision::Absent,
118        };
119
120        let total_byte_size = match self.data_source.byte_size() {
121            Some(vortex::expr::stats::Precision::Exact(v)) => {
122                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
123            }
124            _ => Precision::Absent,
125        };
126
127        let column_statistics =
128            vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
129
130        Some(Statistics {
131            num_rows,
132            total_byte_size,
133            column_statistics,
134        })
135    }
136}