datafusion_python/dataset.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::catalog::Session;
19use pyo3::exceptions::PyValueError;
20/// Implements a Datafusion TableProvider that delegates to a PyArrow Dataset
21/// This allows us to use PyArrow Datasets as Datafusion tables while pushing down projections and filters
22use pyo3::prelude::*;
23use pyo3::types::PyType;
24
25use std::any::Any;
26use std::sync::Arc;
27
28use async_trait::async_trait;
29
30use datafusion::arrow::datatypes::SchemaRef;
31use datafusion::arrow::pyarrow::PyArrowType;
32use datafusion::datasource::{TableProvider, TableType};
33use datafusion::error::{DataFusionError, Result as DFResult};
34use datafusion::logical_expr::Expr;
35use datafusion::logical_expr::TableProviderFilterPushDown;
36use datafusion::physical_plan::ExecutionPlan;
37
38use crate::dataset_exec::DatasetExec;
39use crate::pyarrow_filter_expression::PyArrowFilterExpression;
40
41// Wraps a pyarrow.dataset.Dataset class and implements a Datafusion TableProvider around it
42#[derive(Debug)]
43pub(crate) struct Dataset {
44 dataset: PyObject,
45}
46
47impl Dataset {
48 // Creates a Python PyArrow.Dataset
49 pub fn new(dataset: &Bound<'_, PyAny>, py: Python) -> PyResult<Self> {
50 // Ensure that we were passed an instance of pyarrow.dataset.Dataset
51 let ds = PyModule::import(py, "pyarrow.dataset")?;
52 let ds_attr = ds.getattr("Dataset")?;
53 let ds_type = ds_attr.downcast::<PyType>()?;
54 if dataset.is_instance(ds_type)? {
55 Ok(Dataset {
56 dataset: dataset.clone().unbind(),
57 })
58 } else {
59 Err(PyValueError::new_err(
60 "dataset argument must be a pyarrow.dataset.Dataset object",
61 ))
62 }
63 }
64}
65
66#[async_trait]
67impl TableProvider for Dataset {
68 /// Returns the table provider as [`Any`](std::any::Any) so that it can be
69 /// downcast to a specific implementation.
70 fn as_any(&self) -> &dyn Any {
71 self
72 }
73
74 /// Get a reference to the schema for this table
75 fn schema(&self) -> SchemaRef {
76 Python::with_gil(|py| {
77 let dataset = self.dataset.bind(py);
78 // This can panic but since we checked that self.dataset is a pyarrow.dataset.Dataset it should never
79 Arc::new(
80 dataset
81 .getattr("schema")
82 .unwrap()
83 .extract::<PyArrowType<_>>()
84 .unwrap()
85 .0,
86 )
87 })
88 }
89
90 /// Get the type of this table for metadata/catalog purposes.
91 fn table_type(&self) -> TableType {
92 TableType::Base
93 }
94
95 /// Create an ExecutionPlan that will scan the table.
96 /// The table provider will be usually responsible of grouping
97 /// the source data into partitions that can be efficiently
98 /// parallelized or distributed.
99 async fn scan(
100 &self,
101 _ctx: &dyn Session,
102 projection: Option<&Vec<usize>>,
103 filters: &[Expr],
104 // limit can be used to reduce the amount scanned
105 // from the datasource as a performance optimization.
106 // If set, it contains the amount of rows needed by the `LogicalPlan`,
107 // The datasource should return *at least* this number of rows if available.
108 _limit: Option<usize>,
109 ) -> DFResult<Arc<dyn ExecutionPlan>> {
110 Python::with_gil(|py| {
111 let plan: Arc<dyn ExecutionPlan> = Arc::new(
112 DatasetExec::new(py, self.dataset.bind(py), projection.cloned(), filters)
113 .map_err(|err| DataFusionError::External(Box::new(err)))?,
114 );
115 Ok(plan)
116 })
117 }
118
119 /// Tests whether the table provider can make use of a filter expression
120 /// to optimise data retrieval.
121 fn supports_filters_pushdown(
122 &self,
123 filter: &[&Expr],
124 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
125 filter
126 .iter()
127 .map(|&f| match PyArrowFilterExpression::try_from(f) {
128 Ok(_) => Ok(TableProviderFilterPushDown::Exact),
129 _ => Ok(TableProviderFilterPushDown::Unsupported),
130 })
131 .collect()
132 }
133}