Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12/// INTERVAL arithmetic rewriter for BIGINT timestamp columns
13pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
17pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29    WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36/// Parses SQL with streaming extensions.
37///
38/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
39/// for structured parsing. Standard SQL is delegated to sqlparser directly.
40///
41/// # Errors
42///
43/// Returns `ParseError` if the SQL syntax is invalid.
44pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48/// Parser for streaming SQL extensions.
49///
50/// Provides static methods for parsing streaming SQL statements.
51/// Uses sqlparser's `Parser` API internally for structured parsing
52/// of identifiers, data types, expressions, and queries.
53pub struct StreamingParser;
54
55impl StreamingParser {
56    /// Parse a SQL string with streaming extensions.
57    ///
58    /// Tokenizes the input to detect statement type, then routes to the
59    /// appropriate parser:
60    /// - CREATE SOURCE → `source_parser`
61    /// - CREATE SINK → `sink_parser`
62    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
63    /// - Everything else → `sqlparser::parser::Parser`
64    ///
65    /// # Errors
66    ///
67    /// Returns `ParserError` if the SQL syntax is invalid.
68    #[allow(clippy::too_many_lines)]
69    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70        let sql_trimmed = sql.trim();
71        if sql_trimmed.is_empty() {
72            return Err(sqlparser::parser::ParserError::ParserError(
73                "Empty SQL statement".to_string(),
74            ));
75        }
76
77        let dialect = LaminarDialect::default();
78
79        // Tokenize to detect statement type (with location for better errors)
80        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81            .tokenize_with_location()
82            .map_err(|e| {
83                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84            })?;
85
86        // Route based on token-level detection
87        match detect_streaming_ddl(&tokens) {
88            StreamingDdlKind::CreateSource { .. } => {
89                let mut parser =
90                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91                let source = source_parser::parse_create_source(&mut parser)
92                    .map_err(parse_error_to_parser_error)?;
93                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94            }
95            StreamingDdlKind::CreateSink { .. } => {
96                let mut parser =
97                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98                let sink = sink_parser::parse_create_sink(&mut parser)
99                    .map_err(parse_error_to_parser_error)?;
100                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101            }
102            StreamingDdlKind::CreateContinuousQuery { .. } => {
103                let mut parser =
104                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106                    .map_err(parse_error_to_parser_error)?;
107                Ok(vec![stmt])
108            }
109            StreamingDdlKind::DropSource { .. } => {
110                let mut parser =
111                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113                Ok(vec![stmt])
114            }
115            StreamingDdlKind::DropSink { .. } => {
116                let mut parser =
117                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119                Ok(vec![stmt])
120            }
121            StreamingDdlKind::DropMaterializedView { .. } => {
122                let mut parser =
123                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124                let stmt = parse_drop_materialized_view(&mut parser)
125                    .map_err(parse_error_to_parser_error)?;
126                Ok(vec![stmt])
127            }
128            StreamingDdlKind::ShowSources => {
129                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130            }
131            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132            StreamingDdlKind::ShowQueries => {
133                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134            }
135            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136                ShowCommand::MaterializedViews,
137            )]),
138            StreamingDdlKind::DescribeSource => {
139                let mut parser =
140                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142                Ok(vec![stmt])
143            }
144            StreamingDdlKind::ExplainStreaming => {
145                let mut parser =
146                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147                let stmt =
148                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149                Ok(vec![stmt])
150            }
151            StreamingDdlKind::CreateMaterializedView { .. } => {
152                let mut parser =
153                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154                let stmt = parse_create_materialized_view(&mut parser)
155                    .map_err(parse_error_to_parser_error)?;
156                Ok(vec![stmt])
157            }
158            StreamingDdlKind::CreateStream { .. } => {
159                let mut parser =
160                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161                let stmt = parse_create_stream(&mut parser, sql_trimmed)
162                    .map_err(parse_error_to_parser_error)?;
163                Ok(vec![stmt])
164            }
165            StreamingDdlKind::DropStream { .. } => {
166                let mut parser =
167                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169                Ok(vec![stmt])
170            }
171            StreamingDdlKind::ShowStreams => {
172                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173            }
174            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175            StreamingDdlKind::CreateLookupTable { .. } => {
176                let mut parser =
177                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178                let lt = lookup_table::parse_create_lookup_table(&mut parser)
179                    .map_err(parse_error_to_parser_error)?;
180                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181            }
182            StreamingDdlKind::DropLookupTable { .. } => {
183                let mut parser =
184                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186                    .map_err(parse_error_to_parser_error)?;
187                Ok(vec![StreamingStatement::DropLookupTable {
188                    name,
189                    if_exists,
190                }])
191            }
192            StreamingDdlKind::AlterSource => {
193                let mut parser =
194                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196                Ok(vec![stmt])
197            }
198            StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199                ShowCommand::CheckpointStatus,
200            )]),
201            StreamingDdlKind::ShowCreateSource => {
202                let mut parser =
203                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204                let stmt =
205                    parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206                Ok(vec![stmt])
207            }
208            StreamingDdlKind::ShowCreateSink => {
209                let mut parser =
210                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211                let stmt =
212                    parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213                Ok(vec![stmt])
214            }
215            StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216            StreamingDdlKind::RestoreCheckpoint => {
217                let mut parser =
218                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219                let stmt =
220                    parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
221                Ok(vec![stmt])
222            }
223            StreamingDdlKind::None => {
224                // Standard SQL - check for INSERT INTO and convert
225                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
226                Ok(statements
227                    .into_iter()
228                    .map(convert_standard_statement)
229                    .collect())
230            }
231        }
232    }
233
234    /// Check if an expression contains a window function.
235    #[must_use]
236    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
237        match expr {
238            sqlparser::ast::Expr::Function(func) => {
239                if let Some(name) = func.name.0.last() {
240                    let func_name = name.to_string().to_uppercase();
241                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
242                } else {
243                    false
244                }
245            }
246            _ => false,
247        }
248    }
249
250    /// Parse EMIT clause from SQL string.
251    ///
252    /// # Errors
253    ///
254    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
255    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
256        emit_parser::parse_emit_clause_from_sql(sql)
257    }
258
259    /// Parse late data handling clause from SQL string.
260    ///
261    /// # Errors
262    ///
263    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
264    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
265        late_data_parser::parse_late_data_clause_from_sql(sql)
266    }
267}
268
269/// Convert `ParseError` to `ParserError` for backward compatibility.
270fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
271    match e {
272        ParseError::SqlParseError(pe) => pe,
273        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
274        ParseError::WindowError(msg) => {
275            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
276        }
277        ParseError::ValidationError(msg) => {
278            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
279        }
280    }
281}
282
283/// Parse a RESTORE FROM CHECKPOINT statement.
284///
285/// Syntax: `RESTORE FROM CHECKPOINT <id>`
286///
287/// # Errors
288///
289/// Returns `ParseError` if the statement syntax is invalid.
290fn parse_restore_checkpoint(
291    parser: &mut sqlparser::parser::Parser,
292) -> Result<StreamingStatement, ParseError> {
293    // Consume RESTORE
294    tokenizer::expect_custom_keyword(parser, "RESTORE")?;
295    // Consume FROM
296    if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
297        return Err(ParseError::StreamingError(
298            "Expected FROM after RESTORE".to_string(),
299        ));
300    }
301    // Consume CHECKPOINT
302    tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
303    // Parse checkpoint ID (numeric literal)
304    let token = parser.next_token();
305    match &token.token {
306        sqlparser::tokenizer::Token::Number(n, _) => {
307            let id: u64 = n
308                .parse()
309                .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
310            Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
311        }
312        other => Err(ParseError::StreamingError(format!(
313            "Expected checkpoint ID (number), found {other}"
314        ))),
315    }
316}
317
318/// Convert a standard sqlparser statement to a `StreamingStatement`.
319///
320/// Detects INSERT INTO statements and converts them to the streaming
321/// `InsertInto` variant. All other statements are wrapped as `Standard`.
322fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
323    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
324        // Extract table name from TableObject
325        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
326            let table_name = name.clone();
327            let columns = insert.columns.clone();
328
329            // Try to extract VALUES rows from source query
330            if let Some(ref source) = insert.source {
331                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
332                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
333                    return StreamingStatement::InsertInto {
334                        table_name,
335                        columns,
336                        values: rows,
337                    };
338                }
339            }
340        }
341    }
342    StreamingStatement::Standard(Box::new(stmt))
343}
344
345/// Parse a DROP SOURCE statement.
346///
347/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
348///
349/// # Errors
350///
351/// Returns `ParseError` if the statement syntax is invalid.
352fn parse_drop_source(
353    parser: &mut sqlparser::parser::Parser,
354) -> Result<StreamingStatement, ParseError> {
355    parser
356        .expect_keyword(sqlparser::keywords::Keyword::DROP)
357        .map_err(ParseError::SqlParseError)?;
358    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
359    let if_exists = parser.parse_keywords(&[
360        sqlparser::keywords::Keyword::IF,
361        sqlparser::keywords::Keyword::EXISTS,
362    ]);
363    let name = parser
364        .parse_object_name(false)
365        .map_err(ParseError::SqlParseError)?;
366    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
367    Ok(StreamingStatement::DropSource {
368        name,
369        if_exists,
370        cascade,
371    })
372}
373
374/// Parse a DROP SINK statement.
375///
376/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
377///
378/// # Errors
379///
380/// Returns `ParseError` if the statement syntax is invalid.
381fn parse_drop_sink(
382    parser: &mut sqlparser::parser::Parser,
383) -> Result<StreamingStatement, ParseError> {
384    parser
385        .expect_keyword(sqlparser::keywords::Keyword::DROP)
386        .map_err(ParseError::SqlParseError)?;
387    tokenizer::expect_custom_keyword(parser, "SINK")?;
388    let if_exists = parser.parse_keywords(&[
389        sqlparser::keywords::Keyword::IF,
390        sqlparser::keywords::Keyword::EXISTS,
391    ]);
392    let name = parser
393        .parse_object_name(false)
394        .map_err(ParseError::SqlParseError)?;
395    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
396    Ok(StreamingStatement::DropSink {
397        name,
398        if_exists,
399        cascade,
400    })
401}
402
403/// Parse a DROP MATERIALIZED VIEW statement.
404///
405/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
406///
407/// # Errors
408///
409/// Returns `ParseError` if the statement syntax is invalid.
410fn parse_drop_materialized_view(
411    parser: &mut sqlparser::parser::Parser,
412) -> Result<StreamingStatement, ParseError> {
413    parser
414        .expect_keyword(sqlparser::keywords::Keyword::DROP)
415        .map_err(ParseError::SqlParseError)?;
416    parser
417        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
418        .map_err(ParseError::SqlParseError)?;
419    parser
420        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
421        .map_err(ParseError::SqlParseError)?;
422    let if_exists = parser.parse_keywords(&[
423        sqlparser::keywords::Keyword::IF,
424        sqlparser::keywords::Keyword::EXISTS,
425    ]);
426    let name = parser
427        .parse_object_name(false)
428        .map_err(ParseError::SqlParseError)?;
429    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
430    Ok(StreamingStatement::DropMaterializedView {
431        name,
432        if_exists,
433        cascade,
434    })
435}
436
437/// Parse a CREATE STREAM statement.
438///
439/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
440///
441/// # Errors
442///
443/// Returns `ParseError` if the statement syntax is invalid.
444fn parse_create_stream(
445    parser: &mut sqlparser::parser::Parser,
446    _original_sql: &str,
447) -> Result<StreamingStatement, ParseError> {
448    parser
449        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
450        .map_err(ParseError::SqlParseError)?;
451
452    let or_replace = parser.parse_keywords(&[
453        sqlparser::keywords::Keyword::OR,
454        sqlparser::keywords::Keyword::REPLACE,
455    ]);
456
457    tokenizer::expect_custom_keyword(parser, "STREAM")?;
458
459    let if_not_exists = parser.parse_keywords(&[
460        sqlparser::keywords::Keyword::IF,
461        sqlparser::keywords::Keyword::NOT,
462        sqlparser::keywords::Keyword::EXISTS,
463    ]);
464
465    let name = parser
466        .parse_object_name(false)
467        .map_err(ParseError::SqlParseError)?;
468
469    parser
470        .expect_keyword(sqlparser::keywords::Keyword::AS)
471        .map_err(ParseError::SqlParseError)?;
472
473    // Collect remaining tokens and split at EMIT boundary
474    let remaining = collect_remaining_tokens(parser);
475    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
476
477    let stream_dialect = LaminarDialect::default();
478
479    let query = if query_tokens.is_empty() {
480        return Err(ParseError::StreamingError(
481            "Expected SELECT query after AS".to_string(),
482        ));
483    } else {
484        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485            .with_tokens_with_locations(query_tokens);
486        query_parser
487            .parse_query()
488            .map_err(ParseError::SqlParseError)?
489    };
490
491    let query_stmt =
492        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494    let emit_clause = if emit_tokens.is_empty() {
495        None
496    } else {
497        let mut emit_parser =
498            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499        emit_parser::parse_emit_clause(&mut emit_parser)?
500    };
501
502    Ok(StreamingStatement::CreateStream {
503        name,
504        query: Box::new(query_stmt),
505        emit_clause,
506        or_replace,
507        if_not_exists,
508    })
509}
510
511/// Parse a DROP STREAM statement.
512///
513/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
514///
515/// # Errors
516///
517/// Returns `ParseError` if the statement syntax is invalid.
518fn parse_drop_stream(
519    parser: &mut sqlparser::parser::Parser,
520) -> Result<StreamingStatement, ParseError> {
521    parser
522        .expect_keyword(sqlparser::keywords::Keyword::DROP)
523        .map_err(ParseError::SqlParseError)?;
524    tokenizer::expect_custom_keyword(parser, "STREAM")?;
525    let if_exists = parser.parse_keywords(&[
526        sqlparser::keywords::Keyword::IF,
527        sqlparser::keywords::Keyword::EXISTS,
528    ]);
529    let name = parser
530        .parse_object_name(false)
531        .map_err(ParseError::SqlParseError)?;
532    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
533    Ok(StreamingStatement::DropStream {
534        name,
535        if_exists,
536        cascade,
537    })
538}
539
540/// Parse a DESCRIBE statement.
541///
542/// Syntax: `DESCRIBE [EXTENDED] name`
543/// Parse an ALTER SOURCE statement.
544///
545/// Syntax:
546/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
547/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
548///
549/// # Errors
550///
551/// Returns `ParseError` if the statement syntax is invalid.
552fn parse_alter_source(
553    parser: &mut sqlparser::parser::Parser,
554) -> Result<StreamingStatement, ParseError> {
555    parser
556        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
557        .map_err(ParseError::SqlParseError)?;
558    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
559    let name = parser
560        .parse_object_name(false)
561        .map_err(ParseError::SqlParseError)?;
562
563    // Determine operation: ADD COLUMN or SET
564    if parser.parse_keywords(&[
565        sqlparser::keywords::Keyword::ADD,
566        sqlparser::keywords::Keyword::COLUMN,
567    ]) {
568        // ALTER SOURCE name ADD COLUMN col_name data_type
569        let col_name = parser
570            .parse_identifier()
571            .map_err(ParseError::SqlParseError)?;
572        let data_type = parser
573            .parse_data_type()
574            .map_err(ParseError::SqlParseError)?;
575        let column_def = sqlparser::ast::ColumnDef {
576            name: col_name,
577            data_type,
578            options: vec![],
579        };
580        Ok(StreamingStatement::AlterSource {
581            name,
582            operation: statements::AlterSourceOperation::AddColumn { column_def },
583        })
584    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
585        // ALTER SOURCE name SET ('key' = 'value', ...)
586        parser
587            .expect_token(&sqlparser::tokenizer::Token::LParen)
588            .map_err(ParseError::SqlParseError)?;
589        #[allow(clippy::disallowed_types)] // cold path: SQL parsing
590        let mut properties = std::collections::HashMap::new();
591        loop {
592            let key = parser
593                .parse_literal_string()
594                .map_err(ParseError::SqlParseError)?;
595            parser
596                .expect_token(&sqlparser::tokenizer::Token::Eq)
597                .map_err(ParseError::SqlParseError)?;
598            let value = parser
599                .parse_literal_string()
600                .map_err(ParseError::SqlParseError)?;
601            properties.insert(key, value);
602            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
603                break;
604            }
605        }
606        parser
607            .expect_token(&sqlparser::tokenizer::Token::RParen)
608            .map_err(ParseError::SqlParseError)?;
609        Ok(StreamingStatement::AlterSource {
610            name,
611            operation: statements::AlterSourceOperation::SetProperties { properties },
612        })
613    } else {
614        Err(ParseError::StreamingError(
615            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
616        ))
617    }
618}
619
620/// Parse a DESCRIBE statement.
621///
622/// # Errors
623///
624/// Returns `ParseError` if the statement syntax is invalid.
625fn parse_describe(
626    parser: &mut sqlparser::parser::Parser,
627) -> Result<StreamingStatement, ParseError> {
628    // Consume DESCRIBE or DESC
629    let token = parser.next_token();
630    match &token.token {
631        sqlparser::tokenizer::Token::Word(w)
632            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
633                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
634        _ => {
635            return Err(ParseError::StreamingError(
636                "Expected DESCRIBE or DESC".to_string(),
637            ));
638        }
639    }
640    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
641    let name = parser
642        .parse_object_name(false)
643        .map_err(ParseError::SqlParseError)?;
644    Ok(StreamingStatement::Describe { name, extended })
645}
646
647/// Parse `SHOW CREATE SOURCE <name>`.
648fn parse_show_create_source(
649    parser: &mut sqlparser::parser::Parser,
650) -> Result<StreamingStatement, ParseError> {
651    // Consume SHOW CREATE SOURCE
652    parser
653        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
654        .map_err(ParseError::SqlParseError)?;
655    parser
656        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
657        .map_err(ParseError::SqlParseError)?;
658    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
659    let name = parser
660        .parse_object_name(false)
661        .map_err(ParseError::SqlParseError)?;
662    Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
663}
664
665/// Parse `SHOW CREATE SINK <name>`.
666fn parse_show_create_sink(
667    parser: &mut sqlparser::parser::Parser,
668) -> Result<StreamingStatement, ParseError> {
669    // Consume SHOW CREATE SINK
670    parser
671        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
672        .map_err(ParseError::SqlParseError)?;
673    parser
674        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
675        .map_err(ParseError::SqlParseError)?;
676    tokenizer::expect_custom_keyword(parser, "SINK")?;
677    let name = parser
678        .parse_object_name(false)
679        .map_err(ParseError::SqlParseError)?;
680    Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
681}
682
683/// Parse an EXPLAIN [ANALYZE] statement wrapping a streaming query.
684///
685/// Syntax: `EXPLAIN [ANALYZE] <streaming_statement>`
686///
687/// # Errors
688///
689/// Returns `ParseError` if the statement syntax is invalid.
690fn parse_explain(
691    parser: &mut sqlparser::parser::Parser,
692    original_sql: &str,
693) -> Result<StreamingStatement, ParseError> {
694    parser
695        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
696        .map_err(ParseError::SqlParseError)?;
697
698    // Check for optional ANALYZE keyword
699    let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
700
701    // Find the position after EXPLAIN [ANALYZE] in the original SQL
702    let explain_prefix_upper = original_sql.to_uppercase();
703    let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
704    let inner_start = if analyze {
705        explain_prefix_upper
706            .find("ANALYZE")
707            .map_or(0, |pos| pos + "ANALYZE".len())
708    } else {
709        explain_prefix_upper
710            .find("EXPLAIN")
711            .map_or(0, |pos| pos + "EXPLAIN".len())
712    };
713    let inner_sql = original_sql[inner_start..].trim();
714    let _ = skip_keyword; // suppress unused warning
715
716    // Parse the inner statement recursively
717    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
718    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
719        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
720    })?;
721    Ok(StreamingStatement::Explain {
722        statement: Box::new(inner),
723        analyze,
724    })
725}
726
727/// Parse a CREATE MATERIALIZED VIEW statement.
728///
729/// Syntax:
730/// ```sql
731/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
732/// AS <select_query>
733/// [EMIT <strategy>]
734/// ```
735///
736/// # Errors
737///
738/// Returns `ParseError` if the statement syntax is invalid.
739fn parse_create_materialized_view(
740    parser: &mut sqlparser::parser::Parser,
741) -> Result<StreamingStatement, ParseError> {
742    parser
743        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
744        .map_err(ParseError::SqlParseError)?;
745
746    let or_replace = parser.parse_keywords(&[
747        sqlparser::keywords::Keyword::OR,
748        sqlparser::keywords::Keyword::REPLACE,
749    ]);
750
751    parser
752        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
753        .map_err(ParseError::SqlParseError)?;
754    parser
755        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
756        .map_err(ParseError::SqlParseError)?;
757
758    let if_not_exists = parser.parse_keywords(&[
759        sqlparser::keywords::Keyword::IF,
760        sqlparser::keywords::Keyword::NOT,
761        sqlparser::keywords::Keyword::EXISTS,
762    ]);
763
764    let name = parser
765        .parse_object_name(false)
766        .map_err(ParseError::SqlParseError)?;
767
768    parser
769        .expect_keyword(sqlparser::keywords::Keyword::AS)
770        .map_err(ParseError::SqlParseError)?;
771
772    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
773    let remaining = collect_remaining_tokens(parser);
774    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
775
776    let mv_dialect = LaminarDialect::default();
777
778    let query = if query_tokens.is_empty() {
779        return Err(ParseError::StreamingError(
780            "Expected SELECT query after AS".to_string(),
781        ));
782    } else {
783        let mut query_parser =
784            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
785        query_parser
786            .parse_query()
787            .map_err(ParseError::SqlParseError)?
788    };
789
790    let query_stmt =
791        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
792
793    let emit_clause = if emit_tokens.is_empty() {
794        None
795    } else {
796        let mut emit_parser =
797            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
798        emit_parser::parse_emit_clause(&mut emit_parser)?
799    };
800
801    Ok(StreamingStatement::CreateMaterializedView {
802        name,
803        query: Box::new(query_stmt),
804        emit_clause,
805        or_replace,
806        if_not_exists,
807    })
808}
809
810/// Collect all remaining tokens from the parser into a Vec.
811fn collect_remaining_tokens(
812    parser: &mut sqlparser::parser::Parser,
813) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
814    let mut tokens = Vec::new();
815    loop {
816        let token = parser.next_token();
817        if token.token == sqlparser::tokenizer::Token::EOF {
818            tokens.push(token);
819            break;
820        }
821        tokens.push(token);
822    }
823    tokens
824}
825
826/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
827///
828/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
829/// (or is empty if no EMIT found).
830fn split_at_emit(
831    tokens: &[sqlparser::tokenizer::TokenWithSpan],
832) -> (
833    Vec<sqlparser::tokenizer::TokenWithSpan>,
834    Vec<sqlparser::tokenizer::TokenWithSpan>,
835) {
836    let mut depth: i32 = 0;
837    for (i, token) in tokens.iter().enumerate() {
838        match &token.token {
839            sqlparser::tokenizer::Token::LParen => depth += 1,
840            sqlparser::tokenizer::Token::RParen => {
841                depth -= 1;
842            }
843            sqlparser::tokenizer::Token::Word(w)
844                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
845            {
846                let mut query_tokens = tokens[..i].to_vec();
847                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
848                    token: sqlparser::tokenizer::Token::EOF,
849                    span: sqlparser::tokenizer::Span::empty(),
850                });
851                let emit_tokens = tokens[i..].to_vec();
852                return (query_tokens, emit_tokens);
853            }
854            _ => {}
855        }
856    }
857    (tokens.to_vec(), vec![])
858}
859
860/// SQL parsing errors
861#[derive(Debug, thiserror::Error)]
862pub enum ParseError {
863    /// Standard SQL parse error
864    #[error("SQL parse error: {0}")]
865    SqlParseError(#[from] sqlparser::parser::ParserError),
866
867    /// Streaming extension parse error
868    #[error("Streaming SQL error: {0}")]
869    StreamingError(String),
870
871    /// Window function error
872    #[error("Window function error: {0}")]
873    WindowError(String),
874
875    /// Validation error (e.g., invalid option values)
876    #[error("Validation error: {0}")]
877    ValidationError(String),
878}
879
880#[cfg(test)]
881mod tests {
882    use super::*;
883
884    /// Helper to parse SQL and return the first statement.
885    fn parse_one(sql: &str) -> StreamingStatement {
886        let stmts = StreamingParser::parse_sql(sql).unwrap();
887        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
888        stmts.into_iter().next().unwrap()
889    }
890
891    #[test]
892    fn test_parse_drop_source() {
893        let stmt = parse_one("DROP SOURCE events");
894        match stmt {
895            StreamingStatement::DropSource {
896                name,
897                if_exists,
898                cascade,
899            } => {
900                assert_eq!(name.to_string(), "events");
901                assert!(!if_exists);
902                assert!(!cascade);
903            }
904            _ => panic!("Expected DropSource, got {stmt:?}"),
905        }
906    }
907
908    #[test]
909    fn test_parse_drop_source_if_exists() {
910        let stmt = parse_one("DROP SOURCE IF EXISTS events");
911        match stmt {
912            StreamingStatement::DropSource {
913                name,
914                if_exists,
915                cascade,
916            } => {
917                assert_eq!(name.to_string(), "events");
918                assert!(if_exists);
919                assert!(!cascade);
920            }
921            _ => panic!("Expected DropSource, got {stmt:?}"),
922        }
923    }
924
925    #[test]
926    fn test_parse_drop_source_cascade() {
927        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
928        match stmt {
929            StreamingStatement::DropSource {
930                name,
931                if_exists,
932                cascade,
933            } => {
934                assert_eq!(name.to_string(), "events");
935                assert!(if_exists);
936                assert!(cascade);
937            }
938            _ => panic!("Expected DropSource, got {stmt:?}"),
939        }
940    }
941
942    #[test]
943    fn test_parse_drop_sink() {
944        let stmt = parse_one("DROP SINK output");
945        match stmt {
946            StreamingStatement::DropSink {
947                name,
948                if_exists,
949                cascade,
950            } => {
951                assert_eq!(name.to_string(), "output");
952                assert!(!if_exists);
953                assert!(!cascade);
954            }
955            _ => panic!("Expected DropSink, got {stmt:?}"),
956        }
957    }
958
959    #[test]
960    fn test_parse_drop_sink_if_exists() {
961        let stmt = parse_one("DROP SINK IF EXISTS output");
962        match stmt {
963            StreamingStatement::DropSink {
964                name,
965                if_exists,
966                cascade,
967            } => {
968                assert_eq!(name.to_string(), "output");
969                assert!(if_exists);
970                assert!(!cascade);
971            }
972            _ => panic!("Expected DropSink, got {stmt:?}"),
973        }
974    }
975
976    #[test]
977    fn test_parse_drop_sink_cascade() {
978        let stmt = parse_one("DROP SINK output CASCADE");
979        match stmt {
980            StreamingStatement::DropSink {
981                name,
982                if_exists,
983                cascade,
984            } => {
985                assert_eq!(name.to_string(), "output");
986                assert!(!if_exists);
987                assert!(cascade);
988            }
989            _ => panic!("Expected DropSink, got {stmt:?}"),
990        }
991    }
992
993    #[test]
994    fn test_parse_drop_materialized_view() {
995        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
996        match stmt {
997            StreamingStatement::DropMaterializedView {
998                name,
999                if_exists,
1000                cascade,
1001            } => {
1002                assert_eq!(name.to_string(), "live_stats");
1003                assert!(!if_exists);
1004                assert!(!cascade);
1005            }
1006            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1007        }
1008    }
1009
1010    #[test]
1011    fn test_parse_drop_materialized_view_if_exists_cascade() {
1012        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1013        match stmt {
1014            StreamingStatement::DropMaterializedView {
1015                name,
1016                if_exists,
1017                cascade,
1018            } => {
1019                assert_eq!(name.to_string(), "live_stats");
1020                assert!(if_exists);
1021                assert!(cascade);
1022            }
1023            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1024        }
1025    }
1026
1027    #[test]
1028    fn test_parse_show_sources() {
1029        let stmt = parse_one("SHOW SOURCES");
1030        assert!(matches!(
1031            stmt,
1032            StreamingStatement::Show(ShowCommand::Sources)
1033        ));
1034    }
1035
1036    #[test]
1037    fn test_parse_show_sinks() {
1038        let stmt = parse_one("SHOW SINKS");
1039        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1040    }
1041
1042    #[test]
1043    fn test_parse_show_queries() {
1044        let stmt = parse_one("SHOW QUERIES");
1045        assert!(matches!(
1046            stmt,
1047            StreamingStatement::Show(ShowCommand::Queries)
1048        ));
1049    }
1050
1051    #[test]
1052    fn test_parse_show_materialized_views() {
1053        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1054        assert!(matches!(
1055            stmt,
1056            StreamingStatement::Show(ShowCommand::MaterializedViews)
1057        ));
1058    }
1059
1060    #[test]
1061    fn test_parse_describe() {
1062        let stmt = parse_one("DESCRIBE events");
1063        match stmt {
1064            StreamingStatement::Describe { name, extended } => {
1065                assert_eq!(name.to_string(), "events");
1066                assert!(!extended);
1067            }
1068            _ => panic!("Expected Describe, got {stmt:?}"),
1069        }
1070    }
1071
1072    #[test]
1073    fn test_parse_describe_extended() {
1074        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1075        match stmt {
1076            StreamingStatement::Describe { name, extended } => {
1077                assert_eq!(name.to_string(), "my_schema.events");
1078                assert!(extended);
1079            }
1080            _ => panic!("Expected Describe, got {stmt:?}"),
1081        }
1082    }
1083
1084    #[test]
1085    fn test_parse_explain_select() {
1086        let stmt = parse_one("EXPLAIN SELECT * FROM events");
1087        match stmt {
1088            StreamingStatement::Explain {
1089                statement, analyze, ..
1090            } => {
1091                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1092                assert!(!analyze);
1093            }
1094            _ => panic!("Expected Explain, got {stmt:?}"),
1095        }
1096    }
1097
1098    #[test]
1099    fn test_parse_explain_create_source() {
1100        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1101        match stmt {
1102            StreamingStatement::Explain { statement, .. } => {
1103                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1104            }
1105            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1106        }
1107    }
1108
1109    #[test]
1110    fn test_parse_explain_analyze_select() {
1111        let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1112        match stmt {
1113            StreamingStatement::Explain {
1114                statement, analyze, ..
1115            } => {
1116                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1117                assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1118            }
1119            _ => panic!("Expected Explain, got {stmt:?}"),
1120        }
1121    }
1122
1123    #[test]
1124    fn test_parse_create_materialized_view() {
1125        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1126        match stmt {
1127            StreamingStatement::CreateMaterializedView {
1128                name,
1129                emit_clause,
1130                or_replace,
1131                if_not_exists,
1132                ..
1133            } => {
1134                assert_eq!(name.to_string(), "live_stats");
1135                assert!(emit_clause.is_none());
1136                assert!(!or_replace);
1137                assert!(!if_not_exists);
1138            }
1139            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1140        }
1141    }
1142
1143    #[test]
1144    fn test_parse_create_materialized_view_with_emit() {
1145        let stmt = parse_one(
1146            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1147        );
1148        match stmt {
1149            StreamingStatement::CreateMaterializedView {
1150                name, emit_clause, ..
1151            } => {
1152                assert_eq!(name.to_string(), "live_stats");
1153                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1154            }
1155            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1156        }
1157    }
1158
1159    #[test]
1160    fn test_parse_create_or_replace_materialized_view() {
1161        let stmt = parse_one(
1162            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1163        );
1164        match stmt {
1165            StreamingStatement::CreateMaterializedView {
1166                name,
1167                or_replace,
1168                if_not_exists,
1169                ..
1170            } => {
1171                assert_eq!(name.to_string(), "live_stats");
1172                assert!(or_replace);
1173                assert!(!if_not_exists);
1174            }
1175            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1176        }
1177    }
1178
1179    #[test]
1180    fn test_parse_create_materialized_view_if_not_exists() {
1181        let stmt = parse_one(
1182            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1183        );
1184        match stmt {
1185            StreamingStatement::CreateMaterializedView {
1186                name,
1187                or_replace,
1188                if_not_exists,
1189                ..
1190            } => {
1191                assert_eq!(name.to_string(), "live_stats");
1192                assert!(!or_replace);
1193                assert!(if_not_exists);
1194            }
1195            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1196        }
1197    }
1198
1199    #[test]
1200    fn test_parse_insert_into() {
1201        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1202        match stmt {
1203            StreamingStatement::InsertInto {
1204                table_name,
1205                columns,
1206                values,
1207            } => {
1208                assert_eq!(table_name.to_string(), "events");
1209                assert_eq!(columns.len(), 2);
1210                assert_eq!(columns[0].to_string(), "id");
1211                assert_eq!(columns[1].to_string(), "name");
1212                assert_eq!(values.len(), 1);
1213                assert_eq!(values[0].len(), 2);
1214            }
1215            _ => panic!("Expected InsertInto, got {stmt:?}"),
1216        }
1217    }
1218
1219    #[test]
1220    fn test_parse_insert_into_multiple_rows() {
1221        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1222        match stmt {
1223            StreamingStatement::InsertInto {
1224                table_name,
1225                columns,
1226                values,
1227            } => {
1228                assert_eq!(table_name.to_string(), "events");
1229                assert!(columns.is_empty());
1230                assert_eq!(values.len(), 3);
1231            }
1232            _ => panic!("Expected InsertInto, got {stmt:?}"),
1233        }
1234    }
1235
1236    // ── CREATE STREAM tests ─────────────────────────────
1237
1238    #[test]
1239    fn test_parse_create_stream() {
1240        let stmt = parse_one(
1241            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1242        );
1243        match stmt {
1244            StreamingStatement::CreateStream {
1245                name,
1246                or_replace,
1247                if_not_exists,
1248                emit_clause,
1249                ..
1250            } => {
1251                assert_eq!(name.to_string(), "session_activity");
1252                assert!(!or_replace);
1253                assert!(!if_not_exists);
1254                assert!(emit_clause.is_none());
1255            }
1256            _ => panic!("Expected CreateStream, got {stmt:?}"),
1257        }
1258    }
1259
1260    #[test]
1261    fn test_parse_create_or_replace_stream() {
1262        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1263        match stmt {
1264            StreamingStatement::CreateStream { or_replace, .. } => {
1265                assert!(or_replace);
1266            }
1267            _ => panic!("Expected CreateStream, got {stmt:?}"),
1268        }
1269    }
1270
1271    #[test]
1272    fn test_parse_create_stream_if_not_exists() {
1273        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1274        match stmt {
1275            StreamingStatement::CreateStream { if_not_exists, .. } => {
1276                assert!(if_not_exists);
1277            }
1278            _ => panic!("Expected CreateStream, got {stmt:?}"),
1279        }
1280    }
1281
1282    #[test]
1283    fn test_parse_create_stream_with_emit() {
1284        let stmt =
1285            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1286        match stmt {
1287            StreamingStatement::CreateStream { emit_clause, .. } => {
1288                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1289            }
1290            _ => panic!("Expected CreateStream, got {stmt:?}"),
1291        }
1292    }
1293
1294    #[test]
1295    fn test_parse_drop_stream() {
1296        let stmt = parse_one("DROP STREAM my_stream");
1297        match stmt {
1298            StreamingStatement::DropStream {
1299                name,
1300                if_exists,
1301                cascade,
1302            } => {
1303                assert_eq!(name.to_string(), "my_stream");
1304                assert!(!if_exists);
1305                assert!(!cascade);
1306            }
1307            _ => panic!("Expected DropStream, got {stmt:?}"),
1308        }
1309    }
1310
1311    #[test]
1312    fn test_parse_drop_stream_if_exists() {
1313        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1314        match stmt {
1315            StreamingStatement::DropStream {
1316                name,
1317                if_exists,
1318                cascade,
1319            } => {
1320                assert_eq!(name.to_string(), "my_stream");
1321                assert!(if_exists);
1322                assert!(!cascade);
1323            }
1324            _ => panic!("Expected DropStream, got {stmt:?}"),
1325        }
1326    }
1327
1328    #[test]
1329    fn test_parse_drop_stream_cascade() {
1330        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1331        match stmt {
1332            StreamingStatement::DropStream {
1333                name,
1334                if_exists,
1335                cascade,
1336            } => {
1337                assert_eq!(name.to_string(), "my_stream");
1338                assert!(!if_exists);
1339                assert!(cascade);
1340            }
1341            _ => panic!("Expected DropStream, got {stmt:?}"),
1342        }
1343    }
1344
1345    #[test]
1346    fn test_parse_show_streams() {
1347        let stmt = parse_one("SHOW STREAMS");
1348        assert!(matches!(
1349            stmt,
1350            StreamingStatement::Show(ShowCommand::Streams)
1351        ));
1352    }
1353
1354    #[test]
1355    fn test_parse_alter_source_add_column() {
1356        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1357        match stmt {
1358            StreamingStatement::AlterSource { name, operation } => {
1359                assert_eq!(name.to_string(), "events");
1360                match operation {
1361                    statements::AlterSourceOperation::AddColumn { column_def } => {
1362                        assert_eq!(column_def.name.value, "new_col");
1363                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1364                    }
1365                    statements::AlterSourceOperation::SetProperties { .. } => {
1366                        panic!("Expected AddColumn")
1367                    }
1368                }
1369            }
1370            _ => panic!("Expected AlterSource, got {stmt:?}"),
1371        }
1372    }
1373
1374    #[test]
1375    fn test_parse_alter_source_set_properties() {
1376        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1377        match stmt {
1378            StreamingStatement::AlterSource { name, operation } => {
1379                assert_eq!(name.to_string(), "events");
1380                match operation {
1381                    statements::AlterSourceOperation::SetProperties { properties } => {
1382                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1383                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1384                    }
1385                    statements::AlterSourceOperation::AddColumn { .. } => {
1386                        panic!("Expected SetProperties")
1387                    }
1388                }
1389            }
1390            _ => panic!("Expected AlterSource, got {stmt:?}"),
1391        }
1392    }
1393
1394    #[test]
1395    fn test_parse_checkpoint() {
1396        let stmt = parse_one("CHECKPOINT");
1397        assert!(
1398            matches!(stmt, StreamingStatement::Checkpoint),
1399            "Expected Checkpoint, got {stmt:?}"
1400        );
1401    }
1402
1403    #[test]
1404    fn test_parse_show_checkpoint_status() {
1405        let stmt = parse_one("SHOW CHECKPOINT STATUS");
1406        assert!(
1407            matches!(
1408                stmt,
1409                StreamingStatement::Show(ShowCommand::CheckpointStatus)
1410            ),
1411            "Expected Show(CheckpointStatus), got {stmt:?}"
1412        );
1413    }
1414
1415    #[test]
1416    fn test_parse_restore_checkpoint() {
1417        let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1418        match stmt {
1419            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1420                assert_eq!(checkpoint_id, 42);
1421            }
1422            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1423        }
1424    }
1425
1426    #[test]
1427    fn test_parse_restore_checkpoint_large_id() {
1428        let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1429        match stmt {
1430            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1431                assert_eq!(checkpoint_id, 123_456);
1432            }
1433            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1434        }
1435    }
1436
1437    #[test]
1438    fn test_parse_show_create_source() {
1439        let stmt = parse_one("SHOW CREATE SOURCE events");
1440        match stmt {
1441            StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1442                assert_eq!(name.to_string(), "events");
1443            }
1444            _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1445        }
1446    }
1447
1448    #[test]
1449    fn test_parse_show_create_sink() {
1450        let stmt = parse_one("SHOW CREATE SINK output");
1451        match stmt {
1452            StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1453                assert_eq!(name.to_string(), "output");
1454            }
1455            _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1456        }
1457    }
1458}