datafusion_sql/
parser.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`DFParser`]: DataFusion SQL Parser based on [`sqlparser`]
19//!
20//! This parser implements DataFusion specific statements such as
21//! `CREATE EXTERNAL TABLE`
22
23use datafusion_common::config::SqlParserOptions;
24use datafusion_common::DataFusionError;
25use datafusion_common::{sql_err, Diagnostic, Span};
26use sqlparser::ast::{ExprWithAlias, OrderByOptions};
27use sqlparser::tokenizer::TokenWithSpan;
28use sqlparser::{
29    ast::{
30        ColumnDef, ColumnOptionDef, ObjectName, OrderByExpr, Query,
31        Statement as SQLStatement, TableConstraint, Value,
32    },
33    dialect::{keywords::Keyword, Dialect, GenericDialect},
34    parser::{Parser, ParserError},
35    tokenizer::{Token, Tokenizer, Word},
36};
37use std::collections::VecDeque;
38use std::fmt;
39
40// Use `Parser::expected` instead, if possible
41macro_rules! parser_err {
42    ($MSG:expr $(; diagnostic = $DIAG:expr)?) => {{
43
44        let err = DataFusionError::from(ParserError::ParserError($MSG.to_string()));
45        $(
46            let err = err.with_diagnostic($DIAG);
47        )?
48        Err(err)
49    }};
50}
51
52fn parse_file_type(s: &str) -> Result<String, DataFusionError> {
53    Ok(s.to_uppercase())
54}
55
56/// DataFusion specific `EXPLAIN`
57///
58/// Syntax:
59/// ```sql
60/// EXPLAIN <ANALYZE> <VERBOSE> [FORMAT format] statement
61///```
62#[derive(Debug, Clone, PartialEq, Eq)]
63pub struct ExplainStatement {
64    /// `EXPLAIN ANALYZE ..`
65    pub analyze: bool,
66    /// `EXPLAIN .. VERBOSE ..`
67    pub verbose: bool,
68    /// `EXPLAIN .. FORMAT `
69    pub format: Option<String>,
70    /// The statement to analyze. Note this is a DataFusion [`Statement`] (not a
71    /// [`sqlparser::ast::Statement`] so that we can use `EXPLAIN`, `COPY`, and other
72    /// DataFusion specific statements
73    pub statement: Box<Statement>,
74}
75
76impl fmt::Display for ExplainStatement {
77    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
78        let Self {
79            analyze,
80            verbose,
81            format,
82            statement,
83        } = self;
84
85        write!(f, "EXPLAIN ")?;
86        if *analyze {
87            write!(f, "ANALYZE ")?;
88        }
89        if *verbose {
90            write!(f, "VERBOSE ")?;
91        }
92        if let Some(format) = format.as_ref() {
93            write!(f, "FORMAT {format} ")?;
94        }
95
96        write!(f, "{statement}")
97    }
98}
99
100/// DataFusion extension DDL for `COPY`
101///
102/// # Syntax:
103///
104/// ```text
105/// COPY <table_name | (<query>)>
106/// TO
107/// <destination_url>
108/// (key_value_list)
109/// ```
110///
111/// # Examples
112///
113/// ```sql
114/// COPY lineitem  TO 'lineitem'
115/// STORED AS PARQUET (
116///   partitions 16,
117///   row_group_limit_rows 100000,
118///   row_group_limit_bytes 200000
119/// )
120///
121/// COPY (SELECT l_orderkey from lineitem) to 'lineitem.parquet';
122/// ```
123#[derive(Debug, Clone, PartialEq, Eq)]
124pub struct CopyToStatement {
125    /// From where the data comes from
126    pub source: CopyToSource,
127    /// The URL to where the data is heading
128    pub target: String,
129    /// Partition keys
130    pub partitioned_by: Vec<String>,
131    /// File type (Parquet, NDJSON, CSV etc.)
132    pub stored_as: Option<String>,
133    /// Target specific options
134    pub options: Vec<(String, Value)>,
135}
136
137impl fmt::Display for CopyToStatement {
138    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
139        let Self {
140            source,
141            target,
142            partitioned_by,
143            stored_as,
144            options,
145            ..
146        } = self;
147
148        write!(f, "COPY {source} TO {target}")?;
149        if let Some(file_type) = stored_as {
150            write!(f, " STORED AS {file_type}")?;
151        }
152        if !partitioned_by.is_empty() {
153            write!(f, " PARTITIONED BY ({})", partitioned_by.join(", "))?;
154        }
155
156        if !options.is_empty() {
157            let opts: Vec<_> =
158                options.iter().map(|(k, v)| format!("'{k}' {v}")).collect();
159            write!(f, " OPTIONS ({})", opts.join(", "))?;
160        }
161
162        Ok(())
163    }
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
167pub enum CopyToSource {
168    /// `COPY <table> TO ...`
169    Relation(ObjectName),
170    /// COPY (...query...) TO ...
171    Query(Box<Query>),
172}
173
174impl fmt::Display for CopyToSource {
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        match self {
177            CopyToSource::Relation(r) => write!(f, "{r}"),
178            CopyToSource::Query(q) => write!(f, "({q})"),
179        }
180    }
181}
182
183/// This type defines a lexicographical ordering.
184pub(crate) type LexOrdering = Vec<OrderByExpr>;
185
186/// DataFusion extension DDL for `CREATE EXTERNAL TABLE`
187///
188/// Syntax:
189///
190/// ```text
191/// CREATE EXTERNAL TABLE
192/// [ IF NOT EXISTS ]
193/// <TABLE_NAME>[ (<column_definition>) ]
194/// STORED AS <file_type>
195/// [ PARTITIONED BY (<column_definition list> | <column list>) ]
196/// [ WITH ORDER (<ordered column list>)
197/// [ OPTIONS (<key_value_list>) ]
198/// LOCATION <literal>
199///
200/// <column_definition> := (<column_name> <data_type>, ...)
201///
202/// <column_list> := (<column_name>, ...)
203///
204/// <ordered_column_list> := (<column_name> <sort_clause>, ...)
205///
206/// <key_value_list> := (<literal> <literal, <literal> <literal>, ...)
207/// ```
208#[derive(Debug, Clone, PartialEq, Eq)]
209pub struct CreateExternalTable {
210    /// Table name
211    pub name: ObjectName,
212    /// Optional schema
213    pub columns: Vec<ColumnDef>,
214    /// File type (Parquet, NDJSON, CSV, etc)
215    pub file_type: String,
216    /// Path to file
217    pub location: String,
218    /// Partition Columns
219    pub table_partition_cols: Vec<String>,
220    /// Ordered expressions
221    pub order_exprs: Vec<LexOrdering>,
222    /// Option to not error if table already exists
223    pub if_not_exists: bool,
224    /// Whether the table is a temporary table
225    pub temporary: bool,
226    /// Infinite streams?
227    pub unbounded: bool,
228    /// Table(provider) specific options
229    pub options: Vec<(String, Value)>,
230    /// A table-level constraint
231    pub constraints: Vec<TableConstraint>,
232}
233
234impl fmt::Display for CreateExternalTable {
235    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
236        write!(f, "CREATE EXTERNAL TABLE ")?;
237        if self.if_not_exists {
238            write!(f, "IF NOT EXISTS ")?;
239        }
240        write!(f, "{} ", self.name)?;
241        write!(f, "STORED AS {} ", self.file_type)?;
242        write!(f, "LOCATION {} ", self.location)
243    }
244}
245
246/// DataFusion SQL Statement.
247///
248/// This can either be a [`Statement`] from [`sqlparser`] from a
249/// standard SQL dialect, or a DataFusion extension such as `CREATE
250/// EXTERNAL TABLE`. See [`DFParser`] for more information.
251///
252/// [`Statement`]: sqlparser::ast::Statement
253#[derive(Debug, Clone, PartialEq, Eq)]
254pub enum Statement {
255    /// ANSI SQL AST node (from sqlparser-rs)
256    Statement(Box<SQLStatement>),
257    /// Extension: `CREATE EXTERNAL TABLE`
258    CreateExternalTable(CreateExternalTable),
259    /// Extension: `COPY TO`
260    CopyTo(CopyToStatement),
261    /// EXPLAIN for extensions
262    Explain(ExplainStatement),
263}
264
265impl fmt::Display for Statement {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        match self {
268            Statement::Statement(stmt) => write!(f, "{stmt}"),
269            Statement::CreateExternalTable(stmt) => write!(f, "{stmt}"),
270            Statement::CopyTo(stmt) => write!(f, "{stmt}"),
271            Statement::Explain(stmt) => write!(f, "{stmt}"),
272        }
273    }
274}
275
276fn ensure_not_set<T>(field: &Option<T>, name: &str) -> Result<(), DataFusionError> {
277    if field.is_some() {
278        parser_err!(format!("{name} specified more than once",))?
279    }
280    Ok(())
281}
282
283/// DataFusion SQL Parser based on [`sqlparser`]
284///
285/// Parses DataFusion's SQL dialect, often delegating to [`sqlparser`]'s [`Parser`].
286///
287/// DataFusion mostly follows existing SQL dialects via
288/// `sqlparser`. However, certain statements such as `COPY` and
289/// `CREATE EXTERNAL TABLE` have special syntax in DataFusion. See
290/// [`Statement`] for a list of this special syntax
291pub struct DFParser<'a> {
292    pub parser: Parser<'a>,
293    options: SqlParserOptions,
294}
295
296/// Same as `sqlparser`
297const DEFAULT_RECURSION_LIMIT: usize = 50;
298const DEFAULT_DIALECT: GenericDialect = GenericDialect {};
299
300/// Builder for [`DFParser`]
301///
302/// # Example: Create and Parse SQL statements
303/// ```
304/// # use datafusion_sql::parser::DFParserBuilder;
305/// # use datafusion_common::Result;
306/// # fn test() -> Result<()> {
307/// let mut parser = DFParserBuilder::new("SELECT * FROM foo; SELECT 1 + 2")
308///   .build()?;
309/// // parse the SQL into DFStatements
310/// let statements = parser.parse_statements()?;
311/// assert_eq!(statements.len(), 2);
312/// # Ok(())
313/// # }
314/// ```
315///
316/// # Example: Create and Parse expression with a different dialect
317/// ```
318/// # use datafusion_sql::parser::DFParserBuilder;
319/// # use datafusion_common::Result;
320/// # use datafusion_sql::sqlparser::dialect::MySqlDialect;
321/// # use datafusion_sql::sqlparser::ast::Expr;
322/// # fn test() -> Result<()> {
323/// let dialect = MySqlDialect{}; // Parse using MySQL dialect
324/// let mut parser = DFParserBuilder::new("1 + 2")
325///   .with_dialect(&dialect)
326///   .build()?;
327/// // parse 1+2 into an sqlparser::ast::Expr
328/// let res = parser.parse_expr()?;
329/// assert!(matches!(res.expr, Expr::BinaryOp {..}));
330/// # Ok(())
331/// # }
332/// ```
333pub struct DFParserBuilder<'a> {
334    /// The SQL string to parse
335    sql: &'a str,
336    /// The Dialect to use (defaults to [`GenericDialect`]
337    dialect: &'a dyn Dialect,
338    /// The recursion limit while parsing
339    recursion_limit: usize,
340}
341
342impl<'a> DFParserBuilder<'a> {
343    /// Create a new parser builder for the specified tokens using the
344    /// [`GenericDialect`].
345    pub fn new(sql: &'a str) -> Self {
346        Self {
347            sql,
348            dialect: &DEFAULT_DIALECT,
349            recursion_limit: DEFAULT_RECURSION_LIMIT,
350        }
351    }
352
353    /// Adjust the parser builder's dialect. Defaults to [`GenericDialect`]
354    pub fn with_dialect(mut self, dialect: &'a dyn Dialect) -> Self {
355        self.dialect = dialect;
356        self
357    }
358
359    /// Adjust the recursion limit of sql parsing.  Defaults to 50
360    pub fn with_recursion_limit(mut self, recursion_limit: usize) -> Self {
361        self.recursion_limit = recursion_limit;
362        self
363    }
364
365    pub fn build(self) -> Result<DFParser<'a>, DataFusionError> {
366        let mut tokenizer = Tokenizer::new(self.dialect, self.sql);
367        // Convert TokenizerError -> ParserError
368        let tokens = tokenizer
369            .tokenize_with_location()
370            .map_err(ParserError::from)?;
371
372        Ok(DFParser {
373            parser: Parser::new(self.dialect)
374                .with_tokens_with_locations(tokens)
375                .with_recursion_limit(self.recursion_limit),
376            options: SqlParserOptions {
377                recursion_limit: self.recursion_limit,
378                ..Default::default()
379            },
380        })
381    }
382}
383
384impl<'a> DFParser<'a> {
385    #[deprecated(since = "46.0.0", note = "DFParserBuilder")]
386    pub fn new(sql: &'a str) -> Result<Self, DataFusionError> {
387        DFParserBuilder::new(sql).build()
388    }
389
390    #[deprecated(since = "46.0.0", note = "DFParserBuilder")]
391    pub fn new_with_dialect(
392        sql: &'a str,
393        dialect: &'a dyn Dialect,
394    ) -> Result<Self, DataFusionError> {
395        DFParserBuilder::new(sql).with_dialect(dialect).build()
396    }
397
398    /// Parse a sql string into one or [`Statement`]s using the
399    /// [`GenericDialect`].
400    pub fn parse_sql(sql: &'a str) -> Result<VecDeque<Statement>, DataFusionError> {
401        let mut parser = DFParserBuilder::new(sql).build()?;
402
403        parser.parse_statements()
404    }
405
406    /// Parse a SQL string and produce one or more [`Statement`]s with
407    /// with the specified dialect.
408    pub fn parse_sql_with_dialect(
409        sql: &str,
410        dialect: &dyn Dialect,
411    ) -> Result<VecDeque<Statement>, DataFusionError> {
412        let mut parser = DFParserBuilder::new(sql).with_dialect(dialect).build()?;
413        parser.parse_statements()
414    }
415
416    pub fn parse_sql_into_expr_with_dialect(
417        sql: &str,
418        dialect: &dyn Dialect,
419    ) -> Result<ExprWithAlias, DataFusionError> {
420        let mut parser = DFParserBuilder::new(sql).with_dialect(dialect).build()?;
421
422        parser.parse_expr()
423    }
424
425    /// Parse a sql string into one or [`Statement`]s
426    pub fn parse_statements(&mut self) -> Result<VecDeque<Statement>, DataFusionError> {
427        let mut stmts = VecDeque::new();
428        let mut expecting_statement_delimiter = false;
429        loop {
430            // ignore empty statements (between successive statement delimiters)
431            while self.parser.consume_token(&Token::SemiColon) {
432                expecting_statement_delimiter = false;
433            }
434
435            if self.parser.peek_token() == Token::EOF {
436                break;
437            }
438            if expecting_statement_delimiter {
439                return self.expected("end of statement", self.parser.peek_token());
440            }
441
442            let statement = self.parse_statement()?;
443            stmts.push_back(statement);
444            expecting_statement_delimiter = true;
445        }
446        Ok(stmts)
447    }
448
449    /// Report an unexpected token
450    fn expected<T>(
451        &self,
452        expected: &str,
453        found: TokenWithSpan,
454    ) -> Result<T, DataFusionError> {
455        let sql_parser_span = found.span;
456        let span = Span::try_from_sqlparser_span(sql_parser_span);
457        let diagnostic = Diagnostic::new_error(
458            format!("Expected: {expected}, found: {found}{}", found.span.start),
459            span,
460        );
461        parser_err!(
462            format!("Expected: {expected}, found: {found}{}", found.span.start);
463            diagnostic=
464            diagnostic
465        )
466    }
467
468    /// Parse a new expression
469    pub fn parse_statement(&mut self) -> Result<Statement, DataFusionError> {
470        match self.parser.peek_token().token {
471            Token::Word(w) => {
472                match w.keyword {
473                    Keyword::CREATE => {
474                        self.parser.next_token(); // CREATE
475                        self.parse_create()
476                    }
477                    Keyword::COPY => {
478                        if let Token::Word(w) = self.parser.peek_nth_token(1).token {
479                            // use native parser for COPY INTO
480                            if w.keyword == Keyword::INTO {
481                                return self.parse_and_handle_statement();
482                            }
483                        }
484                        self.parser.next_token(); // COPY
485                        self.parse_copy()
486                    }
487                    Keyword::EXPLAIN => {
488                        self.parser.next_token(); // EXPLAIN
489                        self.parse_explain()
490                    }
491                    _ => {
492                        // use sqlparser-rs parser
493                        self.parse_and_handle_statement()
494                    }
495                }
496            }
497            _ => {
498                // use the native parser
499                self.parse_and_handle_statement()
500            }
501        }
502    }
503
504    pub fn parse_expr(&mut self) -> Result<ExprWithAlias, DataFusionError> {
505        if let Token::Word(w) = self.parser.peek_token().token {
506            match w.keyword {
507                Keyword::CREATE | Keyword::COPY | Keyword::EXPLAIN => {
508                    return parser_err!("Unsupported command in expression")?;
509                }
510                _ => {}
511            }
512        }
513
514        Ok(self.parser.parse_expr_with_alias()?)
515    }
516
517    /// Helper method to parse a statement and handle errors consistently, especially for recursion limits
518    fn parse_and_handle_statement(&mut self) -> Result<Statement, DataFusionError> {
519        self.parser
520            .parse_statement()
521            .map(|stmt| Statement::Statement(Box::from(stmt)))
522            .map_err(|e| match e {
523                ParserError::RecursionLimitExceeded => DataFusionError::SQL(
524                    ParserError::RecursionLimitExceeded,
525                    Some(format!(
526                        " (current limit: {})",
527                        self.options.recursion_limit
528                    )),
529                ),
530                other => DataFusionError::SQL(other, None),
531            })
532    }
533
534    /// Parse a SQL `COPY TO` statement
535    pub fn parse_copy(&mut self) -> Result<Statement, DataFusionError> {
536        // parse as a query
537        let source = if self.parser.consume_token(&Token::LParen) {
538            let query = self.parser.parse_query()?;
539            self.parser.expect_token(&Token::RParen)?;
540            CopyToSource::Query(query)
541        } else {
542            // parse as table reference
543            let table_name = self.parser.parse_object_name(true)?;
544            CopyToSource::Relation(table_name)
545        };
546
547        #[derive(Default)]
548        struct Builder {
549            stored_as: Option<String>,
550            target: Option<String>,
551            partitioned_by: Option<Vec<String>>,
552            options: Option<Vec<(String, Value)>>,
553        }
554
555        let mut builder = Builder::default();
556
557        loop {
558            if let Some(keyword) = self.parser.parse_one_of_keywords(&[
559                Keyword::STORED,
560                Keyword::TO,
561                Keyword::PARTITIONED,
562                Keyword::OPTIONS,
563                Keyword::WITH,
564            ]) {
565                match keyword {
566                    Keyword::STORED => {
567                        self.parser.expect_keyword(Keyword::AS)?;
568                        ensure_not_set(&builder.stored_as, "STORED AS")?;
569                        builder.stored_as = Some(self.parse_file_format()?);
570                    }
571                    Keyword::TO => {
572                        ensure_not_set(&builder.target, "TO")?;
573                        builder.target = Some(self.parser.parse_literal_string()?);
574                    }
575                    Keyword::WITH => {
576                        self.parser.expect_keyword(Keyword::HEADER)?;
577                        self.parser.expect_keyword(Keyword::ROW)?;
578                        return parser_err!("WITH HEADER ROW clause is no longer in use. Please use the OPTIONS clause with 'format.has_header' set appropriately, e.g., OPTIONS ('format.has_header' 'true')")?;
579                    }
580                    Keyword::PARTITIONED => {
581                        self.parser.expect_keyword(Keyword::BY)?;
582                        ensure_not_set(&builder.partitioned_by, "PARTITIONED BY")?;
583                        builder.partitioned_by = Some(self.parse_partitions()?);
584                    }
585                    Keyword::OPTIONS => {
586                        ensure_not_set(&builder.options, "OPTIONS")?;
587                        builder.options = Some(self.parse_value_options()?);
588                    }
589                    _ => {
590                        unreachable!()
591                    }
592                }
593            } else {
594                let token = self.parser.next_token();
595                if token == Token::EOF || token == Token::SemiColon {
596                    break;
597                } else {
598                    return self.expected("end of statement or ;", token)?;
599                }
600            }
601        }
602
603        let Some(target) = builder.target else {
604            return parser_err!("Missing TO clause in COPY statement")?;
605        };
606
607        Ok(Statement::CopyTo(CopyToStatement {
608            source,
609            target,
610            partitioned_by: builder.partitioned_by.unwrap_or(vec![]),
611            stored_as: builder.stored_as,
612            options: builder.options.unwrap_or(vec![]),
613        }))
614    }
615
616    /// Parse the next token as a key name for an option list
617    ///
618    /// Note this is different than [`parse_literal_string`]
619    /// because it allows keywords as well as other non words
620    ///
621    /// [`parse_literal_string`]: sqlparser::parser::Parser::parse_literal_string
622    pub fn parse_option_key(&mut self) -> Result<String, DataFusionError> {
623        let next_token = self.parser.next_token();
624        match next_token.token {
625            Token::Word(Word { value, .. }) => {
626                let mut parts = vec![value];
627                while self.parser.consume_token(&Token::Period) {
628                    let next_token = self.parser.next_token();
629                    if let Token::Word(Word { value, .. }) = next_token.token {
630                        parts.push(value);
631                    } else {
632                        // Unquoted namespaced keys have to conform to the syntax
633                        // "<WORD>[\.<WORD>]*". If we have a key that breaks this
634                        // pattern, error out:
635                        return self.expected("key name", next_token);
636                    }
637                }
638                Ok(parts.join("."))
639            }
640            Token::SingleQuotedString(s) => Ok(s),
641            Token::DoubleQuotedString(s) => Ok(s),
642            Token::EscapedStringLiteral(s) => Ok(s),
643            _ => self.expected("key name", next_token),
644        }
645    }
646
647    /// Parse the next token as a value for an option list
648    ///
649    /// Note this is different than [`parse_value`] as it allows any
650    /// word or keyword in this location.
651    ///
652    /// [`parse_value`]: sqlparser::parser::Parser::parse_value
653    pub fn parse_option_value(&mut self) -> Result<Value, DataFusionError> {
654        let next_token = self.parser.next_token();
655        match next_token.token {
656            // e.g. things like "snappy" or "gzip" that may be keywords
657            Token::Word(word) => Ok(Value::SingleQuotedString(word.value)),
658            Token::SingleQuotedString(s) => Ok(Value::SingleQuotedString(s)),
659            Token::DoubleQuotedString(s) => Ok(Value::DoubleQuotedString(s)),
660            Token::EscapedStringLiteral(s) => Ok(Value::EscapedStringLiteral(s)),
661            Token::Number(n, l) => Ok(Value::Number(n, l)),
662            _ => self.expected("string or numeric value", next_token),
663        }
664    }
665
666    /// Parse a SQL `EXPLAIN`
667    pub fn parse_explain(&mut self) -> Result<Statement, DataFusionError> {
668        let analyze = self.parser.parse_keyword(Keyword::ANALYZE);
669        let verbose = self.parser.parse_keyword(Keyword::VERBOSE);
670        let format = self.parse_explain_format()?;
671
672        let statement = self.parse_statement()?;
673
674        Ok(Statement::Explain(ExplainStatement {
675            statement: Box::new(statement),
676            analyze,
677            verbose,
678            format,
679        }))
680    }
681
682    pub fn parse_explain_format(&mut self) -> Result<Option<String>, DataFusionError> {
683        if !self.parser.parse_keyword(Keyword::FORMAT) {
684            return Ok(None);
685        }
686
687        let next_token = self.parser.next_token();
688        let format = match next_token.token {
689            Token::Word(w) => Ok(w.value),
690            Token::SingleQuotedString(w) => Ok(w),
691            Token::DoubleQuotedString(w) => Ok(w),
692            _ => self.expected("an explain format such as TREE", next_token),
693        }?;
694        Ok(Some(format))
695    }
696
697    /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE`
698    pub fn parse_create(&mut self) -> Result<Statement, DataFusionError> {
699        if self.parser.parse_keyword(Keyword::EXTERNAL) {
700            self.parse_create_external_table(false)
701        } else if self.parser.parse_keyword(Keyword::UNBOUNDED) {
702            self.parser.expect_keyword(Keyword::EXTERNAL)?;
703            self.parse_create_external_table(true)
704        } else {
705            Ok(Statement::Statement(Box::from(self.parser.parse_create()?)))
706        }
707    }
708
709    fn parse_partitions(&mut self) -> Result<Vec<String>, DataFusionError> {
710        let mut partitions: Vec<String> = vec![];
711        if !self.parser.consume_token(&Token::LParen)
712            || self.parser.consume_token(&Token::RParen)
713        {
714            return Ok(partitions);
715        }
716
717        loop {
718            if let Token::Word(_) = self.parser.peek_token().token {
719                let identifier = self.parser.parse_identifier()?;
720                partitions.push(identifier.to_string());
721            } else {
722                return self.expected("partition name", self.parser.peek_token());
723            }
724            let comma = self.parser.consume_token(&Token::Comma);
725            if self.parser.consume_token(&Token::RParen) {
726                // allow a trailing comma, even though it's not in standard
727                break;
728            } else if !comma {
729                return self.expected(
730                    "',' or ')' after partition definition",
731                    self.parser.peek_token(),
732                );
733            }
734        }
735        Ok(partitions)
736    }
737
738    /// Parse the ordering clause of a `CREATE EXTERNAL TABLE` SQL statement
739    pub fn parse_order_by_exprs(&mut self) -> Result<Vec<OrderByExpr>, DataFusionError> {
740        let mut values = vec![];
741        self.parser.expect_token(&Token::LParen)?;
742        loop {
743            values.push(self.parse_order_by_expr()?);
744            if !self.parser.consume_token(&Token::Comma) {
745                self.parser.expect_token(&Token::RParen)?;
746                return Ok(values);
747            }
748        }
749    }
750
751    /// Parse an ORDER BY sub-expression optionally followed by ASC or DESC.
752    pub fn parse_order_by_expr(&mut self) -> Result<OrderByExpr, DataFusionError> {
753        let expr = self.parser.parse_expr()?;
754
755        let asc = if self.parser.parse_keyword(Keyword::ASC) {
756            Some(true)
757        } else if self.parser.parse_keyword(Keyword::DESC) {
758            Some(false)
759        } else {
760            None
761        };
762
763        let nulls_first = if self
764            .parser
765            .parse_keywords(&[Keyword::NULLS, Keyword::FIRST])
766        {
767            Some(true)
768        } else if self.parser.parse_keywords(&[Keyword::NULLS, Keyword::LAST]) {
769            Some(false)
770        } else {
771            None
772        };
773
774        Ok(OrderByExpr {
775            expr,
776            options: OrderByOptions { asc, nulls_first },
777            with_fill: None,
778        })
779    }
780
781    // This is a copy of the equivalent implementation in sqlparser.
782    fn parse_columns(
783        &mut self,
784    ) -> Result<(Vec<ColumnDef>, Vec<TableConstraint>), DataFusionError> {
785        let mut columns = vec![];
786        let mut constraints = vec![];
787        if !self.parser.consume_token(&Token::LParen)
788            || self.parser.consume_token(&Token::RParen)
789        {
790            return Ok((columns, constraints));
791        }
792
793        loop {
794            if let Some(constraint) = self.parser.parse_optional_table_constraint()? {
795                constraints.push(constraint);
796            } else if let Token::Word(_) = self.parser.peek_token().token {
797                let column_def = self.parse_column_def()?;
798                columns.push(column_def);
799            } else {
800                return self.expected(
801                    "column name or constraint definition",
802                    self.parser.peek_token(),
803                );
804            }
805            let comma = self.parser.consume_token(&Token::Comma);
806            if self.parser.consume_token(&Token::RParen) {
807                // allow a trailing comma, even though it's not in standard
808                break;
809            } else if !comma {
810                return self.expected(
811                    "',' or ')' after column definition",
812                    self.parser.peek_token(),
813                );
814            }
815        }
816
817        Ok((columns, constraints))
818    }
819
820    fn parse_column_def(&mut self) -> Result<ColumnDef, DataFusionError> {
821        let name = self.parser.parse_identifier()?;
822        let data_type = self.parser.parse_data_type()?;
823        let mut options = vec![];
824        loop {
825            if self.parser.parse_keyword(Keyword::CONSTRAINT) {
826                let name = Some(self.parser.parse_identifier()?);
827                if let Some(option) = self.parser.parse_optional_column_option()? {
828                    options.push(ColumnOptionDef { name, option });
829                } else {
830                    return self.expected(
831                        "constraint details after CONSTRAINT <name>",
832                        self.parser.peek_token(),
833                    );
834                }
835            } else if let Some(option) = self.parser.parse_optional_column_option()? {
836                options.push(ColumnOptionDef { name: None, option });
837            } else {
838                break;
839            };
840        }
841        Ok(ColumnDef {
842            name,
843            data_type,
844            options,
845        })
846    }
847
848    fn parse_create_external_table(
849        &mut self,
850        unbounded: bool,
851    ) -> Result<Statement, DataFusionError> {
852        let temporary = self
853            .parser
854            .parse_one_of_keywords(&[Keyword::TEMP, Keyword::TEMPORARY])
855            .is_some();
856        self.parser.expect_keyword(Keyword::TABLE)?;
857        let if_not_exists =
858            self.parser
859                .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]);
860        let table_name = self.parser.parse_object_name(true)?;
861        let (mut columns, constraints) = self.parse_columns()?;
862
863        #[derive(Default)]
864        struct Builder {
865            file_type: Option<String>,
866            location: Option<String>,
867            table_partition_cols: Option<Vec<String>>,
868            order_exprs: Vec<LexOrdering>,
869            options: Option<Vec<(String, Value)>>,
870        }
871        let mut builder = Builder::default();
872
873        loop {
874            if let Some(keyword) = self.parser.parse_one_of_keywords(&[
875                Keyword::STORED,
876                Keyword::LOCATION,
877                Keyword::WITH,
878                Keyword::DELIMITER,
879                Keyword::COMPRESSION,
880                Keyword::PARTITIONED,
881                Keyword::OPTIONS,
882            ]) {
883                match keyword {
884                    Keyword::STORED => {
885                        self.parser.expect_keyword(Keyword::AS)?;
886                        ensure_not_set(&builder.file_type, "STORED AS")?;
887                        builder.file_type = Some(self.parse_file_format()?);
888                    }
889                    Keyword::LOCATION => {
890                        ensure_not_set(&builder.location, "LOCATION")?;
891                        builder.location = Some(self.parser.parse_literal_string()?);
892                    }
893                    Keyword::WITH => {
894                        if self.parser.parse_keyword(Keyword::ORDER) {
895                            builder.order_exprs.push(self.parse_order_by_exprs()?);
896                        } else {
897                            self.parser.expect_keyword(Keyword::HEADER)?;
898                            self.parser.expect_keyword(Keyword::ROW)?;
899                            return parser_err!("WITH HEADER ROW clause is no longer in use. Please use the OPTIONS clause with 'format.has_header' set appropriately, e.g., OPTIONS (format.has_header true)")?;
900                        }
901                    }
902                    Keyword::DELIMITER => {
903                        return parser_err!("DELIMITER clause is no longer in use. Please use the OPTIONS clause with 'format.delimiter' set appropriately, e.g., OPTIONS (format.delimiter ',')")?;
904                    }
905                    Keyword::COMPRESSION => {
906                        self.parser.expect_keyword(Keyword::TYPE)?;
907                        return parser_err!("COMPRESSION TYPE clause is no longer in use. Please use the OPTIONS clause with 'format.compression' set appropriately, e.g., OPTIONS (format.compression gzip)")?;
908                    }
909                    Keyword::PARTITIONED => {
910                        self.parser.expect_keyword(Keyword::BY)?;
911                        ensure_not_set(&builder.table_partition_cols, "PARTITIONED BY")?;
912                        // Expects either list of column names (col_name [, col_name]*)
913                        // or list of column definitions (col_name datatype [, col_name datatype]* )
914                        // use the token after the name to decide which parsing rule to use
915                        // Note that mixing both names and definitions is not allowed
916                        let peeked = self.parser.peek_nth_token(2);
917                        if peeked == Token::Comma || peeked == Token::RParen {
918                            // List of column names
919                            builder.table_partition_cols = Some(self.parse_partitions()?)
920                        } else {
921                            // List of column defs
922                            let (cols, cons) = self.parse_columns()?;
923                            builder.table_partition_cols = Some(
924                                cols.iter().map(|col| col.name.to_string()).collect(),
925                            );
926
927                            columns.extend(cols);
928
929                            if !cons.is_empty() {
930                                return sql_err!(ParserError::ParserError(
931                                    "Constraints on Partition Columns are not supported"
932                                        .to_string(),
933                                ));
934                            }
935                        }
936                    }
937                    Keyword::OPTIONS => {
938                        ensure_not_set(&builder.options, "OPTIONS")?;
939                        builder.options = Some(self.parse_value_options()?);
940                    }
941                    _ => {
942                        unreachable!()
943                    }
944                }
945            } else {
946                let token = self.parser.next_token();
947                if token == Token::EOF || token == Token::SemiColon {
948                    break;
949                } else {
950                    return self.expected("end of statement or ;", token)?;
951                }
952            }
953        }
954
955        // Validations: location and file_type are required
956        if builder.file_type.is_none() {
957            return sql_err!(ParserError::ParserError(
958                "Missing STORED AS clause in CREATE EXTERNAL TABLE statement".into(),
959            ));
960        }
961        if builder.location.is_none() {
962            return sql_err!(ParserError::ParserError(
963                "Missing LOCATION clause in CREATE EXTERNAL TABLE statement".into(),
964            ));
965        }
966
967        let create = CreateExternalTable {
968            name: table_name,
969            columns,
970            file_type: builder.file_type.unwrap(),
971            location: builder.location.unwrap(),
972            table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]),
973            order_exprs: builder.order_exprs,
974            if_not_exists,
975            temporary,
976            unbounded,
977            options: builder.options.unwrap_or(Vec::new()),
978            constraints,
979        };
980        Ok(Statement::CreateExternalTable(create))
981    }
982
983    /// Parses the set of valid formats
984    fn parse_file_format(&mut self) -> Result<String, DataFusionError> {
985        let token = self.parser.next_token();
986        match &token.token {
987            Token::Word(w) => parse_file_type(&w.value),
988            _ => self.expected("one of ARROW, PARQUET, NDJSON, or CSV", token),
989        }
990    }
991
992    /// Parses (key value) style options into a map of String --> [`Value`].
993    ///
994    /// This method supports keywords as key names as well as multiple
995    /// value types such as Numbers as well as Strings.
996    fn parse_value_options(&mut self) -> Result<Vec<(String, Value)>, DataFusionError> {
997        let mut options = vec![];
998        self.parser.expect_token(&Token::LParen)?;
999
1000        loop {
1001            let key = self.parse_option_key()?;
1002            let value = self.parse_option_value()?;
1003            options.push((key, value));
1004            let comma = self.parser.consume_token(&Token::Comma);
1005            if self.parser.consume_token(&Token::RParen) {
1006                // Allow a trailing comma, even though it's not in standard
1007                break;
1008            } else if !comma {
1009                return self.expected(
1010                    "',' or ')' after option definition",
1011                    self.parser.peek_token(),
1012                );
1013            }
1014        }
1015        Ok(options)
1016    }
1017}
1018
1019#[cfg(test)]
1020mod tests {
1021    use super::*;
1022    use datafusion_common::assert_contains;
1023    use sqlparser::ast::Expr::Identifier;
1024    use sqlparser::ast::{BinaryOperator, DataType, Expr, Ident};
1025    use sqlparser::dialect::SnowflakeDialect;
1026    use sqlparser::tokenizer::Span;
1027
1028    fn expect_parse_ok(sql: &str, expected: Statement) -> Result<(), DataFusionError> {
1029        let statements = DFParser::parse_sql(sql)?;
1030        assert_eq!(
1031            statements.len(),
1032            1,
1033            "Expected to parse exactly one statement"
1034        );
1035        assert_eq!(statements[0], expected, "actual:\n{:#?}", statements[0]);
1036        Ok(())
1037    }
1038
1039    /// Parses sql and asserts that the expected error message was found
1040    fn expect_parse_error(sql: &str, expected_error: &str) {
1041        match DFParser::parse_sql(sql) {
1042            Ok(statements) => {
1043                panic!(
1044                    "Expected parse error for '{sql}', but was successful: {statements:?}"
1045                );
1046            }
1047            Err(e) => {
1048                let error_message = e.to_string();
1049                assert!(
1050                    error_message.contains(expected_error),
1051                    "Expected error '{expected_error}' not found in actual error '{error_message}'"
1052                );
1053            }
1054        }
1055    }
1056
1057    fn make_column_def(name: impl Into<String>, data_type: DataType) -> ColumnDef {
1058        ColumnDef {
1059            name: Ident {
1060                value: name.into(),
1061                quote_style: None,
1062                span: Span::empty(),
1063            },
1064            data_type,
1065            options: vec![],
1066        }
1067    }
1068
1069    #[test]
1070    fn create_external_table() -> Result<(), DataFusionError> {
1071        // positive case
1072        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'";
1073        let display = None;
1074        let name = ObjectName::from(vec![Ident::from("t")]);
1075        let expected = Statement::CreateExternalTable(CreateExternalTable {
1076            name: name.clone(),
1077            columns: vec![make_column_def("c1", DataType::Int(display))],
1078            file_type: "CSV".to_string(),
1079            location: "foo.csv".into(),
1080            table_partition_cols: vec![],
1081            order_exprs: vec![],
1082            if_not_exists: false,
1083            temporary: false,
1084            unbounded: false,
1085            options: vec![],
1086            constraints: vec![],
1087        });
1088        expect_parse_ok(sql, expected)?;
1089
1090        // positive case: leading space
1091        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'     ";
1092        let expected = Statement::CreateExternalTable(CreateExternalTable {
1093            name: name.clone(),
1094            columns: vec![make_column_def("c1", DataType::Int(None))],
1095            file_type: "CSV".to_string(),
1096            location: "foo.csv".into(),
1097            table_partition_cols: vec![],
1098            order_exprs: vec![],
1099            if_not_exists: false,
1100            temporary: false,
1101            unbounded: false,
1102            options: vec![],
1103            constraints: vec![],
1104        });
1105        expect_parse_ok(sql, expected)?;
1106
1107        // positive case: leading space + semicolon
1108        let sql =
1109            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv'      ;";
1110        let expected = Statement::CreateExternalTable(CreateExternalTable {
1111            name: name.clone(),
1112            columns: vec![make_column_def("c1", DataType::Int(None))],
1113            file_type: "CSV".to_string(),
1114            location: "foo.csv".into(),
1115            table_partition_cols: vec![],
1116            order_exprs: vec![],
1117            if_not_exists: false,
1118            temporary: false,
1119            unbounded: false,
1120            options: vec![],
1121            constraints: vec![],
1122        });
1123        expect_parse_ok(sql, expected)?;
1124
1125        // positive case with delimiter
1126        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS (format.delimiter '|')";
1127        let display = None;
1128        let expected = Statement::CreateExternalTable(CreateExternalTable {
1129            name: name.clone(),
1130            columns: vec![make_column_def("c1", DataType::Int(display))],
1131            file_type: "CSV".to_string(),
1132            location: "foo.csv".into(),
1133            table_partition_cols: vec![],
1134            order_exprs: vec![],
1135            if_not_exists: false,
1136            temporary: false,
1137            unbounded: false,
1138            options: vec![(
1139                "format.delimiter".into(),
1140                Value::SingleQuotedString("|".into()),
1141            )],
1142            constraints: vec![],
1143        });
1144        expect_parse_ok(sql, expected)?;
1145
1146        // positive case: partitioned by
1147        let sql = "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1, p2) LOCATION 'foo.csv'";
1148        let display = None;
1149        let expected = Statement::CreateExternalTable(CreateExternalTable {
1150            name: name.clone(),
1151            columns: vec![make_column_def("c1", DataType::Int(display))],
1152            file_type: "CSV".to_string(),
1153            location: "foo.csv".into(),
1154            table_partition_cols: vec!["p1".to_string(), "p2".to_string()],
1155            order_exprs: vec![],
1156            if_not_exists: false,
1157            temporary: false,
1158            unbounded: false,
1159            options: vec![],
1160            constraints: vec![],
1161        });
1162        expect_parse_ok(sql, expected)?;
1163
1164        // positive case: it is ok for sql stmt with `COMPRESSION TYPE GZIP` tokens
1165        let sqls =
1166            vec![
1167             ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
1168             ('format.compression' 'GZIP')", "GZIP"),
1169             ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
1170             ('format.compression' 'BZIP2')", "BZIP2"),
1171             ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
1172             ('format.compression' 'XZ')", "XZ"),
1173             ("CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV LOCATION 'foo.csv' OPTIONS
1174             ('format.compression' 'ZSTD')", "ZSTD"),
1175         ];
1176        for (sql, compression) in sqls {
1177            let expected = Statement::CreateExternalTable(CreateExternalTable {
1178                name: name.clone(),
1179                columns: vec![make_column_def("c1", DataType::Int(display))],
1180                file_type: "CSV".to_string(),
1181                location: "foo.csv".into(),
1182                table_partition_cols: vec![],
1183                order_exprs: vec![],
1184                if_not_exists: false,
1185                temporary: false,
1186                unbounded: false,
1187                options: vec![(
1188                    "format.compression".into(),
1189                    Value::SingleQuotedString(compression.into()),
1190                )],
1191                constraints: vec![],
1192            });
1193            expect_parse_ok(sql, expected)?;
1194        }
1195
1196        // positive case: it is ok for parquet files not to have columns specified
1197        let sql = "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'";
1198        let expected = Statement::CreateExternalTable(CreateExternalTable {
1199            name: name.clone(),
1200            columns: vec![],
1201            file_type: "PARQUET".to_string(),
1202            location: "foo.parquet".into(),
1203            table_partition_cols: vec![],
1204            order_exprs: vec![],
1205            if_not_exists: false,
1206            temporary: false,
1207            unbounded: false,
1208            options: vec![],
1209            constraints: vec![],
1210        });
1211        expect_parse_ok(sql, expected)?;
1212
1213        // positive case: it is ok for parquet files to be other than upper case
1214        let sql = "CREATE EXTERNAL TABLE t STORED AS parqueT LOCATION 'foo.parquet'";
1215        let expected = Statement::CreateExternalTable(CreateExternalTable {
1216            name: name.clone(),
1217            columns: vec![],
1218            file_type: "PARQUET".to_string(),
1219            location: "foo.parquet".into(),
1220            table_partition_cols: vec![],
1221            order_exprs: vec![],
1222            if_not_exists: false,
1223            temporary: false,
1224            unbounded: false,
1225            options: vec![],
1226            constraints: vec![],
1227        });
1228        expect_parse_ok(sql, expected)?;
1229
1230        // positive case: it is ok for avro files not to have columns specified
1231        let sql = "CREATE EXTERNAL TABLE t STORED AS AVRO LOCATION 'foo.avro'";
1232        let expected = Statement::CreateExternalTable(CreateExternalTable {
1233            name: name.clone(),
1234            columns: vec![],
1235            file_type: "AVRO".to_string(),
1236            location: "foo.avro".into(),
1237            table_partition_cols: vec![],
1238            order_exprs: vec![],
1239            if_not_exists: false,
1240            temporary: false,
1241            unbounded: false,
1242            options: vec![],
1243            constraints: vec![],
1244        });
1245        expect_parse_ok(sql, expected)?;
1246
1247        // positive case: it is ok for avro files not to have columns specified
1248        let sql =
1249            "CREATE EXTERNAL TABLE IF NOT EXISTS t STORED AS PARQUET LOCATION 'foo.parquet'";
1250        let expected = Statement::CreateExternalTable(CreateExternalTable {
1251            name: name.clone(),
1252            columns: vec![],
1253            file_type: "PARQUET".to_string(),
1254            location: "foo.parquet".into(),
1255            table_partition_cols: vec![],
1256            order_exprs: vec![],
1257            if_not_exists: true,
1258            temporary: false,
1259            unbounded: false,
1260            options: vec![],
1261            constraints: vec![],
1262        });
1263        expect_parse_ok(sql, expected)?;
1264
1265        // positive case: column definition allowed in 'partition by' clause
1266        let sql =
1267            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int) LOCATION 'foo.csv'";
1268        let expected = Statement::CreateExternalTable(CreateExternalTable {
1269            name: name.clone(),
1270            columns: vec![
1271                make_column_def("c1", DataType::Int(None)),
1272                make_column_def("p1", DataType::Int(None)),
1273            ],
1274            file_type: "CSV".to_string(),
1275            location: "foo.csv".into(),
1276            table_partition_cols: vec!["p1".to_string()],
1277            order_exprs: vec![],
1278            if_not_exists: false,
1279            temporary: false,
1280            unbounded: false,
1281            options: vec![],
1282            constraints: vec![],
1283        });
1284        expect_parse_ok(sql, expected)?;
1285
1286        // negative case: mixed column defs and column names in `PARTITIONED BY` clause
1287        let sql =
1288            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (p1 int, c1) LOCATION 'foo.csv'";
1289        expect_parse_error(
1290            sql,
1291            "SQL error: ParserError(\"Expected: a data type name, found: ) at Line: 1, Column: 73\")",
1292        );
1293
1294        // negative case: mixed column defs and column names in `PARTITIONED BY` clause
1295        let sql =
1296            "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV PARTITIONED BY (c1, p1 int) LOCATION 'foo.csv'";
1297        expect_parse_error(sql, "SQL error: ParserError(\"Expected: ',' or ')' after partition definition, found: int at Line: 1, Column: 70\")");
1298
1299        // positive case: additional options (one entry) can be specified
1300        let sql =
1301            "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1') LOCATION 'blahblah'";
1302        let expected = Statement::CreateExternalTable(CreateExternalTable {
1303            name: name.clone(),
1304            columns: vec![],
1305            file_type: "X".to_string(),
1306            location: "blahblah".into(),
1307            table_partition_cols: vec![],
1308            order_exprs: vec![],
1309            if_not_exists: false,
1310            temporary: false,
1311            unbounded: false,
1312            options: vec![("k1".into(), Value::SingleQuotedString("v1".into()))],
1313            constraints: vec![],
1314        });
1315        expect_parse_ok(sql, expected)?;
1316
1317        // positive case: additional options (multiple entries) can be specified
1318        let sql =
1319            "CREATE EXTERNAL TABLE t STORED AS x OPTIONS ('k1' 'v1', k2 v2) LOCATION 'blahblah'";
1320        let expected = Statement::CreateExternalTable(CreateExternalTable {
1321            name: name.clone(),
1322            columns: vec![],
1323            file_type: "X".to_string(),
1324            location: "blahblah".into(),
1325            table_partition_cols: vec![],
1326            order_exprs: vec![],
1327            if_not_exists: false,
1328            temporary: false,
1329            unbounded: false,
1330            options: vec![
1331                ("k1".into(), Value::SingleQuotedString("v1".into())),
1332                ("k2".into(), Value::SingleQuotedString("v2".into())),
1333            ],
1334            constraints: vec![],
1335        });
1336        expect_parse_ok(sql, expected)?;
1337
1338        // Ordered Col
1339        let sqls = ["CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1) LOCATION 'foo.csv'",
1340                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 NULLS FIRST) LOCATION 'foo.csv'",
1341                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 NULLS LAST) LOCATION 'foo.csv'",
1342                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 ASC) LOCATION 'foo.csv'",
1343                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 DESC) LOCATION 'foo.csv'",
1344                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 DESC NULLS FIRST) LOCATION 'foo.csv'",
1345                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 DESC NULLS LAST) LOCATION 'foo.csv'",
1346                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 ASC NULLS FIRST) LOCATION 'foo.csv'",
1347                        "CREATE EXTERNAL TABLE t(c1 int) STORED AS CSV WITH ORDER (c1 ASC NULLS LAST) LOCATION 'foo.csv'"];
1348        let expected = vec![
1349            (None, None),
1350            (None, Some(true)),
1351            (None, Some(false)),
1352            (Some(true), None),
1353            (Some(false), None),
1354            (Some(false), Some(true)),
1355            (Some(false), Some(false)),
1356            (Some(true), Some(true)),
1357            (Some(true), Some(false)),
1358        ];
1359        for (sql, (asc, nulls_first)) in sqls.iter().zip(expected.into_iter()) {
1360            let expected = Statement::CreateExternalTable(CreateExternalTable {
1361                name: name.clone(),
1362                columns: vec![make_column_def("c1", DataType::Int(None))],
1363                file_type: "CSV".to_string(),
1364                location: "foo.csv".into(),
1365                table_partition_cols: vec![],
1366                order_exprs: vec![vec![OrderByExpr {
1367                    expr: Identifier(Ident {
1368                        value: "c1".to_owned(),
1369                        quote_style: None,
1370                        span: Span::empty(),
1371                    }),
1372                    options: OrderByOptions { asc, nulls_first },
1373                    with_fill: None,
1374                }]],
1375                if_not_exists: false,
1376                temporary: false,
1377                unbounded: false,
1378                options: vec![],
1379                constraints: vec![],
1380            });
1381            expect_parse_ok(sql, expected)?;
1382        }
1383
1384        // Ordered Col
1385        let sql = "CREATE EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV WITH ORDER (c1 ASC, c2 DESC NULLS FIRST) LOCATION 'foo.csv'";
1386        let display = None;
1387        let expected = Statement::CreateExternalTable(CreateExternalTable {
1388            name: name.clone(),
1389            columns: vec![
1390                make_column_def("c1", DataType::Int(display)),
1391                make_column_def("c2", DataType::Int(display)),
1392            ],
1393            file_type: "CSV".to_string(),
1394            location: "foo.csv".into(),
1395            table_partition_cols: vec![],
1396            order_exprs: vec![vec![
1397                OrderByExpr {
1398                    expr: Identifier(Ident {
1399                        value: "c1".to_owned(),
1400                        quote_style: None,
1401                        span: Span::empty(),
1402                    }),
1403                    options: OrderByOptions {
1404                        asc: Some(true),
1405                        nulls_first: None,
1406                    },
1407                    with_fill: None,
1408                },
1409                OrderByExpr {
1410                    expr: Identifier(Ident {
1411                        value: "c2".to_owned(),
1412                        quote_style: None,
1413                        span: Span::empty(),
1414                    }),
1415                    options: OrderByOptions {
1416                        asc: Some(false),
1417                        nulls_first: Some(true),
1418                    },
1419                    with_fill: None,
1420                },
1421            ]],
1422            if_not_exists: false,
1423            temporary: false,
1424            unbounded: false,
1425            options: vec![],
1426            constraints: vec![],
1427        });
1428        expect_parse_ok(sql, expected)?;
1429
1430        // Ordered Binary op
1431        let sql = "CREATE EXTERNAL TABLE t(c1 int, c2 int) STORED AS CSV WITH ORDER (c1 - c2 ASC) LOCATION 'foo.csv'";
1432        let display = None;
1433        let expected = Statement::CreateExternalTable(CreateExternalTable {
1434            name: name.clone(),
1435            columns: vec![
1436                make_column_def("c1", DataType::Int(display)),
1437                make_column_def("c2", DataType::Int(display)),
1438            ],
1439            file_type: "CSV".to_string(),
1440            location: "foo.csv".into(),
1441            table_partition_cols: vec![],
1442            order_exprs: vec![vec![OrderByExpr {
1443                expr: Expr::BinaryOp {
1444                    left: Box::new(Identifier(Ident {
1445                        value: "c1".to_owned(),
1446                        quote_style: None,
1447                        span: Span::empty(),
1448                    })),
1449                    op: BinaryOperator::Minus,
1450                    right: Box::new(Identifier(Ident {
1451                        value: "c2".to_owned(),
1452                        quote_style: None,
1453                        span: Span::empty(),
1454                    })),
1455                },
1456                options: OrderByOptions {
1457                    asc: Some(true),
1458                    nulls_first: None,
1459                },
1460                with_fill: None,
1461            }]],
1462            if_not_exists: false,
1463            temporary: false,
1464            unbounded: false,
1465            options: vec![],
1466            constraints: vec![],
1467        });
1468        expect_parse_ok(sql, expected)?;
1469
1470        // Most complete CREATE EXTERNAL TABLE statement possible
1471        let sql = "
1472            CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float)
1473            STORED AS PARQUET
1474            WITH ORDER (c1 - c2 ASC)
1475            PARTITIONED BY (c1)
1476            LOCATION 'foo.parquet'
1477            OPTIONS ('format.compression' 'zstd',
1478                     'format.delimiter' '*',
1479                     'ROW_GROUP_SIZE' '1024',
1480                     'TRUNCATE' 'NO',
1481                     'format.has_header' 'true')";
1482        let expected = Statement::CreateExternalTable(CreateExternalTable {
1483            name: name.clone(),
1484            columns: vec![
1485                make_column_def("c1", DataType::Int(None)),
1486                make_column_def("c2", DataType::Float(None)),
1487            ],
1488            file_type: "PARQUET".to_string(),
1489            location: "foo.parquet".into(),
1490            table_partition_cols: vec!["c1".into()],
1491            order_exprs: vec![vec![OrderByExpr {
1492                expr: Expr::BinaryOp {
1493                    left: Box::new(Identifier(Ident {
1494                        value: "c1".to_owned(),
1495                        quote_style: None,
1496                        span: Span::empty(),
1497                    })),
1498                    op: BinaryOperator::Minus,
1499                    right: Box::new(Identifier(Ident {
1500                        value: "c2".to_owned(),
1501                        quote_style: None,
1502                        span: Span::empty(),
1503                    })),
1504                },
1505                options: OrderByOptions {
1506                    asc: Some(true),
1507                    nulls_first: None,
1508                },
1509                with_fill: None,
1510            }]],
1511            if_not_exists: true,
1512            temporary: false,
1513            unbounded: true,
1514            options: vec![
1515                (
1516                    "format.compression".into(),
1517                    Value::SingleQuotedString("zstd".into()),
1518                ),
1519                (
1520                    "format.delimiter".into(),
1521                    Value::SingleQuotedString("*".into()),
1522                ),
1523                (
1524                    "ROW_GROUP_SIZE".into(),
1525                    Value::SingleQuotedString("1024".into()),
1526                ),
1527                ("TRUNCATE".into(), Value::SingleQuotedString("NO".into())),
1528                (
1529                    "format.has_header".into(),
1530                    Value::SingleQuotedString("true".into()),
1531                ),
1532            ],
1533            constraints: vec![],
1534        });
1535        expect_parse_ok(sql, expected)?;
1536
1537        // For error cases, see: `create_external_table.slt`
1538
1539        Ok(())
1540    }
1541
1542    #[test]
1543    fn copy_to_table_to_table() -> Result<(), DataFusionError> {
1544        // positive case
1545        let sql = "COPY foo TO bar STORED AS CSV";
1546        let expected = Statement::CopyTo(CopyToStatement {
1547            source: object_name("foo"),
1548            target: "bar".to_string(),
1549            partitioned_by: vec![],
1550            stored_as: Some("CSV".to_owned()),
1551            options: vec![],
1552        });
1553
1554        assert_eq!(verified_stmt(sql), expected);
1555        Ok(())
1556    }
1557
1558    #[test]
1559    fn skip_copy_into_snowflake() -> Result<(), DataFusionError> {
1560        let sql = "COPY INTO foo FROM @~/staged FILE_FORMAT = (FORMAT_NAME = 'mycsv');";
1561        let dialect = Box::new(SnowflakeDialect);
1562        let statements = DFParser::parse_sql_with_dialect(sql, dialect.as_ref())?;
1563
1564        assert_eq!(
1565            statements.len(),
1566            1,
1567            "Expected to parse exactly one statement"
1568        );
1569        if let Statement::CopyTo(_) = &statements[0] {
1570            panic!("Expected non COPY TO statement, but was successful: {statements:?}");
1571        }
1572        Ok(())
1573    }
1574
1575    #[test]
1576    fn explain_copy_to_table_to_table() -> Result<(), DataFusionError> {
1577        let cases = vec![
1578            ("EXPLAIN COPY foo TO bar STORED AS PARQUET", false, false),
1579            (
1580                "EXPLAIN ANALYZE COPY foo TO bar STORED AS PARQUET",
1581                true,
1582                false,
1583            ),
1584            (
1585                "EXPLAIN VERBOSE COPY foo TO bar STORED AS PARQUET",
1586                false,
1587                true,
1588            ),
1589            (
1590                "EXPLAIN ANALYZE VERBOSE COPY foo TO bar STORED AS PARQUET",
1591                true,
1592                true,
1593            ),
1594        ];
1595        for (sql, analyze, verbose) in cases {
1596            println!("sql: {sql}, analyze: {analyze}, verbose: {verbose}");
1597
1598            let expected_copy = Statement::CopyTo(CopyToStatement {
1599                source: object_name("foo"),
1600                target: "bar".to_string(),
1601                partitioned_by: vec![],
1602                stored_as: Some("PARQUET".to_owned()),
1603                options: vec![],
1604            });
1605            let expected = Statement::Explain(ExplainStatement {
1606                analyze,
1607                verbose,
1608                format: None,
1609                statement: Box::new(expected_copy),
1610            });
1611            assert_eq!(verified_stmt(sql), expected);
1612        }
1613        Ok(())
1614    }
1615
1616    #[test]
1617    fn copy_to_query_to_table() -> Result<(), DataFusionError> {
1618        let statement = verified_stmt("SELECT 1");
1619
1620        // unwrap the various layers
1621        let statement = if let Statement::Statement(statement) = statement {
1622            *statement
1623        } else {
1624            panic!("Expected statement, got {statement:?}");
1625        };
1626
1627        let query = if let SQLStatement::Query(query) = statement {
1628            query
1629        } else {
1630            panic!("Expected query, got {statement:?}");
1631        };
1632
1633        let sql =
1634            "COPY (SELECT 1) TO bar STORED AS CSV OPTIONS ('format.has_header' 'true')";
1635        let expected = Statement::CopyTo(CopyToStatement {
1636            source: CopyToSource::Query(query),
1637            target: "bar".to_string(),
1638            partitioned_by: vec![],
1639            stored_as: Some("CSV".to_owned()),
1640            options: vec![(
1641                "format.has_header".into(),
1642                Value::SingleQuotedString("true".into()),
1643            )],
1644        });
1645        assert_eq!(verified_stmt(sql), expected);
1646        Ok(())
1647    }
1648
1649    #[test]
1650    fn copy_to_options() -> Result<(), DataFusionError> {
1651        let sql = "COPY foo TO bar STORED AS CSV OPTIONS ('row_group_size' '55')";
1652        let expected = Statement::CopyTo(CopyToStatement {
1653            source: object_name("foo"),
1654            target: "bar".to_string(),
1655            partitioned_by: vec![],
1656            stored_as: Some("CSV".to_owned()),
1657            options: vec![(
1658                "row_group_size".to_string(),
1659                Value::SingleQuotedString("55".to_string()),
1660            )],
1661        });
1662        assert_eq!(verified_stmt(sql), expected);
1663        Ok(())
1664    }
1665
1666    #[test]
1667    fn copy_to_partitioned_by() -> Result<(), DataFusionError> {
1668        let sql = "COPY foo TO bar STORED AS CSV PARTITIONED BY (a) OPTIONS ('row_group_size' '55')";
1669        let expected = Statement::CopyTo(CopyToStatement {
1670            source: object_name("foo"),
1671            target: "bar".to_string(),
1672            partitioned_by: vec!["a".to_string()],
1673            stored_as: Some("CSV".to_owned()),
1674            options: vec![(
1675                "row_group_size".to_string(),
1676                Value::SingleQuotedString("55".to_string()),
1677            )],
1678        });
1679        assert_eq!(verified_stmt(sql), expected);
1680        Ok(())
1681    }
1682
1683    #[test]
1684    fn copy_to_multi_options() -> Result<(), DataFusionError> {
1685        // order of options is preserved
1686        let sql =
1687            "COPY foo TO bar STORED AS parquet OPTIONS ('format.row_group_size' 55, 'format.compression' snappy, 'execution.keep_partition_by_columns' true)";
1688
1689        let expected_options = vec![
1690            (
1691                "format.row_group_size".to_string(),
1692                Value::Number("55".to_string(), false),
1693            ),
1694            (
1695                "format.compression".to_string(),
1696                Value::SingleQuotedString("snappy".to_string()),
1697            ),
1698            (
1699                "execution.keep_partition_by_columns".to_string(),
1700                Value::SingleQuotedString("true".to_string()),
1701            ),
1702        ];
1703
1704        let mut statements = DFParser::parse_sql(sql).unwrap();
1705        assert_eq!(statements.len(), 1);
1706        let only_statement = statements.pop_front().unwrap();
1707
1708        let options = if let Statement::CopyTo(copy_to) = only_statement {
1709            copy_to.options
1710        } else {
1711            panic!("Expected copy");
1712        };
1713
1714        assert_eq!(options, expected_options);
1715
1716        Ok(())
1717    }
1718
1719    // For error cases, see: `copy.slt`
1720
1721    fn object_name(name: &str) -> CopyToSource {
1722        CopyToSource::Relation(ObjectName::from(vec![Ident::new(name)]))
1723    }
1724
1725    // Based on  sqlparser-rs
1726    // https://github.com/sqlparser-rs/sqlparser-rs/blob/ae3b5844c839072c235965fe0d1bddc473dced87/src/test_utils.rs#L104-L116
1727
1728    /// Ensures that `sql` parses as a single [Statement]
1729    ///
1730    /// If `canonical` is non empty,this function additionally asserts
1731    /// that:
1732    ///
1733    /// 1. parsing `sql` results in the same [`Statement`] as parsing
1734    ///    `canonical`.
1735    ///
1736    /// 2. re-serializing the result of parsing `sql` produces the same
1737    ///    `canonical` sql string
1738    fn one_statement_parses_to(sql: &str, canonical: &str) -> Statement {
1739        let mut statements = DFParser::parse_sql(sql).unwrap();
1740        assert_eq!(statements.len(), 1);
1741
1742        if sql != canonical {
1743            assert_eq!(DFParser::parse_sql(canonical).unwrap(), statements);
1744        }
1745
1746        let only_statement = statements.pop_front().unwrap();
1747        assert_eq!(
1748            canonical.to_uppercase(),
1749            only_statement.to_string().to_uppercase()
1750        );
1751        only_statement
1752    }
1753
1754    /// Ensures that `sql` parses as a single [Statement], and that
1755    /// re-serializing the parse result produces the same `sql`
1756    /// string (is not modified after a serialization round-trip).
1757    fn verified_stmt(sql: &str) -> Statement {
1758        one_statement_parses_to(sql, sql)
1759    }
1760
1761    #[test]
1762    /// Checks the recursion limit works for sql queries
1763    /// Recursion can happen easily with binary exprs (i.e, AND or OR)
1764    fn test_recursion_limit() {
1765        let sql = "SELECT 1 OR 2";
1766
1767        // Expect parse to succeed
1768        DFParserBuilder::new(sql)
1769            .build()
1770            .unwrap()
1771            .parse_statements()
1772            .unwrap();
1773
1774        let err = DFParserBuilder::new(sql)
1775            .with_recursion_limit(1)
1776            .build()
1777            .unwrap()
1778            .parse_statements()
1779            .unwrap_err();
1780
1781        assert_contains!(
1782            err.to_string(),
1783            "SQL error: RecursionLimitExceeded (current limit: 1)"
1784        );
1785    }
1786}