datafusion_python/
physical_plan.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
21use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29
30#[pyclass(frozen, name = "ExecutionPlan", module = "datafusion", subclass)]
31#[derive(Debug, Clone)]
32pub struct PyExecutionPlan {
33    pub plan: Arc<dyn ExecutionPlan>,
34}
35
36impl PyExecutionPlan {
37    /// creates a new PyPhysicalPlan
38    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
39        Self { plan }
40    }
41}
42
43#[pymethods]
44impl PyExecutionPlan {
45    /// Get the inputs to this plan
46    pub fn children(&self) -> Vec<PyExecutionPlan> {
47        self.plan
48            .children()
49            .iter()
50            .map(|&p| p.to_owned().into())
51            .collect()
52    }
53
54    pub fn display(&self) -> String {
55        let d = displayable(self.plan.as_ref());
56        format!("{}", d.one_line())
57    }
58
59    pub fn display_indent(&self) -> String {
60        let d = displayable(self.plan.as_ref());
61        format!("{}", d.indent(false))
62    }
63
64    pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
65        let codec = DefaultPhysicalExtensionCodec {};
66        let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
67            self.plan.clone(),
68            &codec,
69        )?;
70
71        let bytes = proto.encode_to_vec();
72        Ok(PyBytes::new(py, &bytes))
73    }
74
75    #[staticmethod]
76    pub fn from_proto(
77        ctx: PySessionContext,
78        proto_msg: Bound<'_, PyBytes>,
79    ) -> PyDataFusionResult<Self> {
80        let bytes: &[u8] = proto_msg.extract()?;
81        let proto_plan =
82            datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
83                PyRuntimeError::new_err(format!(
84                    "Unable to decode logical node from serialized bytes: {e}"
85                ))
86            })?;
87
88        let codec = DefaultPhysicalExtensionCodec {};
89        let plan = proto_plan.try_into_physical_plan(ctx.ctx.task_ctx().as_ref(), &codec)?;
90        Ok(Self::new(plan))
91    }
92
93    fn __repr__(&self) -> String {
94        self.display_indent()
95    }
96
97    #[getter]
98    pub fn partition_count(&self) -> usize {
99        self.plan.output_partitioning().partition_count()
100    }
101}
102
103impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
104    fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
105        plan.plan.clone()
106    }
107}
108
109impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
110    fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
111        PyExecutionPlan { plan: plan.clone() }
112    }
113}