1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14pub mod order_analyzer;
15mod sink_parser;
16mod source_parser;
17mod statements;
18mod tokenizer;
19mod window_rewriter;
20
21pub use statements::{
22 CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy, FormatSpec,
23 LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef, WindowFunction,
24};
25pub use window_rewriter::WindowRewriter;
26
27use dialect::LaminarDialect;
28use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
29
30pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
39 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
40}
41
42pub struct StreamingParser;
48
49impl StreamingParser {
50 #[allow(clippy::too_many_lines)]
63 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
64 let sql_trimmed = sql.trim();
65 if sql_trimmed.is_empty() {
66 return Err(sqlparser::parser::ParserError::ParserError(
67 "Empty SQL statement".to_string(),
68 ));
69 }
70
71 let dialect = LaminarDialect::default();
72
73 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
75 .tokenize_with_location()
76 .map_err(|e| {
77 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
78 })?;
79
80 match detect_streaming_ddl(&tokens) {
82 StreamingDdlKind::CreateSource { .. } => {
83 let mut parser =
84 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
85 let source = source_parser::parse_create_source(&mut parser)
86 .map_err(parse_error_to_parser_error)?;
87 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
88 }
89 StreamingDdlKind::CreateSink { .. } => {
90 let mut parser =
91 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
92 let sink = sink_parser::parse_create_sink(&mut parser)
93 .map_err(parse_error_to_parser_error)?;
94 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
95 }
96 StreamingDdlKind::CreateContinuousQuery { .. } => {
97 let mut parser =
98 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
99 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
100 .map_err(parse_error_to_parser_error)?;
101 Ok(vec![stmt])
102 }
103 StreamingDdlKind::DropSource { .. } => {
104 let mut parser =
105 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
106 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
107 Ok(vec![stmt])
108 }
109 StreamingDdlKind::DropSink { .. } => {
110 let mut parser =
111 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
113 Ok(vec![stmt])
114 }
115 StreamingDdlKind::DropMaterializedView { .. } => {
116 let mut parser =
117 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118 let stmt = parse_drop_materialized_view(&mut parser)
119 .map_err(parse_error_to_parser_error)?;
120 Ok(vec![stmt])
121 }
122 StreamingDdlKind::ShowSources => {
123 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
124 }
125 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
126 StreamingDdlKind::ShowQueries => {
127 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
128 }
129 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
130 ShowCommand::MaterializedViews,
131 )]),
132 StreamingDdlKind::DescribeSource => {
133 let mut parser =
134 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
135 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
136 Ok(vec![stmt])
137 }
138 StreamingDdlKind::ExplainStreaming => {
139 let mut parser =
140 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141 let stmt =
142 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
143 Ok(vec![stmt])
144 }
145 StreamingDdlKind::CreateMaterializedView { .. } => {
146 let mut parser =
147 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
148 let stmt = parse_create_materialized_view(&mut parser)
149 .map_err(parse_error_to_parser_error)?;
150 Ok(vec![stmt])
151 }
152 StreamingDdlKind::CreateStream { .. } => {
153 let mut parser =
154 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
155 let stmt = parse_create_stream(&mut parser, sql_trimmed)
156 .map_err(parse_error_to_parser_error)?;
157 Ok(vec![stmt])
158 }
159 StreamingDdlKind::DropStream { .. } => {
160 let mut parser =
161 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
162 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
163 Ok(vec![stmt])
164 }
165 StreamingDdlKind::ShowStreams => {
166 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
167 }
168 StreamingDdlKind::None => {
169 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
171 Ok(statements
172 .into_iter()
173 .map(convert_standard_statement)
174 .collect())
175 }
176 }
177 }
178
179 #[must_use]
181 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
182 match expr {
183 sqlparser::ast::Expr::Function(func) => {
184 if let Some(name) = func.name.0.last() {
185 let func_name = name.to_string().to_uppercase();
186 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
187 } else {
188 false
189 }
190 }
191 _ => false,
192 }
193 }
194
195 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
201 emit_parser::parse_emit_clause_from_sql(sql)
202 }
203
204 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
210 late_data_parser::parse_late_data_clause_from_sql(sql)
211 }
212}
213
214fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
216 match e {
217 ParseError::SqlParseError(pe) => pe,
218 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
219 ParseError::WindowError(msg) => {
220 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
221 }
222 ParseError::ValidationError(msg) => {
223 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
224 }
225 }
226}
227
228fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
233 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
234 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
236 let table_name = name.clone();
237 let columns = insert.columns.clone();
238
239 if let Some(ref source) = insert.source {
241 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
242 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
243 return StreamingStatement::InsertInto {
244 table_name,
245 columns,
246 values: rows,
247 };
248 }
249 }
250 }
251 }
252 StreamingStatement::Standard(Box::new(stmt))
253}
254
255fn parse_drop_source(
263 parser: &mut sqlparser::parser::Parser,
264) -> Result<StreamingStatement, ParseError> {
265 parser
266 .expect_keyword(sqlparser::keywords::Keyword::DROP)
267 .map_err(ParseError::SqlParseError)?;
268 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
269 let if_exists = parser.parse_keywords(&[
270 sqlparser::keywords::Keyword::IF,
271 sqlparser::keywords::Keyword::EXISTS,
272 ]);
273 let name = parser
274 .parse_object_name(false)
275 .map_err(ParseError::SqlParseError)?;
276 Ok(StreamingStatement::DropSource { name, if_exists })
277}
278
279fn parse_drop_sink(
287 parser: &mut sqlparser::parser::Parser,
288) -> Result<StreamingStatement, ParseError> {
289 parser
290 .expect_keyword(sqlparser::keywords::Keyword::DROP)
291 .map_err(ParseError::SqlParseError)?;
292 tokenizer::expect_custom_keyword(parser, "SINK")?;
293 let if_exists = parser.parse_keywords(&[
294 sqlparser::keywords::Keyword::IF,
295 sqlparser::keywords::Keyword::EXISTS,
296 ]);
297 let name = parser
298 .parse_object_name(false)
299 .map_err(ParseError::SqlParseError)?;
300 Ok(StreamingStatement::DropSink { name, if_exists })
301}
302
303fn parse_drop_materialized_view(
311 parser: &mut sqlparser::parser::Parser,
312) -> Result<StreamingStatement, ParseError> {
313 parser
314 .expect_keyword(sqlparser::keywords::Keyword::DROP)
315 .map_err(ParseError::SqlParseError)?;
316 parser
317 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
318 .map_err(ParseError::SqlParseError)?;
319 parser
320 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
321 .map_err(ParseError::SqlParseError)?;
322 let if_exists = parser.parse_keywords(&[
323 sqlparser::keywords::Keyword::IF,
324 sqlparser::keywords::Keyword::EXISTS,
325 ]);
326 let name = parser
327 .parse_object_name(false)
328 .map_err(ParseError::SqlParseError)?;
329 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
330 Ok(StreamingStatement::DropMaterializedView {
331 name,
332 if_exists,
333 cascade,
334 })
335}
336
337fn parse_create_stream(
345 parser: &mut sqlparser::parser::Parser,
346 _original_sql: &str,
347) -> Result<StreamingStatement, ParseError> {
348 parser
349 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
350 .map_err(ParseError::SqlParseError)?;
351
352 let or_replace = parser.parse_keywords(&[
353 sqlparser::keywords::Keyword::OR,
354 sqlparser::keywords::Keyword::REPLACE,
355 ]);
356
357 tokenizer::expect_custom_keyword(parser, "STREAM")?;
358
359 let if_not_exists = parser.parse_keywords(&[
360 sqlparser::keywords::Keyword::IF,
361 sqlparser::keywords::Keyword::NOT,
362 sqlparser::keywords::Keyword::EXISTS,
363 ]);
364
365 let name = parser
366 .parse_object_name(false)
367 .map_err(ParseError::SqlParseError)?;
368
369 parser
370 .expect_keyword(sqlparser::keywords::Keyword::AS)
371 .map_err(ParseError::SqlParseError)?;
372
373 let remaining = collect_remaining_tokens(parser);
375 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
376
377 let stream_dialect = LaminarDialect::default();
378
379 let query = if query_tokens.is_empty() {
380 return Err(ParseError::StreamingError(
381 "Expected SELECT query after AS".to_string(),
382 ));
383 } else {
384 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
385 .with_tokens_with_locations(query_tokens);
386 query_parser
387 .parse_query()
388 .map_err(ParseError::SqlParseError)?
389 };
390
391 let query_stmt =
392 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
393
394 let emit_clause = if emit_tokens.is_empty() {
395 None
396 } else {
397 let mut emit_parser =
398 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
399 emit_parser::parse_emit_clause(&mut emit_parser)?
400 };
401
402 Ok(StreamingStatement::CreateStream {
403 name,
404 query: Box::new(query_stmt),
405 emit_clause,
406 or_replace,
407 if_not_exists,
408 })
409}
410
411fn parse_drop_stream(
419 parser: &mut sqlparser::parser::Parser,
420) -> Result<StreamingStatement, ParseError> {
421 parser
422 .expect_keyword(sqlparser::keywords::Keyword::DROP)
423 .map_err(ParseError::SqlParseError)?;
424 tokenizer::expect_custom_keyword(parser, "STREAM")?;
425 let if_exists = parser.parse_keywords(&[
426 sqlparser::keywords::Keyword::IF,
427 sqlparser::keywords::Keyword::EXISTS,
428 ]);
429 let name = parser
430 .parse_object_name(false)
431 .map_err(ParseError::SqlParseError)?;
432 Ok(StreamingStatement::DropStream { name, if_exists })
433}
434
435fn parse_describe(
443 parser: &mut sqlparser::parser::Parser,
444) -> Result<StreamingStatement, ParseError> {
445 let token = parser.next_token();
447 match &token.token {
448 sqlparser::tokenizer::Token::Word(w)
449 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
450 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
451 _ => {
452 return Err(ParseError::StreamingError(
453 "Expected DESCRIBE or DESC".to_string(),
454 ));
455 }
456 }
457 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
458 let name = parser
459 .parse_object_name(false)
460 .map_err(ParseError::SqlParseError)?;
461 Ok(StreamingStatement::Describe { name, extended })
462}
463
464fn parse_explain(
472 parser: &mut sqlparser::parser::Parser,
473 original_sql: &str,
474) -> Result<StreamingStatement, ParseError> {
475 parser
476 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
477 .map_err(ParseError::SqlParseError)?;
478
479 let explain_prefix_upper = original_sql.to_uppercase();
481 let inner_start = explain_prefix_upper
482 .find("EXPLAIN")
483 .map_or(0, |pos| pos + "EXPLAIN".len());
484 let inner_sql = original_sql[inner_start..].trim();
485
486 let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
488 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
489 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
490 })?;
491 Ok(StreamingStatement::Explain {
492 statement: Box::new(inner),
493 })
494}
495
496fn parse_create_materialized_view(
509 parser: &mut sqlparser::parser::Parser,
510) -> Result<StreamingStatement, ParseError> {
511 parser
512 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
513 .map_err(ParseError::SqlParseError)?;
514
515 let or_replace = parser.parse_keywords(&[
516 sqlparser::keywords::Keyword::OR,
517 sqlparser::keywords::Keyword::REPLACE,
518 ]);
519
520 parser
521 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
522 .map_err(ParseError::SqlParseError)?;
523 parser
524 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
525 .map_err(ParseError::SqlParseError)?;
526
527 let if_not_exists = parser.parse_keywords(&[
528 sqlparser::keywords::Keyword::IF,
529 sqlparser::keywords::Keyword::NOT,
530 sqlparser::keywords::Keyword::EXISTS,
531 ]);
532
533 let name = parser
534 .parse_object_name(false)
535 .map_err(ParseError::SqlParseError)?;
536
537 parser
538 .expect_keyword(sqlparser::keywords::Keyword::AS)
539 .map_err(ParseError::SqlParseError)?;
540
541 let remaining = collect_remaining_tokens(parser);
543 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
544
545 let mv_dialect = LaminarDialect::default();
546
547 let query = if query_tokens.is_empty() {
548 return Err(ParseError::StreamingError(
549 "Expected SELECT query after AS".to_string(),
550 ));
551 } else {
552 let mut query_parser =
553 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
554 query_parser
555 .parse_query()
556 .map_err(ParseError::SqlParseError)?
557 };
558
559 let query_stmt =
560 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
561
562 let emit_clause = if emit_tokens.is_empty() {
563 None
564 } else {
565 let mut emit_parser =
566 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
567 emit_parser::parse_emit_clause(&mut emit_parser)?
568 };
569
570 Ok(StreamingStatement::CreateMaterializedView {
571 name,
572 query: Box::new(query_stmt),
573 emit_clause,
574 or_replace,
575 if_not_exists,
576 })
577}
578
579fn collect_remaining_tokens(
581 parser: &mut sqlparser::parser::Parser,
582) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
583 let mut tokens = Vec::new();
584 loop {
585 let token = parser.next_token();
586 if token.token == sqlparser::tokenizer::Token::EOF {
587 tokens.push(token);
588 break;
589 }
590 tokens.push(token);
591 }
592 tokens
593}
594
595fn split_at_emit(
600 tokens: &[sqlparser::tokenizer::TokenWithSpan],
601) -> (
602 Vec<sqlparser::tokenizer::TokenWithSpan>,
603 Vec<sqlparser::tokenizer::TokenWithSpan>,
604) {
605 let mut depth: i32 = 0;
606 for (i, token) in tokens.iter().enumerate() {
607 match &token.token {
608 sqlparser::tokenizer::Token::LParen => depth += 1,
609 sqlparser::tokenizer::Token::RParen => {
610 depth -= 1;
611 }
612 sqlparser::tokenizer::Token::Word(w)
613 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
614 {
615 let mut query_tokens = tokens[..i].to_vec();
616 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
617 token: sqlparser::tokenizer::Token::EOF,
618 span: sqlparser::tokenizer::Span::empty(),
619 });
620 let emit_tokens = tokens[i..].to_vec();
621 return (query_tokens, emit_tokens);
622 }
623 _ => {}
624 }
625 }
626 (tokens.to_vec(), vec![])
627}
628
629#[derive(Debug, thiserror::Error)]
631pub enum ParseError {
632 #[error("SQL parse error: {0}")]
634 SqlParseError(#[from] sqlparser::parser::ParserError),
635
636 #[error("Streaming SQL error: {0}")]
638 StreamingError(String),
639
640 #[error("Window function error: {0}")]
642 WindowError(String),
643
644 #[error("Validation error: {0}")]
646 ValidationError(String),
647}
648
649#[cfg(test)]
650mod tests {
651 use super::*;
652
653 fn parse_one(sql: &str) -> StreamingStatement {
655 let stmts = StreamingParser::parse_sql(sql).unwrap();
656 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
657 stmts.into_iter().next().unwrap()
658 }
659
660 #[test]
661 fn test_parse_drop_source() {
662 let stmt = parse_one("DROP SOURCE events");
663 match stmt {
664 StreamingStatement::DropSource { name, if_exists } => {
665 assert_eq!(name.to_string(), "events");
666 assert!(!if_exists);
667 }
668 _ => panic!("Expected DropSource, got {stmt:?}"),
669 }
670 }
671
672 #[test]
673 fn test_parse_drop_source_if_exists() {
674 let stmt = parse_one("DROP SOURCE IF EXISTS events");
675 match stmt {
676 StreamingStatement::DropSource { name, if_exists } => {
677 assert_eq!(name.to_string(), "events");
678 assert!(if_exists);
679 }
680 _ => panic!("Expected DropSource, got {stmt:?}"),
681 }
682 }
683
684 #[test]
685 fn test_parse_drop_sink() {
686 let stmt = parse_one("DROP SINK output");
687 match stmt {
688 StreamingStatement::DropSink { name, if_exists } => {
689 assert_eq!(name.to_string(), "output");
690 assert!(!if_exists);
691 }
692 _ => panic!("Expected DropSink, got {stmt:?}"),
693 }
694 }
695
696 #[test]
697 fn test_parse_drop_sink_if_exists() {
698 let stmt = parse_one("DROP SINK IF EXISTS output");
699 match stmt {
700 StreamingStatement::DropSink { name, if_exists } => {
701 assert_eq!(name.to_string(), "output");
702 assert!(if_exists);
703 }
704 _ => panic!("Expected DropSink, got {stmt:?}"),
705 }
706 }
707
708 #[test]
709 fn test_parse_drop_materialized_view() {
710 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
711 match stmt {
712 StreamingStatement::DropMaterializedView {
713 name,
714 if_exists,
715 cascade,
716 } => {
717 assert_eq!(name.to_string(), "live_stats");
718 assert!(!if_exists);
719 assert!(!cascade);
720 }
721 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
722 }
723 }
724
725 #[test]
726 fn test_parse_drop_materialized_view_if_exists_cascade() {
727 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
728 match stmt {
729 StreamingStatement::DropMaterializedView {
730 name,
731 if_exists,
732 cascade,
733 } => {
734 assert_eq!(name.to_string(), "live_stats");
735 assert!(if_exists);
736 assert!(cascade);
737 }
738 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
739 }
740 }
741
742 #[test]
743 fn test_parse_show_sources() {
744 let stmt = parse_one("SHOW SOURCES");
745 assert!(matches!(
746 stmt,
747 StreamingStatement::Show(ShowCommand::Sources)
748 ));
749 }
750
751 #[test]
752 fn test_parse_show_sinks() {
753 let stmt = parse_one("SHOW SINKS");
754 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
755 }
756
757 #[test]
758 fn test_parse_show_queries() {
759 let stmt = parse_one("SHOW QUERIES");
760 assert!(matches!(
761 stmt,
762 StreamingStatement::Show(ShowCommand::Queries)
763 ));
764 }
765
766 #[test]
767 fn test_parse_show_materialized_views() {
768 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
769 assert!(matches!(
770 stmt,
771 StreamingStatement::Show(ShowCommand::MaterializedViews)
772 ));
773 }
774
775 #[test]
776 fn test_parse_describe() {
777 let stmt = parse_one("DESCRIBE events");
778 match stmt {
779 StreamingStatement::Describe { name, extended } => {
780 assert_eq!(name.to_string(), "events");
781 assert!(!extended);
782 }
783 _ => panic!("Expected Describe, got {stmt:?}"),
784 }
785 }
786
787 #[test]
788 fn test_parse_describe_extended() {
789 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
790 match stmt {
791 StreamingStatement::Describe { name, extended } => {
792 assert_eq!(name.to_string(), "my_schema.events");
793 assert!(extended);
794 }
795 _ => panic!("Expected Describe, got {stmt:?}"),
796 }
797 }
798
799 #[test]
800 fn test_parse_explain_select() {
801 let stmt = parse_one("EXPLAIN SELECT * FROM events");
802 match stmt {
803 StreamingStatement::Explain { statement } => {
804 assert!(matches!(*statement, StreamingStatement::Standard(_)));
805 }
806 _ => panic!("Expected Explain, got {stmt:?}"),
807 }
808 }
809
810 #[test]
811 fn test_parse_explain_create_source() {
812 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
813 match stmt {
814 StreamingStatement::Explain { statement } => {
815 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
816 }
817 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
818 }
819 }
820
821 #[test]
822 fn test_parse_create_materialized_view() {
823 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
824 match stmt {
825 StreamingStatement::CreateMaterializedView {
826 name,
827 emit_clause,
828 or_replace,
829 if_not_exists,
830 ..
831 } => {
832 assert_eq!(name.to_string(), "live_stats");
833 assert!(emit_clause.is_none());
834 assert!(!or_replace);
835 assert!(!if_not_exists);
836 }
837 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
838 }
839 }
840
841 #[test]
842 fn test_parse_create_materialized_view_with_emit() {
843 let stmt = parse_one(
844 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
845 );
846 match stmt {
847 StreamingStatement::CreateMaterializedView {
848 name, emit_clause, ..
849 } => {
850 assert_eq!(name.to_string(), "live_stats");
851 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
852 }
853 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
854 }
855 }
856
857 #[test]
858 fn test_parse_create_or_replace_materialized_view() {
859 let stmt = parse_one(
860 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
861 );
862 match stmt {
863 StreamingStatement::CreateMaterializedView {
864 name,
865 or_replace,
866 if_not_exists,
867 ..
868 } => {
869 assert_eq!(name.to_string(), "live_stats");
870 assert!(or_replace);
871 assert!(!if_not_exists);
872 }
873 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
874 }
875 }
876
877 #[test]
878 fn test_parse_create_materialized_view_if_not_exists() {
879 let stmt = parse_one(
880 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
881 );
882 match stmt {
883 StreamingStatement::CreateMaterializedView {
884 name,
885 or_replace,
886 if_not_exists,
887 ..
888 } => {
889 assert_eq!(name.to_string(), "live_stats");
890 assert!(!or_replace);
891 assert!(if_not_exists);
892 }
893 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
894 }
895 }
896
897 #[test]
898 fn test_parse_insert_into() {
899 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
900 match stmt {
901 StreamingStatement::InsertInto {
902 table_name,
903 columns,
904 values,
905 } => {
906 assert_eq!(table_name.to_string(), "events");
907 assert_eq!(columns.len(), 2);
908 assert_eq!(columns[0].to_string(), "id");
909 assert_eq!(columns[1].to_string(), "name");
910 assert_eq!(values.len(), 1);
911 assert_eq!(values[0].len(), 2);
912 }
913 _ => panic!("Expected InsertInto, got {stmt:?}"),
914 }
915 }
916
917 #[test]
918 fn test_parse_insert_into_multiple_rows() {
919 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
920 match stmt {
921 StreamingStatement::InsertInto {
922 table_name,
923 columns,
924 values,
925 } => {
926 assert_eq!(table_name.to_string(), "events");
927 assert!(columns.is_empty());
928 assert_eq!(values.len(), 3);
929 }
930 _ => panic!("Expected InsertInto, got {stmt:?}"),
931 }
932 }
933
934 #[test]
937 fn test_parse_create_stream() {
938 let stmt = parse_one(
939 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
940 );
941 match stmt {
942 StreamingStatement::CreateStream {
943 name,
944 or_replace,
945 if_not_exists,
946 emit_clause,
947 ..
948 } => {
949 assert_eq!(name.to_string(), "session_activity");
950 assert!(!or_replace);
951 assert!(!if_not_exists);
952 assert!(emit_clause.is_none());
953 }
954 _ => panic!("Expected CreateStream, got {stmt:?}"),
955 }
956 }
957
958 #[test]
959 fn test_parse_create_or_replace_stream() {
960 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
961 match stmt {
962 StreamingStatement::CreateStream { or_replace, .. } => {
963 assert!(or_replace);
964 }
965 _ => panic!("Expected CreateStream, got {stmt:?}"),
966 }
967 }
968
969 #[test]
970 fn test_parse_create_stream_if_not_exists() {
971 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
972 match stmt {
973 StreamingStatement::CreateStream { if_not_exists, .. } => {
974 assert!(if_not_exists);
975 }
976 _ => panic!("Expected CreateStream, got {stmt:?}"),
977 }
978 }
979
980 #[test]
981 fn test_parse_create_stream_with_emit() {
982 let stmt =
983 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
984 match stmt {
985 StreamingStatement::CreateStream { emit_clause, .. } => {
986 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
987 }
988 _ => panic!("Expected CreateStream, got {stmt:?}"),
989 }
990 }
991
992 #[test]
993 fn test_parse_drop_stream() {
994 let stmt = parse_one("DROP STREAM my_stream");
995 match stmt {
996 StreamingStatement::DropStream { name, if_exists } => {
997 assert_eq!(name.to_string(), "my_stream");
998 assert!(!if_exists);
999 }
1000 _ => panic!("Expected DropStream, got {stmt:?}"),
1001 }
1002 }
1003
1004 #[test]
1005 fn test_parse_drop_stream_if_exists() {
1006 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1007 match stmt {
1008 StreamingStatement::DropStream { name, if_exists } => {
1009 assert_eq!(name.to_string(), "my_stream");
1010 assert!(if_exists);
1011 }
1012 _ => panic!("Expected DropStream, got {stmt:?}"),
1013 }
1014 }
1015
1016 #[test]
1017 fn test_parse_show_streams() {
1018 let stmt = parse_one("SHOW STREAMS");
1019 assert!(matches!(
1020 stmt,
1021 StreamingStatement::Show(ShowCommand::Streams)
1022 ));
1023 }
1024}