1use datafusion::logical_expr::{
19 Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
20 TransactionEnd, TransactionIsolationLevel, TransactionStart,
21};
22use pyo3::{prelude::*, IntoPyObjectExt};
23
24use crate::{common::data_type::PyDataType, sql::logical::PyLogicalPlan};
25
26use super::{logical_node::LogicalNode, PyExpr};
27
28#[pyclass(name = "TransactionStart", module = "datafusion.expr", subclass)]
29#[derive(Clone)]
30pub struct PyTransactionStart {
31 transaction_start: TransactionStart,
32}
33
34impl From<TransactionStart> for PyTransactionStart {
35 fn from(transaction_start: TransactionStart) -> PyTransactionStart {
36 PyTransactionStart { transaction_start }
37 }
38}
39
40impl TryFrom<PyTransactionStart> for TransactionStart {
41 type Error = PyErr;
42
43 fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
44 Ok(py.transaction_start)
45 }
46}
47
48impl LogicalNode for PyTransactionStart {
49 fn inputs(&self) -> Vec<PyLogicalPlan> {
50 vec![]
51 }
52
53 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
54 self.clone().into_bound_py_any(py)
55 }
56}
57
58#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
59#[pyclass(eq, eq_int, name = "TransactionAccessMode", module = "datafusion.expr")]
60pub enum PyTransactionAccessMode {
61 ReadOnly,
62 ReadWrite,
63}
64
65impl From<TransactionAccessMode> for PyTransactionAccessMode {
66 fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
67 match access_mode {
68 TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
69 TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
70 }
71 }
72}
73
74impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
75 type Error = PyErr;
76
77 fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
78 match py {
79 PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
80 PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
81 }
82 }
83}
84
85#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
86#[pyclass(
87 eq,
88 eq_int,
89 name = "TransactionIsolationLevel",
90 module = "datafusion.expr"
91)]
92pub enum PyTransactionIsolationLevel {
93 ReadUncommitted,
94 ReadCommitted,
95 RepeatableRead,
96 Serializable,
97 Snapshot,
98}
99
100impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
101 fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
102 match isolation_level {
103 TransactionIsolationLevel::ReadUncommitted => {
104 PyTransactionIsolationLevel::ReadUncommitted
105 }
106 TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
107 TransactionIsolationLevel::RepeatableRead => {
108 PyTransactionIsolationLevel::RepeatableRead
109 }
110 TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
111 TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
112 }
113 }
114}
115
116impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
117 type Error = PyErr;
118
119 fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
120 match value {
121 PyTransactionIsolationLevel::ReadUncommitted => {
122 Ok(TransactionIsolationLevel::ReadUncommitted)
123 }
124 PyTransactionIsolationLevel::ReadCommitted => {
125 Ok(TransactionIsolationLevel::ReadCommitted)
126 }
127 PyTransactionIsolationLevel::RepeatableRead => {
128 Ok(TransactionIsolationLevel::RepeatableRead)
129 }
130 PyTransactionIsolationLevel::Serializable => {
131 Ok(TransactionIsolationLevel::Serializable)
132 }
133 PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
134 }
135 }
136}
137
138#[pymethods]
139impl PyTransactionStart {
140 #[new]
141 pub fn new(
142 access_mode: PyTransactionAccessMode,
143 isolation_level: PyTransactionIsolationLevel,
144 ) -> PyResult<Self> {
145 let access_mode = access_mode.try_into()?;
146 let isolation_level = isolation_level.try_into()?;
147 Ok(PyTransactionStart {
148 transaction_start: TransactionStart {
149 access_mode,
150 isolation_level,
151 },
152 })
153 }
154
155 pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
156 Ok(self.transaction_start.access_mode.clone().into())
157 }
158
159 pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
160 Ok(self.transaction_start.isolation_level.clone().into())
161 }
162}
163
164#[pyclass(name = "TransactionEnd", module = "datafusion.expr", subclass)]
165#[derive(Clone)]
166pub struct PyTransactionEnd {
167 transaction_end: TransactionEnd,
168}
169
170impl From<TransactionEnd> for PyTransactionEnd {
171 fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
172 PyTransactionEnd { transaction_end }
173 }
174}
175
176impl TryFrom<PyTransactionEnd> for TransactionEnd {
177 type Error = PyErr;
178
179 fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
180 Ok(py.transaction_end)
181 }
182}
183
184impl LogicalNode for PyTransactionEnd {
185 fn inputs(&self) -> Vec<PyLogicalPlan> {
186 vec![]
187 }
188
189 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
190 self.clone().into_bound_py_any(py)
191 }
192}
193
194#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
195#[pyclass(eq, eq_int, name = "TransactionConclusion", module = "datafusion.expr")]
196pub enum PyTransactionConclusion {
197 Commit,
198 Rollback,
199}
200
201impl From<TransactionConclusion> for PyTransactionConclusion {
202 fn from(value: TransactionConclusion) -> Self {
203 match value {
204 TransactionConclusion::Commit => PyTransactionConclusion::Commit,
205 TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
206 }
207 }
208}
209
210impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
211 type Error = PyErr;
212
213 fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
214 match value {
215 PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
216 PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
217 }
218 }
219}
220#[pymethods]
221impl PyTransactionEnd {
222 #[new]
223 pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
224 let conclusion = conclusion.try_into()?;
225 Ok(PyTransactionEnd {
226 transaction_end: TransactionEnd { conclusion, chain },
227 })
228 }
229
230 pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
231 Ok(self.transaction_end.conclusion.clone().into())
232 }
233
234 pub fn chain(&self) -> bool {
235 self.transaction_end.chain
236 }
237}
238
239#[pyclass(name = "SetVariable", module = "datafusion.expr", subclass)]
240#[derive(Clone)]
241pub struct PySetVariable {
242 set_variable: SetVariable,
243}
244
245impl From<SetVariable> for PySetVariable {
246 fn from(set_variable: SetVariable) -> PySetVariable {
247 PySetVariable { set_variable }
248 }
249}
250
251impl TryFrom<PySetVariable> for SetVariable {
252 type Error = PyErr;
253
254 fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
255 Ok(py.set_variable)
256 }
257}
258
259impl LogicalNode for PySetVariable {
260 fn inputs(&self) -> Vec<PyLogicalPlan> {
261 vec![]
262 }
263
264 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
265 self.clone().into_bound_py_any(py)
266 }
267}
268
269#[pymethods]
270impl PySetVariable {
271 #[new]
272 pub fn new(variable: String, value: String) -> Self {
273 PySetVariable {
274 set_variable: SetVariable { variable, value },
275 }
276 }
277
278 pub fn variable(&self) -> String {
279 self.set_variable.variable.clone()
280 }
281
282 pub fn value(&self) -> String {
283 self.set_variable.value.clone()
284 }
285}
286
287#[pyclass(name = "Prepare", module = "datafusion.expr", subclass)]
288#[derive(Clone)]
289pub struct PyPrepare {
290 prepare: Prepare,
291}
292
293impl From<Prepare> for PyPrepare {
294 fn from(prepare: Prepare) -> PyPrepare {
295 PyPrepare { prepare }
296 }
297}
298
299impl TryFrom<PyPrepare> for Prepare {
300 type Error = PyErr;
301
302 fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
303 Ok(py.prepare)
304 }
305}
306
307impl LogicalNode for PyPrepare {
308 fn inputs(&self) -> Vec<PyLogicalPlan> {
309 vec![PyLogicalPlan::from((*self.prepare.input).clone())]
310 }
311
312 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
313 self.clone().into_bound_py_any(py)
314 }
315}
316
317#[pymethods]
318impl PyPrepare {
319 #[new]
320 pub fn new(name: String, data_types: Vec<PyDataType>, input: PyLogicalPlan) -> Self {
321 let input = input.plan().clone();
322 let data_types = data_types
323 .into_iter()
324 .map(|data_type| data_type.into())
325 .collect();
326 PyPrepare {
327 prepare: Prepare {
328 name,
329 data_types,
330 input,
331 },
332 }
333 }
334
335 pub fn name(&self) -> String {
336 self.prepare.name.clone()
337 }
338
339 pub fn data_types(&self) -> Vec<PyDataType> {
340 self.prepare
341 .data_types
342 .clone()
343 .into_iter()
344 .map(|t| t.into())
345 .collect()
346 }
347
348 pub fn input(&self) -> PyLogicalPlan {
349 PyLogicalPlan {
350 plan: self.prepare.input.clone(),
351 }
352 }
353}
354
355#[pyclass(name = "Execute", module = "datafusion.expr", subclass)]
356#[derive(Clone)]
357pub struct PyExecute {
358 execute: Execute,
359}
360
361impl From<Execute> for PyExecute {
362 fn from(execute: Execute) -> PyExecute {
363 PyExecute { execute }
364 }
365}
366
367impl TryFrom<PyExecute> for Execute {
368 type Error = PyErr;
369
370 fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
371 Ok(py.execute)
372 }
373}
374
375impl LogicalNode for PyExecute {
376 fn inputs(&self) -> Vec<PyLogicalPlan> {
377 vec![]
378 }
379
380 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
381 self.clone().into_bound_py_any(py)
382 }
383}
384
385#[pymethods]
386impl PyExecute {
387 #[new]
388 pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
389 let parameters = parameters
390 .into_iter()
391 .map(|parameter| parameter.into())
392 .collect();
393 PyExecute {
394 execute: Execute { name, parameters },
395 }
396 }
397
398 pub fn name(&self) -> String {
399 self.execute.name.clone()
400 }
401
402 pub fn parameters(&self) -> Vec<PyExpr> {
403 self.execute
404 .parameters
405 .clone()
406 .into_iter()
407 .map(|t| t.into())
408 .collect()
409 }
410}
411
412#[pyclass(name = "Deallocate", module = "datafusion.expr", subclass)]
413#[derive(Clone)]
414pub struct PyDeallocate {
415 deallocate: Deallocate,
416}
417
418impl From<Deallocate> for PyDeallocate {
419 fn from(deallocate: Deallocate) -> PyDeallocate {
420 PyDeallocate { deallocate }
421 }
422}
423
424impl TryFrom<PyDeallocate> for Deallocate {
425 type Error = PyErr;
426
427 fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
428 Ok(py.deallocate)
429 }
430}
431
432impl LogicalNode for PyDeallocate {
433 fn inputs(&self) -> Vec<PyLogicalPlan> {
434 vec![]
435 }
436
437 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
438 self.clone().into_bound_py_any(py)
439 }
440}
441
442#[pymethods]
443impl PyDeallocate {
444 #[new]
445 pub fn new(name: String) -> Self {
446 PyDeallocate {
447 deallocate: Deallocate { name },
448 }
449 }
450
451 pub fn name(&self) -> String {
452 self.deallocate.name.clone()
453 }
454}