datafusion_expr/logical_plan/dml.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33 /// The relation that determines the tuples to write to the output file(s)
34 pub input: Arc<LogicalPlan>,
35 /// The location to write the file(s)
36 pub output_url: String,
37 /// Determines which, if any, columns should be used for hive-style partitioned writes
38 pub partition_by: Vec<String>,
39 /// File type trait
40 pub file_type: Arc<dyn FileType>,
41 /// SQL Options that can affect the formats
42 pub options: HashMap<String, String>,
43}
44
45impl Debug for CopyTo {
46 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
47 f.debug_struct("CopyTo")
48 .field("input", &self.input)
49 .field("output_url", &self.output_url)
50 .field("partition_by", &self.partition_by)
51 .field("file_type", &"...")
52 .field("options", &self.options)
53 .finish_non_exhaustive()
54 }
55}
56
57// Implement PartialEq manually
58impl PartialEq for CopyTo {
59 fn eq(&self, other: &Self) -> bool {
60 self.input == other.input && self.output_url == other.output_url
61 }
62}
63
64// Implement Eq (no need for additional logic over PartialEq)
65impl Eq for CopyTo {}
66
67// Manual implementation needed because of `file_type` and `options` fields.
68// Comparison excludes these field.
69impl PartialOrd for CopyTo {
70 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
71 match self.input.partial_cmp(&other.input) {
72 Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
73 {
74 Some(Ordering::Equal) => {
75 self.partition_by.partial_cmp(&other.partition_by)
76 }
77 cmp => cmp,
78 },
79 cmp => cmp,
80 }
81 }
82}
83
84// Implement Hash manually
85impl Hash for CopyTo {
86 fn hash<H: Hasher>(&self, state: &mut H) {
87 self.input.hash(state);
88 self.output_url.hash(state);
89 }
90}
91
92/// Modifies the content of a database
93///
94/// This operator is used to perform DML operations such as INSERT, DELETE,
95/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
96///
97/// * `INSERT` - Appends new rows to the existing table. Calls
98/// [`TableProvider::insert_into`]
99///
100/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
101/// [`TableProvider`] trait or builtin sources.
102///
103/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
104/// the [`TableProvider`] trait or builtin sources.
105///
106/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
107/// from a query. This is similar to the `INSERT` operation, but it creates a new
108/// table instead of modifying an existing one.
109///
110/// Note that the structure is adapted from substrait WriteRel)
111///
112/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
113/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
114#[derive(Clone)]
115pub struct DmlStatement {
116 /// The table name
117 pub table_name: TableReference,
118 /// this is target table to insert into
119 pub target: Arc<dyn TableSource>,
120 /// The type of operation to perform
121 pub op: WriteOp,
122 /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
123 pub input: Arc<LogicalPlan>,
124 /// The schema of the output relation
125 pub output_schema: DFSchemaRef,
126}
127impl Eq for DmlStatement {}
128impl Hash for DmlStatement {
129 fn hash<H: Hasher>(&self, state: &mut H) {
130 self.table_name.hash(state);
131 self.target.schema().hash(state);
132 self.op.hash(state);
133 self.input.hash(state);
134 self.output_schema.hash(state);
135 }
136}
137
138impl PartialEq for DmlStatement {
139 fn eq(&self, other: &Self) -> bool {
140 self.table_name == other.table_name
141 && self.target.schema() == other.target.schema()
142 && self.op == other.op
143 && self.input == other.input
144 && self.output_schema == other.output_schema
145 }
146}
147
148impl Debug for DmlStatement {
149 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
150 f.debug_struct("DmlStatement")
151 .field("table_name", &self.table_name)
152 .field("target", &"...")
153 .field("target_schema", &self.target.schema())
154 .field("op", &self.op)
155 .field("input", &self.input)
156 .field("output_schema", &self.output_schema)
157 .finish()
158 }
159}
160
161impl DmlStatement {
162 /// Creates a new DML statement with the output schema set to a single `count` column.
163 pub fn new(
164 table_name: TableReference,
165 target: Arc<dyn TableSource>,
166 op: WriteOp,
167 input: Arc<LogicalPlan>,
168 ) -> Self {
169 Self {
170 table_name,
171 target,
172 op,
173 input,
174
175 // The output schema is always a single column with the number of rows affected
176 output_schema: make_count_schema(),
177 }
178 }
179
180 /// Return a descriptive name of this [`DmlStatement`]
181 pub fn name(&self) -> &str {
182 self.op.name()
183 }
184}
185
186// Manual implementation needed because of `table_schema` and `output_schema` fields.
187// Comparison excludes these fields.
188impl PartialOrd for DmlStatement {
189 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
190 match self.table_name.partial_cmp(&other.table_name) {
191 Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
192 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
193 cmp => cmp,
194 },
195 cmp => cmp,
196 }
197 }
198}
199
200/// The type of DML operation to perform.
201///
202/// See [`DmlStatement`] for more details.
203#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
204pub enum WriteOp {
205 /// `INSERT INTO` operation
206 Insert(InsertOp),
207 /// `DELETE` operation
208 Delete,
209 /// `UPDATE` operation
210 Update,
211 /// `CREATE TABLE AS SELECT` operation
212 Ctas,
213}
214
215impl WriteOp {
216 /// Return a descriptive name of this [`WriteOp`]
217 pub fn name(&self) -> &str {
218 match self {
219 WriteOp::Insert(insert) => insert.name(),
220 WriteOp::Delete => "Delete",
221 WriteOp::Update => "Update",
222 WriteOp::Ctas => "Ctas",
223 }
224 }
225}
226
227impl Display for WriteOp {
228 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
229 write!(f, "{}", self.name())
230 }
231}
232
233#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
234pub enum InsertOp {
235 /// Appends new rows to the existing table without modifying any
236 /// existing rows. This corresponds to the SQL `INSERT INTO` query.
237 Append,
238 /// Overwrites all existing rows in the table with the new rows.
239 /// This corresponds to the SQL `INSERT OVERWRITE` query.
240 Overwrite,
241 /// If any existing rows collides with the inserted rows (typically based
242 /// on a unique key or primary key), those existing rows are replaced.
243 /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
244 Replace,
245}
246
247impl InsertOp {
248 /// Return a descriptive name of this [`InsertOp`]
249 pub fn name(&self) -> &str {
250 match self {
251 InsertOp::Append => "Insert Into",
252 InsertOp::Overwrite => "Insert Overwrite",
253 InsertOp::Replace => "Replace Into",
254 }
255 }
256}
257
258impl Display for InsertOp {
259 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
260 write!(f, "{}", self.name())
261 }
262}
263
264fn make_count_schema() -> DFSchemaRef {
265 Arc::new(
266 Schema::new(vec![Field::new("count", DataType::UInt64, false)])
267 .try_into()
268 .unwrap(),
269 )
270}