datafusion_python/
dataset_exec.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
19/// Implements a Datafusion physical ExecutionPlan that delegates to a PyArrow Dataset
20/// This actually performs the projection, filtering and scanning of a Dataset
21use pyo3::prelude::*;
22use pyo3::types::{PyDict, PyIterator, PyList};
23
24use std::any::Any;
25use std::sync::Arc;
26
27use futures::{stream, TryStreamExt};
28
29use datafusion::arrow::datatypes::SchemaRef;
30use datafusion::arrow::error::ArrowError;
31use datafusion::arrow::error::Result as ArrowResult;
32use datafusion::arrow::pyarrow::PyArrowType;
33use datafusion::arrow::record_batch::RecordBatch;
34use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
35use datafusion::execution::context::TaskContext;
36use datafusion::logical_expr::utils::conjunction;
37use datafusion::logical_expr::Expr;
38use datafusion::physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
40use datafusion::physical_plan::{
41    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
42    SendableRecordBatchStream, Statistics,
43};
44
45use crate::errors::PyDataFusionResult;
46use crate::pyarrow_filter_expression::PyArrowFilterExpression;
47
48struct PyArrowBatchesAdapter {
49    batches: Py<PyIterator>,
50}
51
52impl Iterator for PyArrowBatchesAdapter {
53    type Item = ArrowResult<RecordBatch>;
54
55    fn next(&mut self) -> Option<Self::Item> {
56        Python::with_gil(|py| {
57            let mut batches = self.batches.clone_ref(py).into_bound(py);
58            Some(
59                batches
60                    .next()?
61                    .and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
62                    .map_err(|err| ArrowError::ExternalError(Box::new(err))),
63            )
64        })
65    }
66}
67
68// Wraps a pyarrow.dataset.Dataset class and implements a Datafusion ExecutionPlan around it
69#[derive(Debug)]
70pub(crate) struct DatasetExec {
71    dataset: PyObject,
72    schema: SchemaRef,
73    fragments: Py<PyList>,
74    columns: Option<Vec<String>>,
75    filter_expr: Option<PyObject>,
76    projected_statistics: Statistics,
77    plan_properties: datafusion::physical_plan::PlanProperties,
78}
79
80impl DatasetExec {
81    pub fn new(
82        py: Python,
83        dataset: &Bound<'_, PyAny>,
84        projection: Option<Vec<usize>>,
85        filters: &[Expr],
86    ) -> PyDataFusionResult<Self> {
87        let columns: Option<PyDataFusionResult<Vec<String>>> = projection.map(|p| {
88            p.iter()
89                .map(|index| {
90                    let name: String = dataset
91                        .getattr("schema")?
92                        .call_method1("field", (*index,))?
93                        .getattr("name")?
94                        .extract()?;
95                    Ok(name)
96                })
97                .collect()
98        });
99        let columns: Option<Vec<String>> = columns.transpose()?;
100        let filter_expr: Option<PyObject> = conjunction(filters.to_owned())
101            .map(|filters| {
102                PyArrowFilterExpression::try_from(&filters)
103                    .map(|filter_expr| filter_expr.inner().clone_ref(py))
104            })
105            .transpose()?;
106
107        let kwargs = PyDict::new(py);
108
109        kwargs.set_item("columns", columns.clone())?;
110        kwargs.set_item(
111            "filter",
112            filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
113        )?;
114
115        let scanner = dataset.call_method("scanner", (), Some(&kwargs))?;
116
117        let schema = Arc::new(
118            scanner
119                .getattr("projected_schema")?
120                .extract::<PyArrowType<_>>()?
121                .0,
122        );
123
124        let builtins = Python::import(py, "builtins")?;
125        let pylist = builtins.getattr("list")?;
126
127        // Get the fragments or partitions of the dataset
128        let fragments_iterator: Bound<'_, PyAny> = dataset.call_method1(
129            "get_fragments",
130            (filter_expr.as_ref().map(|expr| expr.clone_ref(py)),),
131        )?;
132
133        let fragments_iter = pylist.call1((fragments_iterator,))?;
134        let fragments = fragments_iter.downcast::<PyList>().map_err(PyErr::from)?;
135
136        let projected_statistics = Statistics::new_unknown(&schema);
137        let plan_properties = datafusion::physical_plan::PlanProperties::new(
138            EquivalenceProperties::new(schema.clone()),
139            Partitioning::UnknownPartitioning(fragments.len()),
140            EmissionType::Final,
141            Boundedness::Bounded,
142        );
143
144        Ok(DatasetExec {
145            dataset: dataset.clone().unbind(),
146            schema,
147            fragments: fragments.clone().unbind(),
148            columns,
149            filter_expr,
150            projected_statistics,
151            plan_properties,
152        })
153    }
154}
155
156impl ExecutionPlan for DatasetExec {
157    fn name(&self) -> &str {
158        // [ExecutionPlan::name] docs recommends forwarding to `static_name`
159        Self::static_name()
160    }
161
162    /// Return a reference to Any that can be used for downcasting
163    fn as_any(&self) -> &dyn Any {
164        self
165    }
166
167    /// Get the schema for this execution plan
168    fn schema(&self) -> SchemaRef {
169        self.schema.clone()
170    }
171
172    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
173        // this is a leaf node and has no children
174        vec![]
175    }
176
177    fn with_new_children(
178        self: Arc<Self>,
179        _: Vec<Arc<dyn ExecutionPlan>>,
180    ) -> DFResult<Arc<dyn ExecutionPlan>> {
181        Ok(self)
182    }
183
184    fn execute(
185        &self,
186        partition: usize,
187        context: Arc<TaskContext>,
188    ) -> DFResult<SendableRecordBatchStream> {
189        let batch_size = context.session_config().batch_size();
190        Python::with_gil(|py| {
191            let dataset = self.dataset.bind(py);
192            let fragments = self.fragments.bind(py);
193            let fragment = fragments
194                .get_item(partition)
195                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
196
197            // We need to pass the dataset schema to unify the fragment and dataset schema per PyArrow docs
198            let dataset_schema = dataset
199                .getattr("schema")
200                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
201            let kwargs = PyDict::new(py);
202            kwargs
203                .set_item("columns", self.columns.clone())
204                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
205            kwargs
206                .set_item(
207                    "filter",
208                    self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
209                )
210                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
211            kwargs
212                .set_item("batch_size", batch_size)
213                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
214            let scanner = fragment
215                .call_method("scanner", (dataset_schema,), Some(&kwargs))
216                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
217            let schema: SchemaRef = Arc::new(
218                scanner
219                    .getattr("projected_schema")
220                    .and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
221                    .map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
222            );
223            let record_batches: Bound<'_, PyIterator> = scanner
224                .call_method0("to_batches")
225                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?
226                .try_iter()
227                .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
228
229            let record_batches = PyArrowBatchesAdapter {
230                batches: record_batches.into(),
231            };
232
233            let record_batch_stream = stream::iter(record_batches);
234            let record_batch_stream: SendableRecordBatchStream = Box::pin(
235                RecordBatchStreamAdapter::new(schema, record_batch_stream.map_err(|e| e.into())),
236            );
237            Ok(record_batch_stream)
238        })
239    }
240
241    fn statistics(&self) -> DFResult<Statistics> {
242        Ok(self.projected_statistics.clone())
243    }
244
245    fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
246        &self.plan_properties
247    }
248}
249
250impl ExecutionPlanProperties for DatasetExec {
251    /// Get the output partitioning of this plan
252    fn output_partitioning(&self) -> &Partitioning {
253        self.plan_properties.output_partitioning()
254    }
255
256    fn output_ordering(&self) -> Option<&LexOrdering> {
257        None
258    }
259
260    fn boundedness(&self) -> Boundedness {
261        self.plan_properties.boundedness
262    }
263
264    fn pipeline_behavior(&self) -> EmissionType {
265        self.plan_properties.emission_type
266    }
267
268    fn equivalence_properties(&self) -> &datafusion::physical_expr::EquivalenceProperties {
269        &self.plan_properties.eq_properties
270    }
271}
272
273impl DisplayAs for DatasetExec {
274    fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
275        Python::with_gil(|py| {
276            let number_of_fragments = self.fragments.bind(py).len();
277            match t {
278                DisplayFormatType::Default
279                | DisplayFormatType::Verbose
280                | DisplayFormatType::TreeRender => {
281                    let projected_columns: Vec<String> = self
282                        .schema
283                        .fields()
284                        .iter()
285                        .map(|x| x.name().to_owned())
286                        .collect();
287                    if let Some(filter_expr) = &self.filter_expr {
288                        let filter_expr = filter_expr.bind(py).str().or(Err(std::fmt::Error))?;
289                        write!(
290                            f,
291                            "DatasetExec: number_of_fragments={}, filter_expr={}, projection=[{}]",
292                            number_of_fragments,
293                            filter_expr,
294                            projected_columns.join(", "),
295                        )
296                    } else {
297                        write!(
298                            f,
299                            "DatasetExec: number_of_fragments={}, projection=[{}]",
300                            number_of_fragments,
301                            projected_columns.join(", "),
302                        )
303                    }
304                }
305            }
306        })
307    }
308}