Skip to main content

laminar_sql/parser/
mod.rs

1//! SQL parser with streaming extensions.
2//!
3//! Routes streaming DDL (CREATE SOURCE/SINK/CONTINUOUS QUERY) to custom
4//! parsers that use sqlparser primitives. Routes standard SQL to sqlparser
5//! with `GenericDialect`.
6
7pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12/// INTERVAL arithmetic rewriter for BIGINT timestamp columns
13pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use statements::{
24    CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy, FormatSpec,
25    LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef, WindowFunction,
26};
27pub use window_rewriter::WindowRewriter;
28
29use dialect::LaminarDialect;
30use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
31
32/// Parses SQL with streaming extensions.
33///
34/// Routes streaming DDL to custom parsers that use sqlparser's `Parser` API
35/// for structured parsing. Standard SQL is delegated to sqlparser directly.
36///
37/// # Errors
38///
39/// Returns `ParseError` if the SQL syntax is invalid.
40pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
41    StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
42}
43
44/// Parser for streaming SQL extensions.
45///
46/// Provides static methods for parsing streaming SQL statements.
47/// Uses sqlparser's `Parser` API internally for structured parsing
48/// of identifiers, data types, expressions, and queries.
49pub struct StreamingParser;
50
51impl StreamingParser {
52    /// Parse a SQL string with streaming extensions.
53    ///
54    /// Tokenizes the input to detect statement type, then routes to the
55    /// appropriate parser:
56    /// - CREATE SOURCE → `source_parser`
57    /// - CREATE SINK → `sink_parser`
58    /// - CREATE CONTINUOUS QUERY → `continuous_query_parser`
59    /// - Everything else → `sqlparser::parser::Parser`
60    ///
61    /// # Errors
62    ///
63    /// Returns `ParserError` if the SQL syntax is invalid.
64    #[allow(clippy::too_many_lines)]
65    pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
66        let sql_trimmed = sql.trim();
67        if sql_trimmed.is_empty() {
68            return Err(sqlparser::parser::ParserError::ParserError(
69                "Empty SQL statement".to_string(),
70            ));
71        }
72
73        let dialect = LaminarDialect::default();
74
75        // Tokenize to detect statement type (with location for better errors)
76        let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
77            .tokenize_with_location()
78            .map_err(|e| {
79                sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
80            })?;
81
82        // Route based on token-level detection
83        match detect_streaming_ddl(&tokens) {
84            StreamingDdlKind::CreateSource { .. } => {
85                let mut parser =
86                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
87                let source = source_parser::parse_create_source(&mut parser)
88                    .map_err(parse_error_to_parser_error)?;
89                Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
90            }
91            StreamingDdlKind::CreateSink { .. } => {
92                let mut parser =
93                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
94                let sink = sink_parser::parse_create_sink(&mut parser)
95                    .map_err(parse_error_to_parser_error)?;
96                Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
97            }
98            StreamingDdlKind::CreateContinuousQuery { .. } => {
99                let mut parser =
100                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
101                let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
102                    .map_err(parse_error_to_parser_error)?;
103                Ok(vec![stmt])
104            }
105            StreamingDdlKind::DropSource { .. } => {
106                let mut parser =
107                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
108                let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
109                Ok(vec![stmt])
110            }
111            StreamingDdlKind::DropSink { .. } => {
112                let mut parser =
113                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
114                let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
115                Ok(vec![stmt])
116            }
117            StreamingDdlKind::DropMaterializedView { .. } => {
118                let mut parser =
119                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
120                let stmt = parse_drop_materialized_view(&mut parser)
121                    .map_err(parse_error_to_parser_error)?;
122                Ok(vec![stmt])
123            }
124            StreamingDdlKind::ShowSources => {
125                Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
126            }
127            StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
128            StreamingDdlKind::ShowQueries => {
129                Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
130            }
131            StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
132                ShowCommand::MaterializedViews,
133            )]),
134            StreamingDdlKind::DescribeSource => {
135                let mut parser =
136                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
137                let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
138                Ok(vec![stmt])
139            }
140            StreamingDdlKind::ExplainStreaming => {
141                let mut parser =
142                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
143                let stmt =
144                    parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
145                Ok(vec![stmt])
146            }
147            StreamingDdlKind::CreateMaterializedView { .. } => {
148                let mut parser =
149                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
150                let stmt = parse_create_materialized_view(&mut parser)
151                    .map_err(parse_error_to_parser_error)?;
152                Ok(vec![stmt])
153            }
154            StreamingDdlKind::CreateStream { .. } => {
155                let mut parser =
156                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
157                let stmt = parse_create_stream(&mut parser, sql_trimmed)
158                    .map_err(parse_error_to_parser_error)?;
159                Ok(vec![stmt])
160            }
161            StreamingDdlKind::DropStream { .. } => {
162                let mut parser =
163                    sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
164                let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
165                Ok(vec![stmt])
166            }
167            StreamingDdlKind::ShowStreams => {
168                Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
169            }
170            StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
171            StreamingDdlKind::None => {
172                // Standard SQL - check for INSERT INTO and convert
173                let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
174                Ok(statements
175                    .into_iter()
176                    .map(convert_standard_statement)
177                    .collect())
178            }
179        }
180    }
181
182    /// Check if an expression contains a window function.
183    #[must_use]
184    pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
185        match expr {
186            sqlparser::ast::Expr::Function(func) => {
187                if let Some(name) = func.name.0.last() {
188                    let func_name = name.to_string().to_uppercase();
189                    matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
190                } else {
191                    false
192                }
193            }
194            _ => false,
195        }
196    }
197
198    /// Parse EMIT clause from SQL string.
199    ///
200    /// # Errors
201    ///
202    /// Returns `ParseError::StreamingError` if the EMIT clause syntax is invalid.
203    pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
204        emit_parser::parse_emit_clause_from_sql(sql)
205    }
206
207    /// Parse late data handling clause from SQL string.
208    ///
209    /// # Errors
210    ///
211    /// Returns `ParseError::StreamingError` if the clause syntax is invalid.
212    pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
213        late_data_parser::parse_late_data_clause_from_sql(sql)
214    }
215}
216
217/// Convert `ParseError` to `ParserError` for backward compatibility.
218fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
219    match e {
220        ParseError::SqlParseError(pe) => pe,
221        ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
222        ParseError::WindowError(msg) => {
223            sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
224        }
225        ParseError::ValidationError(msg) => {
226            sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
227        }
228    }
229}
230
231/// Convert a standard sqlparser statement to a `StreamingStatement`.
232///
233/// Detects INSERT INTO statements and converts them to the streaming
234/// `InsertInto` variant. All other statements are wrapped as `Standard`.
235fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
236    if let sqlparser::ast::Statement::Insert(insert) = &stmt {
237        // Extract table name from TableObject
238        if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
239            let table_name = name.clone();
240            let columns = insert.columns.clone();
241
242            // Try to extract VALUES rows from source query
243            if let Some(ref source) = insert.source {
244                if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
245                    let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
246                    return StreamingStatement::InsertInto {
247                        table_name,
248                        columns,
249                        values: rows,
250                    };
251                }
252            }
253        }
254    }
255    StreamingStatement::Standard(Box::new(stmt))
256}
257
258/// Parse a DROP SOURCE statement.
259///
260/// Syntax: `DROP SOURCE [IF EXISTS] name`
261///
262/// # Errors
263///
264/// Returns `ParseError` if the statement syntax is invalid.
265fn parse_drop_source(
266    parser: &mut sqlparser::parser::Parser,
267) -> Result<StreamingStatement, ParseError> {
268    parser
269        .expect_keyword(sqlparser::keywords::Keyword::DROP)
270        .map_err(ParseError::SqlParseError)?;
271    tokenizer::expect_custom_keyword(parser, "SOURCE")?;
272    let if_exists = parser.parse_keywords(&[
273        sqlparser::keywords::Keyword::IF,
274        sqlparser::keywords::Keyword::EXISTS,
275    ]);
276    let name = parser
277        .parse_object_name(false)
278        .map_err(ParseError::SqlParseError)?;
279    Ok(StreamingStatement::DropSource { name, if_exists })
280}
281
282/// Parse a DROP SINK statement.
283///
284/// Syntax: `DROP SINK [IF EXISTS] name`
285///
286/// # Errors
287///
288/// Returns `ParseError` if the statement syntax is invalid.
289fn parse_drop_sink(
290    parser: &mut sqlparser::parser::Parser,
291) -> Result<StreamingStatement, ParseError> {
292    parser
293        .expect_keyword(sqlparser::keywords::Keyword::DROP)
294        .map_err(ParseError::SqlParseError)?;
295    tokenizer::expect_custom_keyword(parser, "SINK")?;
296    let if_exists = parser.parse_keywords(&[
297        sqlparser::keywords::Keyword::IF,
298        sqlparser::keywords::Keyword::EXISTS,
299    ]);
300    let name = parser
301        .parse_object_name(false)
302        .map_err(ParseError::SqlParseError)?;
303    Ok(StreamingStatement::DropSink { name, if_exists })
304}
305
306/// Parse a DROP MATERIALIZED VIEW statement.
307///
308/// Syntax: `DROP MATERIALIZED VIEW [IF EXISTS] name [CASCADE]`
309///
310/// # Errors
311///
312/// Returns `ParseError` if the statement syntax is invalid.
313fn parse_drop_materialized_view(
314    parser: &mut sqlparser::parser::Parser,
315) -> Result<StreamingStatement, ParseError> {
316    parser
317        .expect_keyword(sqlparser::keywords::Keyword::DROP)
318        .map_err(ParseError::SqlParseError)?;
319    parser
320        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
321        .map_err(ParseError::SqlParseError)?;
322    parser
323        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
324        .map_err(ParseError::SqlParseError)?;
325    let if_exists = parser.parse_keywords(&[
326        sqlparser::keywords::Keyword::IF,
327        sqlparser::keywords::Keyword::EXISTS,
328    ]);
329    let name = parser
330        .parse_object_name(false)
331        .map_err(ParseError::SqlParseError)?;
332    let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
333    Ok(StreamingStatement::DropMaterializedView {
334        name,
335        if_exists,
336        cascade,
337    })
338}
339
340/// Parse a CREATE STREAM statement.
341///
342/// Syntax: `CREATE [OR REPLACE] STREAM [IF NOT EXISTS] name AS <select_query> [EMIT <strategy>]`
343///
344/// # Errors
345///
346/// Returns `ParseError` if the statement syntax is invalid.
347fn parse_create_stream(
348    parser: &mut sqlparser::parser::Parser,
349    _original_sql: &str,
350) -> Result<StreamingStatement, ParseError> {
351    parser
352        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
353        .map_err(ParseError::SqlParseError)?;
354
355    let or_replace = parser.parse_keywords(&[
356        sqlparser::keywords::Keyword::OR,
357        sqlparser::keywords::Keyword::REPLACE,
358    ]);
359
360    tokenizer::expect_custom_keyword(parser, "STREAM")?;
361
362    let if_not_exists = parser.parse_keywords(&[
363        sqlparser::keywords::Keyword::IF,
364        sqlparser::keywords::Keyword::NOT,
365        sqlparser::keywords::Keyword::EXISTS,
366    ]);
367
368    let name = parser
369        .parse_object_name(false)
370        .map_err(ParseError::SqlParseError)?;
371
372    parser
373        .expect_keyword(sqlparser::keywords::Keyword::AS)
374        .map_err(ParseError::SqlParseError)?;
375
376    // Collect remaining tokens and split at EMIT boundary
377    let remaining = collect_remaining_tokens(parser);
378    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
379
380    let stream_dialect = LaminarDialect::default();
381
382    let query = if query_tokens.is_empty() {
383        return Err(ParseError::StreamingError(
384            "Expected SELECT query after AS".to_string(),
385        ));
386    } else {
387        let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
388            .with_tokens_with_locations(query_tokens);
389        query_parser
390            .parse_query()
391            .map_err(ParseError::SqlParseError)?
392    };
393
394    let query_stmt =
395        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
396
397    let emit_clause = if emit_tokens.is_empty() {
398        None
399    } else {
400        let mut emit_parser =
401            sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
402        emit_parser::parse_emit_clause(&mut emit_parser)?
403    };
404
405    Ok(StreamingStatement::CreateStream {
406        name,
407        query: Box::new(query_stmt),
408        emit_clause,
409        or_replace,
410        if_not_exists,
411    })
412}
413
414/// Parse a DROP STREAM statement.
415///
416/// Syntax: `DROP STREAM [IF EXISTS] name`
417///
418/// # Errors
419///
420/// Returns `ParseError` if the statement syntax is invalid.
421fn parse_drop_stream(
422    parser: &mut sqlparser::parser::Parser,
423) -> Result<StreamingStatement, ParseError> {
424    parser
425        .expect_keyword(sqlparser::keywords::Keyword::DROP)
426        .map_err(ParseError::SqlParseError)?;
427    tokenizer::expect_custom_keyword(parser, "STREAM")?;
428    let if_exists = parser.parse_keywords(&[
429        sqlparser::keywords::Keyword::IF,
430        sqlparser::keywords::Keyword::EXISTS,
431    ]);
432    let name = parser
433        .parse_object_name(false)
434        .map_err(ParseError::SqlParseError)?;
435    Ok(StreamingStatement::DropStream { name, if_exists })
436}
437
438/// Parse a DESCRIBE statement.
439///
440/// Syntax: `DESCRIBE [EXTENDED] name`
441///
442/// # Errors
443///
444/// Returns `ParseError` if the statement syntax is invalid.
445fn parse_describe(
446    parser: &mut sqlparser::parser::Parser,
447) -> Result<StreamingStatement, ParseError> {
448    // Consume DESCRIBE or DESC
449    let token = parser.next_token();
450    match &token.token {
451        sqlparser::tokenizer::Token::Word(w)
452            if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
453                || w.keyword == sqlparser::keywords::Keyword::DESC => {}
454        _ => {
455            return Err(ParseError::StreamingError(
456                "Expected DESCRIBE or DESC".to_string(),
457            ));
458        }
459    }
460    let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
461    let name = parser
462        .parse_object_name(false)
463        .map_err(ParseError::SqlParseError)?;
464    Ok(StreamingStatement::Describe { name, extended })
465}
466
467/// Parse an EXPLAIN statement wrapping a streaming query.
468///
469/// Syntax: `EXPLAIN <streaming_statement>`
470///
471/// # Errors
472///
473/// Returns `ParseError` if the statement syntax is invalid.
474fn parse_explain(
475    parser: &mut sqlparser::parser::Parser,
476    original_sql: &str,
477) -> Result<StreamingStatement, ParseError> {
478    parser
479        .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
480        .map_err(ParseError::SqlParseError)?;
481
482    // Find the position after EXPLAIN in the original SQL
483    let explain_prefix_upper = original_sql.to_uppercase();
484    let inner_start = explain_prefix_upper
485        .find("EXPLAIN")
486        .map_or(0, |pos| pos + "EXPLAIN".len());
487    let inner_sql = original_sql[inner_start..].trim();
488
489    // Parse the inner statement recursively
490    let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
491    let inner = inner_stmts.into_iter().next().ok_or_else(|| {
492        sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
493    })?;
494    Ok(StreamingStatement::Explain {
495        statement: Box::new(inner),
496    })
497}
498
499/// Parse a CREATE MATERIALIZED VIEW statement.
500///
501/// Syntax:
502/// ```sql
503/// CREATE [OR REPLACE] MATERIALIZED VIEW [IF NOT EXISTS] name
504/// AS <select_query>
505/// [EMIT <strategy>]
506/// ```
507///
508/// # Errors
509///
510/// Returns `ParseError` if the statement syntax is invalid.
511fn parse_create_materialized_view(
512    parser: &mut sqlparser::parser::Parser,
513) -> Result<StreamingStatement, ParseError> {
514    parser
515        .expect_keyword(sqlparser::keywords::Keyword::CREATE)
516        .map_err(ParseError::SqlParseError)?;
517
518    let or_replace = parser.parse_keywords(&[
519        sqlparser::keywords::Keyword::OR,
520        sqlparser::keywords::Keyword::REPLACE,
521    ]);
522
523    parser
524        .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
525        .map_err(ParseError::SqlParseError)?;
526    parser
527        .expect_keyword(sqlparser::keywords::Keyword::VIEW)
528        .map_err(ParseError::SqlParseError)?;
529
530    let if_not_exists = parser.parse_keywords(&[
531        sqlparser::keywords::Keyword::IF,
532        sqlparser::keywords::Keyword::NOT,
533        sqlparser::keywords::Keyword::EXISTS,
534    ]);
535
536    let name = parser
537        .parse_object_name(false)
538        .map_err(ParseError::SqlParseError)?;
539
540    parser
541        .expect_keyword(sqlparser::keywords::Keyword::AS)
542        .map_err(ParseError::SqlParseError)?;
543
544    // Collect remaining tokens and split at EMIT boundary (same strategy as continuous query)
545    let remaining = collect_remaining_tokens(parser);
546    let (query_tokens, emit_tokens) = split_at_emit(&remaining);
547
548    let mv_dialect = LaminarDialect::default();
549
550    let query = if query_tokens.is_empty() {
551        return Err(ParseError::StreamingError(
552            "Expected SELECT query after AS".to_string(),
553        ));
554    } else {
555        let mut query_parser =
556            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
557        query_parser
558            .parse_query()
559            .map_err(ParseError::SqlParseError)?
560    };
561
562    let query_stmt =
563        StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
564
565    let emit_clause = if emit_tokens.is_empty() {
566        None
567    } else {
568        let mut emit_parser =
569            sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
570        emit_parser::parse_emit_clause(&mut emit_parser)?
571    };
572
573    Ok(StreamingStatement::CreateMaterializedView {
574        name,
575        query: Box::new(query_stmt),
576        emit_clause,
577        or_replace,
578        if_not_exists,
579    })
580}
581
582/// Collect all remaining tokens from the parser into a Vec.
583fn collect_remaining_tokens(
584    parser: &mut sqlparser::parser::Parser,
585) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
586    let mut tokens = Vec::new();
587    loop {
588        let token = parser.next_token();
589        if token.token == sqlparser::tokenizer::Token::EOF {
590            tokens.push(token);
591            break;
592        }
593        tokens.push(token);
594    }
595    tokens
596}
597
598/// Split tokens at the first standalone EMIT keyword (not inside parentheses).
599///
600/// Returns (query_tokens, emit_tokens) where emit_tokens starts with EMIT
601/// (or is empty if no EMIT found).
602fn split_at_emit(
603    tokens: &[sqlparser::tokenizer::TokenWithSpan],
604) -> (
605    Vec<sqlparser::tokenizer::TokenWithSpan>,
606    Vec<sqlparser::tokenizer::TokenWithSpan>,
607) {
608    let mut depth: i32 = 0;
609    for (i, token) in tokens.iter().enumerate() {
610        match &token.token {
611            sqlparser::tokenizer::Token::LParen => depth += 1,
612            sqlparser::tokenizer::Token::RParen => {
613                depth -= 1;
614            }
615            sqlparser::tokenizer::Token::Word(w)
616                if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
617            {
618                let mut query_tokens = tokens[..i].to_vec();
619                query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
620                    token: sqlparser::tokenizer::Token::EOF,
621                    span: sqlparser::tokenizer::Span::empty(),
622                });
623                let emit_tokens = tokens[i..].to_vec();
624                return (query_tokens, emit_tokens);
625            }
626            _ => {}
627        }
628    }
629    (tokens.to_vec(), vec![])
630}
631
632/// SQL parsing errors
633#[derive(Debug, thiserror::Error)]
634pub enum ParseError {
635    /// Standard SQL parse error
636    #[error("SQL parse error: {0}")]
637    SqlParseError(#[from] sqlparser::parser::ParserError),
638
639    /// Streaming extension parse error
640    #[error("Streaming SQL error: {0}")]
641    StreamingError(String),
642
643    /// Window function error
644    #[error("Window function error: {0}")]
645    WindowError(String),
646
647    /// Validation error (e.g., invalid option values)
648    #[error("Validation error: {0}")]
649    ValidationError(String),
650}
651
652#[cfg(test)]
653mod tests {
654    use super::*;
655
656    /// Helper to parse SQL and return the first statement.
657    fn parse_one(sql: &str) -> StreamingStatement {
658        let stmts = StreamingParser::parse_sql(sql).unwrap();
659        assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
660        stmts.into_iter().next().unwrap()
661    }
662
663    #[test]
664    fn test_parse_drop_source() {
665        let stmt = parse_one("DROP SOURCE events");
666        match stmt {
667            StreamingStatement::DropSource { name, if_exists } => {
668                assert_eq!(name.to_string(), "events");
669                assert!(!if_exists);
670            }
671            _ => panic!("Expected DropSource, got {stmt:?}"),
672        }
673    }
674
675    #[test]
676    fn test_parse_drop_source_if_exists() {
677        let stmt = parse_one("DROP SOURCE IF EXISTS events");
678        match stmt {
679            StreamingStatement::DropSource { name, if_exists } => {
680                assert_eq!(name.to_string(), "events");
681                assert!(if_exists);
682            }
683            _ => panic!("Expected DropSource, got {stmt:?}"),
684        }
685    }
686
687    #[test]
688    fn test_parse_drop_sink() {
689        let stmt = parse_one("DROP SINK output");
690        match stmt {
691            StreamingStatement::DropSink { name, if_exists } => {
692                assert_eq!(name.to_string(), "output");
693                assert!(!if_exists);
694            }
695            _ => panic!("Expected DropSink, got {stmt:?}"),
696        }
697    }
698
699    #[test]
700    fn test_parse_drop_sink_if_exists() {
701        let stmt = parse_one("DROP SINK IF EXISTS output");
702        match stmt {
703            StreamingStatement::DropSink { name, if_exists } => {
704                assert_eq!(name.to_string(), "output");
705                assert!(if_exists);
706            }
707            _ => panic!("Expected DropSink, got {stmt:?}"),
708        }
709    }
710
711    #[test]
712    fn test_parse_drop_materialized_view() {
713        let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
714        match stmt {
715            StreamingStatement::DropMaterializedView {
716                name,
717                if_exists,
718                cascade,
719            } => {
720                assert_eq!(name.to_string(), "live_stats");
721                assert!(!if_exists);
722                assert!(!cascade);
723            }
724            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
725        }
726    }
727
728    #[test]
729    fn test_parse_drop_materialized_view_if_exists_cascade() {
730        let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
731        match stmt {
732            StreamingStatement::DropMaterializedView {
733                name,
734                if_exists,
735                cascade,
736            } => {
737                assert_eq!(name.to_string(), "live_stats");
738                assert!(if_exists);
739                assert!(cascade);
740            }
741            _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
742        }
743    }
744
745    #[test]
746    fn test_parse_show_sources() {
747        let stmt = parse_one("SHOW SOURCES");
748        assert!(matches!(
749            stmt,
750            StreamingStatement::Show(ShowCommand::Sources)
751        ));
752    }
753
754    #[test]
755    fn test_parse_show_sinks() {
756        let stmt = parse_one("SHOW SINKS");
757        assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
758    }
759
760    #[test]
761    fn test_parse_show_queries() {
762        let stmt = parse_one("SHOW QUERIES");
763        assert!(matches!(
764            stmt,
765            StreamingStatement::Show(ShowCommand::Queries)
766        ));
767    }
768
769    #[test]
770    fn test_parse_show_materialized_views() {
771        let stmt = parse_one("SHOW MATERIALIZED VIEWS");
772        assert!(matches!(
773            stmt,
774            StreamingStatement::Show(ShowCommand::MaterializedViews)
775        ));
776    }
777
778    #[test]
779    fn test_parse_describe() {
780        let stmt = parse_one("DESCRIBE events");
781        match stmt {
782            StreamingStatement::Describe { name, extended } => {
783                assert_eq!(name.to_string(), "events");
784                assert!(!extended);
785            }
786            _ => panic!("Expected Describe, got {stmt:?}"),
787        }
788    }
789
790    #[test]
791    fn test_parse_describe_extended() {
792        let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
793        match stmt {
794            StreamingStatement::Describe { name, extended } => {
795                assert_eq!(name.to_string(), "my_schema.events");
796                assert!(extended);
797            }
798            _ => panic!("Expected Describe, got {stmt:?}"),
799        }
800    }
801
802    #[test]
803    fn test_parse_explain_select() {
804        let stmt = parse_one("EXPLAIN SELECT * FROM events");
805        match stmt {
806            StreamingStatement::Explain { statement } => {
807                assert!(matches!(*statement, StreamingStatement::Standard(_)));
808            }
809            _ => panic!("Expected Explain, got {stmt:?}"),
810        }
811    }
812
813    #[test]
814    fn test_parse_explain_create_source() {
815        let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
816        match stmt {
817            StreamingStatement::Explain { statement } => {
818                assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
819            }
820            _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
821        }
822    }
823
824    #[test]
825    fn test_parse_create_materialized_view() {
826        let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
827        match stmt {
828            StreamingStatement::CreateMaterializedView {
829                name,
830                emit_clause,
831                or_replace,
832                if_not_exists,
833                ..
834            } => {
835                assert_eq!(name.to_string(), "live_stats");
836                assert!(emit_clause.is_none());
837                assert!(!or_replace);
838                assert!(!if_not_exists);
839            }
840            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
841        }
842    }
843
844    #[test]
845    fn test_parse_create_materialized_view_with_emit() {
846        let stmt = parse_one(
847            "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
848        );
849        match stmt {
850            StreamingStatement::CreateMaterializedView {
851                name, emit_clause, ..
852            } => {
853                assert_eq!(name.to_string(), "live_stats");
854                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
855            }
856            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
857        }
858    }
859
860    #[test]
861    fn test_parse_create_or_replace_materialized_view() {
862        let stmt = parse_one(
863            "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
864        );
865        match stmt {
866            StreamingStatement::CreateMaterializedView {
867                name,
868                or_replace,
869                if_not_exists,
870                ..
871            } => {
872                assert_eq!(name.to_string(), "live_stats");
873                assert!(or_replace);
874                assert!(!if_not_exists);
875            }
876            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
877        }
878    }
879
880    #[test]
881    fn test_parse_create_materialized_view_if_not_exists() {
882        let stmt = parse_one(
883            "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
884        );
885        match stmt {
886            StreamingStatement::CreateMaterializedView {
887                name,
888                or_replace,
889                if_not_exists,
890                ..
891            } => {
892                assert_eq!(name.to_string(), "live_stats");
893                assert!(!or_replace);
894                assert!(if_not_exists);
895            }
896            _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
897        }
898    }
899
900    #[test]
901    fn test_parse_insert_into() {
902        let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
903        match stmt {
904            StreamingStatement::InsertInto {
905                table_name,
906                columns,
907                values,
908            } => {
909                assert_eq!(table_name.to_string(), "events");
910                assert_eq!(columns.len(), 2);
911                assert_eq!(columns[0].to_string(), "id");
912                assert_eq!(columns[1].to_string(), "name");
913                assert_eq!(values.len(), 1);
914                assert_eq!(values[0].len(), 2);
915            }
916            _ => panic!("Expected InsertInto, got {stmt:?}"),
917        }
918    }
919
920    #[test]
921    fn test_parse_insert_into_multiple_rows() {
922        let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
923        match stmt {
924            StreamingStatement::InsertInto {
925                table_name,
926                columns,
927                values,
928            } => {
929                assert_eq!(table_name.to_string(), "events");
930                assert!(columns.is_empty());
931                assert_eq!(values.len(), 3);
932            }
933            _ => panic!("Expected InsertInto, got {stmt:?}"),
934        }
935    }
936
937    // ── CREATE STREAM tests (F-SQL-003) ─────────────────────────────
938
939    #[test]
940    fn test_parse_create_stream() {
941        let stmt = parse_one(
942            "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
943        );
944        match stmt {
945            StreamingStatement::CreateStream {
946                name,
947                or_replace,
948                if_not_exists,
949                emit_clause,
950                ..
951            } => {
952                assert_eq!(name.to_string(), "session_activity");
953                assert!(!or_replace);
954                assert!(!if_not_exists);
955                assert!(emit_clause.is_none());
956            }
957            _ => panic!("Expected CreateStream, got {stmt:?}"),
958        }
959    }
960
961    #[test]
962    fn test_parse_create_or_replace_stream() {
963        let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
964        match stmt {
965            StreamingStatement::CreateStream { or_replace, .. } => {
966                assert!(or_replace);
967            }
968            _ => panic!("Expected CreateStream, got {stmt:?}"),
969        }
970    }
971
972    #[test]
973    fn test_parse_create_stream_if_not_exists() {
974        let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
975        match stmt {
976            StreamingStatement::CreateStream { if_not_exists, .. } => {
977                assert!(if_not_exists);
978            }
979            _ => panic!("Expected CreateStream, got {stmt:?}"),
980        }
981    }
982
983    #[test]
984    fn test_parse_create_stream_with_emit() {
985        let stmt =
986            parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
987        match stmt {
988            StreamingStatement::CreateStream { emit_clause, .. } => {
989                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
990            }
991            _ => panic!("Expected CreateStream, got {stmt:?}"),
992        }
993    }
994
995    #[test]
996    fn test_parse_drop_stream() {
997        let stmt = parse_one("DROP STREAM my_stream");
998        match stmt {
999            StreamingStatement::DropStream { name, if_exists } => {
1000                assert_eq!(name.to_string(), "my_stream");
1001                assert!(!if_exists);
1002            }
1003            _ => panic!("Expected DropStream, got {stmt:?}"),
1004        }
1005    }
1006
1007    #[test]
1008    fn test_parse_drop_stream_if_exists() {
1009        let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1010        match stmt {
1011            StreamingStatement::DropStream { name, if_exists } => {
1012                assert_eq!(name.to_string(), "my_stream");
1013                assert!(if_exists);
1014            }
1015            _ => panic!("Expected DropStream, got {stmt:?}"),
1016        }
1017    }
1018
1019    #[test]
1020    fn test_parse_show_streams() {
1021        let stmt = parse_one("SHOW STREAMS");
1022        assert!(matches!(
1023            stmt,
1024            StreamingStatement::Show(ShowCommand::Streams)
1025        ));
1026    }
1027}