datafusion_python/expr/
statement.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::{
19    Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
20    TransactionEnd, TransactionIsolationLevel, TransactionStart,
21};
22use pyo3::{prelude::*, IntoPyObjectExt};
23
24use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
25
26use super::{logical_node::LogicalNode, PyExpr};
27
28#[pyclass(
29    frozen,
30    name = "TransactionStart",
31    module = "datafusion.expr",
32    subclass
33)]
34#[derive(Clone)]
35pub struct PyTransactionStart {
36    transaction_start: TransactionStart,
37}
38
39impl From<TransactionStart> for PyTransactionStart {
40    fn from(transaction_start: TransactionStart) -> PyTransactionStart {
41        PyTransactionStart { transaction_start }
42    }
43}
44
45impl TryFrom<PyTransactionStart> for TransactionStart {
46    type Error = PyErr;
47
48    fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
49        Ok(py.transaction_start)
50    }
51}
52
53impl LogicalNode for PyTransactionStart {
54    fn inputs(&self) -> Vec<PyLogicalPlan> {
55        vec![]
56    }
57
58    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
59        self.clone().into_bound_py_any(py)
60    }
61}
62
63#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
64#[pyclass(
65    frozen,
66    eq,
67    eq_int,
68    name = "TransactionAccessMode",
69    module = "datafusion.expr"
70)]
71pub enum PyTransactionAccessMode {
72    ReadOnly,
73    ReadWrite,
74}
75
76impl From<TransactionAccessMode> for PyTransactionAccessMode {
77    fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
78        match access_mode {
79            TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
80            TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
81        }
82    }
83}
84
85impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
86    type Error = PyErr;
87
88    fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
89        match py {
90            PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
91            PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
92        }
93    }
94}
95
96#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
97#[pyclass(
98    frozen,
99    eq,
100    eq_int,
101    name = "TransactionIsolationLevel",
102    module = "datafusion.expr"
103)]
104pub enum PyTransactionIsolationLevel {
105    ReadUncommitted,
106    ReadCommitted,
107    RepeatableRead,
108    Serializable,
109    Snapshot,
110}
111
112impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
113    fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
114        match isolation_level {
115            TransactionIsolationLevel::ReadUncommitted => {
116                PyTransactionIsolationLevel::ReadUncommitted
117            }
118            TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
119            TransactionIsolationLevel::RepeatableRead => {
120                PyTransactionIsolationLevel::RepeatableRead
121            }
122            TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
123            TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
124        }
125    }
126}
127
128impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
129    type Error = PyErr;
130
131    fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
132        match value {
133            PyTransactionIsolationLevel::ReadUncommitted => {
134                Ok(TransactionIsolationLevel::ReadUncommitted)
135            }
136            PyTransactionIsolationLevel::ReadCommitted => {
137                Ok(TransactionIsolationLevel::ReadCommitted)
138            }
139            PyTransactionIsolationLevel::RepeatableRead => {
140                Ok(TransactionIsolationLevel::RepeatableRead)
141            }
142            PyTransactionIsolationLevel::Serializable => {
143                Ok(TransactionIsolationLevel::Serializable)
144            }
145            PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
146        }
147    }
148}
149
150#[pymethods]
151impl PyTransactionStart {
152    #[new]
153    pub fn new(
154        access_mode: PyTransactionAccessMode,
155        isolation_level: PyTransactionIsolationLevel,
156    ) -> PyResult<Self> {
157        let access_mode = access_mode.try_into()?;
158        let isolation_level = isolation_level.try_into()?;
159        Ok(PyTransactionStart {
160            transaction_start: TransactionStart {
161                access_mode,
162                isolation_level,
163            },
164        })
165    }
166
167    pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
168        Ok(self.transaction_start.access_mode.clone().into())
169    }
170
171    pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
172        Ok(self.transaction_start.isolation_level.clone().into())
173    }
174}
175
176#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
177#[derive(Clone)]
178pub struct PyTransactionEnd {
179    transaction_end: TransactionEnd,
180}
181
182impl From<TransactionEnd> for PyTransactionEnd {
183    fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
184        PyTransactionEnd { transaction_end }
185    }
186}
187
188impl TryFrom<PyTransactionEnd> for TransactionEnd {
189    type Error = PyErr;
190
191    fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
192        Ok(py.transaction_end)
193    }
194}
195
196impl LogicalNode for PyTransactionEnd {
197    fn inputs(&self) -> Vec<PyLogicalPlan> {
198        vec![]
199    }
200
201    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
202        self.clone().into_bound_py_any(py)
203    }
204}
205
206#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
207#[pyclass(
208    frozen,
209    eq,
210    eq_int,
211    name = "TransactionConclusion",
212    module = "datafusion.expr"
213)]
214pub enum PyTransactionConclusion {
215    Commit,
216    Rollback,
217}
218
219impl From<TransactionConclusion> for PyTransactionConclusion {
220    fn from(value: TransactionConclusion) -> Self {
221        match value {
222            TransactionConclusion::Commit => PyTransactionConclusion::Commit,
223            TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
224        }
225    }
226}
227
228impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
229    type Error = PyErr;
230
231    fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
232        match value {
233            PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
234            PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
235        }
236    }
237}
238#[pymethods]
239impl PyTransactionEnd {
240    #[new]
241    pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
242        let conclusion = conclusion.try_into()?;
243        Ok(PyTransactionEnd {
244            transaction_end: TransactionEnd { conclusion, chain },
245        })
246    }
247
248    pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
249        Ok(self.transaction_end.conclusion.clone().into())
250    }
251
252    pub fn chain(&self) -> bool {
253        self.transaction_end.chain
254    }
255}
256
257#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
258#[derive(Clone)]
259pub struct PySetVariable {
260    set_variable: SetVariable,
261}
262
263impl From<SetVariable> for PySetVariable {
264    fn from(set_variable: SetVariable) -> PySetVariable {
265        PySetVariable { set_variable }
266    }
267}
268
269impl TryFrom<PySetVariable> for SetVariable {
270    type Error = PyErr;
271
272    fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
273        Ok(py.set_variable)
274    }
275}
276
277impl LogicalNode for PySetVariable {
278    fn inputs(&self) -> Vec<PyLogicalPlan> {
279        vec![]
280    }
281
282    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
283        self.clone().into_bound_py_any(py)
284    }
285}
286
287#[pymethods]
288impl PySetVariable {
289    #[new]
290    pub fn new(variable: String, value: String) -> Self {
291        PySetVariable {
292            set_variable: SetVariable { variable, value },
293        }
294    }
295
296    pub fn variable(&self) -> String {
297        self.set_variable.variable.clone()
298    }
299
300    pub fn value(&self) -> String {
301        self.set_variable.value.clone()
302    }
303}
304
305#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
306#[derive(Clone)]
307pub struct PyPrepare {
308    prepare: Prepare,
309}
310
311impl From<Prepare> for PyPrepare {
312    fn from(prepare: Prepare) -> PyPrepare {
313        PyPrepare { prepare }
314    }
315}
316
317impl TryFrom<PyPrepare> for Prepare {
318    type Error = PyErr;
319
320    fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
321        Ok(py.prepare)
322    }
323}
324
325impl LogicalNode for PyPrepare {
326    fn inputs(&self) -> Vec<PyLogicalPlan> {
327        vec![PyLogicalPlan::from((*self.prepare.input).clone())]
328    }
329
330    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
331        self.clone().into_bound_py_any(py)
332    }
333}
334
335#[pymethods]
336impl PyPrepare {
337    #[new]
338    pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
339        let input = input.plan().clone();
340        let data_types = data_types
341            .into_iter()
342            .map(|data_type| data_type.into())
343            .collect();
344        PyPrepare {
345            prepare: Prepare {
346                name,
347                data_types,
348                input,
349            },
350        }
351    }
352
353    pub fn name(&self) -> String {
354        self.prepare.name.clone()
355    }
356
357    pub fn data_types(&self) -> Vec<PyDataType> {
358        self.prepare
359            .data_types
360            .clone()
361            .into_iter()
362            .map(|t| t.into())
363            .collect()
364    }
365
366    pub fn input(&self) -> PyLogicalPlan {
367        PyLogicalPlan {
368            plan: self.prepare.input.clone(),
369        }
370    }
371}
372
373#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
374#[derive(Clone)]
375pub struct PyExecute {
376    execute: Execute,
377}
378
379impl From<Execute> for PyExecute {
380    fn from(execute: Execute) -> PyExecute {
381        PyExecute { execute }
382    }
383}
384
385impl TryFrom<PyExecute> for Execute {
386    type Error = PyErr;
387
388    fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
389        Ok(py.execute)
390    }
391}
392
393impl LogicalNode for PyExecute {
394    fn inputs(&self) -> Vec<PyLogicalPlan> {
395        vec![]
396    }
397
398    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
399        self.clone().into_bound_py_any(py)
400    }
401}
402
403#[pymethods]
404impl PyExecute {
405    #[new]
406    pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
407        let parameters = parameters
408            .into_iter()
409            .map(|parameter| parameter.into())
410            .collect();
411        PyExecute {
412            execute: Execute { name, parameters },
413        }
414    }
415
416    pub fn name(&self) -> String {
417        self.execute.name.clone()
418    }
419
420    pub fn parameters(&self) -> Vec<PyExpr> {
421        self.execute
422            .parameters
423            .clone()
424            .into_iter()
425            .map(|t| t.into())
426            .collect()
427    }
428}
429
430#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
431#[derive(Clone)]
432pub struct PyDeallocate {
433    deallocate: Deallocate,
434}
435
436impl From<Deallocate> for PyDeallocate {
437    fn from(deallocate: Deallocate) -> PyDeallocate {
438        PyDeallocate { deallocate }
439    }
440}
441
442impl TryFrom<PyDeallocate> for Deallocate {
443    type Error = PyErr;
444
445    fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
446        Ok(py.deallocate)
447    }
448}
449
450impl LogicalNode for PyDeallocate {
451    fn inputs(&self) -> Vec<PyLogicalPlan> {
452        vec![]
453    }
454
455    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
456        self.clone().into_bound_py_any(py)
457    }
458}
459
460#[pymethods]
461impl PyDeallocate {
462    #[new]
463    pub fn new(name: String) -> Self {
464        PyDeallocate {
465            deallocate: Deallocate { name },
466        }
467    }
468
469    pub fn name(&self) -> String {
470        self.deallocate.name.clone()
471    }
472}