datafusion_python/
dataset.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::catalog::Session;
19use pyo3::exceptions::PyValueError;
20/// Implements a Datafusion TableProvider that delegates to a PyArrow Dataset
21/// This allows us to use PyArrow Datasets as Datafusion tables while pushing down projections and filters
22use pyo3::prelude::*;
23use pyo3::types::PyType;
24
25use std::any::Any;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29
30use datafusion::arrow::datatypes::SchemaRef;
31use datafusion::arrow::pyarrow::PyArrowType;
32use datafusion::datasource::{TableProvider, TableType};
33use datafusion::error::{DataFusionError, Result as DFResult};
34use datafusion::logical_expr::Expr;
35use datafusion::logical_expr::TableProviderFilterPushDown;
36use datafusion::physical_plan::ExecutionPlan;
37
38use crate::dataset_exec::DatasetExec;
39use crate::pyarrow_filter_expression::PyArrowFilterExpression;
40
41// Wraps a pyarrow.dataset.Dataset class and implements a Datafusion TableProvider around it
42#[derive(Debug)]
43pub(crate) struct Dataset {
44    dataset: PyObject,
45}
46
47impl Dataset {
48    // Creates a Python PyArrow.Dataset
49    pub fn new(dataset: &Bound<'_, PyAny>, py: Python) -> PyResult<Self> {
50        // Ensure that we were passed an instance of pyarrow.dataset.Dataset
51        let ds = PyModule::import(py, "pyarrow.dataset")?;
52        let ds_attr = ds.getattr("Dataset")?;
53        let ds_type = ds_attr.downcast::<PyType>()?;
54        if dataset.is_instance(ds_type)? {
55            Ok(Dataset {
56                dataset: dataset.clone().unbind(),
57            })
58        } else {
59            Err(PyValueError::new_err(
60                "dataset argument must be a pyarrow.dataset.Dataset object",
61            ))
62        }
63    }
64}
65
66#[async_trait]
67impl TableProvider for Dataset {
68    /// Returns the table provider as [`Any`](std::any::Any) so that it can be
69    /// downcast to a specific implementation.
70    fn as_any(&self) -> &dyn Any {
71        self
72    }
73
74    /// Get a reference to the schema for this table
75    fn schema(&self) -> SchemaRef {
76        Python::with_gil(|py| {
77            let dataset = self.dataset.bind(py);
78            // This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
79            Arc::new(
80                dataset
81                    .getattr("schema")
82                    .unwrap()
83                    .extract::<PyArrowType<_>>()
84                    .unwrap()
85                    .0,
86            )
87        })
88    }
89
90    /// Get the type of this table for metadata/catalog purposes.
91    fn table_type(&self) -> TableType {
92        TableType::Base
93    }
94
95    /// Create an ExecutionPlan that will scan the table.
96    /// The table provider will be usually responsible of grouping
97    /// the source data into partitions that can be efficiently
98    /// parallelized or distributed.
99    async fn scan(
100        &self,
101        _ctx: &dyn Session,
102        projection: Option<&Vec<usize>>,
103        filters: &[Expr],
104        // limit can be used to reduce the amount scanned
105        // from the datasource as a performance optimization.
106        // If set, it contains the amount of rows needed by the `LogicalPlan`,
107        // The datasource should return *at least* this number of rows if available.
108        _limit: Option<usize>,
109    ) -> DFResult<Arc<dyn ExecutionPlan>> {
110        Python::with_gil(|py| {
111            let plan: Arc<dyn ExecutionPlan> = Arc::new(
112                DatasetExec::new(py, self.dataset.bind(py), projection.cloned(), filters)
113                    .map_err(|err| DataFusionError::External(Box::new(err)))?,
114            );
115            Ok(plan)
116        })
117    }
118
119    /// Tests whether the table provider can make use of a filter expression
120    /// to optimise data retrieval.
121    fn supports_filters_pushdown(
122        &self,
123        filter: &[&Expr],
124    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
125        filter
126            .iter()
127            .map(|&f| match PyArrowFilterExpression::try_from(f) {
128                Ok(_) => Ok(TableProviderFilterPushDown::Exact),
129                _ => Ok(TableProviderFilterPushDown::Unsupported),
130            })
131            .collect()
132    }
133}