1use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23 Deallocate, Execute, Prepare, ResetVariable, SetVariable, TransactionAccessMode,
24 TransactionConclusion, TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::IntoPyObjectExt;
27use pyo3::prelude::*;
28
29use super::PyExpr;
30use super::logical_node::LogicalNode;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34 from_py_object,
35 frozen,
36 name = "TransactionStart",
37 module = "datafusion.expr",
38 subclass
39)]
40#[derive(Clone)]
41pub struct PyTransactionStart {
42 transaction_start: TransactionStart,
43}
44
45impl From<TransactionStart> for PyTransactionStart {
46 fn from(transaction_start: TransactionStart) -> PyTransactionStart {
47 PyTransactionStart { transaction_start }
48 }
49}
50
51impl TryFrom<PyTransactionStart> for TransactionStart {
52 type Error = PyErr;
53
54 fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
55 Ok(py.transaction_start)
56 }
57}
58
59impl LogicalNode for PyTransactionStart {
60 fn inputs(&self) -> Vec<PyLogicalPlan> {
61 vec![]
62 }
63
64 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
65 self.clone().into_bound_py_any(py)
66 }
67}
68
69#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
70#[pyclass(
71 from_py_object,
72 frozen,
73 eq,
74 eq_int,
75 name = "TransactionAccessMode",
76 module = "datafusion.expr"
77)]
78pub enum PyTransactionAccessMode {
79 ReadOnly,
80 ReadWrite,
81}
82
83impl From<TransactionAccessMode> for PyTransactionAccessMode {
84 fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
85 match access_mode {
86 TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
87 TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
88 }
89 }
90}
91
92impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
93 type Error = PyErr;
94
95 fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
96 match py {
97 PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
98 PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
99 }
100 }
101}
102
103#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
104#[pyclass(
105 from_py_object,
106 frozen,
107 eq,
108 eq_int,
109 name = "TransactionIsolationLevel",
110 module = "datafusion.expr"
111)]
112pub enum PyTransactionIsolationLevel {
113 ReadUncommitted,
114 ReadCommitted,
115 RepeatableRead,
116 Serializable,
117 Snapshot,
118}
119
120impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
121 fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
122 match isolation_level {
123 TransactionIsolationLevel::ReadUncommitted => {
124 PyTransactionIsolationLevel::ReadUncommitted
125 }
126 TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
127 TransactionIsolationLevel::RepeatableRead => {
128 PyTransactionIsolationLevel::RepeatableRead
129 }
130 TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
131 TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
132 }
133 }
134}
135
136impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
137 type Error = PyErr;
138
139 fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
140 match value {
141 PyTransactionIsolationLevel::ReadUncommitted => {
142 Ok(TransactionIsolationLevel::ReadUncommitted)
143 }
144 PyTransactionIsolationLevel::ReadCommitted => {
145 Ok(TransactionIsolationLevel::ReadCommitted)
146 }
147 PyTransactionIsolationLevel::RepeatableRead => {
148 Ok(TransactionIsolationLevel::RepeatableRead)
149 }
150 PyTransactionIsolationLevel::Serializable => {
151 Ok(TransactionIsolationLevel::Serializable)
152 }
153 PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
154 }
155 }
156}
157
158#[pymethods]
159impl PyTransactionStart {
160 #[new]
161 pub fn new(
162 access_mode: PyTransactionAccessMode,
163 isolation_level: PyTransactionIsolationLevel,
164 ) -> PyResult<Self> {
165 let access_mode = access_mode.try_into()?;
166 let isolation_level = isolation_level.try_into()?;
167 Ok(PyTransactionStart {
168 transaction_start: TransactionStart {
169 access_mode,
170 isolation_level,
171 },
172 })
173 }
174
175 pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
176 Ok(self.transaction_start.access_mode.clone().into())
177 }
178
179 pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
180 Ok(self.transaction_start.isolation_level.clone().into())
181 }
182}
183
184#[pyclass(
185 from_py_object,
186 frozen,
187 name = "TransactionEnd",
188 module = "datafusion.expr",
189 subclass
190)]
191#[derive(Clone)]
192pub struct PyTransactionEnd {
193 transaction_end: TransactionEnd,
194}
195
196impl From<TransactionEnd> for PyTransactionEnd {
197 fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
198 PyTransactionEnd { transaction_end }
199 }
200}
201
202impl TryFrom<PyTransactionEnd> for TransactionEnd {
203 type Error = PyErr;
204
205 fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
206 Ok(py.transaction_end)
207 }
208}
209
210impl LogicalNode for PyTransactionEnd {
211 fn inputs(&self) -> Vec<PyLogicalPlan> {
212 vec![]
213 }
214
215 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
216 self.clone().into_bound_py_any(py)
217 }
218}
219
220#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
221#[pyclass(
222 from_py_object,
223 frozen,
224 eq,
225 eq_int,
226 name = "TransactionConclusion",
227 module = "datafusion.expr"
228)]
229pub enum PyTransactionConclusion {
230 Commit,
231 Rollback,
232}
233
234impl From<TransactionConclusion> for PyTransactionConclusion {
235 fn from(value: TransactionConclusion) -> Self {
236 match value {
237 TransactionConclusion::Commit => PyTransactionConclusion::Commit,
238 TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
239 }
240 }
241}
242
243impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
244 type Error = PyErr;
245
246 fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
247 match value {
248 PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
249 PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
250 }
251 }
252}
253#[pymethods]
254impl PyTransactionEnd {
255 #[new]
256 pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
257 let conclusion = conclusion.try_into()?;
258 Ok(PyTransactionEnd {
259 transaction_end: TransactionEnd { conclusion, chain },
260 })
261 }
262
263 pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
264 Ok(self.transaction_end.conclusion.clone().into())
265 }
266
267 pub fn chain(&self) -> bool {
268 self.transaction_end.chain
269 }
270}
271
272#[pyclass(
273 from_py_object,
274 frozen,
275 name = "ResetVariable",
276 module = "datafusion.expr",
277 subclass
278)]
279#[derive(Clone)]
280pub struct PyResetVariable {
281 reset_variable: ResetVariable,
282}
283
284impl From<ResetVariable> for PyResetVariable {
285 fn from(reset_variable: ResetVariable) -> PyResetVariable {
286 PyResetVariable { reset_variable }
287 }
288}
289
290impl TryFrom<PyResetVariable> for ResetVariable {
291 type Error = PyErr;
292
293 fn try_from(py: PyResetVariable) -> Result<Self, Self::Error> {
294 Ok(py.reset_variable)
295 }
296}
297
298impl LogicalNode for PyResetVariable {
299 fn inputs(&self) -> Vec<PyLogicalPlan> {
300 vec![]
301 }
302
303 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
304 self.clone().into_bound_py_any(py)
305 }
306}
307
308#[pymethods]
309impl PyResetVariable {
310 #[new]
311 pub fn new(variable: String) -> Self {
312 PyResetVariable {
313 reset_variable: ResetVariable { variable },
314 }
315 }
316
317 pub fn variable(&self) -> String {
318 self.reset_variable.variable.clone()
319 }
320}
321
322#[pyclass(
323 from_py_object,
324 frozen,
325 name = "SetVariable",
326 module = "datafusion.expr",
327 subclass
328)]
329#[derive(Clone)]
330pub struct PySetVariable {
331 set_variable: SetVariable,
332}
333
334impl From<SetVariable> for PySetVariable {
335 fn from(set_variable: SetVariable) -> PySetVariable {
336 PySetVariable { set_variable }
337 }
338}
339
340impl TryFrom<PySetVariable> for SetVariable {
341 type Error = PyErr;
342
343 fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
344 Ok(py.set_variable)
345 }
346}
347
348impl LogicalNode for PySetVariable {
349 fn inputs(&self) -> Vec<PyLogicalPlan> {
350 vec![]
351 }
352
353 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
354 self.clone().into_bound_py_any(py)
355 }
356}
357
358#[pymethods]
359impl PySetVariable {
360 #[new]
361 pub fn new(variable: String, value: String) -> Self {
362 PySetVariable {
363 set_variable: SetVariable { variable, value },
364 }
365 }
366
367 pub fn variable(&self) -> String {
368 self.set_variable.variable.clone()
369 }
370
371 pub fn value(&self) -> String {
372 self.set_variable.value.clone()
373 }
374}
375
376#[pyclass(
377 from_py_object,
378 frozen,
379 name = "Prepare",
380 module = "datafusion.expr",
381 subclass
382)]
383#[derive(Clone)]
384pub struct PyPrepare {
385 prepare: Prepare,
386}
387
388impl From<Prepare> for PyPrepare {
389 fn from(prepare: Prepare) -> PyPrepare {
390 PyPrepare { prepare }
391 }
392}
393
394impl TryFrom<PyPrepare> for Prepare {
395 type Error = PyErr;
396
397 fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
398 Ok(py.prepare)
399 }
400}
401
402impl LogicalNode for PyPrepare {
403 fn inputs(&self) -> Vec<PyLogicalPlan> {
404 vec![PyLogicalPlan::from((*self.prepare.input).clone())]
405 }
406
407 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
408 self.clone().into_bound_py_any(py)
409 }
410}
411
412#[pymethods]
413impl PyPrepare {
414 #[new]
415 pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
416 let input = input.plan().clone();
417 let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
418 PyPrepare {
419 prepare: Prepare {
420 name,
421 fields,
422 input,
423 },
424 }
425 }
426
427 pub fn name(&self) -> String {
428 self.prepare.name.clone()
429 }
430
431 pub fn fields(&self) -> Vec<PyArrowType<Field>> {
432 self.prepare
433 .fields
434 .clone()
435 .into_iter()
436 .map(|f| f.as_ref().clone().into())
437 .collect()
438 }
439
440 pub fn input(&self) -> PyLogicalPlan {
441 PyLogicalPlan {
442 plan: self.prepare.input.clone(),
443 }
444 }
445}
446
447#[pyclass(
448 from_py_object,
449 frozen,
450 name = "Execute",
451 module = "datafusion.expr",
452 subclass
453)]
454#[derive(Clone)]
455pub struct PyExecute {
456 execute: Execute,
457}
458
459impl From<Execute> for PyExecute {
460 fn from(execute: Execute) -> PyExecute {
461 PyExecute { execute }
462 }
463}
464
465impl TryFrom<PyExecute> for Execute {
466 type Error = PyErr;
467
468 fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
469 Ok(py.execute)
470 }
471}
472
473impl LogicalNode for PyExecute {
474 fn inputs(&self) -> Vec<PyLogicalPlan> {
475 vec![]
476 }
477
478 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
479 self.clone().into_bound_py_any(py)
480 }
481}
482
483#[pymethods]
484impl PyExecute {
485 #[new]
486 pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
487 let parameters = parameters
488 .into_iter()
489 .map(|parameter| parameter.into())
490 .collect();
491 PyExecute {
492 execute: Execute { name, parameters },
493 }
494 }
495
496 pub fn name(&self) -> String {
497 self.execute.name.clone()
498 }
499
500 pub fn parameters(&self) -> Vec<PyExpr> {
501 self.execute
502 .parameters
503 .clone()
504 .into_iter()
505 .map(|t| t.into())
506 .collect()
507 }
508}
509
510#[pyclass(
511 from_py_object,
512 frozen,
513 name = "Deallocate",
514 module = "datafusion.expr",
515 subclass
516)]
517#[derive(Clone)]
518pub struct PyDeallocate {
519 deallocate: Deallocate,
520}
521
522impl From<Deallocate> for PyDeallocate {
523 fn from(deallocate: Deallocate) -> PyDeallocate {
524 PyDeallocate { deallocate }
525 }
526}
527
528impl TryFrom<PyDeallocate> for Deallocate {
529 type Error = PyErr;
530
531 fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
532 Ok(py.deallocate)
533 }
534}
535
536impl LogicalNode for PyDeallocate {
537 fn inputs(&self) -> Vec<PyLogicalPlan> {
538 vec![]
539 }
540
541 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
542 self.clone().into_bound_py_any(py)
543 }
544}
545
546#[pymethods]
547impl PyDeallocate {
548 #[new]
549 pub fn new(name: String) -> Self {
550 PyDeallocate {
551 deallocate: Deallocate { name },
552 }
553 }
554
555 pub fn name(&self) -> String {
556 self.deallocate.name.clone()
557 }
558}