datafusion_python/
physical_plan.rs1use datafusion::physical_plan::{displayable, ExecutionPlan, ExecutionPlanProperties};
19use datafusion_proto::physical_plan::{AsExecutionPlan, DefaultPhysicalExtensionCodec};
20use prost::Message;
21use std::sync::Arc;
22
23use pyo3::{exceptions::PyRuntimeError, prelude::*, types::PyBytes};
24
25use crate::{context::PySessionContext, errors::PyDataFusionResult};
26
27#[pyclass(name = "ExecutionPlan", module = "datafusion", subclass)]
28#[derive(Debug, Clone)]
29pub struct PyExecutionPlan {
30 pub plan: Arc<dyn ExecutionPlan>,
31}
32
33impl PyExecutionPlan {
34 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
36 Self { plan }
37 }
38}
39
40#[pymethods]
41impl PyExecutionPlan {
42 pub fn children(&self) -> Vec<PyExecutionPlan> {
44 self.plan
45 .children()
46 .iter()
47 .map(|&p| p.to_owned().into())
48 .collect()
49 }
50
51 pub fn display(&self) -> String {
52 let d = displayable(self.plan.as_ref());
53 format!("{}", d.one_line())
54 }
55
56 pub fn display_indent(&self) -> String {
57 let d = displayable(self.plan.as_ref());
58 format!("{}", d.indent(false))
59 }
60
61 pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
62 let codec = DefaultPhysicalExtensionCodec {};
63 let proto = datafusion_proto::protobuf::PhysicalPlanNode::try_from_physical_plan(
64 self.plan.clone(),
65 &codec,
66 )?;
67
68 let bytes = proto.encode_to_vec();
69 Ok(PyBytes::new(py, &bytes))
70 }
71
72 #[staticmethod]
73 pub fn from_proto(
74 ctx: PySessionContext,
75 proto_msg: Bound<'_, PyBytes>,
76 ) -> PyDataFusionResult<Self> {
77 let bytes: &[u8] = proto_msg.extract()?;
78 let proto_plan =
79 datafusion_proto::protobuf::PhysicalPlanNode::decode(bytes).map_err(|e| {
80 PyRuntimeError::new_err(format!(
81 "Unable to decode logical node from serialized bytes: {}",
82 e
83 ))
84 })?;
85
86 let codec = DefaultPhysicalExtensionCodec {};
87 let plan = proto_plan.try_into_physical_plan(&ctx.ctx, &ctx.ctx.runtime_env(), &codec)?;
88 Ok(Self::new(plan))
89 }
90
91 fn __repr__(&self) -> String {
92 self.display_indent()
93 }
94
95 #[getter]
96 pub fn partition_count(&self) -> usize {
97 self.plan.output_partitioning().partition_count()
98 }
99}
100
101impl From<PyExecutionPlan> for Arc<dyn ExecutionPlan> {
102 fn from(plan: PyExecutionPlan) -> Arc<dyn ExecutionPlan> {
103 plan.plan.clone()
104 }
105}
106
107impl From<Arc<dyn ExecutionPlan>> for PyExecutionPlan {
108 fn from(plan: Arc<dyn ExecutionPlan>) -> PyExecutionPlan {
109 PyExecutionPlan { plan: plan.clone() }
110 }
111}