Skip to main content

datafusion_python/sql/
logical.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::sync::Arc;
19
20use datafusion::logical_expr::{DdlStatement, LogicalPlan, Statement};
21use datafusion_proto::logical_plan::{AsLogicalPlan, DefaultLogicalExtensionCodec};
22use prost::Message;
23use pyo3::exceptions::PyRuntimeError;
24use pyo3::prelude::*;
25use pyo3::types::PyBytes;
26
27use crate::context::PySessionContext;
28use crate::errors::PyDataFusionResult;
29use crate::expr::aggregate::PyAggregate;
30use crate::expr::analyze::PyAnalyze;
31use crate::expr::copy_to::PyCopyTo;
32use crate::expr::create_catalog::PyCreateCatalog;
33use crate::expr::create_catalog_schema::PyCreateCatalogSchema;
34use crate::expr::create_external_table::PyCreateExternalTable;
35use crate::expr::create_function::PyCreateFunction;
36use crate::expr::create_index::PyCreateIndex;
37use crate::expr::create_memory_table::PyCreateMemoryTable;
38use crate::expr::create_view::PyCreateView;
39use crate::expr::describe_table::PyDescribeTable;
40use crate::expr::distinct::PyDistinct;
41use crate::expr::dml::PyDmlStatement;
42use crate::expr::drop_catalog_schema::PyDropCatalogSchema;
43use crate::expr::drop_function::PyDropFunction;
44use crate::expr::drop_table::PyDropTable;
45use crate::expr::drop_view::PyDropView;
46use crate::expr::empty_relation::PyEmptyRelation;
47use crate::expr::explain::PyExplain;
48use crate::expr::extension::PyExtension;
49use crate::expr::filter::PyFilter;
50use crate::expr::join::PyJoin;
51use crate::expr::limit::PyLimit;
52use crate::expr::logical_node::LogicalNode;
53use crate::expr::projection::PyProjection;
54use crate::expr::recursive_query::PyRecursiveQuery;
55use crate::expr::repartition::PyRepartition;
56use crate::expr::sort::PySort;
57use crate::expr::statement::{
58    PyDeallocate, PyExecute, PyPrepare, PyResetVariable, PySetVariable, PyTransactionEnd,
59    PyTransactionStart,
60};
61use crate::expr::subquery::PySubquery;
62use crate::expr::subquery_alias::PySubqueryAlias;
63use crate::expr::table_scan::PyTableScan;
64use crate::expr::union::PyUnion;
65use crate::expr::unnest::PyUnnest;
66use crate::expr::values::PyValues;
67use crate::expr::window::PyWindowExpr;
68
69#[pyclass(frozen, name = "LogicalPlan", module = "datafusion", subclass, eq)]
70#[derive(Debug, Clone, PartialEq, Eq)]
71pub struct PyLogicalPlan {
72    pub(crate) plan: Arc<LogicalPlan>,
73}
74
75impl PyLogicalPlan {
76    /// creates a new PyLogicalPlan
77    pub fn new(plan: LogicalPlan) -> Self {
78        Self {
79            plan: Arc::new(plan),
80        }
81    }
82
83    pub fn plan(&self) -> Arc<LogicalPlan> {
84        self.plan.clone()
85    }
86}
87
88#[pymethods]
89impl PyLogicalPlan {
90    /// Return the specific logical operator
91    pub fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
92        match self.plan.as_ref() {
93            LogicalPlan::Aggregate(plan) => PyAggregate::from(plan.clone()).to_variant(py),
94            LogicalPlan::Analyze(plan) => PyAnalyze::from(plan.clone()).to_variant(py),
95            LogicalPlan::Distinct(plan) => PyDistinct::from(plan.clone()).to_variant(py),
96            LogicalPlan::EmptyRelation(plan) => PyEmptyRelation::from(plan.clone()).to_variant(py),
97            LogicalPlan::Explain(plan) => PyExplain::from(plan.clone()).to_variant(py),
98            LogicalPlan::Extension(plan) => PyExtension::from(plan.clone()).to_variant(py),
99            LogicalPlan::Filter(plan) => PyFilter::from(plan.clone()).to_variant(py),
100            LogicalPlan::Join(plan) => PyJoin::from(plan.clone()).to_variant(py),
101            LogicalPlan::Limit(plan) => PyLimit::from(plan.clone()).to_variant(py),
102            LogicalPlan::Projection(plan) => PyProjection::from(plan.clone()).to_variant(py),
103            LogicalPlan::Sort(plan) => PySort::from(plan.clone()).to_variant(py),
104            LogicalPlan::TableScan(plan) => PyTableScan::from(plan.clone()).to_variant(py),
105            LogicalPlan::Subquery(plan) => PySubquery::from(plan.clone()).to_variant(py),
106            LogicalPlan::SubqueryAlias(plan) => PySubqueryAlias::from(plan.clone()).to_variant(py),
107            LogicalPlan::Unnest(plan) => PyUnnest::from(plan.clone()).to_variant(py),
108            LogicalPlan::Window(plan) => PyWindowExpr::from(plan.clone()).to_variant(py),
109            LogicalPlan::Repartition(plan) => PyRepartition::from(plan.clone()).to_variant(py),
110            LogicalPlan::Union(plan) => PyUnion::from(plan.clone()).to_variant(py),
111            LogicalPlan::Statement(plan) => match plan {
112                Statement::TransactionStart(plan) => {
113                    PyTransactionStart::from(plan.clone()).to_variant(py)
114                }
115                Statement::TransactionEnd(plan) => {
116                    PyTransactionEnd::from(plan.clone()).to_variant(py)
117                }
118                Statement::SetVariable(plan) => PySetVariable::from(plan.clone()).to_variant(py),
119                Statement::ResetVariable(plan) => {
120                    PyResetVariable::from(plan.clone()).to_variant(py)
121                }
122                Statement::Prepare(plan) => PyPrepare::from(plan.clone()).to_variant(py),
123                Statement::Execute(plan) => PyExecute::from(plan.clone()).to_variant(py),
124                Statement::Deallocate(plan) => PyDeallocate::from(plan.clone()).to_variant(py),
125            },
126            LogicalPlan::Values(plan) => PyValues::from(plan.clone()).to_variant(py),
127            LogicalPlan::Dml(plan) => PyDmlStatement::from(plan.clone()).to_variant(py),
128            LogicalPlan::Ddl(plan) => match plan {
129                DdlStatement::CreateExternalTable(plan) => {
130                    PyCreateExternalTable::from(plan.clone()).to_variant(py)
131                }
132                DdlStatement::CreateMemoryTable(plan) => {
133                    PyCreateMemoryTable::from(plan.clone()).to_variant(py)
134                }
135                DdlStatement::CreateView(plan) => PyCreateView::from(plan.clone()).to_variant(py),
136                DdlStatement::CreateCatalogSchema(plan) => {
137                    PyCreateCatalogSchema::from(plan.clone()).to_variant(py)
138                }
139                DdlStatement::CreateCatalog(plan) => {
140                    PyCreateCatalog::from(plan.clone()).to_variant(py)
141                }
142                DdlStatement::CreateIndex(plan) => PyCreateIndex::from(plan.clone()).to_variant(py),
143                DdlStatement::DropTable(plan) => PyDropTable::from(plan.clone()).to_variant(py),
144                DdlStatement::DropView(plan) => PyDropView::from(plan.clone()).to_variant(py),
145                DdlStatement::DropCatalogSchema(plan) => {
146                    PyDropCatalogSchema::from(plan.clone()).to_variant(py)
147                }
148                DdlStatement::CreateFunction(plan) => {
149                    PyCreateFunction::from(plan.clone()).to_variant(py)
150                }
151                DdlStatement::DropFunction(plan) => {
152                    PyDropFunction::from(plan.clone()).to_variant(py)
153                }
154            },
155            LogicalPlan::Copy(plan) => PyCopyTo::from(plan.clone()).to_variant(py),
156            LogicalPlan::DescribeTable(plan) => PyDescribeTable::from(plan.clone()).to_variant(py),
157            LogicalPlan::RecursiveQuery(plan) => {
158                PyRecursiveQuery::from(plan.clone()).to_variant(py)
159            }
160        }
161    }
162
163    /// Get the inputs to this plan
164    fn inputs(&self) -> Vec<PyLogicalPlan> {
165        let mut inputs = vec![];
166        for input in self.plan.inputs() {
167            inputs.push(input.to_owned().into());
168        }
169        inputs
170    }
171
172    fn __repr__(&self) -> PyResult<String> {
173        Ok(format!("{:?}", self.plan))
174    }
175
176    fn display(&self) -> String {
177        format!("{}", self.plan.display())
178    }
179
180    fn display_indent(&self) -> String {
181        format!("{}", self.plan.display_indent())
182    }
183
184    fn display_indent_schema(&self) -> String {
185        format!("{}", self.plan.display_indent_schema())
186    }
187
188    fn display_graphviz(&self) -> String {
189        format!("{}", self.plan.display_graphviz())
190    }
191
192    pub fn to_proto<'py>(&'py self, py: Python<'py>) -> PyDataFusionResult<Bound<'py, PyBytes>> {
193        let codec = DefaultLogicalExtensionCodec {};
194        let proto =
195            datafusion_proto::protobuf::LogicalPlanNode::try_from_logical_plan(&self.plan, &codec)?;
196
197        let bytes = proto.encode_to_vec();
198        Ok(PyBytes::new(py, &bytes))
199    }
200
201    #[staticmethod]
202    pub fn from_proto(
203        ctx: PySessionContext,
204        proto_msg: Bound<'_, PyBytes>,
205    ) -> PyDataFusionResult<Self> {
206        let bytes: &[u8] = proto_msg.extract()?;
207        let proto_plan =
208            datafusion_proto::protobuf::LogicalPlanNode::decode(bytes).map_err(|e| {
209                PyRuntimeError::new_err(format!(
210                    "Unable to decode logical node from serialized bytes: {e}"
211                ))
212            })?;
213
214        let codec = DefaultLogicalExtensionCodec {};
215        let plan = proto_plan.try_into_logical_plan(&ctx.ctx.task_ctx(), &codec)?;
216        Ok(Self::new(plan))
217    }
218}
219
220impl From<PyLogicalPlan> for LogicalPlan {
221    fn from(logical_plan: PyLogicalPlan) -> LogicalPlan {
222        logical_plan.plan.as_ref().clone()
223    }
224}
225
226impl From<LogicalPlan> for PyLogicalPlan {
227    fn from(logical_plan: LogicalPlan) -> PyLogicalPlan {
228        PyLogicalPlan {
229            plan: Arc::new(logical_plan),
230        }
231    }
232}