Skip to main content

vortex_datafusion/v2/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`VortexTable`] adapts a Vortex [`DataSourceRef`] into a DataFusion
5//! [`TableProvider`].
6//!
7//! [`DataSourceRef`]: vortex::scan::DataSourceRef
8//! [`TableProvider`]: datafusion_catalog::TableProvider
9
10use std::fmt;
11use std::sync::Arc;
12
13use arrow_schema::SchemaRef;
14use async_trait::async_trait;
15use datafusion_catalog::Session;
16use datafusion_catalog::TableProvider;
17use datafusion_common::ColumnStatistics;
18use datafusion_common::DataFusionError;
19use datafusion_common::Result as DFResult;
20use datafusion_common::Statistics;
21use datafusion_common::stats::Precision;
22use datafusion_datasource::source::DataSourceExec;
23use datafusion_expr::Expr;
24use datafusion_expr::TableType;
25use datafusion_physical_plan::ExecutionPlan;
26use vortex::expr::stats::Precision as VortexPrecision;
27use vortex::scan::DataSourceRef;
28use vortex::session::VortexSession;
29
30use crate::v2::source::VortexDataSource;
31
32/// DataFusion [`TableProvider`] backed by a Vortex
33/// [`DataSourceRef`].
34///
35/// `VortexTable` is the usual entry point into [`crate::v2`] when you want to
36/// register an existing Vortex source with DataFusion.
37///
38/// Use it when another part of the system has already built a Vortex source and
39/// you want to expose that source through a
40/// [`SessionContext`].
41///
42/// `VortexTable` handles the `TableProvider` side of the integration:
43///
44/// - it exposes the table schema and coarse statistics to DataFusion,
45/// - it seeds the initial top-level projection during `scan`,
46/// - it hands execution off to [`VortexDataSource`] for later pushdown and
47///   execution.
48///
49/// # Example
50///
51/// ```no_run
52/// use std::sync::Arc;
53///
54/// use arrow_schema::Schema;
55/// use datafusion::prelude::SessionContext;
56/// use vortex::VortexSessionDefault;
57/// use vortex::scan::DataSourceRef;
58/// use vortex::session::VortexSession;
59/// use vortex_datafusion::v2::VortexTable;
60///
61/// # let data_source: DataSourceRef = todo!();
62/// let table = Arc::new(VortexTable::new(
63///     data_source,
64///     VortexSession::default(),
65///     Arc::new(Schema::empty()),
66/// ));
67///
68/// let ctx = SessionContext::new();
69/// ctx.register_table("vortex_data", table)?;
70/// # Ok::<(), datafusion_common::DataFusionError>(())
71/// ```
72///
73/// [`DataSourceRef`]: vortex::scan::DataSourceRef
74/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
75pub struct VortexTable {
76    data_source: DataSourceRef,
77    session: VortexSession,
78    arrow_schema: SchemaRef,
79}
80
81impl fmt::Debug for VortexTable {
82    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
83        f.debug_struct("VortexTable")
84            .field("schema", &self.arrow_schema)
85            .finish()
86    }
87}
88
89impl VortexTable {
90    /// Creates a new [`VortexTable`] from a Vortex data source and session.
91    ///
92    /// The Arrow schema is the schema DataFusion will observe for this table.
93    /// It should be compatible with the Vortex dtype exposed by `data_source`.
94    pub fn new(
95        data_source: DataSourceRef,
96        session: VortexSession,
97        arrow_schema: SchemaRef,
98    ) -> Self {
99        Self {
100            data_source,
101            session,
102            arrow_schema,
103        }
104    }
105}
106
107#[async_trait]
108impl TableProvider for VortexTable {
109    fn schema(&self) -> SchemaRef {
110        Arc::clone(&self.arrow_schema)
111    }
112
113    fn table_type(&self) -> TableType {
114        TableType::Base
115    }
116
117    async fn scan(
118        &self,
119        _state: &dyn Session,
120        projection: Option<&Vec<usize>>,
121        _filters: &[Expr],
122        _limit: Option<usize>,
123    ) -> DFResult<Arc<dyn ExecutionPlan>> {
124        // Construct the physical node representing this table.
125        let data_source =
126            VortexDataSource::builder(Arc::clone(&self.data_source), self.session.clone())
127                .with_arrow_schema(Arc::clone(&self.arrow_schema))
128                // We push down the projection now since it can make building the physical plan a lot
129                // cheaper, e.g. by only computing stats for the projected columns.
130                .with_some_projection(projection.cloned())
131                // We don't push down filters for two reasons:
132                //  1. Vortex requires a physical expression, not logical. DataFusion will try to push
133                //     the physical filters later.
134                //  2. There's nothing useful we can do with filters now to reduce the amount of work
135                //     we have to do.
136                //
137                // We also don't push down the limit for the same reason, there's nothing useful we
138                // can do with it.
139                .build()
140                .await
141                .map_err(|e| DataFusionError::External(Box::new(e)))?;
142
143        Ok(DataSourceExec::from_data_source(data_source))
144    }
145
146    /// Returns statistics for the full table, prior to any projection.
147    ///
148    /// We should not (and actually, cannot) perform I/O here, so the best we can do is return
149    /// cardinality and byte size estimates.
150    ///
151    // NOTE(ngates): it's not obvious these are actually used? I think DataFusion does join
152    //  planning over stats from the physical plan?
153    fn statistics(&self) -> Option<Statistics> {
154        let num_rows = match self.data_source.row_count() {
155            VortexPrecision::Exact(v) => {
156                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
157            }
158            _ => Precision::Absent,
159        };
160
161        let total_byte_size = match self.data_source.byte_size() {
162            VortexPrecision::Exact(v) => {
163                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
164            }
165            _ => Precision::Absent,
166        };
167
168        let column_statistics =
169            vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
170
171        Some(Statistics {
172            num_rows,
173            total_byte_size,
174            column_statistics,
175        })
176    }
177}