datafusion_expr/logical_plan/dml.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33 /// The relation that determines the tuples to write to the output file(s)
34 pub input: Arc<LogicalPlan>,
35 /// The location to write the file(s)
36 pub output_url: String,
37 /// Determines which, if any, columns should be used for hive-style partitioned writes
38 pub partition_by: Vec<String>,
39 /// File type trait
40 pub file_type: Arc<dyn FileType>,
41 /// SQL Options that can affect the formats
42 pub options: HashMap<String, String>,
43 /// The schema of the output (a single column "count")
44 pub output_schema: DFSchemaRef,
45}
46
47impl Debug for CopyTo {
48 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49 f.debug_struct("CopyTo")
50 .field("input", &self.input)
51 .field("output_url", &self.output_url)
52 .field("partition_by", &self.partition_by)
53 .field("file_type", &"...")
54 .field("options", &self.options)
55 .field("output_schema", &self.output_schema)
56 .finish_non_exhaustive()
57 }
58}
59
60// Implement PartialEq manually
61impl PartialEq for CopyTo {
62 fn eq(&self, other: &Self) -> bool {
63 self.input == other.input && self.output_url == other.output_url
64 }
65}
66
67// Implement Eq (no need for additional logic over PartialEq)
68impl Eq for CopyTo {}
69
70// Manual implementation needed because of `file_type` and `options` fields.
71// Comparison excludes these field.
72impl PartialOrd for CopyTo {
73 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74 match self.input.partial_cmp(&other.input) {
75 Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
76 {
77 Some(Ordering::Equal) => {
78 self.partition_by.partial_cmp(&other.partition_by)
79 }
80 cmp => cmp,
81 },
82 cmp => cmp,
83 }
84 // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
85 .filter(|cmp| *cmp != Ordering::Equal || self == other)
86 }
87}
88
89// Implement Hash manually
90impl Hash for CopyTo {
91 fn hash<H: Hasher>(&self, state: &mut H) {
92 self.input.hash(state);
93 self.output_url.hash(state);
94 }
95}
96
97impl CopyTo {
98 pub fn new(
99 input: Arc<LogicalPlan>,
100 output_url: String,
101 partition_by: Vec<String>,
102 file_type: Arc<dyn FileType>,
103 options: HashMap<String, String>,
104 ) -> Self {
105 Self {
106 input,
107 output_url,
108 partition_by,
109 file_type,
110 options,
111 // The output schema is always a single column "count" with the number of rows copied
112 output_schema: make_count_schema(),
113 }
114 }
115}
116
117/// Modifies the content of a database
118///
119/// This operator is used to perform DML operations such as INSERT, DELETE,
120/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
121///
122/// * `INSERT` - Appends new rows to the existing table. Calls
123/// [`TableProvider::insert_into`]
124///
125/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
126/// [`TableProvider`] trait or builtin sources.
127///
128/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
129/// the [`TableProvider`] trait or builtin sources.
130///
131/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
132/// from a query. This is similar to the `INSERT` operation, but it creates a new
133/// table instead of modifying an existing one.
134///
135/// Note that the structure is adapted from substrait WriteRel)
136///
137/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
138/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
139#[derive(Clone)]
140pub struct DmlStatement {
141 /// The table name
142 pub table_name: TableReference,
143 /// this is target table to insert into
144 pub target: Arc<dyn TableSource>,
145 /// The type of operation to perform
146 pub op: WriteOp,
147 /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
148 pub input: Arc<LogicalPlan>,
149 /// The schema of the output relation
150 pub output_schema: DFSchemaRef,
151}
152impl Eq for DmlStatement {}
153impl Hash for DmlStatement {
154 fn hash<H: Hasher>(&self, state: &mut H) {
155 self.table_name.hash(state);
156 self.target.schema().hash(state);
157 self.op.hash(state);
158 self.input.hash(state);
159 self.output_schema.hash(state);
160 }
161}
162
163impl PartialEq for DmlStatement {
164 fn eq(&self, other: &Self) -> bool {
165 self.table_name == other.table_name
166 && self.target.schema() == other.target.schema()
167 && self.op == other.op
168 && self.input == other.input
169 && self.output_schema == other.output_schema
170 }
171}
172
173impl Debug for DmlStatement {
174 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
175 f.debug_struct("DmlStatement")
176 .field("table_name", &self.table_name)
177 .field("target", &"...")
178 .field("target_schema", &self.target.schema())
179 .field("op", &self.op)
180 .field("input", &self.input)
181 .field("output_schema", &self.output_schema)
182 .finish()
183 }
184}
185
186impl DmlStatement {
187 /// Creates a new DML statement with the output schema set to a single `count` column.
188 pub fn new(
189 table_name: TableReference,
190 target: Arc<dyn TableSource>,
191 op: WriteOp,
192 input: Arc<LogicalPlan>,
193 ) -> Self {
194 Self {
195 table_name,
196 target,
197 op,
198 input,
199
200 // The output schema is always a single column with the number of rows affected
201 output_schema: make_count_schema(),
202 }
203 }
204
205 /// Return a descriptive name of this [`DmlStatement`]
206 pub fn name(&self) -> &str {
207 self.op.name()
208 }
209}
210
211// Manual implementation needed because of `table_schema` and `output_schema` fields.
212// Comparison excludes these fields.
213impl PartialOrd for DmlStatement {
214 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
215 match self.table_name.partial_cmp(&other.table_name) {
216 Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
217 Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
218 cmp => cmp,
219 },
220 cmp => cmp,
221 }
222 // TODO (https://github.com/apache/datafusion/issues/17477) avoid recomparing all fields
223 .filter(|cmp| *cmp != Ordering::Equal || self == other)
224 }
225}
226
227/// The type of DML operation to perform.
228///
229/// See [`DmlStatement`] for more details.
230#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
231pub enum WriteOp {
232 /// `INSERT INTO` operation
233 Insert(InsertOp),
234 /// `DELETE` operation
235 Delete,
236 /// `UPDATE` operation
237 Update,
238 /// `CREATE TABLE AS SELECT` operation
239 Ctas,
240}
241
242impl WriteOp {
243 /// Return a descriptive name of this [`WriteOp`]
244 pub fn name(&self) -> &str {
245 match self {
246 WriteOp::Insert(insert) => insert.name(),
247 WriteOp::Delete => "Delete",
248 WriteOp::Update => "Update",
249 WriteOp::Ctas => "Ctas",
250 }
251 }
252}
253
254impl Display for WriteOp {
255 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
256 write!(f, "{}", self.name())
257 }
258}
259
260#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
261pub enum InsertOp {
262 /// Appends new rows to the existing table without modifying any
263 /// existing rows. This corresponds to the SQL `INSERT INTO` query.
264 Append,
265 /// Overwrites all existing rows in the table with the new rows.
266 /// This corresponds to the SQL `INSERT OVERWRITE` query.
267 Overwrite,
268 /// If any existing rows collides with the inserted rows (typically based
269 /// on a unique key or primary key), those existing rows are replaced.
270 /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
271 Replace,
272}
273
274impl InsertOp {
275 /// Return a descriptive name of this [`InsertOp`]
276 pub fn name(&self) -> &str {
277 match self {
278 InsertOp::Append => "Insert Into",
279 InsertOp::Overwrite => "Insert Overwrite",
280 InsertOp::Replace => "Replace Into",
281 }
282 }
283}
284
285impl Display for InsertOp {
286 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
287 write!(f, "{}", self.name())
288 }
289}
290
291fn make_count_schema() -> DFSchemaRef {
292 Arc::new(
293 Schema::new(vec![Field::new("count", DataType::UInt64, false)])
294 .try_into()
295 .unwrap(),
296 )
297}