Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14pub mod order_analyzer;
15mod sink_parser;
16mod source_parser;
17mod statements;
18mod tokenizer;
19mod window_rewriter;
20
21pub use statements::{
22    CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy, FormatSpec,
23    LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef, WindowFunction,
24};
25pub use window_rewriter::WindowRewriter;
26
27use dialect::LaminarDialect;
28use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
29
30/// Parses SQL with streaming extensions.
31///
32/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
33/// for structured parsing. Standard SQL is delegated to sqlparser directly.
34///
35/// # Errors
36///
37/// Returns `ParseError` if the SQL syntax is invalid.
38pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
39    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
40}
41
42/// Parser for streaming SQL extensions.
43///
44/// Provides static methods for parsing streaming SQL statements.
45/// Uses sqlparser's `Parser` API internally for structured parsing
46/// of identifiers, data types, expressions, and queries.
47pub struct StreamingParser;
48
49impl StreamingParser {
50    /// Parse a SQL string with streaming extensions.
51    ///
52    /// Tokenizes the input to detect statement type, then routes to the
53    /// appropriate parser:
54    /// - CREATE SOURCE → `source_parser`
55    /// - CREATE SINK → `sink_parser`
56    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
57    /// - Everything else → `sqlparser::parser::Parser`
58    ///
59    /// # Errors
60    ///
61    /// Returns `ParserError` if the SQL syntax is invalid.
62    #[allow(clippy::too_many_lines)]
63    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
64        let sql_trimmed = sql.trim();
65        if sql_trimmed.is_empty() {
66            return Err(sqlparser::parser::ParserError::ParserError(
67                "Empty SQL statement".to_string(),
68            ));
69        }
70
71        let dialect = LaminarDialect::default();
72
73        // Tokenize to detect statement type (with location for better errors)
74        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
75            .tokenize_with_location()
76            .map_err(|e| {
77                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
78            })?;
79
80        // Route based on token-level detection
81        match detect_streaming_ddl(&tokens) {
82            StreamingDdlKind::CreateSource { .. } => {
83                let mut parser =
84                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
85                let source = source_parser::parse_create_source(&mut parser)
86                    .map_err(parse_error_to_parser_error)?;
87                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
88            }
89            StreamingDdlKind::CreateSink { .. } => {
90                let mut parser =
91                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
92                let sink = sink_parser::parse_create_sink(&mut parser)
93                    .map_err(parse_error_to_parser_error)?;
94                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
95            }
96            StreamingDdlKind::CreateContinuousQuery { .. } => {
97                let mut parser =
98                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
99                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
100                    .map_err(parse_error_to_parser_error)?;
101                Ok(vec![stmt])
102            }
103            StreamingDdlKind::DropSource { .. } => {
104                let mut parser =
105                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
106                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
107                Ok(vec![stmt])
108            }
109            StreamingDdlKind::DropSink { .. } => {
110                let mut parser =
111                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
113                Ok(vec![stmt])
114            }
115            StreamingDdlKind::DropMaterializedView { .. } => {
116                let mut parser =
117                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118                let stmt = parse_drop_materialized_view(&mut parser)
119                    .map_err(parse_error_to_parser_error)?;
120                Ok(vec![stmt])
121            }
122            StreamingDdlKind::ShowSources => {
123                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
124            }
125            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
126            StreamingDdlKind::ShowQueries => {
127                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
128            }
129            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
130                ShowCommand::MaterializedViews,
131            )]),
132            StreamingDdlKind::DescribeSource => {
133                let mut parser =
134                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
135                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
136                Ok(vec![stmt])
137            }
138            StreamingDdlKind::ExplainStreaming => {
139                let mut parser =
140                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141                let stmt =
142                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
143                Ok(vec![stmt])
144            }
145            StreamingDdlKind::CreateMaterializedView { .. } => {
146                let mut parser =
147                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
148                let stmt = parse_create_materialized_view(&mut parser)
149                    .map_err(parse_error_to_parser_error)?;
150                Ok(vec![stmt])
151            }
152            StreamingDdlKind::CreateStream { .. } => {
153                let mut parser =
154                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
155                let stmt = parse_create_stream(&mut parser, sql_trimmed)
156                    .map_err(parse_error_to_parser_error)?;
157                Ok(vec![stmt])
158            }
159            StreamingDdlKind::DropStream { .. } => {
160                let mut parser =
161                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
162                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
163                Ok(vec![stmt])
164            }
165            StreamingDdlKind::ShowStreams => {
166                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
167            }
168            StreamingDdlKind::None => {
169                // Standard SQL - check for INSERT INTO and convert
170                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
171                Ok(statements
172                    .into_iter()
173                    .map(convert_standard_statement)
174                    .collect())
175            }
176        }
177    }
178
179    /// Check if an expression contains a window function.
180    #[must_use]
181    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
182        match expr {
183            sqlparser::ast::Expr::Function(func) => {
184                if let Some(name) = func.name.0.last() {
185                    let func_name = name.to_string().to_uppercase();
186                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
187                } else {
188                    false
189                }
190            }
191            _ => false,
192        }
193    }
194
195    /// Parse EMIT clause from SQL string.
196    ///
197    /// # Errors
198    ///
199    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
200    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
201        emit_parser::parse_emit_clause_from_sql(sql)
202    }
203
204    /// Parse late data handling clause from SQL string.
205    ///
206    /// # Errors
207    ///
208    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
209    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
210        late_data_parser::parse_late_data_clause_from_sql(sql)
211    }
212}
213
214/// Convert `ParseError` to `ParserError` for backward compatibility.
215fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
216    match e {
217        ParseError::SqlParseError(pe) => pe,
218        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
219        ParseError::WindowError(msg) => {
220            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
221        }
222        ParseError::ValidationError(msg) => {
223            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
224        }
225    }
226}
227
228/// Convert a standard sqlparser statement to a `StreamingStatement`.
229///
230/// Detects INSERT INTO statements and converts them to the streaming
231/// `InsertInto` variant. All other statements are wrapped as `Standard`.
232fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
233    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
234        // Extract table name from TableObject
235        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
236            let table_name = name.clone();
237            let columns = insert.columns.clone();
238
239            // Try to extract VALUES rows from source query
240            if let Some(ref source) = insert.source {
241                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
242                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
243                    return StreamingStatement::InsertInto {
244                        table_name,
245                        columns,
246                        values: rows,
247                    };
248                }
249            }
250        }
251    }
252    StreamingStatement::Standard(Box::new(stmt))
253}
254
255/// Parse a DROP SOURCE statement.
256///
257/// Syntax: `DROP SOURCE [IF EXISTS] name`
258///
259/// # Errors
260///
261/// Returns `ParseError` if the statement syntax is invalid.
262fn parse_drop_source(
263    parser: &mut sqlparser::parser::Parser,
264) -> Result<StreamingStatement, ParseError> {
265    parser
266        .expect_keyword(sqlparser::keywords::Keyword::DROP)
267        .map_err(ParseError::SqlParseError)?;
268    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
269    let if_exists = parser.parse_keywords(&[
270        sqlparser::keywords::Keyword::IF,
271        sqlparser::keywords::Keyword::EXISTS,
272    ]);
273    let name = parser
274        .parse_object_name(false)
275        .map_err(ParseError::SqlParseError)?;
276    Ok(StreamingStatement::DropSource { name, if_exists })
277}
278
279/// Parse a DROP SINK statement.
280///
281/// Syntax: `DROP SINK [IF EXISTS] name`
282///
283/// # Errors
284///
285/// Returns `ParseError` if the statement syntax is invalid.
286fn parse_drop_sink(
287    parser: &mut sqlparser::parser::Parser,
288) -> Result<StreamingStatement, ParseError> {
289    parser
290        .expect_keyword(sqlparser::keywords::Keyword::DROP)
291        .map_err(ParseError::SqlParseError)?;
292    tokenizer::expect_custom_keyword(parser, "SINK")?;
293    let if_exists = parser.parse_keywords(&[
294        sqlparser::keywords::Keyword::IF,
295        sqlparser::keywords::Keyword::EXISTS,
296    ]);
297    let name = parser
298        .parse_object_name(false)
299        .map_err(ParseError::SqlParseError)?;
300    Ok(StreamingStatement::DropSink { name, if_exists })
301}
302
303/// Parse a DROP MATERIALIZED VIEW statement.
304///
305/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
306///
307/// # Errors
308///
309/// Returns `ParseError` if the statement syntax is invalid.
310fn parse_drop_materialized_view(
311    parser: &mut sqlparser::parser::Parser,
312) -> Result<StreamingStatement, ParseError> {
313    parser
314        .expect_keyword(sqlparser::keywords::Keyword::DROP)
315        .map_err(ParseError::SqlParseError)?;
316    parser
317        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
318        .map_err(ParseError::SqlParseError)?;
319    parser
320        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
321        .map_err(ParseError::SqlParseError)?;
322    let if_exists = parser.parse_keywords(&[
323        sqlparser::keywords::Keyword::IF,
324        sqlparser::keywords::Keyword::EXISTS,
325    ]);
326    let name = parser
327        .parse_object_name(false)
328        .map_err(ParseError::SqlParseError)?;
329    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
330    Ok(StreamingStatement::DropMaterializedView {
331        name,
332        if_exists,
333        cascade,
334    })
335}
336
337/// Parse a CREATE STREAM statement.
338///
339/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
340///
341/// # Errors
342///
343/// Returns `ParseError` if the statement syntax is invalid.
344fn parse_create_stream(
345    parser: &mut sqlparser::parser::Parser,
346    _original_sql: &str,
347) -> Result<StreamingStatement, ParseError> {
348    parser
349        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
350        .map_err(ParseError::SqlParseError)?;
351
352    let or_replace = parser.parse_keywords(&[
353        sqlparser::keywords::Keyword::OR,
354        sqlparser::keywords::Keyword::REPLACE,
355    ]);
356
357    tokenizer::expect_custom_keyword(parser, "STREAM")?;
358
359    let if_not_exists = parser.parse_keywords(&[
360        sqlparser::keywords::Keyword::IF,
361        sqlparser::keywords::Keyword::NOT,
362        sqlparser::keywords::Keyword::EXISTS,
363    ]);
364
365    let name = parser
366        .parse_object_name(false)
367        .map_err(ParseError::SqlParseError)?;
368
369    parser
370        .expect_keyword(sqlparser::keywords::Keyword::AS)
371        .map_err(ParseError::SqlParseError)?;
372
373    // Collect remaining tokens and split at EMIT boundary
374    let remaining = collect_remaining_tokens(parser);
375    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
376
377    let stream_dialect = LaminarDialect::default();
378
379    let query = if query_tokens.is_empty() {
380        return Err(ParseError::StreamingError(
381            "Expected SELECT query after AS".to_string(),
382        ));
383    } else {
384        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
385            .with_tokens_with_locations(query_tokens);
386        query_parser
387            .parse_query()
388            .map_err(ParseError::SqlParseError)?
389    };
390
391    let query_stmt =
392        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
393
394    let emit_clause = if emit_tokens.is_empty() {
395        None
396    } else {
397        let mut emit_parser =
398            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
399        emit_parser::parse_emit_clause(&mut emit_parser)?
400    };
401
402    Ok(StreamingStatement::CreateStream {
403        name,
404        query: Box::new(query_stmt),
405        emit_clause,
406        or_replace,
407        if_not_exists,
408    })
409}
410
411/// Parse a DROP STREAM statement.
412///
413/// Syntax: `DROP STREAM [IF EXISTS] name`
414///
415/// # Errors
416///
417/// Returns `ParseError` if the statement syntax is invalid.
418fn parse_drop_stream(
419    parser: &mut sqlparser::parser::Parser,
420) -> Result<StreamingStatement, ParseError> {
421    parser
422        .expect_keyword(sqlparser::keywords::Keyword::DROP)
423        .map_err(ParseError::SqlParseError)?;
424    tokenizer::expect_custom_keyword(parser, "STREAM")?;
425    let if_exists = parser.parse_keywords(&[
426        sqlparser::keywords::Keyword::IF,
427        sqlparser::keywords::Keyword::EXISTS,
428    ]);
429    let name = parser
430        .parse_object_name(false)
431        .map_err(ParseError::SqlParseError)?;
432    Ok(StreamingStatement::DropStream { name, if_exists })
433}
434
435/// Parse a DESCRIBE statement.
436///
437/// Syntax: `DESCRIBE [EXTENDED] name`
438///
439/// # Errors
440///
441/// Returns `ParseError` if the statement syntax is invalid.
442fn parse_describe(
443    parser: &mut sqlparser::parser::Parser,
444) -> Result<StreamingStatement, ParseError> {
445    // Consume DESCRIBE or DESC
446    let token = parser.next_token();
447    match &token.token {
448        sqlparser::tokenizer::Token::Word(w)
449            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
450                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
451        _ => {
452            return Err(ParseError::StreamingError(
453                "Expected DESCRIBE or DESC".to_string(),
454            ));
455        }
456    }
457    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
458    let name = parser
459        .parse_object_name(false)
460        .map_err(ParseError::SqlParseError)?;
461    Ok(StreamingStatement::Describe { name, extended })
462}
463
464/// Parse an EXPLAIN statement wrapping a streaming query.
465///
466/// Syntax: `EXPLAIN <streaming_statement>`
467///
468/// # Errors
469///
470/// Returns `ParseError` if the statement syntax is invalid.
471fn parse_explain(
472    parser: &mut sqlparser::parser::Parser,
473    original_sql: &str,
474) -> Result<StreamingStatement, ParseError> {
475    parser
476        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
477        .map_err(ParseError::SqlParseError)?;
478
479    // Find the position after EXPLAIN in the original SQL
480    let explain_prefix_upper = original_sql.to_uppercase();
481    let inner_start = explain_prefix_upper
482        .find("EXPLAIN")
483        .map_or(0, |pos| pos + "EXPLAIN".len());
484    let inner_sql = original_sql[inner_start..].trim();
485
486    // Parse the inner statement recursively
487    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
488    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
489        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
490    })?;
491    Ok(StreamingStatement::Explain {
492        statement: Box::new(inner),
493    })
494}
495
496/// Parse a CREATE MATERIALIZED VIEW statement.
497///
498/// Syntax:
499/// ```sql
500/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
501/// AS <select_query>
502/// [EMIT <strategy>]
503/// ```
504///
505/// # Errors
506///
507/// Returns `ParseError` if the statement syntax is invalid.
508fn parse_create_materialized_view(
509    parser: &mut sqlparser::parser::Parser,
510) -> Result<StreamingStatement, ParseError> {
511    parser
512        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
513        .map_err(ParseError::SqlParseError)?;
514
515    let or_replace = parser.parse_keywords(&[
516        sqlparser::keywords::Keyword::OR,
517        sqlparser::keywords::Keyword::REPLACE,
518    ]);
519
520    parser
521        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
522        .map_err(ParseError::SqlParseError)?;
523    parser
524        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
525        .map_err(ParseError::SqlParseError)?;
526
527    let if_not_exists = parser.parse_keywords(&[
528        sqlparser::keywords::Keyword::IF,
529        sqlparser::keywords::Keyword::NOT,
530        sqlparser::keywords::Keyword::EXISTS,
531    ]);
532
533    let name = parser
534        .parse_object_name(false)
535        .map_err(ParseError::SqlParseError)?;
536
537    parser
538        .expect_keyword(sqlparser::keywords::Keyword::AS)
539        .map_err(ParseError::SqlParseError)?;
540
541    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
542    let remaining = collect_remaining_tokens(parser);
543    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
544
545    let mv_dialect = LaminarDialect::default();
546
547    let query = if query_tokens.is_empty() {
548        return Err(ParseError::StreamingError(
549            "Expected SELECT query after AS".to_string(),
550        ));
551    } else {
552        let mut query_parser =
553            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
554        query_parser
555            .parse_query()
556            .map_err(ParseError::SqlParseError)?
557    };
558
559    let query_stmt =
560        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
561
562    let emit_clause = if emit_tokens.is_empty() {
563        None
564    } else {
565        let mut emit_parser =
566            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
567        emit_parser::parse_emit_clause(&mut emit_parser)?
568    };
569
570    Ok(StreamingStatement::CreateMaterializedView {
571        name,
572        query: Box::new(query_stmt),
573        emit_clause,
574        or_replace,
575        if_not_exists,
576    })
577}
578
579/// Collect all remaining tokens from the parser into a Vec.
580fn collect_remaining_tokens(
581    parser: &mut sqlparser::parser::Parser,
582) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
583    let mut tokens = Vec::new();
584    loop {
585        let token = parser.next_token();
586        if token.token == sqlparser::tokenizer::Token::EOF {
587            tokens.push(token);
588            break;
589        }
590        tokens.push(token);
591    }
592    tokens
593}
594
595/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
596///
597/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
598/// (or is empty if no EMIT found).
599fn split_at_emit(
600    tokens: &[sqlparser::tokenizer::TokenWithSpan],
601) -> (
602    Vec<sqlparser::tokenizer::TokenWithSpan>,
603    Vec<sqlparser::tokenizer::TokenWithSpan>,
604) {
605    let mut depth: i32 = 0;
606    for (i, token) in tokens.iter().enumerate() {
607        match &token.token {
608            sqlparser::tokenizer::Token::LParen => depth += 1,
609            sqlparser::tokenizer::Token::RParen => {
610                depth -= 1;
611            }
612            sqlparser::tokenizer::Token::Word(w)
613                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
614            {
615                let mut query_tokens = tokens[..i].to_vec();
616                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
617                    token: sqlparser::tokenizer::Token::EOF,
618                    span: sqlparser::tokenizer::Span::empty(),
619                });
620                let emit_tokens = tokens[i..].to_vec();
621                return (query_tokens, emit_tokens);
622            }
623            _ => {}
624        }
625    }
626    (tokens.to_vec(), vec![])
627}
628
629/// SQL parsing errors
630#[derive(Debug, thiserror::Error)]
631pub enum ParseError {
632    /// Standard SQL parse error
633    #[error("SQL parse error: {0}")]
634    SqlParseError(#[from] sqlparser::parser::ParserError),
635
636    /// Streaming extension parse error
637    #[error("Streaming SQL error: {0}")]
638    StreamingError(String),
639
640    /// Window function error
641    #[error("Window function error: {0}")]
642    WindowError(String),
643
644    /// Validation error (e.g., invalid option values)
645    #[error("Validation error: {0}")]
646    ValidationError(String),
647}
648
649#[cfg(test)]
650mod tests {
651    use super::*;
652
653    /// Helper to parse SQL and return the first statement.
654    fn parse_one(sql: &str) -> StreamingStatement {
655        let stmts = StreamingParser::parse_sql(sql).unwrap();
656        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
657        stmts.into_iter().next().unwrap()
658    }
659
660    #[test]
661    fn test_parse_drop_source() {
662        let stmt = parse_one("DROP SOURCE events");
663        match stmt {
664            StreamingStatement::DropSource { name, if_exists } => {
665                assert_eq!(name.to_string(), "events");
666                assert!(!if_exists);
667            }
668            _ => panic!("Expected DropSource, got {stmt:?}"),
669        }
670    }
671
672    #[test]
673    fn test_parse_drop_source_if_exists() {
674        let stmt = parse_one("DROP SOURCE IF EXISTS events");
675        match stmt {
676            StreamingStatement::DropSource { name, if_exists } => {
677                assert_eq!(name.to_string(), "events");
678                assert!(if_exists);
679            }
680            _ => panic!("Expected DropSource, got {stmt:?}"),
681        }
682    }
683
684    #[test]
685    fn test_parse_drop_sink() {
686        let stmt = parse_one("DROP SINK output");
687        match stmt {
688            StreamingStatement::DropSink { name, if_exists } => {
689                assert_eq!(name.to_string(), "output");
690                assert!(!if_exists);
691            }
692            _ => panic!("Expected DropSink, got {stmt:?}"),
693        }
694    }
695
696    #[test]
697    fn test_parse_drop_sink_if_exists() {
698        let stmt = parse_one("DROP SINK IF EXISTS output");
699        match stmt {
700            StreamingStatement::DropSink { name, if_exists } => {
701                assert_eq!(name.to_string(), "output");
702                assert!(if_exists);
703            }
704            _ => panic!("Expected DropSink, got {stmt:?}"),
705        }
706    }
707
708    #[test]
709    fn test_parse_drop_materialized_view() {
710        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
711        match stmt {
712            StreamingStatement::DropMaterializedView {
713                name,
714                if_exists,
715                cascade,
716            } => {
717                assert_eq!(name.to_string(), "live_stats");
718                assert!(!if_exists);
719                assert!(!cascade);
720            }
721            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
722        }
723    }
724
725    #[test]
726    fn test_parse_drop_materialized_view_if_exists_cascade() {
727        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
728        match stmt {
729            StreamingStatement::DropMaterializedView {
730                name,
731                if_exists,
732                cascade,
733            } => {
734                assert_eq!(name.to_string(), "live_stats");
735                assert!(if_exists);
736                assert!(cascade);
737            }
738            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
739        }
740    }
741
742    #[test]
743    fn test_parse_show_sources() {
744        let stmt = parse_one("SHOW SOURCES");
745        assert!(matches!(
746            stmt,
747            StreamingStatement::Show(ShowCommand::Sources)
748        ));
749    }
750
751    #[test]
752    fn test_parse_show_sinks() {
753        let stmt = parse_one("SHOW SINKS");
754        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
755    }
756
757    #[test]
758    fn test_parse_show_queries() {
759        let stmt = parse_one("SHOW QUERIES");
760        assert!(matches!(
761            stmt,
762            StreamingStatement::Show(ShowCommand::Queries)
763        ));
764    }
765
766    #[test]
767    fn test_parse_show_materialized_views() {
768        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
769        assert!(matches!(
770            stmt,
771            StreamingStatement::Show(ShowCommand::MaterializedViews)
772        ));
773    }
774
775    #[test]
776    fn test_parse_describe() {
777        let stmt = parse_one("DESCRIBE events");
778        match stmt {
779            StreamingStatement::Describe { name, extended } => {
780                assert_eq!(name.to_string(), "events");
781                assert!(!extended);
782            }
783            _ => panic!("Expected Describe, got {stmt:?}"),
784        }
785    }
786
787    #[test]
788    fn test_parse_describe_extended() {
789        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
790        match stmt {
791            StreamingStatement::Describe { name, extended } => {
792                assert_eq!(name.to_string(), "my_schema.events");
793                assert!(extended);
794            }
795            _ => panic!("Expected Describe, got {stmt:?}"),
796        }
797    }
798
799    #[test]
800    fn test_parse_explain_select() {
801        let stmt = parse_one("EXPLAIN SELECT * FROM events");
802        match stmt {
803            StreamingStatement::Explain { statement } => {
804                assert!(matches!(*statement, StreamingStatement::Standard(_)));
805            }
806            _ => panic!("Expected Explain, got {stmt:?}"),
807        }
808    }
809
810    #[test]
811    fn test_parse_explain_create_source() {
812        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
813        match stmt {
814            StreamingStatement::Explain { statement } => {
815                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
816            }
817            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
818        }
819    }
820
821    #[test]
822    fn test_parse_create_materialized_view() {
823        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
824        match stmt {
825            StreamingStatement::CreateMaterializedView {
826                name,
827                emit_clause,
828                or_replace,
829                if_not_exists,
830                ..
831            } => {
832                assert_eq!(name.to_string(), "live_stats");
833                assert!(emit_clause.is_none());
834                assert!(!or_replace);
835                assert!(!if_not_exists);
836            }
837            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
838        }
839    }
840
841    #[test]
842    fn test_parse_create_materialized_view_with_emit() {
843        let stmt = parse_one(
844            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
845        );
846        match stmt {
847            StreamingStatement::CreateMaterializedView {
848                name, emit_clause, ..
849            } => {
850                assert_eq!(name.to_string(), "live_stats");
851                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
852            }
853            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
854        }
855    }
856
857    #[test]
858    fn test_parse_create_or_replace_materialized_view() {
859        let stmt = parse_one(
860            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
861        );
862        match stmt {
863            StreamingStatement::CreateMaterializedView {
864                name,
865                or_replace,
866                if_not_exists,
867                ..
868            } => {
869                assert_eq!(name.to_string(), "live_stats");
870                assert!(or_replace);
871                assert!(!if_not_exists);
872            }
873            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
874        }
875    }
876
877    #[test]
878    fn test_parse_create_materialized_view_if_not_exists() {
879        let stmt = parse_one(
880            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
881        );
882        match stmt {
883            StreamingStatement::CreateMaterializedView {
884                name,
885                or_replace,
886                if_not_exists,
887                ..
888            } => {
889                assert_eq!(name.to_string(), "live_stats");
890                assert!(!or_replace);
891                assert!(if_not_exists);
892            }
893            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
894        }
895    }
896
897    #[test]
898    fn test_parse_insert_into() {
899        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
900        match stmt {
901            StreamingStatement::InsertInto {
902                table_name,
903                columns,
904                values,
905            } => {
906                assert_eq!(table_name.to_string(), "events");
907                assert_eq!(columns.len(), 2);
908                assert_eq!(columns[0].to_string(), "id");
909                assert_eq!(columns[1].to_string(), "name");
910                assert_eq!(values.len(), 1);
911                assert_eq!(values[0].len(), 2);
912            }
913            _ => panic!("Expected InsertInto, got {stmt:?}"),
914        }
915    }
916
917    #[test]
918    fn test_parse_insert_into_multiple_rows() {
919        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
920        match stmt {
921            StreamingStatement::InsertInto {
922                table_name,
923                columns,
924                values,
925            } => {
926                assert_eq!(table_name.to_string(), "events");
927                assert!(columns.is_empty());
928                assert_eq!(values.len(), 3);
929            }
930            _ => panic!("Expected InsertInto, got {stmt:?}"),
931        }
932    }
933
934    // ── CREATE STREAM tests (F-SQL-003) ─────────────────────────────
935
936    #[test]
937    fn test_parse_create_stream() {
938        let stmt = parse_one(
939            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
940        );
941        match stmt {
942            StreamingStatement::CreateStream {
943                name,
944                or_replace,
945                if_not_exists,
946                emit_clause,
947                ..
948            } => {
949                assert_eq!(name.to_string(), "session_activity");
950                assert!(!or_replace);
951                assert!(!if_not_exists);
952                assert!(emit_clause.is_none());
953            }
954            _ => panic!("Expected CreateStream, got {stmt:?}"),
955        }
956    }
957
958    #[test]
959    fn test_parse_create_or_replace_stream() {
960        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
961        match stmt {
962            StreamingStatement::CreateStream { or_replace, .. } => {
963                assert!(or_replace);
964            }
965            _ => panic!("Expected CreateStream, got {stmt:?}"),
966        }
967    }
968
969    #[test]
970    fn test_parse_create_stream_if_not_exists() {
971        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
972        match stmt {
973            StreamingStatement::CreateStream { if_not_exists, .. } => {
974                assert!(if_not_exists);
975            }
976            _ => panic!("Expected CreateStream, got {stmt:?}"),
977        }
978    }
979
980    #[test]
981    fn test_parse_create_stream_with_emit() {
982        let stmt =
983            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
984        match stmt {
985            StreamingStatement::CreateStream { emit_clause, .. } => {
986                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
987            }
988            _ => panic!("Expected CreateStream, got {stmt:?}"),
989        }
990    }
991
992    #[test]
993    fn test_parse_drop_stream() {
994        let stmt = parse_one("DROP STREAM my_stream");
995        match stmt {
996            StreamingStatement::DropStream { name, if_exists } => {
997                assert_eq!(name.to_string(), "my_stream");
998                assert!(!if_exists);
999            }
1000            _ => panic!("Expected DropStream, got {stmt:?}"),
1001        }
1002    }
1003
1004    #[test]
1005    fn test_parse_drop_stream_if_exists() {
1006        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1007        match stmt {
1008            StreamingStatement::DropStream { name, if_exists } => {
1009                assert_eq!(name.to_string(), "my_stream");
1010                assert!(if_exists);
1011            }
1012            _ => panic!("Expected DropStream, got {stmt:?}"),
1013        }
1014    }
1015
1016    #[test]
1017    fn test_parse_show_streams() {
1018        let stmt = parse_one("SHOW STREAMS");
1019        assert!(matches!(
1020            stmt,
1021            StreamingStatement::Show(ShowCommand::Streams)
1022        ));
1023    }
1024}