datafusion_python/expr/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use datafusion::logical_expr::dml::InsertOp;
19use datafusion::logical_expr::{DmlStatement, WriteOp};
20use pyo3::{prelude::*, IntoPyObjectExt};
21
22use crate::common::schema::PyTableSource;
23use crate::{common::df_schema::PyDFSchema, sql::logical::PyLogicalPlan};
24
25use super::logical_node::LogicalNode;
26
27#[pyclass(name = "DmlStatement", module = "datafusion.expr", subclass)]
28#[derive(Clone)]
29pub struct PyDmlStatement {
30    dml: DmlStatement,
31}
32
33impl From<PyDmlStatement> for DmlStatement {
34    fn from(dml: PyDmlStatement) -> Self {
35        dml.dml
36    }
37}
38
39impl From<DmlStatement> for PyDmlStatement {
40    fn from(dml: DmlStatement) -> PyDmlStatement {
41        PyDmlStatement { dml }
42    }
43}
44
45impl LogicalNode for PyDmlStatement {
46    fn inputs(&self) -> Vec<PyLogicalPlan> {
47        vec![PyLogicalPlan::from((*self.dml.input).clone())]
48    }
49
50    fn to_variant<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
51        self.clone().into_bound_py_any(py)
52    }
53}
54
55#[pymethods]
56impl PyDmlStatement {
57    pub fn table_name(&self) -> PyResult<String> {
58        Ok(self.dml.table_name.to_string())
59    }
60
61    pub fn target(&self) -> PyResult<PyTableSource> {
62        Ok(PyTableSource {
63            table_source: self.dml.target.clone(),
64        })
65    }
66
67    pub fn op(&self) -> PyWriteOp {
68        self.dml.op.clone().into()
69    }
70
71    pub fn input(&self) -> PyLogicalPlan {
72        PyLogicalPlan {
73            plan: self.dml.input.clone(),
74        }
75    }
76
77    pub fn output_schema(&self) -> PyDFSchema {
78        (*self.dml.output_schema).clone().into()
79    }
80
81    fn __repr__(&self) -> PyResult<String> {
82        Ok("DmlStatement".to_string())
83    }
84
85    fn __name__(&self) -> PyResult<String> {
86        Ok("DmlStatement".to_string())
87    }
88}
89
90#[derive(Debug, Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
91#[pyclass(eq, eq_int, name = "WriteOp", module = "datafusion.expr")]
92pub enum PyWriteOp {
93    Append,
94    Overwrite,
95    Replace,
96
97    Update,
98    Delete,
99    Ctas,
100}
101
102impl From<WriteOp> for PyWriteOp {
103    fn from(write_op: WriteOp) -> Self {
104        match write_op {
105            WriteOp::Insert(InsertOp::Append) => PyWriteOp::Append,
106            WriteOp::Insert(InsertOp::Overwrite) => PyWriteOp::Overwrite,
107            WriteOp::Insert(InsertOp::Replace) => PyWriteOp::Replace,
108
109            WriteOp::Update => PyWriteOp::Update,
110            WriteOp::Delete => PyWriteOp::Delete,
111            WriteOp::Ctas => PyWriteOp::Ctas,
112        }
113    }
114}
115
116impl From<PyWriteOp> for WriteOp {
117    fn from(py: PyWriteOp) -> Self {
118        match py {
119            PyWriteOp::Append => WriteOp::Insert(InsertOp::Append),
120            PyWriteOp::Overwrite => WriteOp::Insert(InsertOp::Overwrite),
121            PyWriteOp::Replace => WriteOp::Insert(InsertOp::Replace),
122
123            PyWriteOp::Update => WriteOp::Update,
124            PyWriteOp::Delete => WriteOp::Delete,
125            PyWriteOp::Ctas => WriteOp::Ctas,
126        }
127    }
128}
129
130#[pymethods]
131impl PyWriteOp {
132    fn name(&self) -> String {
133        let write_op: WriteOp = self.clone().into();
134        write_op.name().to_string()
135    }
136}