datafusion_expr/logical_plan/
dml.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::cmp::Ordering;
19use std::collections::HashMap;
20use std::fmt::{self, Debug, Display, Formatter};
21use std::hash::{Hash, Hasher};
22use std::sync::Arc;
23
24use arrow::datatypes::{DataType, Field, Schema};
25use datafusion_common::file_options::file_type::FileType;
26use datafusion_common::{DFSchemaRef, TableReference};
27
28use crate::{LogicalPlan, TableSource};
29
30/// Operator that copies the contents of a database to file(s)
31#[derive(Clone)]
32pub struct CopyTo {
33    /// The relation that determines the tuples to write to the output file(s)
34    pub input: Arc<LogicalPlan>,
35    /// The location to write the file(s)
36    pub output_url: String,
37    /// Determines which, if any, columns should be used for hive-style partitioned writes
38    pub partition_by: Vec<String>,
39    /// File type trait
40    pub file_type: Arc<dyn FileType>,
41    /// SQL Options that can affect the formats
42    pub options: HashMap<String, String>,
43    /// The schema of the output (a single column "count")
44    pub output_schema: DFSchemaRef,
45}
46
47impl Debug for CopyTo {
48    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
49        f.debug_struct("CopyTo")
50            .field("input", &self.input)
51            .field("output_url", &self.output_url)
52            .field("partition_by", &self.partition_by)
53            .field("file_type", &"...")
54            .field("options", &self.options)
55            .field("output_schema", &self.output_schema)
56            .finish_non_exhaustive()
57    }
58}
59
60// Implement PartialEq manually
61impl PartialEq for CopyTo {
62    fn eq(&self, other: &Self) -> bool {
63        self.input == other.input && self.output_url == other.output_url
64    }
65}
66
67// Implement Eq (no need for additional logic over PartialEq)
68impl Eq for CopyTo {}
69
70// Manual implementation needed because of `file_type` and `options` fields.
71// Comparison excludes these field.
72impl PartialOrd for CopyTo {
73    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
74        match self.input.partial_cmp(&other.input) {
75            Some(Ordering::Equal) => match self.output_url.partial_cmp(&other.output_url)
76            {
77                Some(Ordering::Equal) => {
78                    self.partition_by.partial_cmp(&other.partition_by)
79                }
80                cmp => cmp,
81            },
82            cmp => cmp,
83        }
84    }
85}
86
87// Implement Hash manually
88impl Hash for CopyTo {
89    fn hash<H: Hasher>(&self, state: &mut H) {
90        self.input.hash(state);
91        self.output_url.hash(state);
92    }
93}
94
95impl CopyTo {
96    pub fn new(
97        input: Arc<LogicalPlan>,
98        output_url: String,
99        partition_by: Vec<String>,
100        file_type: Arc<dyn FileType>,
101        options: HashMap<String, String>,
102    ) -> Self {
103        Self {
104            input,
105            output_url,
106            partition_by,
107            file_type,
108            options,
109            // The output schema is always a single column "count" with the number of rows copied
110            output_schema: make_count_schema(),
111        }
112    }
113}
114
115/// Modifies the content of a database
116///
117/// This operator is used to perform DML operations such as INSERT, DELETE,
118/// UPDATE, and CTAS (CREATE TABLE AS SELECT).
119///
120/// * `INSERT` - Appends new rows to the existing table. Calls
121///   [`TableProvider::insert_into`]
122///
123/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
124///   [`TableProvider`] trait or builtin sources.
125///
126/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
127///   the [`TableProvider`] trait or builtin sources.
128///
129/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
130///   from a query. This is similar to the `INSERT` operation, but it creates a new
131///   table instead of modifying an existing one.
132///
133/// Note that the structure is adapted from substrait WriteRel)
134///
135/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
136/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
137#[derive(Clone)]
138pub struct DmlStatement {
139    /// The table name
140    pub table_name: TableReference,
141    /// this is target table to insert into
142    pub target: Arc<dyn TableSource>,
143    /// The type of operation to perform
144    pub op: WriteOp,
145    /// The relation that determines the tuples to add/remove/modify the schema must match with table_schema
146    pub input: Arc<LogicalPlan>,
147    /// The schema of the output relation
148    pub output_schema: DFSchemaRef,
149}
150impl Eq for DmlStatement {}
151impl Hash for DmlStatement {
152    fn hash<H: Hasher>(&self, state: &mut H) {
153        self.table_name.hash(state);
154        self.target.schema().hash(state);
155        self.op.hash(state);
156        self.input.hash(state);
157        self.output_schema.hash(state);
158    }
159}
160
161impl PartialEq for DmlStatement {
162    fn eq(&self, other: &Self) -> bool {
163        self.table_name == other.table_name
164            && self.target.schema() == other.target.schema()
165            && self.op == other.op
166            && self.input == other.input
167            && self.output_schema == other.output_schema
168    }
169}
170
171impl Debug for DmlStatement {
172    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
173        f.debug_struct("DmlStatement")
174            .field("table_name", &self.table_name)
175            .field("target", &"...")
176            .field("target_schema", &self.target.schema())
177            .field("op", &self.op)
178            .field("input", &self.input)
179            .field("output_schema", &self.output_schema)
180            .finish()
181    }
182}
183
184impl DmlStatement {
185    /// Creates a new DML statement with the output schema set to a single `count` column.
186    pub fn new(
187        table_name: TableReference,
188        target: Arc<dyn TableSource>,
189        op: WriteOp,
190        input: Arc<LogicalPlan>,
191    ) -> Self {
192        Self {
193            table_name,
194            target,
195            op,
196            input,
197
198            // The output schema is always a single column with the number of rows affected
199            output_schema: make_count_schema(),
200        }
201    }
202
203    /// Return a descriptive name of this [`DmlStatement`]
204    pub fn name(&self) -> &str {
205        self.op.name()
206    }
207}
208
209// Manual implementation needed because of `table_schema` and `output_schema` fields.
210// Comparison excludes these fields.
211impl PartialOrd for DmlStatement {
212    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
213        match self.table_name.partial_cmp(&other.table_name) {
214            Some(Ordering::Equal) => match self.op.partial_cmp(&other.op) {
215                Some(Ordering::Equal) => self.input.partial_cmp(&other.input),
216                cmp => cmp,
217            },
218            cmp => cmp,
219        }
220    }
221}
222
223/// The type of DML operation to perform.
224///
225/// See [`DmlStatement`] for more details.
226#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
227pub enum WriteOp {
228    /// `INSERT INTO` operation
229    Insert(InsertOp),
230    /// `DELETE` operation
231    Delete,
232    /// `UPDATE` operation
233    Update,
234    /// `CREATE TABLE AS SELECT` operation
235    Ctas,
236}
237
238impl WriteOp {
239    /// Return a descriptive name of this [`WriteOp`]
240    pub fn name(&self) -> &str {
241        match self {
242            WriteOp::Insert(insert) => insert.name(),
243            WriteOp::Delete => "Delete",
244            WriteOp::Update => "Update",
245            WriteOp::Ctas => "Ctas",
246        }
247    }
248}
249
250impl Display for WriteOp {
251    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
252        write!(f, "{}", self.name())
253    }
254}
255
256#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Hash)]
257pub enum InsertOp {
258    /// Appends new rows to the existing table without modifying any
259    /// existing rows. This corresponds to the SQL `INSERT INTO` query.
260    Append,
261    /// Overwrites all existing rows in the table with the new rows.
262    /// This corresponds to the SQL `INSERT OVERWRITE` query.
263    Overwrite,
264    /// If any existing rows collides with the inserted rows (typically based
265    /// on a unique key or primary key), those existing rows are replaced.
266    /// This corresponds to the SQL `REPLACE INTO` query and its equivalents.
267    Replace,
268}
269
270impl InsertOp {
271    /// Return a descriptive name of this [`InsertOp`]
272    pub fn name(&self) -> &str {
273        match self {
274            InsertOp::Append => "Insert Into",
275            InsertOp::Overwrite => "Insert Overwrite",
276            InsertOp::Replace => "Replace Into",
277        }
278    }
279}
280
281impl Display for InsertOp {
282    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
283        write!(f, "{}", self.name())
284    }
285}
286
287fn make_count_schema() -> DFSchemaRef {
288    Arc::new(
289        Schema::new(vec![Field::new("count", DataType::UInt64, false)])
290            .try_into()
291            .unwrap(),
292    )
293}