Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12/// INTERVAL arithmetic rewriter for BIGINT timestamp columns
13pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16/// Parser for CREATE/DROP LOOKUP TABLE DDL statements
17pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27    AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28    FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29    WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36/// Parses SQL with streaming extensions.
37///
38/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
39/// for structured parsing. Standard SQL is delegated to sqlparser directly.
40///
41/// # Errors
42///
43/// Returns `ParseError` if the SQL syntax is invalid.
44pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48/// Parser for streaming SQL extensions.
49///
50/// Provides static methods for parsing streaming SQL statements.
51/// Uses sqlparser's `Parser` API internally for structured parsing
52/// of identifiers, data types, expressions, and queries.
53pub struct StreamingParser;
54
55impl StreamingParser {
56    /// Parse a SQL string with streaming extensions.
57    ///
58    /// Tokenizes the input to detect statement type, then routes to the
59    /// appropriate parser:
60    /// - CREATE SOURCE → `source_parser`
61    /// - CREATE SINK → `sink_parser`
62    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
63    /// - Everything else → `sqlparser::parser::Parser`
64    ///
65    /// # Errors
66    ///
67    /// Returns `ParserError` if the SQL syntax is invalid.
68    #[allow(clippy::too_many_lines)]
69    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70        let sql_trimmed = sql.trim();
71        if sql_trimmed.is_empty() {
72            return Err(sqlparser::parser::ParserError::ParserError(
73                "Empty SQL statement".to_string(),
74            ));
75        }
76
77        let dialect = LaminarDialect::default();
78
79        // Tokenize to detect statement type (with location for better errors)
80        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81            .tokenize_with_location()
82            .map_err(|e| {
83                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84            })?;
85
86        // Route based on token-level detection
87        match detect_streaming_ddl(&tokens) {
88            StreamingDdlKind::CreateSource { .. } => {
89                let mut parser =
90                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91                let source = source_parser::parse_create_source(&mut parser)
92                    .map_err(parse_error_to_parser_error)?;
93                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94            }
95            StreamingDdlKind::CreateSink { .. } => {
96                let mut parser =
97                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98                let sink = sink_parser::parse_create_sink(&mut parser)
99                    .map_err(parse_error_to_parser_error)?;
100                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101            }
102            StreamingDdlKind::CreateContinuousQuery { .. } => {
103                let mut parser =
104                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106                    .map_err(parse_error_to_parser_error)?;
107                Ok(vec![stmt])
108            }
109            StreamingDdlKind::DropSource { .. } => {
110                let mut parser =
111                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113                Ok(vec![stmt])
114            }
115            StreamingDdlKind::DropSink { .. } => {
116                let mut parser =
117                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119                Ok(vec![stmt])
120            }
121            StreamingDdlKind::DropMaterializedView { .. } => {
122                let mut parser =
123                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124                let stmt = parse_drop_materialized_view(&mut parser)
125                    .map_err(parse_error_to_parser_error)?;
126                Ok(vec![stmt])
127            }
128            StreamingDdlKind::ShowSources => {
129                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130            }
131            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132            StreamingDdlKind::ShowQueries => {
133                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134            }
135            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136                ShowCommand::MaterializedViews,
137            )]),
138            StreamingDdlKind::DescribeSource => {
139                let mut parser =
140                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142                Ok(vec![stmt])
143            }
144            StreamingDdlKind::ExplainStreaming => {
145                let mut parser =
146                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147                let stmt =
148                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149                Ok(vec![stmt])
150            }
151            StreamingDdlKind::CreateMaterializedView { .. } => {
152                let mut parser =
153                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154                let stmt = parse_create_materialized_view(&mut parser)
155                    .map_err(parse_error_to_parser_error)?;
156                Ok(vec![stmt])
157            }
158            StreamingDdlKind::CreateStream { .. } => {
159                let mut parser =
160                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161                let stmt = parse_create_stream(&mut parser, sql_trimmed)
162                    .map_err(parse_error_to_parser_error)?;
163                Ok(vec![stmt])
164            }
165            StreamingDdlKind::DropStream { .. } => {
166                let mut parser =
167                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169                Ok(vec![stmt])
170            }
171            StreamingDdlKind::ShowStreams => {
172                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173            }
174            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175            StreamingDdlKind::CreateLookupTable { .. } => {
176                let mut parser =
177                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178                let lt = lookup_table::parse_create_lookup_table(&mut parser)
179                    .map_err(parse_error_to_parser_error)?;
180                Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181            }
182            StreamingDdlKind::DropLookupTable { .. } => {
183                let mut parser =
184                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185                let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186                    .map_err(parse_error_to_parser_error)?;
187                Ok(vec![StreamingStatement::DropLookupTable {
188                    name,
189                    if_exists,
190                }])
191            }
192            StreamingDdlKind::AlterSource => {
193                let mut parser =
194                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195                let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196                Ok(vec![stmt])
197            }
198            StreamingDdlKind::None => {
199                // Standard SQL - check for INSERT INTO and convert
200                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
201                Ok(statements
202                    .into_iter()
203                    .map(convert_standard_statement)
204                    .collect())
205            }
206        }
207    }
208
209    /// Check if an expression contains a window function.
210    #[must_use]
211    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
212        match expr {
213            sqlparser::ast::Expr::Function(func) => {
214                if let Some(name) = func.name.0.last() {
215                    let func_name = name.to_string().to_uppercase();
216                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
217                } else {
218                    false
219                }
220            }
221            _ => false,
222        }
223    }
224
225    /// Parse EMIT clause from SQL string.
226    ///
227    /// # Errors
228    ///
229    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
230    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
231        emit_parser::parse_emit_clause_from_sql(sql)
232    }
233
234    /// Parse late data handling clause from SQL string.
235    ///
236    /// # Errors
237    ///
238    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
239    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
240        late_data_parser::parse_late_data_clause_from_sql(sql)
241    }
242}
243
244/// Convert `ParseError` to `ParserError` for backward compatibility.
245fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
246    match e {
247        ParseError::SqlParseError(pe) => pe,
248        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
249        ParseError::WindowError(msg) => {
250            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
251        }
252        ParseError::ValidationError(msg) => {
253            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
254        }
255    }
256}
257
258/// Convert a standard sqlparser statement to a `StreamingStatement`.
259///
260/// Detects INSERT INTO statements and converts them to the streaming
261/// `InsertInto` variant. All other statements are wrapped as `Standard`.
262fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
263    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
264        // Extract table name from TableObject
265        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
266            let table_name = name.clone();
267            let columns = insert.columns.clone();
268
269            // Try to extract VALUES rows from source query
270            if let Some(ref source) = insert.source {
271                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
272                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
273                    return StreamingStatement::InsertInto {
274                        table_name,
275                        columns,
276                        values: rows,
277                    };
278                }
279            }
280        }
281    }
282    StreamingStatement::Standard(Box::new(stmt))
283}
284
285/// Parse a DROP SOURCE statement.
286///
287/// Syntax: `DROP SOURCE [IF EXISTS] name [CASCADE]`
288///
289/// # Errors
290///
291/// Returns `ParseError` if the statement syntax is invalid.
292fn parse_drop_source(
293    parser: &mut sqlparser::parser::Parser,
294) -> Result<StreamingStatement, ParseError> {
295    parser
296        .expect_keyword(sqlparser::keywords::Keyword::DROP)
297        .map_err(ParseError::SqlParseError)?;
298    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
299    let if_exists = parser.parse_keywords(&[
300        sqlparser::keywords::Keyword::IF,
301        sqlparser::keywords::Keyword::EXISTS,
302    ]);
303    let name = parser
304        .parse_object_name(false)
305        .map_err(ParseError::SqlParseError)?;
306    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
307    Ok(StreamingStatement::DropSource {
308        name,
309        if_exists,
310        cascade,
311    })
312}
313
314/// Parse a DROP SINK statement.
315///
316/// Syntax: `DROP SINK [IF EXISTS] name [CASCADE]`
317///
318/// # Errors
319///
320/// Returns `ParseError` if the statement syntax is invalid.
321fn parse_drop_sink(
322    parser: &mut sqlparser::parser::Parser,
323) -> Result<StreamingStatement, ParseError> {
324    parser
325        .expect_keyword(sqlparser::keywords::Keyword::DROP)
326        .map_err(ParseError::SqlParseError)?;
327    tokenizer::expect_custom_keyword(parser, "SINK")?;
328    let if_exists = parser.parse_keywords(&[
329        sqlparser::keywords::Keyword::IF,
330        sqlparser::keywords::Keyword::EXISTS,
331    ]);
332    let name = parser
333        .parse_object_name(false)
334        .map_err(ParseError::SqlParseError)?;
335    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
336    Ok(StreamingStatement::DropSink {
337        name,
338        if_exists,
339        cascade,
340    })
341}
342
343/// Parse a DROP MATERIALIZED VIEW statement.
344///
345/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
346///
347/// # Errors
348///
349/// Returns `ParseError` if the statement syntax is invalid.
350fn parse_drop_materialized_view(
351    parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353    parser
354        .expect_keyword(sqlparser::keywords::Keyword::DROP)
355        .map_err(ParseError::SqlParseError)?;
356    parser
357        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
358        .map_err(ParseError::SqlParseError)?;
359    parser
360        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
361        .map_err(ParseError::SqlParseError)?;
362    let if_exists = parser.parse_keywords(&[
363        sqlparser::keywords::Keyword::IF,
364        sqlparser::keywords::Keyword::EXISTS,
365    ]);
366    let name = parser
367        .parse_object_name(false)
368        .map_err(ParseError::SqlParseError)?;
369    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
370    Ok(StreamingStatement::DropMaterializedView {
371        name,
372        if_exists,
373        cascade,
374    })
375}
376
377/// Parse a CREATE STREAM statement.
378///
379/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
380///
381/// # Errors
382///
383/// Returns `ParseError` if the statement syntax is invalid.
384fn parse_create_stream(
385    parser: &mut sqlparser::parser::Parser,
386    _original_sql: &str,
387) -> Result<StreamingStatement, ParseError> {
388    parser
389        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
390        .map_err(ParseError::SqlParseError)?;
391
392    let or_replace = parser.parse_keywords(&[
393        sqlparser::keywords::Keyword::OR,
394        sqlparser::keywords::Keyword::REPLACE,
395    ]);
396
397    tokenizer::expect_custom_keyword(parser, "STREAM")?;
398
399    let if_not_exists = parser.parse_keywords(&[
400        sqlparser::keywords::Keyword::IF,
401        sqlparser::keywords::Keyword::NOT,
402        sqlparser::keywords::Keyword::EXISTS,
403    ]);
404
405    let name = parser
406        .parse_object_name(false)
407        .map_err(ParseError::SqlParseError)?;
408
409    parser
410        .expect_keyword(sqlparser::keywords::Keyword::AS)
411        .map_err(ParseError::SqlParseError)?;
412
413    // Collect remaining tokens and split at EMIT boundary
414    let remaining = collect_remaining_tokens(parser);
415    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
416
417    let stream_dialect = LaminarDialect::default();
418
419    let query = if query_tokens.is_empty() {
420        return Err(ParseError::StreamingError(
421            "Expected SELECT query after AS".to_string(),
422        ));
423    } else {
424        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
425            .with_tokens_with_locations(query_tokens);
426        query_parser
427            .parse_query()
428            .map_err(ParseError::SqlParseError)?
429    };
430
431    let query_stmt =
432        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
433
434    let emit_clause = if emit_tokens.is_empty() {
435        None
436    } else {
437        let mut emit_parser =
438            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
439        emit_parser::parse_emit_clause(&mut emit_parser)?
440    };
441
442    Ok(StreamingStatement::CreateStream {
443        name,
444        query: Box::new(query_stmt),
445        emit_clause,
446        or_replace,
447        if_not_exists,
448    })
449}
450
451/// Parse a DROP STREAM statement.
452///
453/// Syntax: `DROP STREAM [IF EXISTS] name [CASCADE]`
454///
455/// # Errors
456///
457/// Returns `ParseError` if the statement syntax is invalid.
458fn parse_drop_stream(
459    parser: &mut sqlparser::parser::Parser,
460) -> Result<StreamingStatement, ParseError> {
461    parser
462        .expect_keyword(sqlparser::keywords::Keyword::DROP)
463        .map_err(ParseError::SqlParseError)?;
464    tokenizer::expect_custom_keyword(parser, "STREAM")?;
465    let if_exists = parser.parse_keywords(&[
466        sqlparser::keywords::Keyword::IF,
467        sqlparser::keywords::Keyword::EXISTS,
468    ]);
469    let name = parser
470        .parse_object_name(false)
471        .map_err(ParseError::SqlParseError)?;
472    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
473    Ok(StreamingStatement::DropStream {
474        name,
475        if_exists,
476        cascade,
477    })
478}
479
480/// Parse a DESCRIBE statement.
481///
482/// Syntax: `DESCRIBE [EXTENDED] name`
483/// Parse an ALTER SOURCE statement.
484///
485/// Syntax:
486/// - `ALTER SOURCE name ADD COLUMN col_name data_type`
487/// - `ALTER SOURCE name SET ('key' = 'value', ...)`
488///
489/// # Errors
490///
491/// Returns `ParseError` if the statement syntax is invalid.
492fn parse_alter_source(
493    parser: &mut sqlparser::parser::Parser,
494) -> Result<StreamingStatement, ParseError> {
495    parser
496        .expect_keyword(sqlparser::keywords::Keyword::ALTER)
497        .map_err(ParseError::SqlParseError)?;
498    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
499    let name = parser
500        .parse_object_name(false)
501        .map_err(ParseError::SqlParseError)?;
502
503    // Determine operation: ADD COLUMN or SET
504    if parser.parse_keywords(&[
505        sqlparser::keywords::Keyword::ADD,
506        sqlparser::keywords::Keyword::COLUMN,
507    ]) {
508        // ALTER SOURCE name ADD COLUMN col_name data_type
509        let col_name = parser
510            .parse_identifier()
511            .map_err(ParseError::SqlParseError)?;
512        let data_type = parser
513            .parse_data_type()
514            .map_err(ParseError::SqlParseError)?;
515        let column_def = sqlparser::ast::ColumnDef {
516            name: col_name,
517            data_type,
518            options: vec![],
519        };
520        Ok(StreamingStatement::AlterSource {
521            name,
522            operation: statements::AlterSourceOperation::AddColumn { column_def },
523        })
524    } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
525        // ALTER SOURCE name SET ('key' = 'value', ...)
526        parser
527            .expect_token(&sqlparser::tokenizer::Token::LParen)
528            .map_err(ParseError::SqlParseError)?;
529        let mut properties = std::collections::HashMap::new();
530        loop {
531            let key = parser
532                .parse_literal_string()
533                .map_err(ParseError::SqlParseError)?;
534            parser
535                .expect_token(&sqlparser::tokenizer::Token::Eq)
536                .map_err(ParseError::SqlParseError)?;
537            let value = parser
538                .parse_literal_string()
539                .map_err(ParseError::SqlParseError)?;
540            properties.insert(key, value);
541            if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
542                break;
543            }
544        }
545        parser
546            .expect_token(&sqlparser::tokenizer::Token::RParen)
547            .map_err(ParseError::SqlParseError)?;
548        Ok(StreamingStatement::AlterSource {
549            name,
550            operation: statements::AlterSourceOperation::SetProperties { properties },
551        })
552    } else {
553        Err(ParseError::StreamingError(
554            "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
555        ))
556    }
557}
558
559/// Parse a DESCRIBE statement.
560///
561/// # Errors
562///
563/// Returns `ParseError` if the statement syntax is invalid.
564fn parse_describe(
565    parser: &mut sqlparser::parser::Parser,
566) -> Result<StreamingStatement, ParseError> {
567    // Consume DESCRIBE or DESC
568    let token = parser.next_token();
569    match &token.token {
570        sqlparser::tokenizer::Token::Word(w)
571            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
572                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
573        _ => {
574            return Err(ParseError::StreamingError(
575                "Expected DESCRIBE or DESC".to_string(),
576            ));
577        }
578    }
579    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
580    let name = parser
581        .parse_object_name(false)
582        .map_err(ParseError::SqlParseError)?;
583    Ok(StreamingStatement::Describe { name, extended })
584}
585
586/// Parse an EXPLAIN statement wrapping a streaming query.
587///
588/// Syntax: `EXPLAIN <streaming_statement>`
589///
590/// # Errors
591///
592/// Returns `ParseError` if the statement syntax is invalid.
593fn parse_explain(
594    parser: &mut sqlparser::parser::Parser,
595    original_sql: &str,
596) -> Result<StreamingStatement, ParseError> {
597    parser
598        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
599        .map_err(ParseError::SqlParseError)?;
600
601    // Find the position after EXPLAIN in the original SQL
602    let explain_prefix_upper = original_sql.to_uppercase();
603    let inner_start = explain_prefix_upper
604        .find("EXPLAIN")
605        .map_or(0, |pos| pos + "EXPLAIN".len());
606    let inner_sql = original_sql[inner_start..].trim();
607
608    // Parse the inner statement recursively
609    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
610    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
611        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
612    })?;
613    Ok(StreamingStatement::Explain {
614        statement: Box::new(inner),
615    })
616}
617
618/// Parse a CREATE MATERIALIZED VIEW statement.
619///
620/// Syntax:
621/// ```sql
622/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
623/// AS <select_query>
624/// [EMIT <strategy>]
625/// ```
626///
627/// # Errors
628///
629/// Returns `ParseError` if the statement syntax is invalid.
630fn parse_create_materialized_view(
631    parser: &mut sqlparser::parser::Parser,
632) -> Result<StreamingStatement, ParseError> {
633    parser
634        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
635        .map_err(ParseError::SqlParseError)?;
636
637    let or_replace = parser.parse_keywords(&[
638        sqlparser::keywords::Keyword::OR,
639        sqlparser::keywords::Keyword::REPLACE,
640    ]);
641
642    parser
643        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
644        .map_err(ParseError::SqlParseError)?;
645    parser
646        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
647        .map_err(ParseError::SqlParseError)?;
648
649    let if_not_exists = parser.parse_keywords(&[
650        sqlparser::keywords::Keyword::IF,
651        sqlparser::keywords::Keyword::NOT,
652        sqlparser::keywords::Keyword::EXISTS,
653    ]);
654
655    let name = parser
656        .parse_object_name(false)
657        .map_err(ParseError::SqlParseError)?;
658
659    parser
660        .expect_keyword(sqlparser::keywords::Keyword::AS)
661        .map_err(ParseError::SqlParseError)?;
662
663    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
664    let remaining = collect_remaining_tokens(parser);
665    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
666
667    let mv_dialect = LaminarDialect::default();
668
669    let query = if query_tokens.is_empty() {
670        return Err(ParseError::StreamingError(
671            "Expected SELECT query after AS".to_string(),
672        ));
673    } else {
674        let mut query_parser =
675            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
676        query_parser
677            .parse_query()
678            .map_err(ParseError::SqlParseError)?
679    };
680
681    let query_stmt =
682        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
683
684    let emit_clause = if emit_tokens.is_empty() {
685        None
686    } else {
687        let mut emit_parser =
688            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
689        emit_parser::parse_emit_clause(&mut emit_parser)?
690    };
691
692    Ok(StreamingStatement::CreateMaterializedView {
693        name,
694        query: Box::new(query_stmt),
695        emit_clause,
696        or_replace,
697        if_not_exists,
698    })
699}
700
701/// Collect all remaining tokens from the parser into a Vec.
702fn collect_remaining_tokens(
703    parser: &mut sqlparser::parser::Parser,
704) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
705    let mut tokens = Vec::new();
706    loop {
707        let token = parser.next_token();
708        if token.token == sqlparser::tokenizer::Token::EOF {
709            tokens.push(token);
710            break;
711        }
712        tokens.push(token);
713    }
714    tokens
715}
716
717/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
718///
719/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
720/// (or is empty if no EMIT found).
721fn split_at_emit(
722    tokens: &[sqlparser::tokenizer::TokenWithSpan],
723) -> (
724    Vec<sqlparser::tokenizer::TokenWithSpan>,
725    Vec<sqlparser::tokenizer::TokenWithSpan>,
726) {
727    let mut depth: i32 = 0;
728    for (i, token) in tokens.iter().enumerate() {
729        match &token.token {
730            sqlparser::tokenizer::Token::LParen => depth += 1,
731            sqlparser::tokenizer::Token::RParen => {
732                depth -= 1;
733            }
734            sqlparser::tokenizer::Token::Word(w)
735                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
736            {
737                let mut query_tokens = tokens[..i].to_vec();
738                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
739                    token: sqlparser::tokenizer::Token::EOF,
740                    span: sqlparser::tokenizer::Span::empty(),
741                });
742                let emit_tokens = tokens[i..].to_vec();
743                return (query_tokens, emit_tokens);
744            }
745            _ => {}
746        }
747    }
748    (tokens.to_vec(), vec![])
749}
750
751/// SQL parsing errors
752#[derive(Debug, thiserror::Error)]
753pub enum ParseError {
754    /// Standard SQL parse error
755    #[error("SQL parse error: {0}")]
756    SqlParseError(#[from] sqlparser::parser::ParserError),
757
758    /// Streaming extension parse error
759    #[error("Streaming SQL error: {0}")]
760    StreamingError(String),
761
762    /// Window function error
763    #[error("Window function error: {0}")]
764    WindowError(String),
765
766    /// Validation error (e.g., invalid option values)
767    #[error("Validation error: {0}")]
768    ValidationError(String),
769}
770
771#[cfg(test)]
772mod tests {
773    use super::*;
774
775    /// Helper to parse SQL and return the first statement.
776    fn parse_one(sql: &str) -> StreamingStatement {
777        let stmts = StreamingParser::parse_sql(sql).unwrap();
778        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
779        stmts.into_iter().next().unwrap()
780    }
781
782    #[test]
783    fn test_parse_drop_source() {
784        let stmt = parse_one("DROP SOURCE events");
785        match stmt {
786            StreamingStatement::DropSource {
787                name,
788                if_exists,
789                cascade,
790            } => {
791                assert_eq!(name.to_string(), "events");
792                assert!(!if_exists);
793                assert!(!cascade);
794            }
795            _ => panic!("Expected DropSource, got {stmt:?}"),
796        }
797    }
798
799    #[test]
800    fn test_parse_drop_source_if_exists() {
801        let stmt = parse_one("DROP SOURCE IF EXISTS events");
802        match stmt {
803            StreamingStatement::DropSource {
804                name,
805                if_exists,
806                cascade,
807            } => {
808                assert_eq!(name.to_string(), "events");
809                assert!(if_exists);
810                assert!(!cascade);
811            }
812            _ => panic!("Expected DropSource, got {stmt:?}"),
813        }
814    }
815
816    #[test]
817    fn test_parse_drop_source_cascade() {
818        let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
819        match stmt {
820            StreamingStatement::DropSource {
821                name,
822                if_exists,
823                cascade,
824            } => {
825                assert_eq!(name.to_string(), "events");
826                assert!(if_exists);
827                assert!(cascade);
828            }
829            _ => panic!("Expected DropSource, got {stmt:?}"),
830        }
831    }
832
833    #[test]
834    fn test_parse_drop_sink() {
835        let stmt = parse_one("DROP SINK output");
836        match stmt {
837            StreamingStatement::DropSink {
838                name,
839                if_exists,
840                cascade,
841            } => {
842                assert_eq!(name.to_string(), "output");
843                assert!(!if_exists);
844                assert!(!cascade);
845            }
846            _ => panic!("Expected DropSink, got {stmt:?}"),
847        }
848    }
849
850    #[test]
851    fn test_parse_drop_sink_if_exists() {
852        let stmt = parse_one("DROP SINK IF EXISTS output");
853        match stmt {
854            StreamingStatement::DropSink {
855                name,
856                if_exists,
857                cascade,
858            } => {
859                assert_eq!(name.to_string(), "output");
860                assert!(if_exists);
861                assert!(!cascade);
862            }
863            _ => panic!("Expected DropSink, got {stmt:?}"),
864        }
865    }
866
867    #[test]
868    fn test_parse_drop_sink_cascade() {
869        let stmt = parse_one("DROP SINK output CASCADE");
870        match stmt {
871            StreamingStatement::DropSink {
872                name,
873                if_exists,
874                cascade,
875            } => {
876                assert_eq!(name.to_string(), "output");
877                assert!(!if_exists);
878                assert!(cascade);
879            }
880            _ => panic!("Expected DropSink, got {stmt:?}"),
881        }
882    }
883
884    #[test]
885    fn test_parse_drop_materialized_view() {
886        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
887        match stmt {
888            StreamingStatement::DropMaterializedView {
889                name,
890                if_exists,
891                cascade,
892            } => {
893                assert_eq!(name.to_string(), "live_stats");
894                assert!(!if_exists);
895                assert!(!cascade);
896            }
897            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
898        }
899    }
900
901    #[test]
902    fn test_parse_drop_materialized_view_if_exists_cascade() {
903        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
904        match stmt {
905            StreamingStatement::DropMaterializedView {
906                name,
907                if_exists,
908                cascade,
909            } => {
910                assert_eq!(name.to_string(), "live_stats");
911                assert!(if_exists);
912                assert!(cascade);
913            }
914            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
915        }
916    }
917
918    #[test]
919    fn test_parse_show_sources() {
920        let stmt = parse_one("SHOW SOURCES");
921        assert!(matches!(
922            stmt,
923            StreamingStatement::Show(ShowCommand::Sources)
924        ));
925    }
926
927    #[test]
928    fn test_parse_show_sinks() {
929        let stmt = parse_one("SHOW SINKS");
930        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
931    }
932
933    #[test]
934    fn test_parse_show_queries() {
935        let stmt = parse_one("SHOW QUERIES");
936        assert!(matches!(
937            stmt,
938            StreamingStatement::Show(ShowCommand::Queries)
939        ));
940    }
941
942    #[test]
943    fn test_parse_show_materialized_views() {
944        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
945        assert!(matches!(
946            stmt,
947            StreamingStatement::Show(ShowCommand::MaterializedViews)
948        ));
949    }
950
951    #[test]
952    fn test_parse_describe() {
953        let stmt = parse_one("DESCRIBE events");
954        match stmt {
955            StreamingStatement::Describe { name, extended } => {
956                assert_eq!(name.to_string(), "events");
957                assert!(!extended);
958            }
959            _ => panic!("Expected Describe, got {stmt:?}"),
960        }
961    }
962
963    #[test]
964    fn test_parse_describe_extended() {
965        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
966        match stmt {
967            StreamingStatement::Describe { name, extended } => {
968                assert_eq!(name.to_string(), "my_schema.events");
969                assert!(extended);
970            }
971            _ => panic!("Expected Describe, got {stmt:?}"),
972        }
973    }
974
975    #[test]
976    fn test_parse_explain_select() {
977        let stmt = parse_one("EXPLAIN SELECT * FROM events");
978        match stmt {
979            StreamingStatement::Explain { statement } => {
980                assert!(matches!(*statement, StreamingStatement::Standard(_)));
981            }
982            _ => panic!("Expected Explain, got {stmt:?}"),
983        }
984    }
985
986    #[test]
987    fn test_parse_explain_create_source() {
988        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
989        match stmt {
990            StreamingStatement::Explain { statement } => {
991                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
992            }
993            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
994        }
995    }
996
997    #[test]
998    fn test_parse_create_materialized_view() {
999        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1000        match stmt {
1001            StreamingStatement::CreateMaterializedView {
1002                name,
1003                emit_clause,
1004                or_replace,
1005                if_not_exists,
1006                ..
1007            } => {
1008                assert_eq!(name.to_string(), "live_stats");
1009                assert!(emit_clause.is_none());
1010                assert!(!or_replace);
1011                assert!(!if_not_exists);
1012            }
1013            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1014        }
1015    }
1016
1017    #[test]
1018    fn test_parse_create_materialized_view_with_emit() {
1019        let stmt = parse_one(
1020            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1021        );
1022        match stmt {
1023            StreamingStatement::CreateMaterializedView {
1024                name, emit_clause, ..
1025            } => {
1026                assert_eq!(name.to_string(), "live_stats");
1027                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1028            }
1029            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1030        }
1031    }
1032
1033    #[test]
1034    fn test_parse_create_or_replace_materialized_view() {
1035        let stmt = parse_one(
1036            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1037        );
1038        match stmt {
1039            StreamingStatement::CreateMaterializedView {
1040                name,
1041                or_replace,
1042                if_not_exists,
1043                ..
1044            } => {
1045                assert_eq!(name.to_string(), "live_stats");
1046                assert!(or_replace);
1047                assert!(!if_not_exists);
1048            }
1049            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1050        }
1051    }
1052
1053    #[test]
1054    fn test_parse_create_materialized_view_if_not_exists() {
1055        let stmt = parse_one(
1056            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1057        );
1058        match stmt {
1059            StreamingStatement::CreateMaterializedView {
1060                name,
1061                or_replace,
1062                if_not_exists,
1063                ..
1064            } => {
1065                assert_eq!(name.to_string(), "live_stats");
1066                assert!(!or_replace);
1067                assert!(if_not_exists);
1068            }
1069            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1070        }
1071    }
1072
1073    #[test]
1074    fn test_parse_insert_into() {
1075        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1076        match stmt {
1077            StreamingStatement::InsertInto {
1078                table_name,
1079                columns,
1080                values,
1081            } => {
1082                assert_eq!(table_name.to_string(), "events");
1083                assert_eq!(columns.len(), 2);
1084                assert_eq!(columns[0].to_string(), "id");
1085                assert_eq!(columns[1].to_string(), "name");
1086                assert_eq!(values.len(), 1);
1087                assert_eq!(values[0].len(), 2);
1088            }
1089            _ => panic!("Expected InsertInto, got {stmt:?}"),
1090        }
1091    }
1092
1093    #[test]
1094    fn test_parse_insert_into_multiple_rows() {
1095        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1096        match stmt {
1097            StreamingStatement::InsertInto {
1098                table_name,
1099                columns,
1100                values,
1101            } => {
1102                assert_eq!(table_name.to_string(), "events");
1103                assert!(columns.is_empty());
1104                assert_eq!(values.len(), 3);
1105            }
1106            _ => panic!("Expected InsertInto, got {stmt:?}"),
1107        }
1108    }
1109
1110    // ── CREATE STREAM tests ─────────────────────────────
1111
1112    #[test]
1113    fn test_parse_create_stream() {
1114        let stmt = parse_one(
1115            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1116        );
1117        match stmt {
1118            StreamingStatement::CreateStream {
1119                name,
1120                or_replace,
1121                if_not_exists,
1122                emit_clause,
1123                ..
1124            } => {
1125                assert_eq!(name.to_string(), "session_activity");
1126                assert!(!or_replace);
1127                assert!(!if_not_exists);
1128                assert!(emit_clause.is_none());
1129            }
1130            _ => panic!("Expected CreateStream, got {stmt:?}"),
1131        }
1132    }
1133
1134    #[test]
1135    fn test_parse_create_or_replace_stream() {
1136        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1137        match stmt {
1138            StreamingStatement::CreateStream { or_replace, .. } => {
1139                assert!(or_replace);
1140            }
1141            _ => panic!("Expected CreateStream, got {stmt:?}"),
1142        }
1143    }
1144
1145    #[test]
1146    fn test_parse_create_stream_if_not_exists() {
1147        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1148        match stmt {
1149            StreamingStatement::CreateStream { if_not_exists, .. } => {
1150                assert!(if_not_exists);
1151            }
1152            _ => panic!("Expected CreateStream, got {stmt:?}"),
1153        }
1154    }
1155
1156    #[test]
1157    fn test_parse_create_stream_with_emit() {
1158        let stmt =
1159            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1160        match stmt {
1161            StreamingStatement::CreateStream { emit_clause, .. } => {
1162                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1163            }
1164            _ => panic!("Expected CreateStream, got {stmt:?}"),
1165        }
1166    }
1167
1168    #[test]
1169    fn test_parse_drop_stream() {
1170        let stmt = parse_one("DROP STREAM my_stream");
1171        match stmt {
1172            StreamingStatement::DropStream {
1173                name,
1174                if_exists,
1175                cascade,
1176            } => {
1177                assert_eq!(name.to_string(), "my_stream");
1178                assert!(!if_exists);
1179                assert!(!cascade);
1180            }
1181            _ => panic!("Expected DropStream, got {stmt:?}"),
1182        }
1183    }
1184
1185    #[test]
1186    fn test_parse_drop_stream_if_exists() {
1187        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1188        match stmt {
1189            StreamingStatement::DropStream {
1190                name,
1191                if_exists,
1192                cascade,
1193            } => {
1194                assert_eq!(name.to_string(), "my_stream");
1195                assert!(if_exists);
1196                assert!(!cascade);
1197            }
1198            _ => panic!("Expected DropStream, got {stmt:?}"),
1199        }
1200    }
1201
1202    #[test]
1203    fn test_parse_drop_stream_cascade() {
1204        let stmt = parse_one("DROP STREAM my_stream CASCADE");
1205        match stmt {
1206            StreamingStatement::DropStream {
1207                name,
1208                if_exists,
1209                cascade,
1210            } => {
1211                assert_eq!(name.to_string(), "my_stream");
1212                assert!(!if_exists);
1213                assert!(cascade);
1214            }
1215            _ => panic!("Expected DropStream, got {stmt:?}"),
1216        }
1217    }
1218
1219    #[test]
1220    fn test_parse_show_streams() {
1221        let stmt = parse_one("SHOW STREAMS");
1222        assert!(matches!(
1223            stmt,
1224            StreamingStatement::Show(ShowCommand::Streams)
1225        ));
1226    }
1227
1228    #[test]
1229    fn test_parse_alter_source_add_column() {
1230        let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1231        match stmt {
1232            StreamingStatement::AlterSource { name, operation } => {
1233                assert_eq!(name.to_string(), "events");
1234                match operation {
1235                    statements::AlterSourceOperation::AddColumn { column_def } => {
1236                        assert_eq!(column_def.name.value, "new_col");
1237                        assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1238                    }
1239                    statements::AlterSourceOperation::SetProperties { .. } => {
1240                        panic!("Expected AddColumn")
1241                    }
1242                }
1243            }
1244            _ => panic!("Expected AlterSource, got {stmt:?}"),
1245        }
1246    }
1247
1248    #[test]
1249    fn test_parse_alter_source_set_properties() {
1250        let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1251        match stmt {
1252            StreamingStatement::AlterSource { name, operation } => {
1253                assert_eq!(name.to_string(), "events");
1254                match operation {
1255                    statements::AlterSourceOperation::SetProperties { properties } => {
1256                        assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1257                        assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1258                    }
1259                    statements::AlterSourceOperation::AddColumn { .. } => {
1260                        panic!("Expected SetProperties")
1261                    }
1262                }
1263            }
1264            _ => panic!("Expected AlterSource, got {stmt:?}"),
1265        }
1266    }
1267}