1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use statements::{
24 CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy, FormatSpec,
25 LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef, WindowFunction,
26};
27pub use window_rewriter::WindowRewriter;
28
29use dialect::LaminarDialect;
30use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
31
32pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
41 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
42}
43
44pub struct StreamingParser;
50
51impl StreamingParser {
52 #[allow(clippy::too_many_lines)]
65 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
66 let sql_trimmed = sql.trim();
67 if sql_trimmed.is_empty() {
68 return Err(sqlparser::parser::ParserError::ParserError(
69 "Empty SQL statement".to_string(),
70 ));
71 }
72
73 let dialect = LaminarDialect::default();
74
75 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
77 .tokenize_with_location()
78 .map_err(|e| {
79 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
80 })?;
81
82 match detect_streaming_ddl(&tokens) {
84 StreamingDdlKind::CreateSource { .. } => {
85 let mut parser =
86 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
87 let source = source_parser::parse_create_source(&mut parser)
88 .map_err(parse_error_to_parser_error)?;
89 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
90 }
91 StreamingDdlKind::CreateSink { .. } => {
92 let mut parser =
93 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
94 let sink = sink_parser::parse_create_sink(&mut parser)
95 .map_err(parse_error_to_parser_error)?;
96 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
97 }
98 StreamingDdlKind::CreateContinuousQuery { .. } => {
99 let mut parser =
100 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
101 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
102 .map_err(parse_error_to_parser_error)?;
103 Ok(vec![stmt])
104 }
105 StreamingDdlKind::DropSource { .. } => {
106 let mut parser =
107 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
108 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
109 Ok(vec![stmt])
110 }
111 StreamingDdlKind::DropSink { .. } => {
112 let mut parser =
113 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
114 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
115 Ok(vec![stmt])
116 }
117 StreamingDdlKind::DropMaterializedView { .. } => {
118 let mut parser =
119 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
120 let stmt = parse_drop_materialized_view(&mut parser)
121 .map_err(parse_error_to_parser_error)?;
122 Ok(vec![stmt])
123 }
124 StreamingDdlKind::ShowSources => {
125 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
126 }
127 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
128 StreamingDdlKind::ShowQueries => {
129 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
130 }
131 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
132 ShowCommand::MaterializedViews,
133 )]),
134 StreamingDdlKind::DescribeSource => {
135 let mut parser =
136 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
137 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
138 Ok(vec![stmt])
139 }
140 StreamingDdlKind::ExplainStreaming => {
141 let mut parser =
142 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
143 let stmt =
144 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
145 Ok(vec![stmt])
146 }
147 StreamingDdlKind::CreateMaterializedView { .. } => {
148 let mut parser =
149 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
150 let stmt = parse_create_materialized_view(&mut parser)
151 .map_err(parse_error_to_parser_error)?;
152 Ok(vec![stmt])
153 }
154 StreamingDdlKind::CreateStream { .. } => {
155 let mut parser =
156 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
157 let stmt = parse_create_stream(&mut parser, sql_trimmed)
158 .map_err(parse_error_to_parser_error)?;
159 Ok(vec![stmt])
160 }
161 StreamingDdlKind::DropStream { .. } => {
162 let mut parser =
163 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
164 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
165 Ok(vec![stmt])
166 }
167 StreamingDdlKind::ShowStreams => {
168 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
169 }
170 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
171 StreamingDdlKind::None => {
172 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
174 Ok(statements
175 .into_iter()
176 .map(convert_standard_statement)
177 .collect())
178 }
179 }
180 }
181
182 #[must_use]
184 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
185 match expr {
186 sqlparser::ast::Expr::Function(func) => {
187 if let Some(name) = func.name.0.last() {
188 let func_name = name.to_string().to_uppercase();
189 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
190 } else {
191 false
192 }
193 }
194 _ => false,
195 }
196 }
197
198 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
204 emit_parser::parse_emit_clause_from_sql(sql)
205 }
206
207 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
213 late_data_parser::parse_late_data_clause_from_sql(sql)
214 }
215}
216
217fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
219 match e {
220 ParseError::SqlParseError(pe) => pe,
221 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
222 ParseError::WindowError(msg) => {
223 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
224 }
225 ParseError::ValidationError(msg) => {
226 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
227 }
228 }
229}
230
231fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
236 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
237 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
239 let table_name = name.clone();
240 let columns = insert.columns.clone();
241
242 if let Some(ref source) = insert.source {
244 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
245 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
246 return StreamingStatement::InsertInto {
247 table_name,
248 columns,
249 values: rows,
250 };
251 }
252 }
253 }
254 }
255 StreamingStatement::Standard(Box::new(stmt))
256}
257
258fn parse_drop_source(
266 parser: &mut sqlparser::parser::Parser,
267) -> Result<StreamingStatement, ParseError> {
268 parser
269 .expect_keyword(sqlparser::keywords::Keyword::DROP)
270 .map_err(ParseError::SqlParseError)?;
271 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
272 let if_exists = parser.parse_keywords(&[
273 sqlparser::keywords::Keyword::IF,
274 sqlparser::keywords::Keyword::EXISTS,
275 ]);
276 let name = parser
277 .parse_object_name(false)
278 .map_err(ParseError::SqlParseError)?;
279 Ok(StreamingStatement::DropSource { name, if_exists })
280}
281
282fn parse_drop_sink(
290 parser: &mut sqlparser::parser::Parser,
291) -> Result<StreamingStatement, ParseError> {
292 parser
293 .expect_keyword(sqlparser::keywords::Keyword::DROP)
294 .map_err(ParseError::SqlParseError)?;
295 tokenizer::expect_custom_keyword(parser, "SINK")?;
296 let if_exists = parser.parse_keywords(&[
297 sqlparser::keywords::Keyword::IF,
298 sqlparser::keywords::Keyword::EXISTS,
299 ]);
300 let name = parser
301 .parse_object_name(false)
302 .map_err(ParseError::SqlParseError)?;
303 Ok(StreamingStatement::DropSink { name, if_exists })
304}
305
306fn parse_drop_materialized_view(
314 parser: &mut sqlparser::parser::Parser,
315) -> Result<StreamingStatement, ParseError> {
316 parser
317 .expect_keyword(sqlparser::keywords::Keyword::DROP)
318 .map_err(ParseError::SqlParseError)?;
319 parser
320 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
321 .map_err(ParseError::SqlParseError)?;
322 parser
323 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
324 .map_err(ParseError::SqlParseError)?;
325 let if_exists = parser.parse_keywords(&[
326 sqlparser::keywords::Keyword::IF,
327 sqlparser::keywords::Keyword::EXISTS,
328 ]);
329 let name = parser
330 .parse_object_name(false)
331 .map_err(ParseError::SqlParseError)?;
332 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
333 Ok(StreamingStatement::DropMaterializedView {
334 name,
335 if_exists,
336 cascade,
337 })
338}
339
340fn parse_create_stream(
348 parser: &mut sqlparser::parser::Parser,
349 _original_sql: &str,
350) -> Result<StreamingStatement, ParseError> {
351 parser
352 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
353 .map_err(ParseError::SqlParseError)?;
354
355 let or_replace = parser.parse_keywords(&[
356 sqlparser::keywords::Keyword::OR,
357 sqlparser::keywords::Keyword::REPLACE,
358 ]);
359
360 tokenizer::expect_custom_keyword(parser, "STREAM")?;
361
362 let if_not_exists = parser.parse_keywords(&[
363 sqlparser::keywords::Keyword::IF,
364 sqlparser::keywords::Keyword::NOT,
365 sqlparser::keywords::Keyword::EXISTS,
366 ]);
367
368 let name = parser
369 .parse_object_name(false)
370 .map_err(ParseError::SqlParseError)?;
371
372 parser
373 .expect_keyword(sqlparser::keywords::Keyword::AS)
374 .map_err(ParseError::SqlParseError)?;
375
376 let remaining = collect_remaining_tokens(parser);
378 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
379
380 let stream_dialect = LaminarDialect::default();
381
382 let query = if query_tokens.is_empty() {
383 return Err(ParseError::StreamingError(
384 "Expected SELECT query after AS".to_string(),
385 ));
386 } else {
387 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
388 .with_tokens_with_locations(query_tokens);
389 query_parser
390 .parse_query()
391 .map_err(ParseError::SqlParseError)?
392 };
393
394 let query_stmt =
395 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
396
397 let emit_clause = if emit_tokens.is_empty() {
398 None
399 } else {
400 let mut emit_parser =
401 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
402 emit_parser::parse_emit_clause(&mut emit_parser)?
403 };
404
405 Ok(StreamingStatement::CreateStream {
406 name,
407 query: Box::new(query_stmt),
408 emit_clause,
409 or_replace,
410 if_not_exists,
411 })
412}
413
414fn parse_drop_stream(
422 parser: &mut sqlparser::parser::Parser,
423) -> Result<StreamingStatement, ParseError> {
424 parser
425 .expect_keyword(sqlparser::keywords::Keyword::DROP)
426 .map_err(ParseError::SqlParseError)?;
427 tokenizer::expect_custom_keyword(parser, "STREAM")?;
428 let if_exists = parser.parse_keywords(&[
429 sqlparser::keywords::Keyword::IF,
430 sqlparser::keywords::Keyword::EXISTS,
431 ]);
432 let name = parser
433 .parse_object_name(false)
434 .map_err(ParseError::SqlParseError)?;
435 Ok(StreamingStatement::DropStream { name, if_exists })
436}
437
438fn parse_describe(
446 parser: &mut sqlparser::parser::Parser,
447) -> Result<StreamingStatement, ParseError> {
448 let token = parser.next_token();
450 match &token.token {
451 sqlparser::tokenizer::Token::Word(w)
452 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
453 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
454 _ => {
455 return Err(ParseError::StreamingError(
456 "Expected DESCRIBE or DESC".to_string(),
457 ));
458 }
459 }
460 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
461 let name = parser
462 .parse_object_name(false)
463 .map_err(ParseError::SqlParseError)?;
464 Ok(StreamingStatement::Describe { name, extended })
465}
466
467fn parse_explain(
475 parser: &mut sqlparser::parser::Parser,
476 original_sql: &str,
477) -> Result<StreamingStatement, ParseError> {
478 parser
479 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
480 .map_err(ParseError::SqlParseError)?;
481
482 let explain_prefix_upper = original_sql.to_uppercase();
484 let inner_start = explain_prefix_upper
485 .find("EXPLAIN")
486 .map_or(0, |pos| pos + "EXPLAIN".len());
487 let inner_sql = original_sql[inner_start..].trim();
488
489 let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
491 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
492 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
493 })?;
494 Ok(StreamingStatement::Explain {
495 statement: Box::new(inner),
496 })
497}
498
499fn parse_create_materialized_view(
512 parser: &mut sqlparser::parser::Parser,
513) -> Result<StreamingStatement, ParseError> {
514 parser
515 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
516 .map_err(ParseError::SqlParseError)?;
517
518 let or_replace = parser.parse_keywords(&[
519 sqlparser::keywords::Keyword::OR,
520 sqlparser::keywords::Keyword::REPLACE,
521 ]);
522
523 parser
524 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
525 .map_err(ParseError::SqlParseError)?;
526 parser
527 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
528 .map_err(ParseError::SqlParseError)?;
529
530 let if_not_exists = parser.parse_keywords(&[
531 sqlparser::keywords::Keyword::IF,
532 sqlparser::keywords::Keyword::NOT,
533 sqlparser::keywords::Keyword::EXISTS,
534 ]);
535
536 let name = parser
537 .parse_object_name(false)
538 .map_err(ParseError::SqlParseError)?;
539
540 parser
541 .expect_keyword(sqlparser::keywords::Keyword::AS)
542 .map_err(ParseError::SqlParseError)?;
543
544 let remaining = collect_remaining_tokens(parser);
546 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
547
548 let mv_dialect = LaminarDialect::default();
549
550 let query = if query_tokens.is_empty() {
551 return Err(ParseError::StreamingError(
552 "Expected SELECT query after AS".to_string(),
553 ));
554 } else {
555 let mut query_parser =
556 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
557 query_parser
558 .parse_query()
559 .map_err(ParseError::SqlParseError)?
560 };
561
562 let query_stmt =
563 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
564
565 let emit_clause = if emit_tokens.is_empty() {
566 None
567 } else {
568 let mut emit_parser =
569 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
570 emit_parser::parse_emit_clause(&mut emit_parser)?
571 };
572
573 Ok(StreamingStatement::CreateMaterializedView {
574 name,
575 query: Box::new(query_stmt),
576 emit_clause,
577 or_replace,
578 if_not_exists,
579 })
580}
581
582fn collect_remaining_tokens(
584 parser: &mut sqlparser::parser::Parser,
585) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
586 let mut tokens = Vec::new();
587 loop {
588 let token = parser.next_token();
589 if token.token == sqlparser::tokenizer::Token::EOF {
590 tokens.push(token);
591 break;
592 }
593 tokens.push(token);
594 }
595 tokens
596}
597
598fn split_at_emit(
603 tokens: &[sqlparser::tokenizer::TokenWithSpan],
604) -> (
605 Vec<sqlparser::tokenizer::TokenWithSpan>,
606 Vec<sqlparser::tokenizer::TokenWithSpan>,
607) {
608 let mut depth: i32 = 0;
609 for (i, token) in tokens.iter().enumerate() {
610 match &token.token {
611 sqlparser::tokenizer::Token::LParen => depth += 1,
612 sqlparser::tokenizer::Token::RParen => {
613 depth -= 1;
614 }
615 sqlparser::tokenizer::Token::Word(w)
616 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
617 {
618 let mut query_tokens = tokens[..i].to_vec();
619 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
620 token: sqlparser::tokenizer::Token::EOF,
621 span: sqlparser::tokenizer::Span::empty(),
622 });
623 let emit_tokens = tokens[i..].to_vec();
624 return (query_tokens, emit_tokens);
625 }
626 _ => {}
627 }
628 }
629 (tokens.to_vec(), vec![])
630}
631
632#[derive(Debug, thiserror::Error)]
634pub enum ParseError {
635 #[error("SQL parse error: {0}")]
637 SqlParseError(#[from] sqlparser::parser::ParserError),
638
639 #[error("Streaming SQL error: {0}")]
641 StreamingError(String),
642
643 #[error("Window function error: {0}")]
645 WindowError(String),
646
647 #[error("Validation error: {0}")]
649 ValidationError(String),
650}
651
652#[cfg(test)]
653mod tests {
654 use super::*;
655
656 fn parse_one(sql: &str) -> StreamingStatement {
658 let stmts = StreamingParser::parse_sql(sql).unwrap();
659 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
660 stmts.into_iter().next().unwrap()
661 }
662
663 #[test]
664 fn test_parse_drop_source() {
665 let stmt = parse_one("DROP SOURCE events");
666 match stmt {
667 StreamingStatement::DropSource { name, if_exists } => {
668 assert_eq!(name.to_string(), "events");
669 assert!(!if_exists);
670 }
671 _ => panic!("Expected DropSource, got {stmt:?}"),
672 }
673 }
674
675 #[test]
676 fn test_parse_drop_source_if_exists() {
677 let stmt = parse_one("DROP SOURCE IF EXISTS events");
678 match stmt {
679 StreamingStatement::DropSource { name, if_exists } => {
680 assert_eq!(name.to_string(), "events");
681 assert!(if_exists);
682 }
683 _ => panic!("Expected DropSource, got {stmt:?}"),
684 }
685 }
686
687 #[test]
688 fn test_parse_drop_sink() {
689 let stmt = parse_one("DROP SINK output");
690 match stmt {
691 StreamingStatement::DropSink { name, if_exists } => {
692 assert_eq!(name.to_string(), "output");
693 assert!(!if_exists);
694 }
695 _ => panic!("Expected DropSink, got {stmt:?}"),
696 }
697 }
698
699 #[test]
700 fn test_parse_drop_sink_if_exists() {
701 let stmt = parse_one("DROP SINK IF EXISTS output");
702 match stmt {
703 StreamingStatement::DropSink { name, if_exists } => {
704 assert_eq!(name.to_string(), "output");
705 assert!(if_exists);
706 }
707 _ => panic!("Expected DropSink, got {stmt:?}"),
708 }
709 }
710
711 #[test]
712 fn test_parse_drop_materialized_view() {
713 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
714 match stmt {
715 StreamingStatement::DropMaterializedView {
716 name,
717 if_exists,
718 cascade,
719 } => {
720 assert_eq!(name.to_string(), "live_stats");
721 assert!(!if_exists);
722 assert!(!cascade);
723 }
724 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
725 }
726 }
727
728 #[test]
729 fn test_parse_drop_materialized_view_if_exists_cascade() {
730 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
731 match stmt {
732 StreamingStatement::DropMaterializedView {
733 name,
734 if_exists,
735 cascade,
736 } => {
737 assert_eq!(name.to_string(), "live_stats");
738 assert!(if_exists);
739 assert!(cascade);
740 }
741 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
742 }
743 }
744
745 #[test]
746 fn test_parse_show_sources() {
747 let stmt = parse_one("SHOW SOURCES");
748 assert!(matches!(
749 stmt,
750 StreamingStatement::Show(ShowCommand::Sources)
751 ));
752 }
753
754 #[test]
755 fn test_parse_show_sinks() {
756 let stmt = parse_one("SHOW SINKS");
757 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
758 }
759
760 #[test]
761 fn test_parse_show_queries() {
762 let stmt = parse_one("SHOW QUERIES");
763 assert!(matches!(
764 stmt,
765 StreamingStatement::Show(ShowCommand::Queries)
766 ));
767 }
768
769 #[test]
770 fn test_parse_show_materialized_views() {
771 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
772 assert!(matches!(
773 stmt,
774 StreamingStatement::Show(ShowCommand::MaterializedViews)
775 ));
776 }
777
778 #[test]
779 fn test_parse_describe() {
780 let stmt = parse_one("DESCRIBE events");
781 match stmt {
782 StreamingStatement::Describe { name, extended } => {
783 assert_eq!(name.to_string(), "events");
784 assert!(!extended);
785 }
786 _ => panic!("Expected Describe, got {stmt:?}"),
787 }
788 }
789
790 #[test]
791 fn test_parse_describe_extended() {
792 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
793 match stmt {
794 StreamingStatement::Describe { name, extended } => {
795 assert_eq!(name.to_string(), "my_schema.events");
796 assert!(extended);
797 }
798 _ => panic!("Expected Describe, got {stmt:?}"),
799 }
800 }
801
802 #[test]
803 fn test_parse_explain_select() {
804 let stmt = parse_one("EXPLAIN SELECT * FROM events");
805 match stmt {
806 StreamingStatement::Explain { statement } => {
807 assert!(matches!(*statement, StreamingStatement::Standard(_)));
808 }
809 _ => panic!("Expected Explain, got {stmt:?}"),
810 }
811 }
812
813 #[test]
814 fn test_parse_explain_create_source() {
815 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
816 match stmt {
817 StreamingStatement::Explain { statement } => {
818 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
819 }
820 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
821 }
822 }
823
824 #[test]
825 fn test_parse_create_materialized_view() {
826 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
827 match stmt {
828 StreamingStatement::CreateMaterializedView {
829 name,
830 emit_clause,
831 or_replace,
832 if_not_exists,
833 ..
834 } => {
835 assert_eq!(name.to_string(), "live_stats");
836 assert!(emit_clause.is_none());
837 assert!(!or_replace);
838 assert!(!if_not_exists);
839 }
840 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
841 }
842 }
843
844 #[test]
845 fn test_parse_create_materialized_view_with_emit() {
846 let stmt = parse_one(
847 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
848 );
849 match stmt {
850 StreamingStatement::CreateMaterializedView {
851 name, emit_clause, ..
852 } => {
853 assert_eq!(name.to_string(), "live_stats");
854 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
855 }
856 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
857 }
858 }
859
860 #[test]
861 fn test_parse_create_or_replace_materialized_view() {
862 let stmt = parse_one(
863 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
864 );
865 match stmt {
866 StreamingStatement::CreateMaterializedView {
867 name,
868 or_replace,
869 if_not_exists,
870 ..
871 } => {
872 assert_eq!(name.to_string(), "live_stats");
873 assert!(or_replace);
874 assert!(!if_not_exists);
875 }
876 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
877 }
878 }
879
880 #[test]
881 fn test_parse_create_materialized_view_if_not_exists() {
882 let stmt = parse_one(
883 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
884 );
885 match stmt {
886 StreamingStatement::CreateMaterializedView {
887 name,
888 or_replace,
889 if_not_exists,
890 ..
891 } => {
892 assert_eq!(name.to_string(), "live_stats");
893 assert!(!or_replace);
894 assert!(if_not_exists);
895 }
896 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
897 }
898 }
899
900 #[test]
901 fn test_parse_insert_into() {
902 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
903 match stmt {
904 StreamingStatement::InsertInto {
905 table_name,
906 columns,
907 values,
908 } => {
909 assert_eq!(table_name.to_string(), "events");
910 assert_eq!(columns.len(), 2);
911 assert_eq!(columns[0].to_string(), "id");
912 assert_eq!(columns[1].to_string(), "name");
913 assert_eq!(values.len(), 1);
914 assert_eq!(values[0].len(), 2);
915 }
916 _ => panic!("Expected InsertInto, got {stmt:?}"),
917 }
918 }
919
920 #[test]
921 fn test_parse_insert_into_multiple_rows() {
922 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
923 match stmt {
924 StreamingStatement::InsertInto {
925 table_name,
926 columns,
927 values,
928 } => {
929 assert_eq!(table_name.to_string(), "events");
930 assert!(columns.is_empty());
931 assert_eq!(values.len(), 3);
932 }
933 _ => panic!("Expected InsertInto, got {stmt:?}"),
934 }
935 }
936
937 #[test]
940 fn test_parse_create_stream() {
941 let stmt = parse_one(
942 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
943 );
944 match stmt {
945 StreamingStatement::CreateStream {
946 name,
947 or_replace,
948 if_not_exists,
949 emit_clause,
950 ..
951 } => {
952 assert_eq!(name.to_string(), "session_activity");
953 assert!(!or_replace);
954 assert!(!if_not_exists);
955 assert!(emit_clause.is_none());
956 }
957 _ => panic!("Expected CreateStream, got {stmt:?}"),
958 }
959 }
960
961 #[test]
962 fn test_parse_create_or_replace_stream() {
963 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
964 match stmt {
965 StreamingStatement::CreateStream { or_replace, .. } => {
966 assert!(or_replace);
967 }
968 _ => panic!("Expected CreateStream, got {stmt:?}"),
969 }
970 }
971
972 #[test]
973 fn test_parse_create_stream_if_not_exists() {
974 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
975 match stmt {
976 StreamingStatement::CreateStream { if_not_exists, .. } => {
977 assert!(if_not_exists);
978 }
979 _ => panic!("Expected CreateStream, got {stmt:?}"),
980 }
981 }
982
983 #[test]
984 fn test_parse_create_stream_with_emit() {
985 let stmt =
986 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
987 match stmt {
988 StreamingStatement::CreateStream { emit_clause, .. } => {
989 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
990 }
991 _ => panic!("Expected CreateStream, got {stmt:?}"),
992 }
993 }
994
995 #[test]
996 fn test_parse_drop_stream() {
997 let stmt = parse_one("DROP STREAM my_stream");
998 match stmt {
999 StreamingStatement::DropStream { name, if_exists } => {
1000 assert_eq!(name.to_string(), "my_stream");
1001 assert!(!if_exists);
1002 }
1003 _ => panic!("Expected DropStream, got {stmt:?}"),
1004 }
1005 }
1006
1007 #[test]
1008 fn test_parse_drop_stream_if_exists() {
1009 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1010 match stmt {
1011 StreamingStatement::DropStream { name, if_exists } => {
1012 assert_eq!(name.to_string(), "my_stream");
1013 assert!(if_exists);
1014 }
1015 _ => panic!("Expected DropStream, got {stmt:?}"),
1016 }
1017 }
1018
1019 #[test]
1020 fn test_parse_show_streams() {
1021 let stmt = parse_one("SHOW STREAMS");
1022 assert!(matches!(
1023 stmt,
1024 StreamingStatement::Show(ShowCommand::Streams)
1025 ));
1026 }
1027}