1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29 WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48pub struct StreamingParser;
54
55impl StreamingParser {
56 #[allow(clippy::too_many_lines)]
69 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70 let sql_trimmed = sql.trim();
71 if sql_trimmed.is_empty() {
72 return Err(sqlparser::parser::ParserError::ParserError(
73 "Empty SQL statement".to_string(),
74 ));
75 }
76
77 let dialect = LaminarDialect::default();
78
79 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81 .tokenize_with_location()
82 .map_err(|e| {
83 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84 })?;
85
86 match detect_streaming_ddl(&tokens) {
88 StreamingDdlKind::CreateSource { .. } => {
89 let mut parser =
90 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91 let source = source_parser::parse_create_source(&mut parser)
92 .map_err(parse_error_to_parser_error)?;
93 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94 }
95 StreamingDdlKind::CreateSink { .. } => {
96 let mut parser =
97 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98 let sink = sink_parser::parse_create_sink(&mut parser)
99 .map_err(parse_error_to_parser_error)?;
100 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101 }
102 StreamingDdlKind::CreateContinuousQuery { .. } => {
103 let mut parser =
104 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106 .map_err(parse_error_to_parser_error)?;
107 Ok(vec![stmt])
108 }
109 StreamingDdlKind::DropSource { .. } => {
110 let mut parser =
111 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113 Ok(vec![stmt])
114 }
115 StreamingDdlKind::DropSink { .. } => {
116 let mut parser =
117 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119 Ok(vec![stmt])
120 }
121 StreamingDdlKind::DropMaterializedView { .. } => {
122 let mut parser =
123 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124 let stmt = parse_drop_materialized_view(&mut parser)
125 .map_err(parse_error_to_parser_error)?;
126 Ok(vec![stmt])
127 }
128 StreamingDdlKind::ShowSources => {
129 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130 }
131 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132 StreamingDdlKind::ShowQueries => {
133 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134 }
135 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136 ShowCommand::MaterializedViews,
137 )]),
138 StreamingDdlKind::DescribeSource => {
139 let mut parser =
140 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142 Ok(vec![stmt])
143 }
144 StreamingDdlKind::ExplainStreaming => {
145 let mut parser =
146 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147 let stmt =
148 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149 Ok(vec![stmt])
150 }
151 StreamingDdlKind::CreateMaterializedView { .. } => {
152 let mut parser =
153 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154 let stmt = parse_create_materialized_view(&mut parser)
155 .map_err(parse_error_to_parser_error)?;
156 Ok(vec![stmt])
157 }
158 StreamingDdlKind::CreateStream { .. } => {
159 let mut parser =
160 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161 let stmt = parse_create_stream(&mut parser, sql_trimmed)
162 .map_err(parse_error_to_parser_error)?;
163 Ok(vec![stmt])
164 }
165 StreamingDdlKind::DropStream { .. } => {
166 let mut parser =
167 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169 Ok(vec![stmt])
170 }
171 StreamingDdlKind::ShowStreams => {
172 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173 }
174 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175 StreamingDdlKind::CreateLookupTable { .. } => {
176 let mut parser =
177 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178 let lt = lookup_table::parse_create_lookup_table(&mut parser)
179 .map_err(parse_error_to_parser_error)?;
180 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181 }
182 StreamingDdlKind::DropLookupTable { .. } => {
183 let mut parser =
184 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186 .map_err(parse_error_to_parser_error)?;
187 Ok(vec![StreamingStatement::DropLookupTable {
188 name,
189 if_exists,
190 }])
191 }
192 StreamingDdlKind::AlterSource => {
193 let mut parser =
194 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196 Ok(vec![stmt])
197 }
198 StreamingDdlKind::None => {
199 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
201 Ok(statements
202 .into_iter()
203 .map(convert_standard_statement)
204 .collect())
205 }
206 }
207 }
208
209 #[must_use]
211 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
212 match expr {
213 sqlparser::ast::Expr::Function(func) => {
214 if let Some(name) = func.name.0.last() {
215 let func_name = name.to_string().to_uppercase();
216 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
217 } else {
218 false
219 }
220 }
221 _ => false,
222 }
223 }
224
225 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
231 emit_parser::parse_emit_clause_from_sql(sql)
232 }
233
234 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
240 late_data_parser::parse_late_data_clause_from_sql(sql)
241 }
242}
243
244fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
246 match e {
247 ParseError::SqlParseError(pe) => pe,
248 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
249 ParseError::WindowError(msg) => {
250 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
251 }
252 ParseError::ValidationError(msg) => {
253 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
254 }
255 }
256}
257
258fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
263 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
264 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
266 let table_name = name.clone();
267 let columns = insert.columns.clone();
268
269 if let Some(ref source) = insert.source {
271 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
272 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
273 return StreamingStatement::InsertInto {
274 table_name,
275 columns,
276 values: rows,
277 };
278 }
279 }
280 }
281 }
282 StreamingStatement::Standard(Box::new(stmt))
283}
284
285fn parse_drop_source(
293 parser: &mut sqlparser::parser::Parser,
294) -> Result<StreamingStatement, ParseError> {
295 parser
296 .expect_keyword(sqlparser::keywords::Keyword::DROP)
297 .map_err(ParseError::SqlParseError)?;
298 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
299 let if_exists = parser.parse_keywords(&[
300 sqlparser::keywords::Keyword::IF,
301 sqlparser::keywords::Keyword::EXISTS,
302 ]);
303 let name = parser
304 .parse_object_name(false)
305 .map_err(ParseError::SqlParseError)?;
306 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
307 Ok(StreamingStatement::DropSource {
308 name,
309 if_exists,
310 cascade,
311 })
312}
313
314fn parse_drop_sink(
322 parser: &mut sqlparser::parser::Parser,
323) -> Result<StreamingStatement, ParseError> {
324 parser
325 .expect_keyword(sqlparser::keywords::Keyword::DROP)
326 .map_err(ParseError::SqlParseError)?;
327 tokenizer::expect_custom_keyword(parser, "SINK")?;
328 let if_exists = parser.parse_keywords(&[
329 sqlparser::keywords::Keyword::IF,
330 sqlparser::keywords::Keyword::EXISTS,
331 ]);
332 let name = parser
333 .parse_object_name(false)
334 .map_err(ParseError::SqlParseError)?;
335 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
336 Ok(StreamingStatement::DropSink {
337 name,
338 if_exists,
339 cascade,
340 })
341}
342
343fn parse_drop_materialized_view(
351 parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353 parser
354 .expect_keyword(sqlparser::keywords::Keyword::DROP)
355 .map_err(ParseError::SqlParseError)?;
356 parser
357 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
358 .map_err(ParseError::SqlParseError)?;
359 parser
360 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
361 .map_err(ParseError::SqlParseError)?;
362 let if_exists = parser.parse_keywords(&[
363 sqlparser::keywords::Keyword::IF,
364 sqlparser::keywords::Keyword::EXISTS,
365 ]);
366 let name = parser
367 .parse_object_name(false)
368 .map_err(ParseError::SqlParseError)?;
369 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
370 Ok(StreamingStatement::DropMaterializedView {
371 name,
372 if_exists,
373 cascade,
374 })
375}
376
377fn parse_create_stream(
385 parser: &mut sqlparser::parser::Parser,
386 _original_sql: &str,
387) -> Result<StreamingStatement, ParseError> {
388 parser
389 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
390 .map_err(ParseError::SqlParseError)?;
391
392 let or_replace = parser.parse_keywords(&[
393 sqlparser::keywords::Keyword::OR,
394 sqlparser::keywords::Keyword::REPLACE,
395 ]);
396
397 tokenizer::expect_custom_keyword(parser, "STREAM")?;
398
399 let if_not_exists = parser.parse_keywords(&[
400 sqlparser::keywords::Keyword::IF,
401 sqlparser::keywords::Keyword::NOT,
402 sqlparser::keywords::Keyword::EXISTS,
403 ]);
404
405 let name = parser
406 .parse_object_name(false)
407 .map_err(ParseError::SqlParseError)?;
408
409 parser
410 .expect_keyword(sqlparser::keywords::Keyword::AS)
411 .map_err(ParseError::SqlParseError)?;
412
413 let remaining = collect_remaining_tokens(parser);
415 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
416
417 let stream_dialect = LaminarDialect::default();
418
419 let query = if query_tokens.is_empty() {
420 return Err(ParseError::StreamingError(
421 "Expected SELECT query after AS".to_string(),
422 ));
423 } else {
424 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
425 .with_tokens_with_locations(query_tokens);
426 query_parser
427 .parse_query()
428 .map_err(ParseError::SqlParseError)?
429 };
430
431 let query_stmt =
432 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
433
434 let emit_clause = if emit_tokens.is_empty() {
435 None
436 } else {
437 let mut emit_parser =
438 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
439 emit_parser::parse_emit_clause(&mut emit_parser)?
440 };
441
442 Ok(StreamingStatement::CreateStream {
443 name,
444 query: Box::new(query_stmt),
445 emit_clause,
446 or_replace,
447 if_not_exists,
448 })
449}
450
451fn parse_drop_stream(
459 parser: &mut sqlparser::parser::Parser,
460) -> Result<StreamingStatement, ParseError> {
461 parser
462 .expect_keyword(sqlparser::keywords::Keyword::DROP)
463 .map_err(ParseError::SqlParseError)?;
464 tokenizer::expect_custom_keyword(parser, "STREAM")?;
465 let if_exists = parser.parse_keywords(&[
466 sqlparser::keywords::Keyword::IF,
467 sqlparser::keywords::Keyword::EXISTS,
468 ]);
469 let name = parser
470 .parse_object_name(false)
471 .map_err(ParseError::SqlParseError)?;
472 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
473 Ok(StreamingStatement::DropStream {
474 name,
475 if_exists,
476 cascade,
477 })
478}
479
480fn parse_alter_source(
493 parser: &mut sqlparser::parser::Parser,
494) -> Result<StreamingStatement, ParseError> {
495 parser
496 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
497 .map_err(ParseError::SqlParseError)?;
498 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
499 let name = parser
500 .parse_object_name(false)
501 .map_err(ParseError::SqlParseError)?;
502
503 if parser.parse_keywords(&[
505 sqlparser::keywords::Keyword::ADD,
506 sqlparser::keywords::Keyword::COLUMN,
507 ]) {
508 let col_name = parser
510 .parse_identifier()
511 .map_err(ParseError::SqlParseError)?;
512 let data_type = parser
513 .parse_data_type()
514 .map_err(ParseError::SqlParseError)?;
515 let column_def = sqlparser::ast::ColumnDef {
516 name: col_name,
517 data_type,
518 options: vec![],
519 };
520 Ok(StreamingStatement::AlterSource {
521 name,
522 operation: statements::AlterSourceOperation::AddColumn { column_def },
523 })
524 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
525 parser
527 .expect_token(&sqlparser::tokenizer::Token::LParen)
528 .map_err(ParseError::SqlParseError)?;
529 let mut properties = std::collections::HashMap::new();
530 loop {
531 let key = parser
532 .parse_literal_string()
533 .map_err(ParseError::SqlParseError)?;
534 parser
535 .expect_token(&sqlparser::tokenizer::Token::Eq)
536 .map_err(ParseError::SqlParseError)?;
537 let value = parser
538 .parse_literal_string()
539 .map_err(ParseError::SqlParseError)?;
540 properties.insert(key, value);
541 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
542 break;
543 }
544 }
545 parser
546 .expect_token(&sqlparser::tokenizer::Token::RParen)
547 .map_err(ParseError::SqlParseError)?;
548 Ok(StreamingStatement::AlterSource {
549 name,
550 operation: statements::AlterSourceOperation::SetProperties { properties },
551 })
552 } else {
553 Err(ParseError::StreamingError(
554 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
555 ))
556 }
557}
558
559fn parse_describe(
565 parser: &mut sqlparser::parser::Parser,
566) -> Result<StreamingStatement, ParseError> {
567 let token = parser.next_token();
569 match &token.token {
570 sqlparser::tokenizer::Token::Word(w)
571 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
572 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
573 _ => {
574 return Err(ParseError::StreamingError(
575 "Expected DESCRIBE or DESC".to_string(),
576 ));
577 }
578 }
579 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
580 let name = parser
581 .parse_object_name(false)
582 .map_err(ParseError::SqlParseError)?;
583 Ok(StreamingStatement::Describe { name, extended })
584}
585
586fn parse_explain(
594 parser: &mut sqlparser::parser::Parser,
595 original_sql: &str,
596) -> Result<StreamingStatement, ParseError> {
597 parser
598 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
599 .map_err(ParseError::SqlParseError)?;
600
601 let explain_prefix_upper = original_sql.to_uppercase();
603 let inner_start = explain_prefix_upper
604 .find("EXPLAIN")
605 .map_or(0, |pos| pos + "EXPLAIN".len());
606 let inner_sql = original_sql[inner_start..].trim();
607
608 let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
610 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
611 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
612 })?;
613 Ok(StreamingStatement::Explain {
614 statement: Box::new(inner),
615 })
616}
617
618fn parse_create_materialized_view(
631 parser: &mut sqlparser::parser::Parser,
632) -> Result<StreamingStatement, ParseError> {
633 parser
634 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
635 .map_err(ParseError::SqlParseError)?;
636
637 let or_replace = parser.parse_keywords(&[
638 sqlparser::keywords::Keyword::OR,
639 sqlparser::keywords::Keyword::REPLACE,
640 ]);
641
642 parser
643 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
644 .map_err(ParseError::SqlParseError)?;
645 parser
646 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
647 .map_err(ParseError::SqlParseError)?;
648
649 let if_not_exists = parser.parse_keywords(&[
650 sqlparser::keywords::Keyword::IF,
651 sqlparser::keywords::Keyword::NOT,
652 sqlparser::keywords::Keyword::EXISTS,
653 ]);
654
655 let name = parser
656 .parse_object_name(false)
657 .map_err(ParseError::SqlParseError)?;
658
659 parser
660 .expect_keyword(sqlparser::keywords::Keyword::AS)
661 .map_err(ParseError::SqlParseError)?;
662
663 let remaining = collect_remaining_tokens(parser);
665 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
666
667 let mv_dialect = LaminarDialect::default();
668
669 let query = if query_tokens.is_empty() {
670 return Err(ParseError::StreamingError(
671 "Expected SELECT query after AS".to_string(),
672 ));
673 } else {
674 let mut query_parser =
675 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
676 query_parser
677 .parse_query()
678 .map_err(ParseError::SqlParseError)?
679 };
680
681 let query_stmt =
682 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
683
684 let emit_clause = if emit_tokens.is_empty() {
685 None
686 } else {
687 let mut emit_parser =
688 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
689 emit_parser::parse_emit_clause(&mut emit_parser)?
690 };
691
692 Ok(StreamingStatement::CreateMaterializedView {
693 name,
694 query: Box::new(query_stmt),
695 emit_clause,
696 or_replace,
697 if_not_exists,
698 })
699}
700
701fn collect_remaining_tokens(
703 parser: &mut sqlparser::parser::Parser,
704) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
705 let mut tokens = Vec::new();
706 loop {
707 let token = parser.next_token();
708 if token.token == sqlparser::tokenizer::Token::EOF {
709 tokens.push(token);
710 break;
711 }
712 tokens.push(token);
713 }
714 tokens
715}
716
717fn split_at_emit(
722 tokens: &[sqlparser::tokenizer::TokenWithSpan],
723) -> (
724 Vec<sqlparser::tokenizer::TokenWithSpan>,
725 Vec<sqlparser::tokenizer::TokenWithSpan>,
726) {
727 let mut depth: i32 = 0;
728 for (i, token) in tokens.iter().enumerate() {
729 match &token.token {
730 sqlparser::tokenizer::Token::LParen => depth += 1,
731 sqlparser::tokenizer::Token::RParen => {
732 depth -= 1;
733 }
734 sqlparser::tokenizer::Token::Word(w)
735 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
736 {
737 let mut query_tokens = tokens[..i].to_vec();
738 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
739 token: sqlparser::tokenizer::Token::EOF,
740 span: sqlparser::tokenizer::Span::empty(),
741 });
742 let emit_tokens = tokens[i..].to_vec();
743 return (query_tokens, emit_tokens);
744 }
745 _ => {}
746 }
747 }
748 (tokens.to_vec(), vec![])
749}
750
751#[derive(Debug, thiserror::Error)]
753pub enum ParseError {
754 #[error("SQL parse error: {0}")]
756 SqlParseError(#[from] sqlparser::parser::ParserError),
757
758 #[error("Streaming SQL error: {0}")]
760 StreamingError(String),
761
762 #[error("Window function error: {0}")]
764 WindowError(String),
765
766 #[error("Validation error: {0}")]
768 ValidationError(String),
769}
770
771#[cfg(test)]
772mod tests {
773 use super::*;
774
775 fn parse_one(sql: &str) -> StreamingStatement {
777 let stmts = StreamingParser::parse_sql(sql).unwrap();
778 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
779 stmts.into_iter().next().unwrap()
780 }
781
782 #[test]
783 fn test_parse_drop_source() {
784 let stmt = parse_one("DROP SOURCE events");
785 match stmt {
786 StreamingStatement::DropSource {
787 name,
788 if_exists,
789 cascade,
790 } => {
791 assert_eq!(name.to_string(), "events");
792 assert!(!if_exists);
793 assert!(!cascade);
794 }
795 _ => panic!("Expected DropSource, got {stmt:?}"),
796 }
797 }
798
799 #[test]
800 fn test_parse_drop_source_if_exists() {
801 let stmt = parse_one("DROP SOURCE IF EXISTS events");
802 match stmt {
803 StreamingStatement::DropSource {
804 name,
805 if_exists,
806 cascade,
807 } => {
808 assert_eq!(name.to_string(), "events");
809 assert!(if_exists);
810 assert!(!cascade);
811 }
812 _ => panic!("Expected DropSource, got {stmt:?}"),
813 }
814 }
815
816 #[test]
817 fn test_parse_drop_source_cascade() {
818 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
819 match stmt {
820 StreamingStatement::DropSource {
821 name,
822 if_exists,
823 cascade,
824 } => {
825 assert_eq!(name.to_string(), "events");
826 assert!(if_exists);
827 assert!(cascade);
828 }
829 _ => panic!("Expected DropSource, got {stmt:?}"),
830 }
831 }
832
833 #[test]
834 fn test_parse_drop_sink() {
835 let stmt = parse_one("DROP SINK output");
836 match stmt {
837 StreamingStatement::DropSink {
838 name,
839 if_exists,
840 cascade,
841 } => {
842 assert_eq!(name.to_string(), "output");
843 assert!(!if_exists);
844 assert!(!cascade);
845 }
846 _ => panic!("Expected DropSink, got {stmt:?}"),
847 }
848 }
849
850 #[test]
851 fn test_parse_drop_sink_if_exists() {
852 let stmt = parse_one("DROP SINK IF EXISTS output");
853 match stmt {
854 StreamingStatement::DropSink {
855 name,
856 if_exists,
857 cascade,
858 } => {
859 assert_eq!(name.to_string(), "output");
860 assert!(if_exists);
861 assert!(!cascade);
862 }
863 _ => panic!("Expected DropSink, got {stmt:?}"),
864 }
865 }
866
867 #[test]
868 fn test_parse_drop_sink_cascade() {
869 let stmt = parse_one("DROP SINK output CASCADE");
870 match stmt {
871 StreamingStatement::DropSink {
872 name,
873 if_exists,
874 cascade,
875 } => {
876 assert_eq!(name.to_string(), "output");
877 assert!(!if_exists);
878 assert!(cascade);
879 }
880 _ => panic!("Expected DropSink, got {stmt:?}"),
881 }
882 }
883
884 #[test]
885 fn test_parse_drop_materialized_view() {
886 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
887 match stmt {
888 StreamingStatement::DropMaterializedView {
889 name,
890 if_exists,
891 cascade,
892 } => {
893 assert_eq!(name.to_string(), "live_stats");
894 assert!(!if_exists);
895 assert!(!cascade);
896 }
897 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
898 }
899 }
900
901 #[test]
902 fn test_parse_drop_materialized_view_if_exists_cascade() {
903 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
904 match stmt {
905 StreamingStatement::DropMaterializedView {
906 name,
907 if_exists,
908 cascade,
909 } => {
910 assert_eq!(name.to_string(), "live_stats");
911 assert!(if_exists);
912 assert!(cascade);
913 }
914 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
915 }
916 }
917
918 #[test]
919 fn test_parse_show_sources() {
920 let stmt = parse_one("SHOW SOURCES");
921 assert!(matches!(
922 stmt,
923 StreamingStatement::Show(ShowCommand::Sources)
924 ));
925 }
926
927 #[test]
928 fn test_parse_show_sinks() {
929 let stmt = parse_one("SHOW SINKS");
930 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
931 }
932
933 #[test]
934 fn test_parse_show_queries() {
935 let stmt = parse_one("SHOW QUERIES");
936 assert!(matches!(
937 stmt,
938 StreamingStatement::Show(ShowCommand::Queries)
939 ));
940 }
941
942 #[test]
943 fn test_parse_show_materialized_views() {
944 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
945 assert!(matches!(
946 stmt,
947 StreamingStatement::Show(ShowCommand::MaterializedViews)
948 ));
949 }
950
951 #[test]
952 fn test_parse_describe() {
953 let stmt = parse_one("DESCRIBE events");
954 match stmt {
955 StreamingStatement::Describe { name, extended } => {
956 assert_eq!(name.to_string(), "events");
957 assert!(!extended);
958 }
959 _ => panic!("Expected Describe, got {stmt:?}"),
960 }
961 }
962
963 #[test]
964 fn test_parse_describe_extended() {
965 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
966 match stmt {
967 StreamingStatement::Describe { name, extended } => {
968 assert_eq!(name.to_string(), "my_schema.events");
969 assert!(extended);
970 }
971 _ => panic!("Expected Describe, got {stmt:?}"),
972 }
973 }
974
975 #[test]
976 fn test_parse_explain_select() {
977 let stmt = parse_one("EXPLAIN SELECT * FROM events");
978 match stmt {
979 StreamingStatement::Explain { statement } => {
980 assert!(matches!(*statement, StreamingStatement::Standard(_)));
981 }
982 _ => panic!("Expected Explain, got {stmt:?}"),
983 }
984 }
985
986 #[test]
987 fn test_parse_explain_create_source() {
988 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
989 match stmt {
990 StreamingStatement::Explain { statement } => {
991 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
992 }
993 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
994 }
995 }
996
997 #[test]
998 fn test_parse_create_materialized_view() {
999 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1000 match stmt {
1001 StreamingStatement::CreateMaterializedView {
1002 name,
1003 emit_clause,
1004 or_replace,
1005 if_not_exists,
1006 ..
1007 } => {
1008 assert_eq!(name.to_string(), "live_stats");
1009 assert!(emit_clause.is_none());
1010 assert!(!or_replace);
1011 assert!(!if_not_exists);
1012 }
1013 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1014 }
1015 }
1016
1017 #[test]
1018 fn test_parse_create_materialized_view_with_emit() {
1019 let stmt = parse_one(
1020 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1021 );
1022 match stmt {
1023 StreamingStatement::CreateMaterializedView {
1024 name, emit_clause, ..
1025 } => {
1026 assert_eq!(name.to_string(), "live_stats");
1027 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1028 }
1029 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1030 }
1031 }
1032
1033 #[test]
1034 fn test_parse_create_or_replace_materialized_view() {
1035 let stmt = parse_one(
1036 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1037 );
1038 match stmt {
1039 StreamingStatement::CreateMaterializedView {
1040 name,
1041 or_replace,
1042 if_not_exists,
1043 ..
1044 } => {
1045 assert_eq!(name.to_string(), "live_stats");
1046 assert!(or_replace);
1047 assert!(!if_not_exists);
1048 }
1049 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1050 }
1051 }
1052
1053 #[test]
1054 fn test_parse_create_materialized_view_if_not_exists() {
1055 let stmt = parse_one(
1056 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1057 );
1058 match stmt {
1059 StreamingStatement::CreateMaterializedView {
1060 name,
1061 or_replace,
1062 if_not_exists,
1063 ..
1064 } => {
1065 assert_eq!(name.to_string(), "live_stats");
1066 assert!(!or_replace);
1067 assert!(if_not_exists);
1068 }
1069 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1070 }
1071 }
1072
1073 #[test]
1074 fn test_parse_insert_into() {
1075 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1076 match stmt {
1077 StreamingStatement::InsertInto {
1078 table_name,
1079 columns,
1080 values,
1081 } => {
1082 assert_eq!(table_name.to_string(), "events");
1083 assert_eq!(columns.len(), 2);
1084 assert_eq!(columns[0].to_string(), "id");
1085 assert_eq!(columns[1].to_string(), "name");
1086 assert_eq!(values.len(), 1);
1087 assert_eq!(values[0].len(), 2);
1088 }
1089 _ => panic!("Expected InsertInto, got {stmt:?}"),
1090 }
1091 }
1092
1093 #[test]
1094 fn test_parse_insert_into_multiple_rows() {
1095 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1096 match stmt {
1097 StreamingStatement::InsertInto {
1098 table_name,
1099 columns,
1100 values,
1101 } => {
1102 assert_eq!(table_name.to_string(), "events");
1103 assert!(columns.is_empty());
1104 assert_eq!(values.len(), 3);
1105 }
1106 _ => panic!("Expected InsertInto, got {stmt:?}"),
1107 }
1108 }
1109
1110 #[test]
1113 fn test_parse_create_stream() {
1114 let stmt = parse_one(
1115 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1116 );
1117 match stmt {
1118 StreamingStatement::CreateStream {
1119 name,
1120 or_replace,
1121 if_not_exists,
1122 emit_clause,
1123 ..
1124 } => {
1125 assert_eq!(name.to_string(), "session_activity");
1126 assert!(!or_replace);
1127 assert!(!if_not_exists);
1128 assert!(emit_clause.is_none());
1129 }
1130 _ => panic!("Expected CreateStream, got {stmt:?}"),
1131 }
1132 }
1133
1134 #[test]
1135 fn test_parse_create_or_replace_stream() {
1136 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1137 match stmt {
1138 StreamingStatement::CreateStream { or_replace, .. } => {
1139 assert!(or_replace);
1140 }
1141 _ => panic!("Expected CreateStream, got {stmt:?}"),
1142 }
1143 }
1144
1145 #[test]
1146 fn test_parse_create_stream_if_not_exists() {
1147 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1148 match stmt {
1149 StreamingStatement::CreateStream { if_not_exists, .. } => {
1150 assert!(if_not_exists);
1151 }
1152 _ => panic!("Expected CreateStream, got {stmt:?}"),
1153 }
1154 }
1155
1156 #[test]
1157 fn test_parse_create_stream_with_emit() {
1158 let stmt =
1159 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1160 match stmt {
1161 StreamingStatement::CreateStream { emit_clause, .. } => {
1162 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1163 }
1164 _ => panic!("Expected CreateStream, got {stmt:?}"),
1165 }
1166 }
1167
1168 #[test]
1169 fn test_parse_drop_stream() {
1170 let stmt = parse_one("DROP STREAM my_stream");
1171 match stmt {
1172 StreamingStatement::DropStream {
1173 name,
1174 if_exists,
1175 cascade,
1176 } => {
1177 assert_eq!(name.to_string(), "my_stream");
1178 assert!(!if_exists);
1179 assert!(!cascade);
1180 }
1181 _ => panic!("Expected DropStream, got {stmt:?}"),
1182 }
1183 }
1184
1185 #[test]
1186 fn test_parse_drop_stream_if_exists() {
1187 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1188 match stmt {
1189 StreamingStatement::DropStream {
1190 name,
1191 if_exists,
1192 cascade,
1193 } => {
1194 assert_eq!(name.to_string(), "my_stream");
1195 assert!(if_exists);
1196 assert!(!cascade);
1197 }
1198 _ => panic!("Expected DropStream, got {stmt:?}"),
1199 }
1200 }
1201
1202 #[test]
1203 fn test_parse_drop_stream_cascade() {
1204 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1205 match stmt {
1206 StreamingStatement::DropStream {
1207 name,
1208 if_exists,
1209 cascade,
1210 } => {
1211 assert_eq!(name.to_string(), "my_stream");
1212 assert!(!if_exists);
1213 assert!(cascade);
1214 }
1215 _ => panic!("Expected DropStream, got {stmt:?}"),
1216 }
1217 }
1218
1219 #[test]
1220 fn test_parse_show_streams() {
1221 let stmt = parse_one("SHOW STREAMS");
1222 assert!(matches!(
1223 stmt,
1224 StreamingStatement::Show(ShowCommand::Streams)
1225 ));
1226 }
1227
1228 #[test]
1229 fn test_parse_alter_source_add_column() {
1230 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1231 match stmt {
1232 StreamingStatement::AlterSource { name, operation } => {
1233 assert_eq!(name.to_string(), "events");
1234 match operation {
1235 statements::AlterSourceOperation::AddColumn { column_def } => {
1236 assert_eq!(column_def.name.value, "new_col");
1237 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1238 }
1239 statements::AlterSourceOperation::SetProperties { .. } => {
1240 panic!("Expected AddColumn")
1241 }
1242 }
1243 }
1244 _ => panic!("Expected AlterSource, got {stmt:?}"),
1245 }
1246 }
1247
1248 #[test]
1249 fn test_parse_alter_source_set_properties() {
1250 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1251 match stmt {
1252 StreamingStatement::AlterSource { name, operation } => {
1253 assert_eq!(name.to_string(), "events");
1254 match operation {
1255 statements::AlterSourceOperation::SetProperties { properties } => {
1256 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1257 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1258 }
1259 statements::AlterSourceOperation::AddColumn { .. } => {
1260 panic!("Expected SetProperties")
1261 }
1262 }
1263 }
1264 _ => panic!("Expected AlterSource, got {stmt:?}"),
1265 }
1266 }
1267}