Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12/// INTERVAL arithmetic rewriter for BIGINT timestamp columns
13pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
17pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29    WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36/// Parses SQL with streaming extensions.
37///
38/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
39/// for structured parsing. Standard SQL is delegated to sqlparser directly.
40///
41/// # Errors
42///
43/// Returns `ParseError` if the SQL syntax is invalid.
44pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48/// Parser for streaming SQL extensions.
49///
50/// Provides static methods for parsing streaming SQL statements.
51/// Uses sqlparser's `Parser` API internally for structured parsing
52/// of identifiers, data types, expressions, and queries.
53pub struct StreamingParser;
54
55impl StreamingParser {
56    /// Parse a SQL string with streaming extensions.
57    ///
58    /// Tokenizes the input to detect statement type, then routes to the
59    /// appropriate parser:
60    /// - CREATE SOURCE → `source_parser`
61    /// - CREATE SINK → `sink_parser`
62    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
63    /// - Everything else → `sqlparser::parser::Parser`
64    ///
65    /// # Errors
66    ///
67    /// Returns `ParserError` if the SQL syntax is invalid.
68    #[allow(clippy::too_many_lines)]
69    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70        let sql_trimmed = sql.trim();
71        if sql_trimmed.is_empty() {
72            return Err(sqlparser::parser::ParserError::ParserError(
73                "Empty SQL statement".to_string(),
74            ));
75        }
76
77        let dialect = LaminarDialect::default();
78
79        // Tokenize to detect statement type (with location for better errors)
80        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81            .tokenize_with_location()
82            .map_err(|e| {
83                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84            })?;
85
86        // Route based on token-level detection
87        match detect_streaming_ddl(&tokens) {
88            StreamingDdlKind::CreateSource { .. } => {
89                let mut parser =
90                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91                let source = source_parser::parse_create_source(&mut parser)
92                    .map_err(parse_error_to_parser_error)?;
93                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94            }
95            StreamingDdlKind::CreateSink { .. } => {
96                let mut parser =
97                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98                let sink = sink_parser::parse_create_sink(&mut parser)
99                    .map_err(parse_error_to_parser_error)?;
100                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101            }
102            StreamingDdlKind::CreateContinuousQuery { .. } => {
103                let mut parser =
104                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106                    .map_err(parse_error_to_parser_error)?;
107                Ok(vec![stmt])
108            }
109            StreamingDdlKind::DropSource { .. } => {
110                let mut parser =
111                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113                Ok(vec![stmt])
114            }
115            StreamingDdlKind::DropSink { .. } => {
116                let mut parser =
117                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119                Ok(vec![stmt])
120            }
121            StreamingDdlKind::DropMaterializedView { .. } => {
122                let mut parser =
123                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124                let stmt = parse_drop_materialized_view(&mut parser)
125                    .map_err(parse_error_to_parser_error)?;
126                Ok(vec![stmt])
127            }
128            StreamingDdlKind::ShowSources => {
129                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130            }
131            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132            StreamingDdlKind::ShowQueries => {
133                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134            }
135            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136                ShowCommand::MaterializedViews,
137            )]),
138            StreamingDdlKind::DescribeSource => {
139                let mut parser =
140                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142                Ok(vec![stmt])
143            }
144            StreamingDdlKind::ExplainStreaming => {
145                let mut parser =
146                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147                let stmt =
148                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149                Ok(vec![stmt])
150            }
151            StreamingDdlKind::CreateMaterializedView { .. } => {
152                let mut parser =
153                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154                let stmt = parse_create_materialized_view(&mut parser)
155                    .map_err(parse_error_to_parser_error)?;
156                Ok(vec![stmt])
157            }
158            StreamingDdlKind::CreateStream { .. } => {
159                let mut parser =
160                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161                let stmt = parse_create_stream(&mut parser, sql_trimmed)
162                    .map_err(parse_error_to_parser_error)?;
163                Ok(vec![stmt])
164            }
165            StreamingDdlKind::DropStream { .. } => {
166                let mut parser =
167                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169                Ok(vec![stmt])
170            }
171            StreamingDdlKind::ShowStreams => {
172                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173            }
174            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175            StreamingDdlKind::CreateLookupTable { .. } => {
176                let mut parser =
177                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178                let lt = lookup_table::parse_create_lookup_table(&mut parser)
179                    .map_err(parse_error_to_parser_error)?;
180                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181            }
182            StreamingDdlKind::DropLookupTable { .. } => {
183                let mut parser =
184                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186                    .map_err(parse_error_to_parser_error)?;
187                Ok(vec![StreamingStatement::DropLookupTable {
188                    name,
189                    if_exists,
190                }])
191            }
192            StreamingDdlKind::AlterSource => {
193                let mut parser =
194                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196                Ok(vec![stmt])
197            }
198            StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199                ShowCommand::CheckpointStatus,
200            )]),
201            StreamingDdlKind::ShowCreateSource => {
202                let mut parser =
203                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204                let stmt =
205                    parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206                Ok(vec![stmt])
207            }
208            StreamingDdlKind::ShowCreateSink => {
209                let mut parser =
210                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211                let stmt =
212                    parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213                Ok(vec![stmt])
214            }
215            StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216            StreamingDdlKind::RestoreCheckpoint => {
217                let mut parser =
218                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219                let stmt =
220                    parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
221                Ok(vec![stmt])
222            }
223            StreamingDdlKind::None => {
224                // Standard SQL - check for INSERT INTO and convert
225                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
226                Ok(statements
227                    .into_iter()
228                    .map(convert_standard_statement)
229                    .collect())
230            }
231        }
232    }
233
234    /// Check if an expression contains a window function.
235    #[must_use]
236    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
237        match expr {
238            sqlparser::ast::Expr::Function(func) => {
239                if let Some(name) = func.name.0.last() {
240                    let func_name = name.to_string().to_uppercase();
241                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
242                } else {
243                    false
244                }
245            }
246            _ => false,
247        }
248    }
249
250    /// Parse EMIT clause from SQL string.
251    ///
252    /// # Errors
253    ///
254    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
255    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
256        emit_parser::parse_emit_clause_from_sql(sql)
257    }
258
259    /// Parse late data handling clause from SQL string.
260    ///
261    /// # Errors
262    ///
263    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
264    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
265        late_data_parser::parse_late_data_clause_from_sql(sql)
266    }
267}
268
269/// Convert `ParseError` to `ParserError` for backward compatibility.
270fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
271    match e {
272        ParseError::SqlParseError(pe) => pe,
273        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
274        ParseError::WindowError(msg) => {
275            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
276        }
277        ParseError::ValidationError(msg) => {
278            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
279        }
280    }
281}
282
283/// Parse a RESTORE FROM CHECKPOINT statement.
284///
285/// Syntax: `RESTORE FROM CHECKPOINT <id>`
286///
287/// # Errors
288///
289/// Returns `ParseError` if the statement syntax is invalid.
290fn parse_restore_checkpoint(
291    parser: &mut sqlparser::parser::Parser,
292) -> Result<StreamingStatement, ParseError> {
293    // Consume RESTORE
294    tokenizer::expect_custom_keyword(parser, "RESTORE")?;
295    // Consume FROM
296    if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
297        return Err(ParseError::StreamingError(
298            "Expected FROM after RESTORE".to_string(),
299        ));
300    }
301    // Consume CHECKPOINT
302    tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
303    // Parse checkpoint ID (numeric literal)
304    let token = parser.next_token();
305    match &token.token {
306        sqlparser::tokenizer::Token::Number(n, _) => {
307            let id: u64 = n
308                .parse()
309                .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
310            Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
311        }
312        other => Err(ParseError::StreamingError(format!(
313            "Expected checkpoint ID (number), found {other}"
314        ))),
315    }
316}
317
318/// Convert a standard sqlparser statement to a `StreamingStatement`.
319///
320/// Detects INSERT INTO statements and converts them to the streaming
321/// `InsertInto` variant. All other statements are wrapped as `Standard`.
322fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
323    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
324        // Extract table name from TableObject
325        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
326            let table_name = name.clone();
327            let columns = insert.columns.clone();
328
329            // Try to extract VALUES rows from source query
330            if let Some(ref source) = insert.source {
331                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
332                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
333                    return StreamingStatement::InsertInto {
334                        table_name,
335                        columns,
336                        values: rows,
337                    };
338                }
339            }
340        }
341    }
342    StreamingStatement::Standard(Box::new(stmt))
343}
344
345/// Parse a DROP SOURCE statement.
346///
347/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
348///
349/// # Errors
350///
351/// Returns `ParseError` if the statement syntax is invalid.
352fn parse_drop_source(
353    parser: &mut sqlparser::parser::Parser,
354) -> Result<StreamingStatement, ParseError> {
355    parser
356        .expect_keyword(sqlparser::keywords::Keyword::DROP)
357        .map_err(ParseError::SqlParseError)?;
358    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
359    let if_exists = parser.parse_keywords(&[
360        sqlparser::keywords::Keyword::IF,
361        sqlparser::keywords::Keyword::EXISTS,
362    ]);
363    let name = parser
364        .parse_object_name(false)
365        .map_err(ParseError::SqlParseError)?;
366    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
367    Ok(StreamingStatement::DropSource {
368        name,
369        if_exists,
370        cascade,
371    })
372}
373
374/// Parse a DROP SINK statement.
375///
376/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
377///
378/// # Errors
379///
380/// Returns `ParseError` if the statement syntax is invalid.
381fn parse_drop_sink(
382    parser: &mut sqlparser::parser::Parser,
383) -> Result<StreamingStatement, ParseError> {
384    parser
385        .expect_keyword(sqlparser::keywords::Keyword::DROP)
386        .map_err(ParseError::SqlParseError)?;
387    tokenizer::expect_custom_keyword(parser, "SINK")?;
388    let if_exists = parser.parse_keywords(&[
389        sqlparser::keywords::Keyword::IF,
390        sqlparser::keywords::Keyword::EXISTS,
391    ]);
392    let name = parser
393        .parse_object_name(false)
394        .map_err(ParseError::SqlParseError)?;
395    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
396    Ok(StreamingStatement::DropSink {
397        name,
398        if_exists,
399        cascade,
400    })
401}
402
403/// Parse a DROP MATERIALIZED VIEW statement.
404///
405/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
406///
407/// # Errors
408///
409/// Returns `ParseError` if the statement syntax is invalid.
410fn parse_drop_materialized_view(
411    parser: &mut sqlparser::parser::Parser,
412) -> Result<StreamingStatement, ParseError> {
413    parser
414        .expect_keyword(sqlparser::keywords::Keyword::DROP)
415        .map_err(ParseError::SqlParseError)?;
416    parser
417        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
418        .map_err(ParseError::SqlParseError)?;
419    parser
420        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
421        .map_err(ParseError::SqlParseError)?;
422    let if_exists = parser.parse_keywords(&[
423        sqlparser::keywords::Keyword::IF,
424        sqlparser::keywords::Keyword::EXISTS,
425    ]);
426    let name = parser
427        .parse_object_name(false)
428        .map_err(ParseError::SqlParseError)?;
429    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
430    Ok(StreamingStatement::DropMaterializedView {
431        name,
432        if_exists,
433        cascade,
434    })
435}
436
437/// Parse a CREATE STREAM statement.
438///
439/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
440///
441/// # Errors
442///
443/// Returns `ParseError` if the statement syntax is invalid.
444fn parse_create_stream(
445    parser: &mut sqlparser::parser::Parser,
446    _original_sql: &str,
447) -> Result<StreamingStatement, ParseError> {
448    parser
449        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
450        .map_err(ParseError::SqlParseError)?;
451
452    let or_replace = parser.parse_keywords(&[
453        sqlparser::keywords::Keyword::OR,
454        sqlparser::keywords::Keyword::REPLACE,
455    ]);
456
457    tokenizer::expect_custom_keyword(parser, "STREAM")?;
458
459    let if_not_exists = parser.parse_keywords(&[
460        sqlparser::keywords::Keyword::IF,
461        sqlparser::keywords::Keyword::NOT,
462        sqlparser::keywords::Keyword::EXISTS,
463    ]);
464
465    let name = parser
466        .parse_object_name(false)
467        .map_err(ParseError::SqlParseError)?;
468
469    parser
470        .expect_keyword(sqlparser::keywords::Keyword::AS)
471        .map_err(ParseError::SqlParseError)?;
472
473    // Collect remaining tokens and split at EMIT boundary
474    let remaining = collect_remaining_tokens(parser);
475    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
476
477    let stream_dialect = LaminarDialect::default();
478
479    let query = if query_tokens.is_empty() {
480        return Err(ParseError::StreamingError(
481            "Expected SELECT query after AS".to_string(),
482        ));
483    } else {
484        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485            .with_tokens_with_locations(query_tokens);
486        query_parser
487            .parse_query()
488            .map_err(ParseError::SqlParseError)?
489    };
490
491    let query_stmt =
492        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494    let emit_clause = if emit_tokens.is_empty() {
495        None
496    } else {
497        let mut emit_parser =
498            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499        emit_parser::parse_emit_clause(&mut emit_parser)?
500    };
501
502    Ok(StreamingStatement::CreateStream {
503        name,
504        query: Box::new(query_stmt),
505        emit_clause,
506        or_replace,
507        if_not_exists,
508    })
509}
510
511/// Parse a DROP STREAM statement.
512///
513/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
514///
515/// # Errors
516///
517/// Returns `ParseError` if the statement syntax is invalid.
518fn parse_drop_stream(
519    parser: &mut sqlparser::parser::Parser,
520) -> Result<StreamingStatement, ParseError> {
521    parser
522        .expect_keyword(sqlparser::keywords::Keyword::DROP)
523        .map_err(ParseError::SqlParseError)?;
524    tokenizer::expect_custom_keyword(parser, "STREAM")?;
525    let if_exists = parser.parse_keywords(&[
526        sqlparser::keywords::Keyword::IF,
527        sqlparser::keywords::Keyword::EXISTS,
528    ]);
529    let name = parser
530        .parse_object_name(false)
531        .map_err(ParseError::SqlParseError)?;
532    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
533    Ok(StreamingStatement::DropStream {
534        name,
535        if_exists,
536        cascade,
537    })
538}
539
540/// Parse a DESCRIBE statement.
541///
542/// Syntax: `DESCRIBE [EXTENDED] name`
543/// Parse an ALTER SOURCE statement.
544///
545/// Syntax:
546/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
547/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
548///
549/// # Errors
550///
551/// Returns `ParseError` if the statement syntax is invalid.
552fn parse_alter_source(
553    parser: &mut sqlparser::parser::Parser,
554) -> Result<StreamingStatement, ParseError> {
555    parser
556        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
557        .map_err(ParseError::SqlParseError)?;
558    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
559    let name = parser
560        .parse_object_name(false)
561        .map_err(ParseError::SqlParseError)?;
562
563    // Determine operation: ADD COLUMN or SET
564    if parser.parse_keywords(&[
565        sqlparser::keywords::Keyword::ADD,
566        sqlparser::keywords::Keyword::COLUMN,
567    ]) {
568        // ALTER SOURCE name ADD COLUMN col_name data_type
569        let col_name = parser
570            .parse_identifier()
571            .map_err(ParseError::SqlParseError)?;
572        let data_type = parser
573            .parse_data_type()
574            .map_err(ParseError::SqlParseError)?;
575        let column_def = sqlparser::ast::ColumnDef {
576            name: col_name,
577            data_type,
578            options: vec![],
579        };
580        Ok(StreamingStatement::AlterSource {
581            name,
582            operation: statements::AlterSourceOperation::AddColumn { column_def },
583        })
584    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
585        // ALTER SOURCE name SET ('key' = 'value', ...)
586        parser
587            .expect_token(&sqlparser::tokenizer::Token::LParen)
588            .map_err(ParseError::SqlParseError)?;
589        let mut properties = std::collections::HashMap::new();
590        loop {
591            let key = parser
592                .parse_literal_string()
593                .map_err(ParseError::SqlParseError)?;
594            parser
595                .expect_token(&sqlparser::tokenizer::Token::Eq)
596                .map_err(ParseError::SqlParseError)?;
597            let value = parser
598                .parse_literal_string()
599                .map_err(ParseError::SqlParseError)?;
600            properties.insert(key, value);
601            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
602                break;
603            }
604        }
605        parser
606            .expect_token(&sqlparser::tokenizer::Token::RParen)
607            .map_err(ParseError::SqlParseError)?;
608        Ok(StreamingStatement::AlterSource {
609            name,
610            operation: statements::AlterSourceOperation::SetProperties { properties },
611        })
612    } else {
613        Err(ParseError::StreamingError(
614            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
615        ))
616    }
617}
618
619/// Parse a DESCRIBE statement.
620///
621/// # Errors
622///
623/// Returns `ParseError` if the statement syntax is invalid.
624fn parse_describe(
625    parser: &mut sqlparser::parser::Parser,
626) -> Result<StreamingStatement, ParseError> {
627    // Consume DESCRIBE or DESC
628    let token = parser.next_token();
629    match &token.token {
630        sqlparser::tokenizer::Token::Word(w)
631            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
632                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
633        _ => {
634            return Err(ParseError::StreamingError(
635                "Expected DESCRIBE or DESC".to_string(),
636            ));
637        }
638    }
639    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
640    let name = parser
641        .parse_object_name(false)
642        .map_err(ParseError::SqlParseError)?;
643    Ok(StreamingStatement::Describe { name, extended })
644}
645
646/// Parse `SHOW CREATE SOURCE <name>`.
647fn parse_show_create_source(
648    parser: &mut sqlparser::parser::Parser,
649) -> Result<StreamingStatement, ParseError> {
650    // Consume SHOW CREATE SOURCE
651    parser
652        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
653        .map_err(ParseError::SqlParseError)?;
654    parser
655        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
656        .map_err(ParseError::SqlParseError)?;
657    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
658    let name = parser
659        .parse_object_name(false)
660        .map_err(ParseError::SqlParseError)?;
661    Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
662}
663
664/// Parse `SHOW CREATE SINK <name>`.
665fn parse_show_create_sink(
666    parser: &mut sqlparser::parser::Parser,
667) -> Result<StreamingStatement, ParseError> {
668    // Consume SHOW CREATE SINK
669    parser
670        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
671        .map_err(ParseError::SqlParseError)?;
672    parser
673        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
674        .map_err(ParseError::SqlParseError)?;
675    tokenizer::expect_custom_keyword(parser, "SINK")?;
676    let name = parser
677        .parse_object_name(false)
678        .map_err(ParseError::SqlParseError)?;
679    Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
680}
681
682/// Parse an EXPLAIN [ANALYZE] statement wrapping a streaming query.
683///
684/// Syntax: `EXPLAIN [ANALYZE] <streaming_statement>`
685///
686/// # Errors
687///
688/// Returns `ParseError` if the statement syntax is invalid.
689fn parse_explain(
690    parser: &mut sqlparser::parser::Parser,
691    original_sql: &str,
692) -> Result<StreamingStatement, ParseError> {
693    parser
694        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
695        .map_err(ParseError::SqlParseError)?;
696
697    // Check for optional ANALYZE keyword
698    let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
699
700    // Find the position after EXPLAIN [ANALYZE] in the original SQL
701    let explain_prefix_upper = original_sql.to_uppercase();
702    let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
703    let inner_start = if analyze {
704        explain_prefix_upper
705            .find("ANALYZE")
706            .map_or(0, |pos| pos + "ANALYZE".len())
707    } else {
708        explain_prefix_upper
709            .find("EXPLAIN")
710            .map_or(0, |pos| pos + "EXPLAIN".len())
711    };
712    let inner_sql = original_sql[inner_start..].trim();
713    let _ = skip_keyword; // suppress unused warning
714
715    // Parse the inner statement recursively
716    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
717    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
718        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
719    })?;
720    Ok(StreamingStatement::Explain {
721        statement: Box::new(inner),
722        analyze,
723    })
724}
725
726/// Parse a CREATE MATERIALIZED VIEW statement.
727///
728/// Syntax:
729/// ```sql
730/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
731/// AS <select_query>
732/// [EMIT <strategy>]
733/// ```
734///
735/// # Errors
736///
737/// Returns `ParseError` if the statement syntax is invalid.
738fn parse_create_materialized_view(
739    parser: &mut sqlparser::parser::Parser,
740) -> Result<StreamingStatement, ParseError> {
741    parser
742        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
743        .map_err(ParseError::SqlParseError)?;
744
745    let or_replace = parser.parse_keywords(&[
746        sqlparser::keywords::Keyword::OR,
747        sqlparser::keywords::Keyword::REPLACE,
748    ]);
749
750    parser
751        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
752        .map_err(ParseError::SqlParseError)?;
753    parser
754        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
755        .map_err(ParseError::SqlParseError)?;
756
757    let if_not_exists = parser.parse_keywords(&[
758        sqlparser::keywords::Keyword::IF,
759        sqlparser::keywords::Keyword::NOT,
760        sqlparser::keywords::Keyword::EXISTS,
761    ]);
762
763    let name = parser
764        .parse_object_name(false)
765        .map_err(ParseError::SqlParseError)?;
766
767    parser
768        .expect_keyword(sqlparser::keywords::Keyword::AS)
769        .map_err(ParseError::SqlParseError)?;
770
771    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
772    let remaining = collect_remaining_tokens(parser);
773    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
774
775    let mv_dialect = LaminarDialect::default();
776
777    let query = if query_tokens.is_empty() {
778        return Err(ParseError::StreamingError(
779            "Expected SELECT query after AS".to_string(),
780        ));
781    } else {
782        let mut query_parser =
783            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
784        query_parser
785            .parse_query()
786            .map_err(ParseError::SqlParseError)?
787    };
788
789    let query_stmt =
790        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
791
792    let emit_clause = if emit_tokens.is_empty() {
793        None
794    } else {
795        let mut emit_parser =
796            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
797        emit_parser::parse_emit_clause(&mut emit_parser)?
798    };
799
800    Ok(StreamingStatement::CreateMaterializedView {
801        name,
802        query: Box::new(query_stmt),
803        emit_clause,
804        or_replace,
805        if_not_exists,
806    })
807}
808
809/// Collect all remaining tokens from the parser into a Vec.
810fn collect_remaining_tokens(
811    parser: &mut sqlparser::parser::Parser,
812) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
813    let mut tokens = Vec::new();
814    loop {
815        let token = parser.next_token();
816        if token.token == sqlparser::tokenizer::Token::EOF {
817            tokens.push(token);
818            break;
819        }
820        tokens.push(token);
821    }
822    tokens
823}
824
825/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
826///
827/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
828/// (or is empty if no EMIT found).
829fn split_at_emit(
830    tokens: &[sqlparser::tokenizer::TokenWithSpan],
831) -> (
832    Vec<sqlparser::tokenizer::TokenWithSpan>,
833    Vec<sqlparser::tokenizer::TokenWithSpan>,
834) {
835    let mut depth: i32 = 0;
836    for (i, token) in tokens.iter().enumerate() {
837        match &token.token {
838            sqlparser::tokenizer::Token::LParen => depth += 1,
839            sqlparser::tokenizer::Token::RParen => {
840                depth -= 1;
841            }
842            sqlparser::tokenizer::Token::Word(w)
843                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
844            {
845                let mut query_tokens = tokens[..i].to_vec();
846                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
847                    token: sqlparser::tokenizer::Token::EOF,
848                    span: sqlparser::tokenizer::Span::empty(),
849                });
850                let emit_tokens = tokens[i..].to_vec();
851                return (query_tokens, emit_tokens);
852            }
853            _ => {}
854        }
855    }
856    (tokens.to_vec(), vec![])
857}
858
859/// SQL parsing errors
860#[derive(Debug, thiserror::Error)]
861pub enum ParseError {
862    /// Standard SQL parse error
863    #[error("SQL parse error: {0}")]
864    SqlParseError(#[from] sqlparser::parser::ParserError),
865
866    /// Streaming extension parse error
867    #[error("Streaming SQL error: {0}")]
868    StreamingError(String),
869
870    /// Window function error
871    #[error("Window function error: {0}")]
872    WindowError(String),
873
874    /// Validation error (e.g., invalid option values)
875    #[error("Validation error: {0}")]
876    ValidationError(String),
877}
878
879#[cfg(test)]
880mod tests {
881    use super::*;
882
883    /// Helper to parse SQL and return the first statement.
884    fn parse_one(sql: &str) -> StreamingStatement {
885        let stmts = StreamingParser::parse_sql(sql).unwrap();
886        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
887        stmts.into_iter().next().unwrap()
888    }
889
890    #[test]
891    fn test_parse_drop_source() {
892        let stmt = parse_one("DROP SOURCE events");
893        match stmt {
894            StreamingStatement::DropSource {
895                name,
896                if_exists,
897                cascade,
898            } => {
899                assert_eq!(name.to_string(), "events");
900                assert!(!if_exists);
901                assert!(!cascade);
902            }
903            _ => panic!("Expected DropSource, got {stmt:?}"),
904        }
905    }
906
907    #[test]
908    fn test_parse_drop_source_if_exists() {
909        let stmt = parse_one("DROP SOURCE IF EXISTS events");
910        match stmt {
911            StreamingStatement::DropSource {
912                name,
913                if_exists,
914                cascade,
915            } => {
916                assert_eq!(name.to_string(), "events");
917                assert!(if_exists);
918                assert!(!cascade);
919            }
920            _ => panic!("Expected DropSource, got {stmt:?}"),
921        }
922    }
923
924    #[test]
925    fn test_parse_drop_source_cascade() {
926        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
927        match stmt {
928            StreamingStatement::DropSource {
929                name,
930                if_exists,
931                cascade,
932            } => {
933                assert_eq!(name.to_string(), "events");
934                assert!(if_exists);
935                assert!(cascade);
936            }
937            _ => panic!("Expected DropSource, got {stmt:?}"),
938        }
939    }
940
941    #[test]
942    fn test_parse_drop_sink() {
943        let stmt = parse_one("DROP SINK output");
944        match stmt {
945            StreamingStatement::DropSink {
946                name,
947                if_exists,
948                cascade,
949            } => {
950                assert_eq!(name.to_string(), "output");
951                assert!(!if_exists);
952                assert!(!cascade);
953            }
954            _ => panic!("Expected DropSink, got {stmt:?}"),
955        }
956    }
957
958    #[test]
959    fn test_parse_drop_sink_if_exists() {
960        let stmt = parse_one("DROP SINK IF EXISTS output");
961        match stmt {
962            StreamingStatement::DropSink {
963                name,
964                if_exists,
965                cascade,
966            } => {
967                assert_eq!(name.to_string(), "output");
968                assert!(if_exists);
969                assert!(!cascade);
970            }
971            _ => panic!("Expected DropSink, got {stmt:?}"),
972        }
973    }
974
975    #[test]
976    fn test_parse_drop_sink_cascade() {
977        let stmt = parse_one("DROP SINK output CASCADE");
978        match stmt {
979            StreamingStatement::DropSink {
980                name,
981                if_exists,
982                cascade,
983            } => {
984                assert_eq!(name.to_string(), "output");
985                assert!(!if_exists);
986                assert!(cascade);
987            }
988            _ => panic!("Expected DropSink, got {stmt:?}"),
989        }
990    }
991
992    #[test]
993    fn test_parse_drop_materialized_view() {
994        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
995        match stmt {
996            StreamingStatement::DropMaterializedView {
997                name,
998                if_exists,
999                cascade,
1000            } => {
1001                assert_eq!(name.to_string(), "live_stats");
1002                assert!(!if_exists);
1003                assert!(!cascade);
1004            }
1005            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1006        }
1007    }
1008
1009    #[test]
1010    fn test_parse_drop_materialized_view_if_exists_cascade() {
1011        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1012        match stmt {
1013            StreamingStatement::DropMaterializedView {
1014                name,
1015                if_exists,
1016                cascade,
1017            } => {
1018                assert_eq!(name.to_string(), "live_stats");
1019                assert!(if_exists);
1020                assert!(cascade);
1021            }
1022            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1023        }
1024    }
1025
1026    #[test]
1027    fn test_parse_show_sources() {
1028        let stmt = parse_one("SHOW SOURCES");
1029        assert!(matches!(
1030            stmt,
1031            StreamingStatement::Show(ShowCommand::Sources)
1032        ));
1033    }
1034
1035    #[test]
1036    fn test_parse_show_sinks() {
1037        let stmt = parse_one("SHOW SINKS");
1038        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1039    }
1040
1041    #[test]
1042    fn test_parse_show_queries() {
1043        let stmt = parse_one("SHOW QUERIES");
1044        assert!(matches!(
1045            stmt,
1046            StreamingStatement::Show(ShowCommand::Queries)
1047        ));
1048    }
1049
1050    #[test]
1051    fn test_parse_show_materialized_views() {
1052        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1053        assert!(matches!(
1054            stmt,
1055            StreamingStatement::Show(ShowCommand::MaterializedViews)
1056        ));
1057    }
1058
1059    #[test]
1060    fn test_parse_describe() {
1061        let stmt = parse_one("DESCRIBE events");
1062        match stmt {
1063            StreamingStatement::Describe { name, extended } => {
1064                assert_eq!(name.to_string(), "events");
1065                assert!(!extended);
1066            }
1067            _ => panic!("Expected Describe, got {stmt:?}"),
1068        }
1069    }
1070
1071    #[test]
1072    fn test_parse_describe_extended() {
1073        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1074        match stmt {
1075            StreamingStatement::Describe { name, extended } => {
1076                assert_eq!(name.to_string(), "my_schema.events");
1077                assert!(extended);
1078            }
1079            _ => panic!("Expected Describe, got {stmt:?}"),
1080        }
1081    }
1082
1083    #[test]
1084    fn test_parse_explain_select() {
1085        let stmt = parse_one("EXPLAIN SELECT * FROM events");
1086        match stmt {
1087            StreamingStatement::Explain {
1088                statement, analyze, ..
1089            } => {
1090                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1091                assert!(!analyze);
1092            }
1093            _ => panic!("Expected Explain, got {stmt:?}"),
1094        }
1095    }
1096
1097    #[test]
1098    fn test_parse_explain_create_source() {
1099        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1100        match stmt {
1101            StreamingStatement::Explain { statement, .. } => {
1102                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1103            }
1104            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1105        }
1106    }
1107
1108    #[test]
1109    fn test_parse_explain_analyze_select() {
1110        let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1111        match stmt {
1112            StreamingStatement::Explain {
1113                statement, analyze, ..
1114            } => {
1115                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1116                assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1117            }
1118            _ => panic!("Expected Explain, got {stmt:?}"),
1119        }
1120    }
1121
1122    #[test]
1123    fn test_parse_create_materialized_view() {
1124        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1125        match stmt {
1126            StreamingStatement::CreateMaterializedView {
1127                name,
1128                emit_clause,
1129                or_replace,
1130                if_not_exists,
1131                ..
1132            } => {
1133                assert_eq!(name.to_string(), "live_stats");
1134                assert!(emit_clause.is_none());
1135                assert!(!or_replace);
1136                assert!(!if_not_exists);
1137            }
1138            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1139        }
1140    }
1141
1142    #[test]
1143    fn test_parse_create_materialized_view_with_emit() {
1144        let stmt = parse_one(
1145            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1146        );
1147        match stmt {
1148            StreamingStatement::CreateMaterializedView {
1149                name, emit_clause, ..
1150            } => {
1151                assert_eq!(name.to_string(), "live_stats");
1152                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1153            }
1154            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1155        }
1156    }
1157
1158    #[test]
1159    fn test_parse_create_or_replace_materialized_view() {
1160        let stmt = parse_one(
1161            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1162        );
1163        match stmt {
1164            StreamingStatement::CreateMaterializedView {
1165                name,
1166                or_replace,
1167                if_not_exists,
1168                ..
1169            } => {
1170                assert_eq!(name.to_string(), "live_stats");
1171                assert!(or_replace);
1172                assert!(!if_not_exists);
1173            }
1174            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1175        }
1176    }
1177
1178    #[test]
1179    fn test_parse_create_materialized_view_if_not_exists() {
1180        let stmt = parse_one(
1181            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1182        );
1183        match stmt {
1184            StreamingStatement::CreateMaterializedView {
1185                name,
1186                or_replace,
1187                if_not_exists,
1188                ..
1189            } => {
1190                assert_eq!(name.to_string(), "live_stats");
1191                assert!(!or_replace);
1192                assert!(if_not_exists);
1193            }
1194            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1195        }
1196    }
1197
1198    #[test]
1199    fn test_parse_insert_into() {
1200        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1201        match stmt {
1202            StreamingStatement::InsertInto {
1203                table_name,
1204                columns,
1205                values,
1206            } => {
1207                assert_eq!(table_name.to_string(), "events");
1208                assert_eq!(columns.len(), 2);
1209                assert_eq!(columns[0].to_string(), "id");
1210                assert_eq!(columns[1].to_string(), "name");
1211                assert_eq!(values.len(), 1);
1212                assert_eq!(values[0].len(), 2);
1213            }
1214            _ => panic!("Expected InsertInto, got {stmt:?}"),
1215        }
1216    }
1217
1218    #[test]
1219    fn test_parse_insert_into_multiple_rows() {
1220        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1221        match stmt {
1222            StreamingStatement::InsertInto {
1223                table_name,
1224                columns,
1225                values,
1226            } => {
1227                assert_eq!(table_name.to_string(), "events");
1228                assert!(columns.is_empty());
1229                assert_eq!(values.len(), 3);
1230            }
1231            _ => panic!("Expected InsertInto, got {stmt:?}"),
1232        }
1233    }
1234
1235    // ── CREATE STREAM tests ─────────────────────────────
1236
1237    #[test]
1238    fn test_parse_create_stream() {
1239        let stmt = parse_one(
1240            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1241        );
1242        match stmt {
1243            StreamingStatement::CreateStream {
1244                name,
1245                or_replace,
1246                if_not_exists,
1247                emit_clause,
1248                ..
1249            } => {
1250                assert_eq!(name.to_string(), "session_activity");
1251                assert!(!or_replace);
1252                assert!(!if_not_exists);
1253                assert!(emit_clause.is_none());
1254            }
1255            _ => panic!("Expected CreateStream, got {stmt:?}"),
1256        }
1257    }
1258
1259    #[test]
1260    fn test_parse_create_or_replace_stream() {
1261        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1262        match stmt {
1263            StreamingStatement::CreateStream { or_replace, .. } => {
1264                assert!(or_replace);
1265            }
1266            _ => panic!("Expected CreateStream, got {stmt:?}"),
1267        }
1268    }
1269
1270    #[test]
1271    fn test_parse_create_stream_if_not_exists() {
1272        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1273        match stmt {
1274            StreamingStatement::CreateStream { if_not_exists, .. } => {
1275                assert!(if_not_exists);
1276            }
1277            _ => panic!("Expected CreateStream, got {stmt:?}"),
1278        }
1279    }
1280
1281    #[test]
1282    fn test_parse_create_stream_with_emit() {
1283        let stmt =
1284            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1285        match stmt {
1286            StreamingStatement::CreateStream { emit_clause, .. } => {
1287                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1288            }
1289            _ => panic!("Expected CreateStream, got {stmt:?}"),
1290        }
1291    }
1292
1293    #[test]
1294    fn test_parse_drop_stream() {
1295        let stmt = parse_one("DROP STREAM my_stream");
1296        match stmt {
1297            StreamingStatement::DropStream {
1298                name,
1299                if_exists,
1300                cascade,
1301            } => {
1302                assert_eq!(name.to_string(), "my_stream");
1303                assert!(!if_exists);
1304                assert!(!cascade);
1305            }
1306            _ => panic!("Expected DropStream, got {stmt:?}"),
1307        }
1308    }
1309
1310    #[test]
1311    fn test_parse_drop_stream_if_exists() {
1312        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1313        match stmt {
1314            StreamingStatement::DropStream {
1315                name,
1316                if_exists,
1317                cascade,
1318            } => {
1319                assert_eq!(name.to_string(), "my_stream");
1320                assert!(if_exists);
1321                assert!(!cascade);
1322            }
1323            _ => panic!("Expected DropStream, got {stmt:?}"),
1324        }
1325    }
1326
1327    #[test]
1328    fn test_parse_drop_stream_cascade() {
1329        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1330        match stmt {
1331            StreamingStatement::DropStream {
1332                name,
1333                if_exists,
1334                cascade,
1335            } => {
1336                assert_eq!(name.to_string(), "my_stream");
1337                assert!(!if_exists);
1338                assert!(cascade);
1339            }
1340            _ => panic!("Expected DropStream, got {stmt:?}"),
1341        }
1342    }
1343
1344    #[test]
1345    fn test_parse_show_streams() {
1346        let stmt = parse_one("SHOW STREAMS");
1347        assert!(matches!(
1348            stmt,
1349            StreamingStatement::Show(ShowCommand::Streams)
1350        ));
1351    }
1352
1353    #[test]
1354    fn test_parse_alter_source_add_column() {
1355        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1356        match stmt {
1357            StreamingStatement::AlterSource { name, operation } => {
1358                assert_eq!(name.to_string(), "events");
1359                match operation {
1360                    statements::AlterSourceOperation::AddColumn { column_def } => {
1361                        assert_eq!(column_def.name.value, "new_col");
1362                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1363                    }
1364                    statements::AlterSourceOperation::SetProperties { .. } => {
1365                        panic!("Expected AddColumn")
1366                    }
1367                }
1368            }
1369            _ => panic!("Expected AlterSource, got {stmt:?}"),
1370        }
1371    }
1372
1373    #[test]
1374    fn test_parse_alter_source_set_properties() {
1375        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1376        match stmt {
1377            StreamingStatement::AlterSource { name, operation } => {
1378                assert_eq!(name.to_string(), "events");
1379                match operation {
1380                    statements::AlterSourceOperation::SetProperties { properties } => {
1381                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1382                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1383                    }
1384                    statements::AlterSourceOperation::AddColumn { .. } => {
1385                        panic!("Expected SetProperties")
1386                    }
1387                }
1388            }
1389            _ => panic!("Expected AlterSource, got {stmt:?}"),
1390        }
1391    }
1392
1393    #[test]
1394    fn test_parse_checkpoint() {
1395        let stmt = parse_one("CHECKPOINT");
1396        assert!(
1397            matches!(stmt, StreamingStatement::Checkpoint),
1398            "Expected Checkpoint, got {stmt:?}"
1399        );
1400    }
1401
1402    #[test]
1403    fn test_parse_show_checkpoint_status() {
1404        let stmt = parse_one("SHOW CHECKPOINT STATUS");
1405        assert!(
1406            matches!(
1407                stmt,
1408                StreamingStatement::Show(ShowCommand::CheckpointStatus)
1409            ),
1410            "Expected Show(CheckpointStatus), got {stmt:?}"
1411        );
1412    }
1413
1414    #[test]
1415    fn test_parse_restore_checkpoint() {
1416        let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1417        match stmt {
1418            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1419                assert_eq!(checkpoint_id, 42);
1420            }
1421            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1422        }
1423    }
1424
1425    #[test]
1426    fn test_parse_restore_checkpoint_large_id() {
1427        let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1428        match stmt {
1429            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1430                assert_eq!(checkpoint_id, 123_456);
1431            }
1432            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1433        }
1434    }
1435
1436    #[test]
1437    fn test_parse_show_create_source() {
1438        let stmt = parse_one("SHOW CREATE SOURCE events");
1439        match stmt {
1440            StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1441                assert_eq!(name.to_string(), "events");
1442            }
1443            _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1444        }
1445    }
1446
1447    #[test]
1448    fn test_parse_show_create_sink() {
1449        let stmt = parse_one("SHOW CREATE SINK output");
1450        match stmt {
1451            StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1452                assert_eq!(name.to_string(), "output");
1453            }
1454            _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1455        }
1456    }
1457}