Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
15pub mod lookup_table;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use lookup_table::CreateLookupTableStatement;
24pub use statements::{
25    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
26    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
27    WindowFunction,
28};
29pub use window_rewriter::WindowRewriter;
30
31use dialect::LaminarDialect;
32use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
33
34/// Parses SQL with streaming extensions.
35///
36/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
37/// for structured parsing. Standard SQL is delegated to sqlparser directly.
38///
39/// # Errors
40///
41/// Returns `ParseError` if the SQL syntax is invalid.
42pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
43    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
44}
45
46/// Parser for streaming SQL extensions.
47///
48/// Provides static methods for parsing streaming SQL statements.
49/// Uses sqlparser's `Parser` API internally for structured parsing
50/// of identifiers, data types, expressions, and queries.
51pub struct StreamingParser;
52
53impl StreamingParser {
54    /// Parse a SQL string with streaming extensions.
55    ///
56    /// Tokenizes the input to detect statement type, then routes to the
57    /// appropriate parser:
58    /// - CREATE SOURCE → `source_parser`
59    /// - CREATE SINK → `sink_parser`
60    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
61    /// - Everything else → `sqlparser::parser::Parser`
62    ///
63    /// # Errors
64    ///
65    /// Returns `ParserError` if the SQL syntax is invalid.
66    #[allow(clippy::too_many_lines)]
67    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
68        let sql_trimmed = sql.trim();
69        if sql_trimmed.is_empty() {
70            return Err(sqlparser::parser::ParserError::ParserError(
71                "Empty SQL statement".to_string(),
72            ));
73        }
74
75        let dialect = LaminarDialect::default();
76
77        // Tokenize to detect statement type (with location for better errors)
78        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
79            .tokenize_with_location()
80            .map_err(|e| {
81                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
82            })?;
83
84        // Route based on token-level detection
85        match detect_streaming_ddl(&tokens) {
86            StreamingDdlKind::CreateSource { .. } => {
87                let mut parser =
88                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
89                let source = source_parser::parse_create_source(&mut parser)
90                    .map_err(parse_error_to_parser_error)?;
91                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
92            }
93            StreamingDdlKind::CreateSink { .. } => {
94                let mut parser =
95                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
96                let sink = sink_parser::parse_create_sink(&mut parser)
97                    .map_err(parse_error_to_parser_error)?;
98                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
99            }
100            StreamingDdlKind::CreateContinuousQuery { .. } => {
101                let mut parser =
102                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
103                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
104                    .map_err(parse_error_to_parser_error)?;
105                Ok(vec![stmt])
106            }
107            StreamingDdlKind::DropSource { .. } => {
108                let mut parser =
109                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
110                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
111                Ok(vec![stmt])
112            }
113            StreamingDdlKind::DropSink { .. } => {
114                let mut parser =
115                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
116                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
117                Ok(vec![stmt])
118            }
119            StreamingDdlKind::DropMaterializedView { .. } => {
120                let mut parser =
121                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
122                let stmt = parse_drop_materialized_view(&mut parser)
123                    .map_err(parse_error_to_parser_error)?;
124                Ok(vec![stmt])
125            }
126            StreamingDdlKind::ShowSources => {
127                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
128            }
129            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
130            StreamingDdlKind::ShowQueries => {
131                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
132            }
133            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
134                ShowCommand::MaterializedViews,
135            )]),
136            StreamingDdlKind::DescribeSource => {
137                let mut parser =
138                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
139                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
140                Ok(vec![stmt])
141            }
142            StreamingDdlKind::ExplainStreaming => {
143                let mut parser =
144                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
145                let stmt =
146                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
147                Ok(vec![stmt])
148            }
149            StreamingDdlKind::CreateMaterializedView { .. } => {
150                let mut parser =
151                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
152                let stmt = parse_create_materialized_view(&mut parser)
153                    .map_err(parse_error_to_parser_error)?;
154                Ok(vec![stmt])
155            }
156            StreamingDdlKind::CreateStream { .. } => {
157                let mut parser =
158                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
159                let stmt = parse_create_stream(&mut parser, sql_trimmed)
160                    .map_err(parse_error_to_parser_error)?;
161                Ok(vec![stmt])
162            }
163            StreamingDdlKind::DropStream { .. } => {
164                let mut parser =
165                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
166                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
167                Ok(vec![stmt])
168            }
169            StreamingDdlKind::ShowStreams => {
170                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
171            }
172            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
173            StreamingDdlKind::CreateLookupTable { .. } => {
174                let mut parser =
175                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
176                let lt = lookup_table::parse_create_lookup_table(&mut parser)
177                    .map_err(parse_error_to_parser_error)?;
178                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
179            }
180            StreamingDdlKind::DropLookupTable { .. } => {
181                let mut parser =
182                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
183                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
184                    .map_err(parse_error_to_parser_error)?;
185                Ok(vec![StreamingStatement::DropLookupTable {
186                    name,
187                    if_exists,
188                }])
189            }
190            StreamingDdlKind::AlterSource => {
191                let mut parser =
192                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
193                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
194                Ok(vec![stmt])
195            }
196            StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
197                ShowCommand::CheckpointStatus,
198            )]),
199            StreamingDdlKind::ShowCreateSource => {
200                let mut parser =
201                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
202                let stmt =
203                    parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
204                Ok(vec![stmt])
205            }
206            StreamingDdlKind::ShowCreateSink => {
207                let mut parser =
208                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
209                let stmt =
210                    parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
211                Ok(vec![stmt])
212            }
213            StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
214            StreamingDdlKind::RestoreCheckpoint => {
215                let mut parser =
216                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
217                let stmt =
218                    parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
219                Ok(vec![stmt])
220            }
221            StreamingDdlKind::None => {
222                // Standard SQL - check for INSERT INTO and convert
223                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
224                Ok(statements
225                    .into_iter()
226                    .map(convert_standard_statement)
227                    .collect())
228            }
229        }
230    }
231
232    /// Check if an expression contains a window function.
233    #[must_use]
234    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
235        match expr {
236            sqlparser::ast::Expr::Function(func) => {
237                if let Some(name) = func.name.0.last() {
238                    let func_name = name.to_string().to_uppercase();
239                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
240                } else {
241                    false
242                }
243            }
244            _ => false,
245        }
246    }
247
248    /// Parse EMIT clause from SQL string.
249    ///
250    /// # Errors
251    ///
252    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
253    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
254        emit_parser::parse_emit_clause_from_sql(sql)
255    }
256
257    /// Parse late data handling clause from SQL string.
258    ///
259    /// # Errors
260    ///
261    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
262    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
263        late_data_parser::parse_late_data_clause_from_sql(sql)
264    }
265}
266
267/// Convert `ParseError` to `ParserError` for backward compatibility.
268fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
269    match e {
270        ParseError::SqlParseError(pe) => pe,
271        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
272        ParseError::WindowError(msg) => {
273            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
274        }
275        ParseError::ValidationError(msg) => {
276            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
277        }
278    }
279}
280
281/// Parse a RESTORE FROM CHECKPOINT statement.
282///
283/// Syntax: `RESTORE FROM CHECKPOINT <id>`
284///
285/// # Errors
286///
287/// Returns `ParseError` if the statement syntax is invalid.
288fn parse_restore_checkpoint(
289    parser: &mut sqlparser::parser::Parser,
290) -> Result<StreamingStatement, ParseError> {
291    // Consume RESTORE
292    tokenizer::expect_custom_keyword(parser, "RESTORE")?;
293    // Consume FROM
294    if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
295        return Err(ParseError::StreamingError(
296            "Expected FROM after RESTORE".to_string(),
297        ));
298    }
299    // Consume CHECKPOINT
300    tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
301    // Parse checkpoint ID (numeric literal)
302    let token = parser.next_token();
303    match &token.token {
304        sqlparser::tokenizer::Token::Number(n, _) => {
305            let id: u64 = n
306                .parse()
307                .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
308            Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
309        }
310        other => Err(ParseError::StreamingError(format!(
311            "Expected checkpoint ID (number), found {other}"
312        ))),
313    }
314}
315
316/// Convert a standard sqlparser statement to a `StreamingStatement`.
317///
318/// Detects INSERT INTO statements and converts them to the streaming
319/// `InsertInto` variant. All other statements are wrapped as `Standard`.
320fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
321    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
322        // Extract table name from TableObject
323        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
324            let table_name = name.clone();
325            let columns = insert.columns.clone();
326
327            // Try to extract VALUES rows from source query
328            if let Some(ref source) = insert.source {
329                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
330                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
331                    return StreamingStatement::InsertInto {
332                        table_name,
333                        columns,
334                        values: rows,
335                    };
336                }
337            }
338        }
339    }
340    StreamingStatement::Standard(Box::new(stmt))
341}
342
343/// Parse a DROP SOURCE statement.
344///
345/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
346///
347/// # Errors
348///
349/// Returns `ParseError` if the statement syntax is invalid.
350fn parse_drop_source(
351    parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353    parser
354        .expect_keyword(sqlparser::keywords::Keyword::DROP)
355        .map_err(ParseError::SqlParseError)?;
356    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
357    let if_exists = parser.parse_keywords(&[
358        sqlparser::keywords::Keyword::IF,
359        sqlparser::keywords::Keyword::EXISTS,
360    ]);
361    let name = parser
362        .parse_object_name(false)
363        .map_err(ParseError::SqlParseError)?;
364    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
365    Ok(StreamingStatement::DropSource {
366        name,
367        if_exists,
368        cascade,
369    })
370}
371
372/// Parse a DROP SINK statement.
373///
374/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
375///
376/// # Errors
377///
378/// Returns `ParseError` if the statement syntax is invalid.
379fn parse_drop_sink(
380    parser: &mut sqlparser::parser::Parser,
381) -> Result<StreamingStatement, ParseError> {
382    parser
383        .expect_keyword(sqlparser::keywords::Keyword::DROP)
384        .map_err(ParseError::SqlParseError)?;
385    tokenizer::expect_custom_keyword(parser, "SINK")?;
386    let if_exists = parser.parse_keywords(&[
387        sqlparser::keywords::Keyword::IF,
388        sqlparser::keywords::Keyword::EXISTS,
389    ]);
390    let name = parser
391        .parse_object_name(false)
392        .map_err(ParseError::SqlParseError)?;
393    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
394    Ok(StreamingStatement::DropSink {
395        name,
396        if_exists,
397        cascade,
398    })
399}
400
401/// Parse a DROP MATERIALIZED VIEW statement.
402///
403/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
404///
405/// # Errors
406///
407/// Returns `ParseError` if the statement syntax is invalid.
408fn parse_drop_materialized_view(
409    parser: &mut sqlparser::parser::Parser,
410) -> Result<StreamingStatement, ParseError> {
411    parser
412        .expect_keyword(sqlparser::keywords::Keyword::DROP)
413        .map_err(ParseError::SqlParseError)?;
414    parser
415        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
416        .map_err(ParseError::SqlParseError)?;
417    parser
418        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
419        .map_err(ParseError::SqlParseError)?;
420    let if_exists = parser.parse_keywords(&[
421        sqlparser::keywords::Keyword::IF,
422        sqlparser::keywords::Keyword::EXISTS,
423    ]);
424    let name = parser
425        .parse_object_name(false)
426        .map_err(ParseError::SqlParseError)?;
427    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
428    Ok(StreamingStatement::DropMaterializedView {
429        name,
430        if_exists,
431        cascade,
432    })
433}
434
435/// Parse a CREATE STREAM statement.
436///
437/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
438///
439/// # Errors
440///
441/// Returns `ParseError` if the statement syntax is invalid.
442fn parse_create_stream(
443    parser: &mut sqlparser::parser::Parser,
444    _original_sql: &str,
445) -> Result<StreamingStatement, ParseError> {
446    parser
447        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
448        .map_err(ParseError::SqlParseError)?;
449
450    let or_replace = parser.parse_keywords(&[
451        sqlparser::keywords::Keyword::OR,
452        sqlparser::keywords::Keyword::REPLACE,
453    ]);
454
455    tokenizer::expect_custom_keyword(parser, "STREAM")?;
456
457    let if_not_exists = parser.parse_keywords(&[
458        sqlparser::keywords::Keyword::IF,
459        sqlparser::keywords::Keyword::NOT,
460        sqlparser::keywords::Keyword::EXISTS,
461    ]);
462
463    let name = parser
464        .parse_object_name(false)
465        .map_err(ParseError::SqlParseError)?;
466
467    parser
468        .expect_keyword(sqlparser::keywords::Keyword::AS)
469        .map_err(ParseError::SqlParseError)?;
470
471    // Collect remaining tokens and split at EMIT boundary
472    let remaining = collect_remaining_tokens(parser);
473    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
474
475    let stream_dialect = LaminarDialect::default();
476
477    let query = if query_tokens.is_empty() {
478        return Err(ParseError::StreamingError(
479            "Expected SELECT query after AS".to_string(),
480        ));
481    } else {
482        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
483            .with_tokens_with_locations(query_tokens);
484        query_parser
485            .parse_query()
486            .map_err(ParseError::SqlParseError)?
487    };
488
489    let query_stmt =
490        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
491
492    let emit_clause = if emit_tokens.is_empty() {
493        None
494    } else {
495        let mut emit_parser =
496            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
497        emit_parser::parse_emit_clause(&mut emit_parser)?
498    };
499
500    Ok(StreamingStatement::CreateStream {
501        name,
502        query: Box::new(query_stmt),
503        emit_clause,
504        or_replace,
505        if_not_exists,
506    })
507}
508
509/// Parse a DROP STREAM statement.
510///
511/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
512///
513/// # Errors
514///
515/// Returns `ParseError` if the statement syntax is invalid.
516fn parse_drop_stream(
517    parser: &mut sqlparser::parser::Parser,
518) -> Result<StreamingStatement, ParseError> {
519    parser
520        .expect_keyword(sqlparser::keywords::Keyword::DROP)
521        .map_err(ParseError::SqlParseError)?;
522    tokenizer::expect_custom_keyword(parser, "STREAM")?;
523    let if_exists = parser.parse_keywords(&[
524        sqlparser::keywords::Keyword::IF,
525        sqlparser::keywords::Keyword::EXISTS,
526    ]);
527    let name = parser
528        .parse_object_name(false)
529        .map_err(ParseError::SqlParseError)?;
530    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
531    Ok(StreamingStatement::DropStream {
532        name,
533        if_exists,
534        cascade,
535    })
536}
537
538/// Parse a DESCRIBE statement.
539///
540/// Syntax: `DESCRIBE [EXTENDED] name`
541/// Parse an ALTER SOURCE statement.
542///
543/// Syntax:
544/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
545/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
546///
547/// # Errors
548///
549/// Returns `ParseError` if the statement syntax is invalid.
550fn parse_alter_source(
551    parser: &mut sqlparser::parser::Parser,
552) -> Result<StreamingStatement, ParseError> {
553    parser
554        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
555        .map_err(ParseError::SqlParseError)?;
556    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
557    let name = parser
558        .parse_object_name(false)
559        .map_err(ParseError::SqlParseError)?;
560
561    // Determine operation: ADD COLUMN or SET
562    if parser.parse_keywords(&[
563        sqlparser::keywords::Keyword::ADD,
564        sqlparser::keywords::Keyword::COLUMN,
565    ]) {
566        // ALTER SOURCE name ADD COLUMN col_name data_type
567        let col_name = parser
568            .parse_identifier()
569            .map_err(ParseError::SqlParseError)?;
570        let data_type = parser
571            .parse_data_type()
572            .map_err(ParseError::SqlParseError)?;
573        let column_def = sqlparser::ast::ColumnDef {
574            name: col_name,
575            data_type,
576            options: vec![],
577        };
578        Ok(StreamingStatement::AlterSource {
579            name,
580            operation: statements::AlterSourceOperation::AddColumn { column_def },
581        })
582    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
583        // ALTER SOURCE name SET ('key' = 'value', ...)
584        parser
585            .expect_token(&sqlparser::tokenizer::Token::LParen)
586            .map_err(ParseError::SqlParseError)?;
587        #[allow(clippy::disallowed_types)] // cold path: SQL parsing
588        let mut properties = std::collections::HashMap::new();
589        loop {
590            let key = parser
591                .parse_literal_string()
592                .map_err(ParseError::SqlParseError)?;
593            parser
594                .expect_token(&sqlparser::tokenizer::Token::Eq)
595                .map_err(ParseError::SqlParseError)?;
596            let value = parser
597                .parse_literal_string()
598                .map_err(ParseError::SqlParseError)?;
599            properties.insert(key, value);
600            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
601                break;
602            }
603        }
604        parser
605            .expect_token(&sqlparser::tokenizer::Token::RParen)
606            .map_err(ParseError::SqlParseError)?;
607        Ok(StreamingStatement::AlterSource {
608            name,
609            operation: statements::AlterSourceOperation::SetProperties { properties },
610        })
611    } else {
612        Err(ParseError::StreamingError(
613            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
614        ))
615    }
616}
617
618/// Parse a DESCRIBE statement.
619///
620/// # Errors
621///
622/// Returns `ParseError` if the statement syntax is invalid.
623fn parse_describe(
624    parser: &mut sqlparser::parser::Parser,
625) -> Result<StreamingStatement, ParseError> {
626    // Consume DESCRIBE or DESC
627    let token = parser.next_token();
628    match &token.token {
629        sqlparser::tokenizer::Token::Word(w)
630            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
631                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
632        _ => {
633            return Err(ParseError::StreamingError(
634                "Expected DESCRIBE or DESC".to_string(),
635            ));
636        }
637    }
638    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
639    let name = parser
640        .parse_object_name(false)
641        .map_err(ParseError::SqlParseError)?;
642    Ok(StreamingStatement::Describe { name, extended })
643}
644
645/// Parse `SHOW CREATE SOURCE <name>`.
646fn parse_show_create_source(
647    parser: &mut sqlparser::parser::Parser,
648) -> Result<StreamingStatement, ParseError> {
649    // Consume SHOW CREATE SOURCE
650    parser
651        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
652        .map_err(ParseError::SqlParseError)?;
653    parser
654        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
655        .map_err(ParseError::SqlParseError)?;
656    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
657    let name = parser
658        .parse_object_name(false)
659        .map_err(ParseError::SqlParseError)?;
660    Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
661}
662
663/// Parse `SHOW CREATE SINK <name>`.
664fn parse_show_create_sink(
665    parser: &mut sqlparser::parser::Parser,
666) -> Result<StreamingStatement, ParseError> {
667    // Consume SHOW CREATE SINK
668    parser
669        .expect_keyword(sqlparser::keywords::Keyword::SHOW)
670        .map_err(ParseError::SqlParseError)?;
671    parser
672        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
673        .map_err(ParseError::SqlParseError)?;
674    tokenizer::expect_custom_keyword(parser, "SINK")?;
675    let name = parser
676        .parse_object_name(false)
677        .map_err(ParseError::SqlParseError)?;
678    Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
679}
680
681/// Parse an EXPLAIN [ANALYZE] statement wrapping a streaming query.
682///
683/// Syntax: `EXPLAIN [ANALYZE] <streaming_statement>`
684///
685/// # Errors
686///
687/// Returns `ParseError` if the statement syntax is invalid.
688fn parse_explain(
689    parser: &mut sqlparser::parser::Parser,
690    original_sql: &str,
691) -> Result<StreamingStatement, ParseError> {
692    parser
693        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
694        .map_err(ParseError::SqlParseError)?;
695
696    // Check for optional ANALYZE keyword
697    let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
698
699    // Find the position after EXPLAIN [ANALYZE] in the original SQL
700    let explain_prefix_upper = original_sql.to_uppercase();
701    let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
702    let inner_start = if analyze {
703        explain_prefix_upper
704            .find("ANALYZE")
705            .map_or(0, |pos| pos + "ANALYZE".len())
706    } else {
707        explain_prefix_upper
708            .find("EXPLAIN")
709            .map_or(0, |pos| pos + "EXPLAIN".len())
710    };
711    let inner_sql = original_sql[inner_start..].trim();
712    let _ = skip_keyword; // suppress unused warning
713
714    // Parse the inner statement recursively
715    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
716    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
717        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
718    })?;
719    Ok(StreamingStatement::Explain {
720        statement: Box::new(inner),
721        analyze,
722    })
723}
724
725/// Parse a CREATE MATERIALIZED VIEW statement.
726///
727/// Syntax:
728/// ```sql
729/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
730/// AS <select_query>
731/// [EMIT <strategy>]
732/// ```
733///
734/// # Errors
735///
736/// Returns `ParseError` if the statement syntax is invalid.
737fn parse_create_materialized_view(
738    parser: &mut sqlparser::parser::Parser,
739) -> Result<StreamingStatement, ParseError> {
740    parser
741        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
742        .map_err(ParseError::SqlParseError)?;
743
744    let or_replace = parser.parse_keywords(&[
745        sqlparser::keywords::Keyword::OR,
746        sqlparser::keywords::Keyword::REPLACE,
747    ]);
748
749    parser
750        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
751        .map_err(ParseError::SqlParseError)?;
752    parser
753        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
754        .map_err(ParseError::SqlParseError)?;
755
756    let if_not_exists = parser.parse_keywords(&[
757        sqlparser::keywords::Keyword::IF,
758        sqlparser::keywords::Keyword::NOT,
759        sqlparser::keywords::Keyword::EXISTS,
760    ]);
761
762    let name = parser
763        .parse_object_name(false)
764        .map_err(ParseError::SqlParseError)?;
765
766    parser
767        .expect_keyword(sqlparser::keywords::Keyword::AS)
768        .map_err(ParseError::SqlParseError)?;
769
770    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
771    let remaining = collect_remaining_tokens(parser);
772    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
773
774    let mv_dialect = LaminarDialect::default();
775
776    let query = if query_tokens.is_empty() {
777        return Err(ParseError::StreamingError(
778            "Expected SELECT query after AS".to_string(),
779        ));
780    } else {
781        let mut query_parser =
782            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
783        query_parser
784            .parse_query()
785            .map_err(ParseError::SqlParseError)?
786    };
787
788    let query_stmt =
789        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
790
791    let emit_clause = if emit_tokens.is_empty() {
792        None
793    } else {
794        let mut emit_parser =
795            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
796        emit_parser::parse_emit_clause(&mut emit_parser)?
797    };
798
799    Ok(StreamingStatement::CreateMaterializedView {
800        name,
801        query: Box::new(query_stmt),
802        emit_clause,
803        or_replace,
804        if_not_exists,
805    })
806}
807
808/// Collect all remaining tokens from the parser into a Vec.
809fn collect_remaining_tokens(
810    parser: &mut sqlparser::parser::Parser,
811) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
812    let mut tokens = Vec::new();
813    loop {
814        let token = parser.next_token();
815        if token.token == sqlparser::tokenizer::Token::EOF {
816            tokens.push(token);
817            break;
818        }
819        tokens.push(token);
820    }
821    tokens
822}
823
824/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
825///
826/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
827/// (or is empty if no EMIT found).
828fn split_at_emit(
829    tokens: &[sqlparser::tokenizer::TokenWithSpan],
830) -> (
831    Vec<sqlparser::tokenizer::TokenWithSpan>,
832    Vec<sqlparser::tokenizer::TokenWithSpan>,
833) {
834    let mut depth: i32 = 0;
835    for (i, token) in tokens.iter().enumerate() {
836        match &token.token {
837            sqlparser::tokenizer::Token::LParen => depth += 1,
838            sqlparser::tokenizer::Token::RParen => {
839                depth -= 1;
840            }
841            sqlparser::tokenizer::Token::Word(w)
842                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
843            {
844                let mut query_tokens = tokens[..i].to_vec();
845                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
846                    token: sqlparser::tokenizer::Token::EOF,
847                    span: sqlparser::tokenizer::Span::empty(),
848                });
849                let emit_tokens = tokens[i..].to_vec();
850                return (query_tokens, emit_tokens);
851            }
852            _ => {}
853        }
854    }
855    (tokens.to_vec(), vec![])
856}
857
858/// SQL parsing errors
859#[derive(Debug, thiserror::Error)]
860pub enum ParseError {
861    /// Standard SQL parse error
862    #[error("SQL parse error: {0}")]
863    SqlParseError(#[from] sqlparser::parser::ParserError),
864
865    /// Streaming extension parse error
866    #[error("Streaming SQL error: {0}")]
867    StreamingError(String),
868
869    /// Window function error
870    #[error("Window function error: {0}")]
871    WindowError(String),
872
873    /// Validation error (e.g., invalid option values)
874    #[error("Validation error: {0}")]
875    ValidationError(String),
876}
877
878#[cfg(test)]
879mod tests {
880    use super::*;
881
882    /// Helper to parse SQL and return the first statement.
883    fn parse_one(sql: &str) -> StreamingStatement {
884        let stmts = StreamingParser::parse_sql(sql).unwrap();
885        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
886        stmts.into_iter().next().unwrap()
887    }
888
889    #[test]
890    fn test_parse_drop_source() {
891        let stmt = parse_one("DROP SOURCE events");
892        match stmt {
893            StreamingStatement::DropSource {
894                name,
895                if_exists,
896                cascade,
897            } => {
898                assert_eq!(name.to_string(), "events");
899                assert!(!if_exists);
900                assert!(!cascade);
901            }
902            _ => panic!("Expected DropSource, got {stmt:?}"),
903        }
904    }
905
906    #[test]
907    fn test_parse_drop_source_if_exists() {
908        let stmt = parse_one("DROP SOURCE IF EXISTS events");
909        match stmt {
910            StreamingStatement::DropSource {
911                name,
912                if_exists,
913                cascade,
914            } => {
915                assert_eq!(name.to_string(), "events");
916                assert!(if_exists);
917                assert!(!cascade);
918            }
919            _ => panic!("Expected DropSource, got {stmt:?}"),
920        }
921    }
922
923    #[test]
924    fn test_parse_drop_source_cascade() {
925        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
926        match stmt {
927            StreamingStatement::DropSource {
928                name,
929                if_exists,
930                cascade,
931            } => {
932                assert_eq!(name.to_string(), "events");
933                assert!(if_exists);
934                assert!(cascade);
935            }
936            _ => panic!("Expected DropSource, got {stmt:?}"),
937        }
938    }
939
940    #[test]
941    fn test_parse_drop_sink() {
942        let stmt = parse_one("DROP SINK output");
943        match stmt {
944            StreamingStatement::DropSink {
945                name,
946                if_exists,
947                cascade,
948            } => {
949                assert_eq!(name.to_string(), "output");
950                assert!(!if_exists);
951                assert!(!cascade);
952            }
953            _ => panic!("Expected DropSink, got {stmt:?}"),
954        }
955    }
956
957    #[test]
958    fn test_parse_drop_sink_if_exists() {
959        let stmt = parse_one("DROP SINK IF EXISTS output");
960        match stmt {
961            StreamingStatement::DropSink {
962                name,
963                if_exists,
964                cascade,
965            } => {
966                assert_eq!(name.to_string(), "output");
967                assert!(if_exists);
968                assert!(!cascade);
969            }
970            _ => panic!("Expected DropSink, got {stmt:?}"),
971        }
972    }
973
974    #[test]
975    fn test_parse_drop_sink_cascade() {
976        let stmt = parse_one("DROP SINK output CASCADE");
977        match stmt {
978            StreamingStatement::DropSink {
979                name,
980                if_exists,
981                cascade,
982            } => {
983                assert_eq!(name.to_string(), "output");
984                assert!(!if_exists);
985                assert!(cascade);
986            }
987            _ => panic!("Expected DropSink, got {stmt:?}"),
988        }
989    }
990
991    #[test]
992    fn test_parse_drop_materialized_view() {
993        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
994        match stmt {
995            StreamingStatement::DropMaterializedView {
996                name,
997                if_exists,
998                cascade,
999            } => {
1000                assert_eq!(name.to_string(), "live_stats");
1001                assert!(!if_exists);
1002                assert!(!cascade);
1003            }
1004            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1005        }
1006    }
1007
1008    #[test]
1009    fn test_parse_drop_materialized_view_if_exists_cascade() {
1010        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1011        match stmt {
1012            StreamingStatement::DropMaterializedView {
1013                name,
1014                if_exists,
1015                cascade,
1016            } => {
1017                assert_eq!(name.to_string(), "live_stats");
1018                assert!(if_exists);
1019                assert!(cascade);
1020            }
1021            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1022        }
1023    }
1024
1025    #[test]
1026    fn test_parse_show_sources() {
1027        let stmt = parse_one("SHOW SOURCES");
1028        assert!(matches!(
1029            stmt,
1030            StreamingStatement::Show(ShowCommand::Sources)
1031        ));
1032    }
1033
1034    #[test]
1035    fn test_parse_show_sinks() {
1036        let stmt = parse_one("SHOW SINKS");
1037        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1038    }
1039
1040    #[test]
1041    fn test_parse_show_queries() {
1042        let stmt = parse_one("SHOW QUERIES");
1043        assert!(matches!(
1044            stmt,
1045            StreamingStatement::Show(ShowCommand::Queries)
1046        ));
1047    }
1048
1049    #[test]
1050    fn test_parse_show_materialized_views() {
1051        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1052        assert!(matches!(
1053            stmt,
1054            StreamingStatement::Show(ShowCommand::MaterializedViews)
1055        ));
1056    }
1057
1058    #[test]
1059    fn test_parse_describe() {
1060        let stmt = parse_one("DESCRIBE events");
1061        match stmt {
1062            StreamingStatement::Describe { name, extended } => {
1063                assert_eq!(name.to_string(), "events");
1064                assert!(!extended);
1065            }
1066            _ => panic!("Expected Describe, got {stmt:?}"),
1067        }
1068    }
1069
1070    #[test]
1071    fn test_parse_describe_extended() {
1072        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1073        match stmt {
1074            StreamingStatement::Describe { name, extended } => {
1075                assert_eq!(name.to_string(), "my_schema.events");
1076                assert!(extended);
1077            }
1078            _ => panic!("Expected Describe, got {stmt:?}"),
1079        }
1080    }
1081
1082    #[test]
1083    fn test_parse_explain_select() {
1084        let stmt = parse_one("EXPLAIN SELECT * FROM events");
1085        match stmt {
1086            StreamingStatement::Explain {
1087                statement, analyze, ..
1088            } => {
1089                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1090                assert!(!analyze);
1091            }
1092            _ => panic!("Expected Explain, got {stmt:?}"),
1093        }
1094    }
1095
1096    #[test]
1097    fn test_parse_explain_create_source() {
1098        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1099        match stmt {
1100            StreamingStatement::Explain { statement, .. } => {
1101                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1102            }
1103            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1104        }
1105    }
1106
1107    #[test]
1108    fn test_parse_explain_analyze_select() {
1109        let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1110        match stmt {
1111            StreamingStatement::Explain {
1112                statement, analyze, ..
1113            } => {
1114                assert!(matches!(*statement, StreamingStatement::Standard(_)));
1115                assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1116            }
1117            _ => panic!("Expected Explain, got {stmt:?}"),
1118        }
1119    }
1120
1121    #[test]
1122    fn test_parse_create_materialized_view() {
1123        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1124        match stmt {
1125            StreamingStatement::CreateMaterializedView {
1126                name,
1127                emit_clause,
1128                or_replace,
1129                if_not_exists,
1130                ..
1131            } => {
1132                assert_eq!(name.to_string(), "live_stats");
1133                assert!(emit_clause.is_none());
1134                assert!(!or_replace);
1135                assert!(!if_not_exists);
1136            }
1137            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1138        }
1139    }
1140
1141    #[test]
1142    fn test_parse_create_materialized_view_with_emit() {
1143        let stmt = parse_one(
1144            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1145        );
1146        match stmt {
1147            StreamingStatement::CreateMaterializedView {
1148                name, emit_clause, ..
1149            } => {
1150                assert_eq!(name.to_string(), "live_stats");
1151                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1152            }
1153            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1154        }
1155    }
1156
1157    #[test]
1158    fn test_parse_create_or_replace_materialized_view() {
1159        let stmt = parse_one(
1160            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1161        );
1162        match stmt {
1163            StreamingStatement::CreateMaterializedView {
1164                name,
1165                or_replace,
1166                if_not_exists,
1167                ..
1168            } => {
1169                assert_eq!(name.to_string(), "live_stats");
1170                assert!(or_replace);
1171                assert!(!if_not_exists);
1172            }
1173            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1174        }
1175    }
1176
1177    #[test]
1178    fn test_parse_create_materialized_view_if_not_exists() {
1179        let stmt = parse_one(
1180            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1181        );
1182        match stmt {
1183            StreamingStatement::CreateMaterializedView {
1184                name,
1185                or_replace,
1186                if_not_exists,
1187                ..
1188            } => {
1189                assert_eq!(name.to_string(), "live_stats");
1190                assert!(!or_replace);
1191                assert!(if_not_exists);
1192            }
1193            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1194        }
1195    }
1196
1197    #[test]
1198    fn test_parse_insert_into() {
1199        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1200        match stmt {
1201            StreamingStatement::InsertInto {
1202                table_name,
1203                columns,
1204                values,
1205            } => {
1206                assert_eq!(table_name.to_string(), "events");
1207                assert_eq!(columns.len(), 2);
1208                assert_eq!(columns[0].to_string(), "id");
1209                assert_eq!(columns[1].to_string(), "name");
1210                assert_eq!(values.len(), 1);
1211                assert_eq!(values[0].len(), 2);
1212            }
1213            _ => panic!("Expected InsertInto, got {stmt:?}"),
1214        }
1215    }
1216
1217    #[test]
1218    fn test_parse_insert_into_multiple_rows() {
1219        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1220        match stmt {
1221            StreamingStatement::InsertInto {
1222                table_name,
1223                columns,
1224                values,
1225            } => {
1226                assert_eq!(table_name.to_string(), "events");
1227                assert!(columns.is_empty());
1228                assert_eq!(values.len(), 3);
1229            }
1230            _ => panic!("Expected InsertInto, got {stmt:?}"),
1231        }
1232    }
1233
1234    // ── CREATE STREAM tests ─────────────────────────────
1235
1236    #[test]
1237    fn test_parse_create_stream() {
1238        let stmt = parse_one(
1239            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1240        );
1241        match stmt {
1242            StreamingStatement::CreateStream {
1243                name,
1244                or_replace,
1245                if_not_exists,
1246                emit_clause,
1247                ..
1248            } => {
1249                assert_eq!(name.to_string(), "session_activity");
1250                assert!(!or_replace);
1251                assert!(!if_not_exists);
1252                assert!(emit_clause.is_none());
1253            }
1254            _ => panic!("Expected CreateStream, got {stmt:?}"),
1255        }
1256    }
1257
1258    #[test]
1259    fn test_parse_create_or_replace_stream() {
1260        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1261        match stmt {
1262            StreamingStatement::CreateStream { or_replace, .. } => {
1263                assert!(or_replace);
1264            }
1265            _ => panic!("Expected CreateStream, got {stmt:?}"),
1266        }
1267    }
1268
1269    #[test]
1270    fn test_parse_create_stream_if_not_exists() {
1271        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1272        match stmt {
1273            StreamingStatement::CreateStream { if_not_exists, .. } => {
1274                assert!(if_not_exists);
1275            }
1276            _ => panic!("Expected CreateStream, got {stmt:?}"),
1277        }
1278    }
1279
1280    #[test]
1281    fn test_parse_create_stream_with_emit() {
1282        let stmt =
1283            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1284        match stmt {
1285            StreamingStatement::CreateStream { emit_clause, .. } => {
1286                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1287            }
1288            _ => panic!("Expected CreateStream, got {stmt:?}"),
1289        }
1290    }
1291
1292    #[test]
1293    fn test_parse_drop_stream() {
1294        let stmt = parse_one("DROP STREAM my_stream");
1295        match stmt {
1296            StreamingStatement::DropStream {
1297                name,
1298                if_exists,
1299                cascade,
1300            } => {
1301                assert_eq!(name.to_string(), "my_stream");
1302                assert!(!if_exists);
1303                assert!(!cascade);
1304            }
1305            _ => panic!("Expected DropStream, got {stmt:?}"),
1306        }
1307    }
1308
1309    #[test]
1310    fn test_parse_drop_stream_if_exists() {
1311        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1312        match stmt {
1313            StreamingStatement::DropStream {
1314                name,
1315                if_exists,
1316                cascade,
1317            } => {
1318                assert_eq!(name.to_string(), "my_stream");
1319                assert!(if_exists);
1320                assert!(!cascade);
1321            }
1322            _ => panic!("Expected DropStream, got {stmt:?}"),
1323        }
1324    }
1325
1326    #[test]
1327    fn test_parse_drop_stream_cascade() {
1328        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1329        match stmt {
1330            StreamingStatement::DropStream {
1331                name,
1332                if_exists,
1333                cascade,
1334            } => {
1335                assert_eq!(name.to_string(), "my_stream");
1336                assert!(!if_exists);
1337                assert!(cascade);
1338            }
1339            _ => panic!("Expected DropStream, got {stmt:?}"),
1340        }
1341    }
1342
1343    #[test]
1344    fn test_parse_show_streams() {
1345        let stmt = parse_one("SHOW STREAMS");
1346        assert!(matches!(
1347            stmt,
1348            StreamingStatement::Show(ShowCommand::Streams)
1349        ));
1350    }
1351
1352    #[test]
1353    fn test_parse_alter_source_add_column() {
1354        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1355        match stmt {
1356            StreamingStatement::AlterSource { name, operation } => {
1357                assert_eq!(name.to_string(), "events");
1358                match operation {
1359                    statements::AlterSourceOperation::AddColumn { column_def } => {
1360                        assert_eq!(column_def.name.value, "new_col");
1361                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1362                    }
1363                    statements::AlterSourceOperation::SetProperties { .. } => {
1364                        panic!("Expected AddColumn")
1365                    }
1366                }
1367            }
1368            _ => panic!("Expected AlterSource, got {stmt:?}"),
1369        }
1370    }
1371
1372    #[test]
1373    fn test_parse_alter_source_set_properties() {
1374        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1375        match stmt {
1376            StreamingStatement::AlterSource { name, operation } => {
1377                assert_eq!(name.to_string(), "events");
1378                match operation {
1379                    statements::AlterSourceOperation::SetProperties { properties } => {
1380                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1381                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1382                    }
1383                    statements::AlterSourceOperation::AddColumn { .. } => {
1384                        panic!("Expected SetProperties")
1385                    }
1386                }
1387            }
1388            _ => panic!("Expected AlterSource, got {stmt:?}"),
1389        }
1390    }
1391
1392    #[test]
1393    fn test_parse_checkpoint() {
1394        let stmt = parse_one("CHECKPOINT");
1395        assert!(
1396            matches!(stmt, StreamingStatement::Checkpoint),
1397            "Expected Checkpoint, got {stmt:?}"
1398        );
1399    }
1400
1401    #[test]
1402    fn test_parse_show_checkpoint_status() {
1403        let stmt = parse_one("SHOW CHECKPOINT STATUS");
1404        assert!(
1405            matches!(
1406                stmt,
1407                StreamingStatement::Show(ShowCommand::CheckpointStatus)
1408            ),
1409            "Expected Show(CheckpointStatus), got {stmt:?}"
1410        );
1411    }
1412
1413    #[test]
1414    fn test_parse_restore_checkpoint() {
1415        let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1416        match stmt {
1417            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1418                assert_eq!(checkpoint_id, 42);
1419            }
1420            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1421        }
1422    }
1423
1424    #[test]
1425    fn test_parse_restore_checkpoint_large_id() {
1426        let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1427        match stmt {
1428            StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1429                assert_eq!(checkpoint_id, 123_456);
1430            }
1431            _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1432        }
1433    }
1434
1435    #[test]
1436    fn test_parse_show_create_source() {
1437        let stmt = parse_one("SHOW CREATE SOURCE events");
1438        match stmt {
1439            StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1440                assert_eq!(name.to_string(), "events");
1441            }
1442            _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1443        }
1444    }
1445
1446    #[test]
1447    fn test_parse_show_create_sink() {
1448        let stmt = parse_one("SHOW CREATE SINK output");
1449        match stmt {
1450            StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1451                assert_eq!(name.to_string(), "output");
1452            }
1453            _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1454        }
1455    }
1456}