1use std::sync::Arc;
19
20use arrow::datatypes::Field;
21use arrow::pyarrow::PyArrowType;
22use datafusion::logical_expr::{
23 Deallocate, Execute, Prepare, SetVariable, TransactionAccessMode, TransactionConclusion,
24 TransactionEnd, TransactionIsolationLevel, TransactionStart,
25};
26use pyo3::prelude::*;
27use pyo3::IntoPyObjectExt;
28
29use super::logical_node::LogicalNode;
30use super::PyExpr;
31use crate::sql::logical::PyLogicalPlan;
32
33#[pyclass(
34 frozen,
35 name = "TransactionStart",
36 module = "datafusion.expr",
37 subclass
38)]
39#[derive(Clone)]
40pub struct PyTransactionStart {
41 transaction_start: TransactionStart,
42}
43
44impl From<TransactionStart> for PyTransactionStart {
45 fn from(transaction_start: TransactionStart) -> PyTransactionStart {
46 PyTransactionStart { transaction_start }
47 }
48}
49
50impl TryFrom<PyTransactionStart> for TransactionStart {
51 type Error = PyErr;
52
53 fn try_from(py: PyTransactionStart) -> Result<Self, Self::Error> {
54 Ok(py.transaction_start)
55 }
56}
57
58impl LogicalNode for PyTransactionStart {
59 fn inputs(&self) -> Vec<PyLogicalPlan> {
60 vec![]
61 }
62
63 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
64 self.clone().into_bound_py_any(py)
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
69#[pyclass(
70 frozen,
71 eq,
72 eq_int,
73 name = "TransactionAccessMode",
74 module = "datafusion.expr"
75)]
76pub enum PyTransactionAccessMode {
77 ReadOnly,
78 ReadWrite,
79}
80
81impl From<TransactionAccessMode> for PyTransactionAccessMode {
82 fn from(access_mode: TransactionAccessMode) -> PyTransactionAccessMode {
83 match access_mode {
84 TransactionAccessMode::ReadOnly => PyTransactionAccessMode::ReadOnly,
85 TransactionAccessMode::ReadWrite => PyTransactionAccessMode::ReadWrite,
86 }
87 }
88}
89
90impl TryFrom<PyTransactionAccessMode> for TransactionAccessMode {
91 type Error = PyErr;
92
93 fn try_from(py: PyTransactionAccessMode) -> Result<Self, Self::Error> {
94 match py {
95 PyTransactionAccessMode::ReadOnly => Ok(TransactionAccessMode::ReadOnly),
96 PyTransactionAccessMode::ReadWrite => Ok(TransactionAccessMode::ReadWrite),
97 }
98 }
99}
100
101#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
102#[pyclass(
103 frozen,
104 eq,
105 eq_int,
106 name = "TransactionIsolationLevel",
107 module = "datafusion.expr"
108)]
109pub enum PyTransactionIsolationLevel {
110 ReadUncommitted,
111 ReadCommitted,
112 RepeatableRead,
113 Serializable,
114 Snapshot,
115}
116
117impl From<TransactionIsolationLevel> for PyTransactionIsolationLevel {
118 fn from(isolation_level: TransactionIsolationLevel) -> PyTransactionIsolationLevel {
119 match isolation_level {
120 TransactionIsolationLevel::ReadUncommitted => {
121 PyTransactionIsolationLevel::ReadUncommitted
122 }
123 TransactionIsolationLevel::ReadCommitted => PyTransactionIsolationLevel::ReadCommitted,
124 TransactionIsolationLevel::RepeatableRead => {
125 PyTransactionIsolationLevel::RepeatableRead
126 }
127 TransactionIsolationLevel::Serializable => PyTransactionIsolationLevel::Serializable,
128 TransactionIsolationLevel::Snapshot => PyTransactionIsolationLevel::Snapshot,
129 }
130 }
131}
132
133impl TryFrom<PyTransactionIsolationLevel> for TransactionIsolationLevel {
134 type Error = PyErr;
135
136 fn try_from(value: PyTransactionIsolationLevel) -> Result<Self, Self::Error> {
137 match value {
138 PyTransactionIsolationLevel::ReadUncommitted => {
139 Ok(TransactionIsolationLevel::ReadUncommitted)
140 }
141 PyTransactionIsolationLevel::ReadCommitted => {
142 Ok(TransactionIsolationLevel::ReadCommitted)
143 }
144 PyTransactionIsolationLevel::RepeatableRead => {
145 Ok(TransactionIsolationLevel::RepeatableRead)
146 }
147 PyTransactionIsolationLevel::Serializable => {
148 Ok(TransactionIsolationLevel::Serializable)
149 }
150 PyTransactionIsolationLevel::Snapshot => Ok(TransactionIsolationLevel::Snapshot),
151 }
152 }
153}
154
155#[pymethods]
156impl PyTransactionStart {
157 #[new]
158 pub fn new(
159 access_mode: PyTransactionAccessMode,
160 isolation_level: PyTransactionIsolationLevel,
161 ) -> PyResult<Self> {
162 let access_mode = access_mode.try_into()?;
163 let isolation_level = isolation_level.try_into()?;
164 Ok(PyTransactionStart {
165 transaction_start: TransactionStart {
166 access_mode,
167 isolation_level,
168 },
169 })
170 }
171
172 pub fn access_mode(&self) -> PyResult<PyTransactionAccessMode> {
173 Ok(self.transaction_start.access_mode.clone().into())
174 }
175
176 pub fn isolation_level(&self) -> PyResult<PyTransactionIsolationLevel> {
177 Ok(self.transaction_start.isolation_level.clone().into())
178 }
179}
180
181#[pyclass(frozen, name = "TransactionEnd", module = "datafusion.expr", subclass)]
182#[derive(Clone)]
183pub struct PyTransactionEnd {
184 transaction_end: TransactionEnd,
185}
186
187impl From<TransactionEnd> for PyTransactionEnd {
188 fn from(transaction_end: TransactionEnd) -> PyTransactionEnd {
189 PyTransactionEnd { transaction_end }
190 }
191}
192
193impl TryFrom<PyTransactionEnd> for TransactionEnd {
194 type Error = PyErr;
195
196 fn try_from(py: PyTransactionEnd) -> Result<Self, Self::Error> {
197 Ok(py.transaction_end)
198 }
199}
200
201impl LogicalNode for PyTransactionEnd {
202 fn inputs(&self) -> Vec<PyLogicalPlan> {
203 vec![]
204 }
205
206 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
207 self.clone().into_bound_py_any(py)
208 }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
212#[pyclass(
213 frozen,
214 eq,
215 eq_int,
216 name = "TransactionConclusion",
217 module = "datafusion.expr"
218)]
219pub enum PyTransactionConclusion {
220 Commit,
221 Rollback,
222}
223
224impl From<TransactionConclusion> for PyTransactionConclusion {
225 fn from(value: TransactionConclusion) -> Self {
226 match value {
227 TransactionConclusion::Commit => PyTransactionConclusion::Commit,
228 TransactionConclusion::Rollback => PyTransactionConclusion::Rollback,
229 }
230 }
231}
232
233impl TryFrom<PyTransactionConclusion> for TransactionConclusion {
234 type Error = PyErr;
235
236 fn try_from(value: PyTransactionConclusion) -> Result<Self, Self::Error> {
237 match value {
238 PyTransactionConclusion::Commit => Ok(TransactionConclusion::Commit),
239 PyTransactionConclusion::Rollback => Ok(TransactionConclusion::Rollback),
240 }
241 }
242}
243#[pymethods]
244impl PyTransactionEnd {
245 #[new]
246 pub fn new(conclusion: PyTransactionConclusion, chain: bool) -> PyResult<Self> {
247 let conclusion = conclusion.try_into()?;
248 Ok(PyTransactionEnd {
249 transaction_end: TransactionEnd { conclusion, chain },
250 })
251 }
252
253 pub fn conclusion(&self) -> PyResult<PyTransactionConclusion> {
254 Ok(self.transaction_end.conclusion.clone().into())
255 }
256
257 pub fn chain(&self) -> bool {
258 self.transaction_end.chain
259 }
260}
261
262#[pyclass(frozen, name = "SetVariable", module = "datafusion.expr", subclass)]
263#[derive(Clone)]
264pub struct PySetVariable {
265 set_variable: SetVariable,
266}
267
268impl From<SetVariable> for PySetVariable {
269 fn from(set_variable: SetVariable) -> PySetVariable {
270 PySetVariable { set_variable }
271 }
272}
273
274impl TryFrom<PySetVariable> for SetVariable {
275 type Error = PyErr;
276
277 fn try_from(py: PySetVariable) -> Result<Self, Self::Error> {
278 Ok(py.set_variable)
279 }
280}
281
282impl LogicalNode for PySetVariable {
283 fn inputs(&self) -> Vec<PyLogicalPlan> {
284 vec![]
285 }
286
287 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
288 self.clone().into_bound_py_any(py)
289 }
290}
291
292#[pymethods]
293impl PySetVariable {
294 #[new]
295 pub fn new(variable: String, value: String) -> Self {
296 PySetVariable {
297 set_variable: SetVariable { variable, value },
298 }
299 }
300
301 pub fn variable(&self) -> String {
302 self.set_variable.variable.clone()
303 }
304
305 pub fn value(&self) -> String {
306 self.set_variable.value.clone()
307 }
308}
309
310#[pyclass(frozen, name = "Prepare", module = "datafusion.expr", subclass)]
311#[derive(Clone)]
312pub struct PyPrepare {
313 prepare: Prepare,
314}
315
316impl From<Prepare> for PyPrepare {
317 fn from(prepare: Prepare) -> PyPrepare {
318 PyPrepare { prepare }
319 }
320}
321
322impl TryFrom<PyPrepare> for Prepare {
323 type Error = PyErr;
324
325 fn try_from(py: PyPrepare) -> Result<Self, Self::Error> {
326 Ok(py.prepare)
327 }
328}
329
330impl LogicalNode for PyPrepare {
331 fn inputs(&self) -> Vec<PyLogicalPlan> {
332 vec![PyLogicalPlan::from((*self.prepare.input).clone())]
333 }
334
335 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
336 self.clone().into_bound_py_any(py)
337 }
338}
339
340#[pymethods]
341impl PyPrepare {
342 #[new]
343 pub fn new(name: String, fields: Vec<PyArrowType<Field>>, input: PyLogicalPlan) -> Self {
344 let input = input.plan().clone();
345 let fields = fields.into_iter().map(|field| Arc::new(field.0)).collect();
346 PyPrepare {
347 prepare: Prepare {
348 name,
349 fields,
350 input,
351 },
352 }
353 }
354
355 pub fn name(&self) -> String {
356 self.prepare.name.clone()
357 }
358
359 pub fn fields(&self) -> Vec<PyArrowType<Field>> {
360 self.prepare
361 .fields
362 .clone()
363 .into_iter()
364 .map(|f| f.as_ref().clone().into())
365 .collect()
366 }
367
368 pub fn input(&self) -> PyLogicalPlan {
369 PyLogicalPlan {
370 plan: self.prepare.input.clone(),
371 }
372 }
373}
374
375#[pyclass(frozen, name = "Execute", module = "datafusion.expr", subclass)]
376#[derive(Clone)]
377pub struct PyExecute {
378 execute: Execute,
379}
380
381impl From<Execute> for PyExecute {
382 fn from(execute: Execute) -> PyExecute {
383 PyExecute { execute }
384 }
385}
386
387impl TryFrom<PyExecute> for Execute {
388 type Error = PyErr;
389
390 fn try_from(py: PyExecute) -> Result<Self, Self::Error> {
391 Ok(py.execute)
392 }
393}
394
395impl LogicalNode for PyExecute {
396 fn inputs(&self) -> Vec<PyLogicalPlan> {
397 vec![]
398 }
399
400 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
401 self.clone().into_bound_py_any(py)
402 }
403}
404
405#[pymethods]
406impl PyExecute {
407 #[new]
408 pub fn new(name: String, parameters: Vec<PyExpr>) -> Self {
409 let parameters = parameters
410 .into_iter()
411 .map(|parameter| parameter.into())
412 .collect();
413 PyExecute {
414 execute: Execute { name, parameters },
415 }
416 }
417
418 pub fn name(&self) -> String {
419 self.execute.name.clone()
420 }
421
422 pub fn parameters(&self) -> Vec<PyExpr> {
423 self.execute
424 .parameters
425 .clone()
426 .into_iter()
427 .map(|t| t.into())
428 .collect()
429 }
430}
431
432#[pyclass(frozen, name = "Deallocate", module = "datafusion.expr", subclass)]
433#[derive(Clone)]
434pub struct PyDeallocate {
435 deallocate: Deallocate,
436}
437
438impl From<Deallocate> for PyDeallocate {
439 fn from(deallocate: Deallocate) -> PyDeallocate {
440 PyDeallocate { deallocate }
441 }
442}
443
444impl TryFrom<PyDeallocate> for Deallocate {
445 type Error = PyErr;
446
447 fn try_from(py: PyDeallocate) -> Result<Self, Self::Error> {
448 Ok(py.deallocate)
449 }
450}
451
452impl LogicalNode for PyDeallocate {
453 fn inputs(&self) -> Vec<PyLogicalPlan> {
454 vec![]
455 }
456
457 fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
458 self.clone().into_bound_py_any(py)
459 }
460}
461
462#[pymethods]
463impl PyDeallocate {
464 #[new]
465 pub fn new(name: String) -> Self {
466 PyDeallocate {
467 deallocate: Deallocate { name },
468 }
469 }
470
471 pub fn name(&self) -> String {
472 self.deallocate.name.clone()
473 }
474}