datafusion_python/sql/
logical.rs1use std::sync::Arc;
19
20use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement};
21use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29use crate::expr::aggregate::PyAggregate;
30use crate::expr::analyze::PyAnalyze;
31use crate::expr::copy_to::PyCopyTo;
32use crate::expr::create_catalog::PyCreateCatalog;
33use crate::expr::create_catalog_schema::PyCreateCatalogSchema;
34use crate::expr::create_external_table::PyCreateExternalTable;
35use crate::expr::create_function::PyCreateFunction;
36use crate::expr::create_index::PyCreateIndex;
37use crate::expr::create_memory_table::PyCreateMemoryTable;
38use crate::expr::create_view::PyCreateView;
39use crate::expr::describe_table::PyDescribeTable;
40use crate::expr::distinct::PyDistinct;
41use crate::expr::dml::PyDmlStatement;
42use crate::expr::drop_catalog_schema::PyDropCatalogSchema;
43use crate::expr::drop_function::PyDropFunction;
44use crate::expr::drop_table::PyDropTable;
45use crate::expr::drop_view::PyDropView;
46use crate::expr::empty_relation::PyEmptyRelation;
47use crate::expr::explain::PyExplain;
48use crate::expr::extension::PyExtension;
49use crate::expr::filter::PyFilter;
50use crate::expr::join::PyJoin;
51use crate::expr::limit::PyLimit;
52use crate::expr::logical_node::LogicalNode;
53use crate::expr::projection::PyProjection;
54use crate::expr::recursive_query::PyRecursiveQuery;
55use crate::expr::repartition::PyRepartition;
56use crate::expr::sort::PySort;
57use crate::expr::statement::{
58 PyDeallocate, PyExecute, PyPrepare, PyResetVariable, PySetVariable, PyTransactionEnd,
59 PyTransactionStart,
60};
61use crate::expr::subquery::PySubquery;
62use crate::expr::subquery_alias::PySubqueryAlias;
63use crate::expr::table_scan::PyTableScan;
64use crate::expr::union::PyUnion;
65use crate::expr::unnest::PyUnnest;
66use crate::expr::values::PyValues;
67use crate::expr::window::PyWindowExpr;
68
69#[pyclass(
70 from_py_object,
71 frozen,
72 name = "LogicalPlan",
73 module = "datafusion",
74 subclass,
75 eq
76)]
77#[derive(Debug, Clone, PartialEq, Eq)]
78pub struct PyLogicalPlan {
79 pub(crate) plan: Arc<LogicalPlan>,
80}
81
82impl PyLogicalPlan {
83 pub fn new(plan: LogicalPlan) -> Self {
85 Self {
86 plan: Arc::new(plan),
87 }
88 }
89
90 pub fn plan(&self) -> Arc<LogicalPlan> {
91 self.plan.clone()
92 }
93}
94
95#[pymethods]
96impl PyLogicalPlan {
97 pub fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
99 match self.plan.as_ref() {
100 LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py),
101 LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py),
102 LogicalPlan::Distinct(plan) => PyDistinct::from(plan.clone()).to_variant(py),
103 LogicalPlan::EmptyRelation(plan) => PyEmptyRelation::from(plan.clone()).to_variant(py),
104 LogicalPlan::Explain(plan) => PyExplain::from(plan.clone()).to_variant(py),
105 LogicalPlan::Extension(plan) => PyExtension::from(plan.clone()).to_variant(py),
106 LogicalPlan::Filter(plan) => PyFilter::from(plan.clone()).to_variant(py),
107 LogicalPlan::Join(plan) => PyJoin::from(plan.clone()).to_variant(py),
108 LogicalPlan::Limit(plan) => PyLimit::from(plan.clone()).to_variant(py),
109 LogicalPlan::Projection(plan) => PyProjection::from(plan.clone()).to_variant(py),
110 LogicalPlan::Sort(plan) => PySort::from(plan.clone()).to_variant(py),
111 LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py),
112 LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
113 LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
114 LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
115 LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py),
116 LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py),
117 LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py),
118 LogicalPlan::Statement(plan) => match plan {
119 Statement::TransactionStart(plan) => {
120 PyTransactionStart::from(plan.clone()).to_variant(py)
121 }
122 Statement::TransactionEnd(plan) => {
123 PyTransactionEnd::from(plan.clone()).to_variant(py)
124 }
125 Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py),
126 Statement::ResetVariable(plan) => {
127 PyResetVariable::from(plan.clone()).to_variant(py)
128 }
129 Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py),
130 Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py),
131 Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py),
132 },
133 LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py),
134 LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py),
135 LogicalPlan::Ddl(plan) => match plan {
136 DdlStatement::CreateExternalTable(plan) => {
137 PyCreateExternalTable::from(plan.clone()).to_variant(py)
138 }
139 DdlStatement::CreateMemoryTable(plan) => {
140 PyCreateMemoryTable::from(plan.clone()).to_variant(py)
141 }
142 DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py),
143 DdlStatement::CreateCatalogSchema(plan) => {
144 PyCreateCatalogSchema::from(plan.clone()).to_variant(py)
145 }
146 DdlStatement::CreateCatalog(plan) => {
147 PyCreateCatalog::from(plan.clone()).to_variant(py)
148 }
149 DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py),
150 DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py),
151 DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py),
152 DdlStatement::DropCatalogSchema(plan) => {
153 PyDropCatalogSchema::from(plan.clone()).to_variant(py)
154 }
155 DdlStatement::CreateFunction(plan) => {
156 PyCreateFunction::from(plan.clone()).to_variant(py)
157 }
158 DdlStatement::DropFunction(plan) => {
159 PyDropFunction::from(plan.clone()).to_variant(py)
160 }
161 },
162 LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py),
163 LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py),
164 LogicalPlan::RecursiveQuery(plan) => {
165 PyRecursiveQuery::from(plan.clone()).to_variant(py)
166 }
167 }
168 }
169
170 fn inputs(&self) -> Vec<PyLogicalPlan> {
172 let mut inputs = vec![];
173 for input in self.plan.inputs() {
174 inputs.push(input.to_owned().into());
175 }
176 inputs
177 }
178
179 fn __repr__(&self) -> PyResult<String> {
180 Ok(format!("{:?}", self.plan))
181 }
182
183 fn display(&self) -> String {
184 format!("{}", self.plan.display())
185 }
186
187 fn display_indent(&self) -> String {
188 format!("{}", self.plan.display_indent())
189 }
190
191 fn display_indent_schema(&self) -> String {
192 format!("{}", self.plan.display_indent_schema())
193 }
194
195 fn display_graphviz(&self) -> String {
196 format!("{}", self.plan.display_graphviz())
197 }
198
199 pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
200 let codec = DefaultLogicalExtensionCodec {};
201 let proto =
202 datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?;
203
204 let bytes = proto.encode_to_vec();
205 Ok(PyBytes::new(py, &bytes))
206 }
207
208 #[staticmethod]
209 pub fn from_proto(
210 ctx: PySessionContext,
211 proto_msg: Bound<'_, PyBytes>,
212 ) -> PyDataFusionResult<Self> {
213 let bytes: &[u8] = proto_msg.extract().map_err(Into::<PyErr>::into)?;
214 let proto_plan =
215 datafusion_proto::protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
216 PyRuntimeError::new_err(format!(
217 "Unable to decode logical node from serialized bytes: {e}"
218 ))
219 })?;
220
221 let codec = DefaultLogicalExtensionCodec {};
222 let plan = proto_plan.try_into_logical_plan(&ctx.ctx.task_ctx(), &codec)?;
223 Ok(Self::new(plan))
224 }
225}
226
227impl From<PyLogicalPlan> for LogicalPlan {
228 fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
229 logical_plan.plan.as_ref().clone()
230 }
231}
232
233impl From<LogicalPlan> for PyLogicalPlan {
234 fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
235 PyLogicalPlan {
236 plan: Arc::new(logical_plan),
237 }
238 }
239}