datafusion_python/
dataset_exec.rs1use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
19use pyo3::prelude::*;
22use pyo3::types::{PyDict, PyIterator, PyList};
23
24use std::any::Any;
25use std::sync::Arc;
26
27use futures::{stream, TryStreamExt};
28
29use datafusion::arrow::datatypes::SchemaRef;
30use datafusion::arrow::error::ArrowError;
31use datafusion::arrow::error::Result as ArrowResult;
32use datafusion::arrow::pyarrow::PyArrowType;
33use datafusion::arrow::record_batch::RecordBatch;
34use datafusion::error::{DataFusionError as InnerDataFusionError, Result as DFResult};
35use datafusion::execution::context::TaskContext;
36use datafusion::logical_expr::utils::conjunction;
37use datafusion::logical_expr::Expr;
38use datafusion::physical_expr::{EquivalenceProperties, LexOrdering};
39use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
40use datafusion::physical_plan::{
41 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, Partitioning,
42 SendableRecordBatchStream, Statistics,
43};
44
45use crate::errors::PyDataFusionResult;
46use crate::pyarrow_filter_expression::PyArrowFilterExpression;
47
48struct PyArrowBatchesAdapter {
49 batches: Py<PyIterator>,
50}
51
52impl Iterator for PyArrowBatchesAdapter {
53 type Item = ArrowResult<RecordBatch>;
54
55 fn next(&mut self) -> Option<Self::Item> {
56 Python::with_gil(|py| {
57 let mut batches = self.batches.clone_ref(py).into_bound(py);
58 Some(
59 batches
60 .next()?
61 .and_then(|batch| Ok(batch.extract::<PyArrowType<_>>()?.0))
62 .map_err(|err| ArrowError::ExternalError(Box::new(err))),
63 )
64 })
65 }
66}
67
68#[derive(Debug)]
70pub(crate) struct DatasetExec {
71 dataset: PyObject,
72 schema: SchemaRef,
73 fragments: Py<PyList>,
74 columns: Option<Vec<String>>,
75 filter_expr: Option<PyObject>,
76 projected_statistics: Statistics,
77 plan_properties: datafusion::physical_plan::PlanProperties,
78}
79
80impl DatasetExec {
81 pub fn new(
82 py: Python,
83 dataset: &Bound<'_, PyAny>,
84 projection: Option<Vec<usize>>,
85 filters: &[Expr],
86 ) -> PyDataFusionResult<Self> {
87 let columns: Option<PyDataFusionResult<Vec<String>>> = projection.map(|p| {
88 p.iter()
89 .map(|index| {
90 let name: String = dataset
91 .getattr("schema")?
92 .call_method1("field", (*index,))?
93 .getattr("name")?
94 .extract()?;
95 Ok(name)
96 })
97 .collect()
98 });
99 let columns: Option<Vec<String>> = columns.transpose()?;
100 let filter_expr: Option<PyObject> = conjunction(filters.to_owned())
101 .map(|filters| {
102 PyArrowFilterExpression::try_from(&filters)
103 .map(|filter_expr| filter_expr.inner().clone_ref(py))
104 })
105 .transpose()?;
106
107 let kwargs = PyDict::new(py);
108
109 kwargs.set_item("columns", columns.clone())?;
110 kwargs.set_item(
111 "filter",
112 filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
113 )?;
114
115 let scanner = dataset.call_method("scanner", (), Some(&kwargs))?;
116
117 let schema = Arc::new(
118 scanner
119 .getattr("projected_schema")?
120 .extract::<PyArrowType<_>>()?
121 .0,
122 );
123
124 let builtins = Python::import(py, "builtins")?;
125 let pylist = builtins.getattr("list")?;
126
127 let fragments_iterator: Bound<'_, PyAny> = dataset.call_method1(
129 "get_fragments",
130 (filter_expr.as_ref().map(|expr| expr.clone_ref(py)),),
131 )?;
132
133 let fragments_iter = pylist.call1((fragments_iterator,))?;
134 let fragments = fragments_iter.downcast::<PyList>().map_err(PyErr::from)?;
135
136 let projected_statistics = Statistics::new_unknown(&schema);
137 let plan_properties = datafusion::physical_plan::PlanProperties::new(
138 EquivalenceProperties::new(schema.clone()),
139 Partitioning::UnknownPartitioning(fragments.len()),
140 EmissionType::Final,
141 Boundedness::Bounded,
142 );
143
144 Ok(DatasetExec {
145 dataset: dataset.clone().unbind(),
146 schema,
147 fragments: fragments.clone().unbind(),
148 columns,
149 filter_expr,
150 projected_statistics,
151 plan_properties,
152 })
153 }
154}
155
156impl ExecutionPlan for DatasetExec {
157 fn name(&self) -> &str {
158 Self::static_name()
160 }
161
162 fn as_any(&self) -> &dyn Any {
164 self
165 }
166
167 fn schema(&self) -> SchemaRef {
169 self.schema.clone()
170 }
171
172 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
173 vec![]
175 }
176
177 fn with_new_children(
178 self: Arc<Self>,
179 _: Vec<Arc<dyn ExecutionPlan>>,
180 ) -> DFResult<Arc<dyn ExecutionPlan>> {
181 Ok(self)
182 }
183
184 fn execute(
185 &self,
186 partition: usize,
187 context: Arc<TaskContext>,
188 ) -> DFResult<SendableRecordBatchStream> {
189 let batch_size = context.session_config().batch_size();
190 Python::with_gil(|py| {
191 let dataset = self.dataset.bind(py);
192 let fragments = self.fragments.bind(py);
193 let fragment = fragments
194 .get_item(partition)
195 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
196
197 let dataset_schema = dataset
199 .getattr("schema")
200 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
201 let kwargs = PyDict::new(py);
202 kwargs
203 .set_item("columns", self.columns.clone())
204 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
205 kwargs
206 .set_item(
207 "filter",
208 self.filter_expr.as_ref().map(|expr| expr.clone_ref(py)),
209 )
210 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
211 kwargs
212 .set_item("batch_size", batch_size)
213 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
214 let scanner = fragment
215 .call_method("scanner", (dataset_schema,), Some(&kwargs))
216 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
217 let schema: SchemaRef = Arc::new(
218 scanner
219 .getattr("projected_schema")
220 .and_then(|schema| Ok(schema.extract::<PyArrowType<_>>()?.0))
221 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?,
222 );
223 let record_batches: Bound<'_, PyIterator> = scanner
224 .call_method0("to_batches")
225 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?
226 .try_iter()
227 .map_err(|err| InnerDataFusionError::External(Box::new(err)))?;
228
229 let record_batches = PyArrowBatchesAdapter {
230 batches: record_batches.into(),
231 };
232
233 let record_batch_stream = stream::iter(record_batches);
234 let record_batch_stream: SendableRecordBatchStream = Box::pin(
235 RecordBatchStreamAdapter::new(schema, record_batch_stream.map_err(|e| e.into())),
236 );
237 Ok(record_batch_stream)
238 })
239 }
240
241 fn statistics(&self) -> DFResult<Statistics> {
242 Ok(self.projected_statistics.clone())
243 }
244
245 fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
246 &self.plan_properties
247 }
248}
249
250impl ExecutionPlanProperties for DatasetExec {
251 fn output_partitioning(&self) -> &Partitioning {
253 self.plan_properties.output_partitioning()
254 }
255
256 fn output_ordering(&self) -> Option<&LexOrdering> {
257 None
258 }
259
260 fn boundedness(&self) -> Boundedness {
261 self.plan_properties.boundedness
262 }
263
264 fn pipeline_behavior(&self) -> EmissionType {
265 self.plan_properties.emission_type
266 }
267
268 fn equivalence_properties(&self) -> &datafusion::physical_expr::EquivalenceProperties {
269 &self.plan_properties.eq_properties
270 }
271}
272
273impl DisplayAs for DatasetExec {
274 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
275 Python::with_gil(|py| {
276 let number_of_fragments = self.fragments.bind(py).len();
277 match t {
278 DisplayFormatType::Default
279 | DisplayFormatType::Verbose
280 | DisplayFormatType::TreeRender => {
281 let projected_columns: Vec<String> = self
282 .schema
283 .fields()
284 .iter()
285 .map(|x| x.name().to_owned())
286 .collect();
287 if let Some(filter_expr) = &self.filter_expr {
288 let filter_expr = filter_expr.bind(py).str().or(Err(std::fmt::Error))?;
289 write!(
290 f,
291 "DatasetExec: number_of_fragments={}, filter_expr={}, projection=[{}]",
292 number_of_fragments,
293 filter_expr,
294 projected_columns.join(", "),
295 )
296 } else {
297 write!(
298 f,
299 "DatasetExec: number_of_fragments={}, projection=[{}]",
300 number_of_fragments,
301 projected_columns.join(", "),
302 )
303 }
304 }
305 }
306 })
307 }
308}