Skip to main content

datafusion_python/
physical_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties, displayable};
21use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29
30#[pyclass(
31    from_py_object,
32    frozen,
33    name = "ExecutionPlan",
34    module = "datafusion",
35    subclass
36)]
37#[derive(Debug, Clone)]
38pub struct PyExecutionPlan {
39    pub plan: Arc<dyn ExecutionPlan>,
40}
41
42impl PyExecutionPlan {
43    /// creates a new PyPhysicalPlan
44    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
45        Self { plan }
46    }
47}
48
49#[pymethods]
50impl PyExecutionPlan {
51    /// Get the inputs to this plan
52    pub fn children(&self) -> Vec<PyExecutionPlan> {
53        self.plan
54            .children()
55            .iter()
56            .map(|&p| p.to_owned().into())
57            .collect()
58    }
59
60    pub fn display(&self) -> String {
61        let d = displayable(self.plan.as_ref());
62        format!("{}", d.one_line())
63    }
64
65    pub fn display_indent(&self) -> String {
66        let d = displayable(self.plan.as_ref());
67        format!("{}", d.indent(false))
68    }
69
70    pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
71        let codec = DefaultPhysicalExtensionCodec {};
72        let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
73            self.plan.clone(),
74            &codec,
75        )?;
76
77        let bytes = proto.encode_to_vec();
78        Ok(PyBytes::new(py, &bytes))
79    }
80
81    #[staticmethod]
82    pub fn from_proto(
83        ctx: PySessionContext,
84        proto_msg: Bound<'_, PyBytes>,
85    ) -> PyDataFusionResult<Self> {
86        let bytes: &[u8] = proto_msg.extract().map_err(Into::<PyErr>::into)?;
87        let proto_plan =
88            datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
89                PyRuntimeError::new_err(format!(
90                    "Unable to decode logical node from serialized bytes: {e}"
91                ))
92            })?;
93
94        let codec = DefaultPhysicalExtensionCodec {};
95        let plan = proto_plan.try_into_physical_plan(ctx.ctx.task_ctx().as_ref(), &codec)?;
96        Ok(Self::new(plan))
97    }
98
99    fn __repr__(&self) -> String {
100        self.display_indent()
101    }
102
103    #[getter]
104    pub fn partition_count(&self) -> usize {
105        self.plan.output_partitioning().partition_count()
106    }
107}
108
109impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
110    fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
111        plan.plan.clone()
112    }
113}
114
115impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
116    fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
117        PyExecutionPlan { plan: plan.clone() }
118    }
119}