datafusion_python/expr/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::{
19    Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
20    TransactionEnd, TransactionIsolationLevel, TransactionStart,
21};
22use pyo3::{prelude::*, IntoPyObjectExt};
23
24use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
25
26use super::{logical_node::LogicalNode, PyExpr};
27
28#[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)]
29#[derive(Clone)]
30pub struct PyTransactionStart {
31    transaction_start: TransactionStart,
32}
33
34impl From<TransactionStart> for PyTransactionStart {
35    fn from(transaction_start: TransactionStart) -> PyTransactionStart {
36        PyTransactionStart { transaction_start }
37    }
38}
39
40impl TryFrom<PyTransactionStart> for TransactionStart {
41    type Error = PyErr;
42
43    fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
44        Ok(py.transaction_start)
45    }
46}
47
48impl LogicalNode for PyTransactionStart {
49    fn inputs(&self) -> Vec<PyLogicalPlan> {
50        vec![]
51    }
52
53    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
54        self.clone().into_bound_py_any(py)
55    }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
59#[pyclass(eq, eq_int, name = "TransactionAccessMode", module = "datafusion.expr")]
60pub enum PyTransactionAccessMode {
61    ReadOnly,
62    ReadWrite,
63}
64
65impl From<TransactionAccessMode> for PyTransactionAccessMode {
66    fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
67        match access_mode {
68            TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
69            TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
70        }
71    }
72}
73
74impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
75    type Error = PyErr;
76
77    fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
78        match py {
79            PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
80            PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
81        }
82    }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
86#[pyclass(
87    eq,
88    eq_int,
89    name = "TransactionIsolationLevel",
90    module = "datafusion.expr"
91)]
92pub enum PyTransactionIsolationLevel {
93    ReadUncommitted,
94    ReadCommitted,
95    RepeatableRead,
96    Serializable,
97    Snapshot,
98}
99
100impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
101    fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
102        match isolation_level {
103            TransactionIsolationLevel::ReadUncommitted => {
104                PyTransactionIsolationLevel::ReadUncommitted
105            }
106            TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
107            TransactionIsolationLevel::RepeatableRead => {
108                PyTransactionIsolationLevel::RepeatableRead
109            }
110            TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
111            TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
112        }
113    }
114}
115
116impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
117    type Error = PyErr;
118
119    fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
120        match value {
121            PyTransactionIsolationLevel::ReadUncommitted => {
122                Ok(TransactionIsolationLevel::ReadUncommitted)
123            }
124            PyTransactionIsolationLevel::ReadCommitted => {
125                Ok(TransactionIsolationLevel::ReadCommitted)
126            }
127            PyTransactionIsolationLevel::RepeatableRead => {
128                Ok(TransactionIsolationLevel::RepeatableRead)
129            }
130            PyTransactionIsolationLevel::Serializable => {
131                Ok(TransactionIsolationLevel::Serializable)
132            }
133            PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
134        }
135    }
136}
137
138#[pymethods]
139impl PyTransactionStart {
140    #[new]
141    pub fn new(
142        access_mode: PyTransactionAccessMode,
143        isolation_level: PyTransactionIsolationLevel,
144    ) -> PyResult<Self> {
145        let access_mode = access_mode.try_into()?;
146        let isolation_level = isolation_level.try_into()?;
147        Ok(PyTransactionStart {
148            transaction_start: TransactionStart {
149                access_mode,
150                isolation_level,
151            },
152        })
153    }
154
155    pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
156        Ok(self.transaction_start.access_mode.clone().into())
157    }
158
159    pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
160        Ok(self.transaction_start.isolation_level.clone().into())
161    }
162}
163
164#[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)]
165#[derive(Clone)]
166pub struct PyTransactionEnd {
167    transaction_end: TransactionEnd,
168}
169
170impl From<TransactionEnd> for PyTransactionEnd {
171    fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
172        PyTransactionEnd { transaction_end }
173    }
174}
175
176impl TryFrom<PyTransactionEnd> for TransactionEnd {
177    type Error = PyErr;
178
179    fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
180        Ok(py.transaction_end)
181    }
182}
183
184impl LogicalNode for PyTransactionEnd {
185    fn inputs(&self) -> Vec<PyLogicalPlan> {
186        vec![]
187    }
188
189    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
190        self.clone().into_bound_py_any(py)
191    }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
195#[pyclass(eq, eq_int, name = "TransactionConclusion", module = "datafusion.expr")]
196pub enum PyTransactionConclusion {
197    Commit,
198    Rollback,
199}
200
201impl From<TransactionConclusion> for PyTransactionConclusion {
202    fn from(value: TransactionConclusion) -> Self {
203        match value {
204            TransactionConclusion::Commit => PyTransactionConclusion::Commit,
205            TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
206        }
207    }
208}
209
210impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
211    type Error = PyErr;
212
213    fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
214        match value {
215            PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
216            PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
217        }
218    }
219}
220#[pymethods]
221impl PyTransactionEnd {
222    #[new]
223    pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
224        let conclusion = conclusion.try_into()?;
225        Ok(PyTransactionEnd {
226            transaction_end: TransactionEnd { conclusion, chain },
227        })
228    }
229
230    pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
231        Ok(self.transaction_end.conclusion.clone().into())
232    }
233
234    pub fn chain(&self) -> bool {
235        self.transaction_end.chain
236    }
237}
238
239#[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)]
240#[derive(Clone)]
241pub struct PySetVariable {
242    set_variable: SetVariable,
243}
244
245impl From<SetVariable> for PySetVariable {
246    fn from(set_variable: SetVariable) -> PySetVariable {
247        PySetVariable { set_variable }
248    }
249}
250
251impl TryFrom<PySetVariable> for SetVariable {
252    type Error = PyErr;
253
254    fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
255        Ok(py.set_variable)
256    }
257}
258
259impl LogicalNode for PySetVariable {
260    fn inputs(&self) -> Vec<PyLogicalPlan> {
261        vec![]
262    }
263
264    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
265        self.clone().into_bound_py_any(py)
266    }
267}
268
269#[pymethods]
270impl PySetVariable {
271    #[new]
272    pub fn new(variable: String, value: String) -> Self {
273        PySetVariable {
274            set_variable: SetVariable { variable, value },
275        }
276    }
277
278    pub fn variable(&self) -> String {
279        self.set_variable.variable.clone()
280    }
281
282    pub fn value(&self) -> String {
283        self.set_variable.value.clone()
284    }
285}
286
287#[pyclass(name = "Prepare", module = "datafusion.expr", subclass)]
288#[derive(Clone)]
289pub struct PyPrepare {
290    prepare: Prepare,
291}
292
293impl From<Prepare> for PyPrepare {
294    fn from(prepare: Prepare) -> PyPrepare {
295        PyPrepare { prepare }
296    }
297}
298
299impl TryFrom<PyPrepare> for Prepare {
300    type Error = PyErr;
301
302    fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
303        Ok(py.prepare)
304    }
305}
306
307impl LogicalNode for PyPrepare {
308    fn inputs(&self) -> Vec<PyLogicalPlan> {
309        vec![PyLogicalPlan::from((*self.prepare.input).clone())]
310    }
311
312    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
313        self.clone().into_bound_py_any(py)
314    }
315}
316
317#[pymethods]
318impl PyPrepare {
319    #[new]
320    pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
321        let input = input.plan().clone();
322        let data_types = data_types
323            .into_iter()
324            .map(|data_type| data_type.into())
325            .collect();
326        PyPrepare {
327            prepare: Prepare {
328                name,
329                data_types,
330                input,
331            },
332        }
333    }
334
335    pub fn name(&self) -> String {
336        self.prepare.name.clone()
337    }
338
339    pub fn data_types(&self) -> Vec<PyDataType> {
340        self.prepare
341            .data_types
342            .clone()
343            .into_iter()
344            .map(|t| t.into())
345            .collect()
346    }
347
348    pub fn input(&self) -> PyLogicalPlan {
349        PyLogicalPlan {
350            plan: self.prepare.input.clone(),
351        }
352    }
353}
354
355#[pyclass(name = "Execute", module = "datafusion.expr", subclass)]
356#[derive(Clone)]
357pub struct PyExecute {
358    execute: Execute,
359}
360
361impl From<Execute> for PyExecute {
362    fn from(execute: Execute) -> PyExecute {
363        PyExecute { execute }
364    }
365}
366
367impl TryFrom<PyExecute> for Execute {
368    type Error = PyErr;
369
370    fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
371        Ok(py.execute)
372    }
373}
374
375impl LogicalNode for PyExecute {
376    fn inputs(&self) -> Vec<PyLogicalPlan> {
377        vec![]
378    }
379
380    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
381        self.clone().into_bound_py_any(py)
382    }
383}
384
385#[pymethods]
386impl PyExecute {
387    #[new]
388    pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
389        let parameters = parameters
390            .into_iter()
391            .map(|parameter| parameter.into())
392            .collect();
393        PyExecute {
394            execute: Execute { name, parameters },
395        }
396    }
397
398    pub fn name(&self) -> String {
399        self.execute.name.clone()
400    }
401
402    pub fn parameters(&self) -> Vec<PyExpr> {
403        self.execute
404            .parameters
405            .clone()
406            .into_iter()
407            .map(|t| t.into())
408            .collect()
409    }
410}
411
412#[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)]
413#[derive(Clone)]
414pub struct PyDeallocate {
415    deallocate: Deallocate,
416}
417
418impl From<Deallocate> for PyDeallocate {
419    fn from(deallocate: Deallocate) -> PyDeallocate {
420        PyDeallocate { deallocate }
421    }
422}
423
424impl TryFrom<PyDeallocate> for Deallocate {
425    type Error = PyErr;
426
427    fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
428        Ok(py.deallocate)
429    }
430}
431
432impl LogicalNode for PyDeallocate {
433    fn inputs(&self) -> Vec<PyLogicalPlan> {
434        vec![]
435    }
436
437    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
438        self.clone().into_bound_py_any(py)
439    }
440}
441
442#[pymethods]
443impl PyDeallocate {
444    #[new]
445    pub fn new(name: String) -> Self {
446        PyDeallocate {
447            deallocate: Deallocate { name },
448        }
449    }
450
451    pub fn name(&self) -> String {
452        self.deallocate.name.clone()
453    }
454}