Skip to main content

vortex_datafusion/v2/
table.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4//! [`VortexTable`] adapts a Vortex [`DataSourceRef`] into a DataFusion
5//! [`TableProvider`].
6//!
7//! [`DataSourceRef`]: vortex::scan::DataSourceRef
8//! [`TableProvider`]: datafusion_catalog::TableProvider
9
10use std::any::Any;
11use std::fmt;
12use std::sync::Arc;
13
14use arrow_schema::SchemaRef;
15use async_trait::async_trait;
16use datafusion_catalog::Session;
17use datafusion_catalog::TableProvider;
18use datafusion_common::ColumnStatistics;
19use datafusion_common::DataFusionError;
20use datafusion_common::Result as DFResult;
21use datafusion_common::Statistics;
22use datafusion_common::stats::Precision;
23use datafusion_datasource::source::DataSourceExec;
24use datafusion_expr::Expr;
25use datafusion_expr::TableType;
26use datafusion_physical_plan::ExecutionPlan;
27use vortex::expr::stats::Precision as VortexPrecision;
28use vortex::scan::DataSourceRef;
29use vortex::session::VortexSession;
30
31use crate::v2::source::VortexDataSource;
32
33/// DataFusion [`TableProvider`] backed by a Vortex
34/// [`DataSourceRef`].
35///
36/// `VortexTable` is the usual entry point into [`crate::v2`] when you want to
37/// register an existing Vortex source with DataFusion.
38///
39/// Use it when another part of the system has already built a Vortex source and
40/// you want to expose that source through a
41/// [`SessionContext`].
42///
43/// `VortexTable` handles the `TableProvider` side of the integration:
44///
45/// - it exposes the table schema and coarse statistics to DataFusion,
46/// - it seeds the initial top-level projection during `scan`,
47/// - it hands execution off to [`VortexDataSource`] for later pushdown and
48///   execution.
49///
50/// # Example
51///
52/// ```no_run
53/// use std::sync::Arc;
54///
55/// use arrow_schema::Schema;
56/// use datafusion::prelude::SessionContext;
57/// use vortex::VortexSessionDefault;
58/// use vortex::scan::DataSourceRef;
59/// use vortex::session::VortexSession;
60/// use vortex_datafusion::v2::VortexTable;
61///
62/// # let data_source: DataSourceRef = todo!();
63/// let table = Arc::new(VortexTable::new(
64///     data_source,
65///     VortexSession::default(),
66///     Arc::new(Schema::empty()),
67/// ));
68///
69/// let ctx = SessionContext::new();
70/// ctx.register_table("vortex_data", table)?;
71/// # Ok::<(), datafusion_common::DataFusionError>(())
72/// ```
73///
74/// [`DataSourceRef`]: vortex::scan::DataSourceRef
75/// [`SessionContext`]: https://docs.rs/datafusion/latest/datafusion/prelude/struct.SessionContext.html
76pub struct VortexTable {
77    data_source: DataSourceRef,
78    session: VortexSession,
79    arrow_schema: SchemaRef,
80}
81
82impl fmt::Debug for VortexTable {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("VortexTable")
85            .field("schema", &self.arrow_schema)
86            .finish()
87    }
88}
89
90impl VortexTable {
91    /// Creates a new [`VortexTable`] from a Vortex data source and session.
92    ///
93    /// The Arrow schema is the schema DataFusion will observe for this table.
94    /// It should be compatible with the Vortex dtype exposed by `data_source`.
95    pub fn new(
96        data_source: DataSourceRef,
97        session: VortexSession,
98        arrow_schema: SchemaRef,
99    ) -> Self {
100        Self {
101            data_source,
102            session,
103            arrow_schema,
104        }
105    }
106}
107
108#[async_trait]
109impl TableProvider for VortexTable {
110    fn as_any(&self) -> &dyn Any {
111        self
112    }
113
114    fn schema(&self) -> SchemaRef {
115        Arc::clone(&self.arrow_schema)
116    }
117
118    fn table_type(&self) -> TableType {
119        TableType::Base
120    }
121
122    async fn scan(
123        &self,
124        _state: &dyn Session,
125        projection: Option<&Vec<usize>>,
126        _filters: &[Expr],
127        _limit: Option<usize>,
128    ) -> DFResult<Arc<dyn ExecutionPlan>> {
129        // Construct the physical node representing this table.
130        let data_source =
131            VortexDataSource::builder(Arc::clone(&self.data_source), self.session.clone())
132                .with_arrow_schema(Arc::clone(&self.arrow_schema))
133                // We push down the projection now since it can make building the physical plan a lot
134                // cheaper, e.g. by only computing stats for the projected columns.
135                .with_some_projection(projection.cloned())
136                // We don't push down filters for two reasons:
137                //  1. Vortex requires a physical expression, not logical. DataFusion will try to push
138                //     the physical filters later.
139                //  2. There's nothing useful we can do with filters now to reduce the amount of work
140                //     we have to do.
141                //
142                // We also don't push down the limit for the same reason, there's nothing useful we
143                // can do with it.
144                .build()
145                .await
146                .map_err(|e| DataFusionError::External(Box::new(e)))?;
147
148        Ok(DataSourceExec::from_data_source(data_source))
149    }
150
151    /// Returns statistics for the full table, prior to any projection.
152    ///
153    /// We should not (and actually, cannot) perform I/O here, so the best we can do is return
154    /// cardinality and byte size estimates.
155    ///
156    // NOTE(ngates): it's not obvious these are actually used? I think DataFusion does join
157    //  planning over stats from the physical plan?
158    fn statistics(&self) -> Option<Statistics> {
159        let num_rows = match self.data_source.row_count() {
160            Some(VortexPrecision::Exact(v)) => {
161                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
162            }
163            _ => Precision::Absent,
164        };
165
166        let total_byte_size = match self.data_source.byte_size() {
167            Some(VortexPrecision::Exact(v)) => {
168                usize::try_from(v).map(Precision::Exact).unwrap_or_default()
169            }
170            _ => Precision::Absent,
171        };
172
173        let column_statistics =
174            vec![ColumnStatistics::new_unknown(); self.arrow_schema.fields.len()];
175
176        Some(Statistics {
177            num_rows,
178            total_byte_size,
179            column_statistics,
180        })
181    }
182}