1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29 WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48pub struct StreamingParser;
54
55impl StreamingParser {
56 #[allow(clippy::too_many_lines)]
69 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70 let sql_trimmed = sql.trim();
71 if sql_trimmed.is_empty() {
72 return Err(sqlparser::parser::ParserError::ParserError(
73 "Empty SQL statement".to_string(),
74 ));
75 }
76
77 let dialect = LaminarDialect::default();
78
79 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81 .tokenize_with_location()
82 .map_err(|e| {
83 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84 })?;
85
86 match detect_streaming_ddl(&tokens) {
88 StreamingDdlKind::CreateSource { .. } => {
89 let mut parser =
90 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91 let source = source_parser::parse_create_source(&mut parser)
92 .map_err(parse_error_to_parser_error)?;
93 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94 }
95 StreamingDdlKind::CreateSink { .. } => {
96 let mut parser =
97 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98 let sink = sink_parser::parse_create_sink(&mut parser)
99 .map_err(parse_error_to_parser_error)?;
100 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101 }
102 StreamingDdlKind::CreateContinuousQuery { .. } => {
103 let mut parser =
104 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106 .map_err(parse_error_to_parser_error)?;
107 Ok(vec![stmt])
108 }
109 StreamingDdlKind::DropSource { .. } => {
110 let mut parser =
111 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113 Ok(vec![stmt])
114 }
115 StreamingDdlKind::DropSink { .. } => {
116 let mut parser =
117 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119 Ok(vec![stmt])
120 }
121 StreamingDdlKind::DropMaterializedView { .. } => {
122 let mut parser =
123 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124 let stmt = parse_drop_materialized_view(&mut parser)
125 .map_err(parse_error_to_parser_error)?;
126 Ok(vec![stmt])
127 }
128 StreamingDdlKind::ShowSources => {
129 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130 }
131 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132 StreamingDdlKind::ShowQueries => {
133 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134 }
135 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136 ShowCommand::MaterializedViews,
137 )]),
138 StreamingDdlKind::DescribeSource => {
139 let mut parser =
140 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142 Ok(vec![stmt])
143 }
144 StreamingDdlKind::ExplainStreaming => {
145 let mut parser =
146 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147 let stmt =
148 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149 Ok(vec![stmt])
150 }
151 StreamingDdlKind::CreateMaterializedView { .. } => {
152 let mut parser =
153 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154 let stmt = parse_create_materialized_view(&mut parser)
155 .map_err(parse_error_to_parser_error)?;
156 Ok(vec![stmt])
157 }
158 StreamingDdlKind::CreateStream { .. } => {
159 let mut parser =
160 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161 let stmt = parse_create_stream(&mut parser, sql_trimmed)
162 .map_err(parse_error_to_parser_error)?;
163 Ok(vec![stmt])
164 }
165 StreamingDdlKind::DropStream { .. } => {
166 let mut parser =
167 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169 Ok(vec![stmt])
170 }
171 StreamingDdlKind::ShowStreams => {
172 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173 }
174 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175 StreamingDdlKind::CreateLookupTable { .. } => {
176 let mut parser =
177 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178 let lt = lookup_table::parse_create_lookup_table(&mut parser)
179 .map_err(parse_error_to_parser_error)?;
180 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181 }
182 StreamingDdlKind::DropLookupTable { .. } => {
183 let mut parser =
184 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186 .map_err(parse_error_to_parser_error)?;
187 Ok(vec![StreamingStatement::DropLookupTable {
188 name,
189 if_exists,
190 }])
191 }
192 StreamingDdlKind::AlterSource => {
193 let mut parser =
194 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196 Ok(vec![stmt])
197 }
198 StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199 ShowCommand::CheckpointStatus,
200 )]),
201 StreamingDdlKind::ShowCreateSource => {
202 let mut parser =
203 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204 let stmt =
205 parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206 Ok(vec![stmt])
207 }
208 StreamingDdlKind::ShowCreateSink => {
209 let mut parser =
210 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211 let stmt =
212 parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213 Ok(vec![stmt])
214 }
215 StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216 StreamingDdlKind::RestoreCheckpoint => {
217 let mut parser =
218 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219 let stmt =
220 parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
221 Ok(vec![stmt])
222 }
223 StreamingDdlKind::None => {
224 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
226 Ok(statements
227 .into_iter()
228 .map(convert_standard_statement)
229 .collect())
230 }
231 }
232 }
233
234 #[must_use]
236 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
237 match expr {
238 sqlparser::ast::Expr::Function(func) => {
239 if let Some(name) = func.name.0.last() {
240 let func_name = name.to_string().to_uppercase();
241 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
242 } else {
243 false
244 }
245 }
246 _ => false,
247 }
248 }
249
250 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
256 emit_parser::parse_emit_clause_from_sql(sql)
257 }
258
259 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
265 late_data_parser::parse_late_data_clause_from_sql(sql)
266 }
267}
268
269fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
271 match e {
272 ParseError::SqlParseError(pe) => pe,
273 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
274 ParseError::WindowError(msg) => {
275 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
276 }
277 ParseError::ValidationError(msg) => {
278 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
279 }
280 }
281}
282
283fn parse_restore_checkpoint(
291 parser: &mut sqlparser::parser::Parser,
292) -> Result<StreamingStatement, ParseError> {
293 tokenizer::expect_custom_keyword(parser, "RESTORE")?;
295 if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
297 return Err(ParseError::StreamingError(
298 "Expected FROM after RESTORE".to_string(),
299 ));
300 }
301 tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
303 let token = parser.next_token();
305 match &token.token {
306 sqlparser::tokenizer::Token::Number(n, _) => {
307 let id: u64 = n
308 .parse()
309 .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
310 Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
311 }
312 other => Err(ParseError::StreamingError(format!(
313 "Expected checkpoint ID (number), found {other}"
314 ))),
315 }
316}
317
318fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
323 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
324 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
326 let table_name = name.clone();
327 let columns = insert.columns.clone();
328
329 if let Some(ref source) = insert.source {
331 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
332 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
333 return StreamingStatement::InsertInto {
334 table_name,
335 columns,
336 values: rows,
337 };
338 }
339 }
340 }
341 }
342 StreamingStatement::Standard(Box::new(stmt))
343}
344
345fn parse_drop_source(
353 parser: &mut sqlparser::parser::Parser,
354) -> Result<StreamingStatement, ParseError> {
355 parser
356 .expect_keyword(sqlparser::keywords::Keyword::DROP)
357 .map_err(ParseError::SqlParseError)?;
358 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
359 let if_exists = parser.parse_keywords(&[
360 sqlparser::keywords::Keyword::IF,
361 sqlparser::keywords::Keyword::EXISTS,
362 ]);
363 let name = parser
364 .parse_object_name(false)
365 .map_err(ParseError::SqlParseError)?;
366 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
367 Ok(StreamingStatement::DropSource {
368 name,
369 if_exists,
370 cascade,
371 })
372}
373
374fn parse_drop_sink(
382 parser: &mut sqlparser::parser::Parser,
383) -> Result<StreamingStatement, ParseError> {
384 parser
385 .expect_keyword(sqlparser::keywords::Keyword::DROP)
386 .map_err(ParseError::SqlParseError)?;
387 tokenizer::expect_custom_keyword(parser, "SINK")?;
388 let if_exists = parser.parse_keywords(&[
389 sqlparser::keywords::Keyword::IF,
390 sqlparser::keywords::Keyword::EXISTS,
391 ]);
392 let name = parser
393 .parse_object_name(false)
394 .map_err(ParseError::SqlParseError)?;
395 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
396 Ok(StreamingStatement::DropSink {
397 name,
398 if_exists,
399 cascade,
400 })
401}
402
403fn parse_drop_materialized_view(
411 parser: &mut sqlparser::parser::Parser,
412) -> Result<StreamingStatement, ParseError> {
413 parser
414 .expect_keyword(sqlparser::keywords::Keyword::DROP)
415 .map_err(ParseError::SqlParseError)?;
416 parser
417 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
418 .map_err(ParseError::SqlParseError)?;
419 parser
420 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
421 .map_err(ParseError::SqlParseError)?;
422 let if_exists = parser.parse_keywords(&[
423 sqlparser::keywords::Keyword::IF,
424 sqlparser::keywords::Keyword::EXISTS,
425 ]);
426 let name = parser
427 .parse_object_name(false)
428 .map_err(ParseError::SqlParseError)?;
429 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
430 Ok(StreamingStatement::DropMaterializedView {
431 name,
432 if_exists,
433 cascade,
434 })
435}
436
437fn parse_create_stream(
445 parser: &mut sqlparser::parser::Parser,
446 _original_sql: &str,
447) -> Result<StreamingStatement, ParseError> {
448 parser
449 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
450 .map_err(ParseError::SqlParseError)?;
451
452 let or_replace = parser.parse_keywords(&[
453 sqlparser::keywords::Keyword::OR,
454 sqlparser::keywords::Keyword::REPLACE,
455 ]);
456
457 tokenizer::expect_custom_keyword(parser, "STREAM")?;
458
459 let if_not_exists = parser.parse_keywords(&[
460 sqlparser::keywords::Keyword::IF,
461 sqlparser::keywords::Keyword::NOT,
462 sqlparser::keywords::Keyword::EXISTS,
463 ]);
464
465 let name = parser
466 .parse_object_name(false)
467 .map_err(ParseError::SqlParseError)?;
468
469 parser
470 .expect_keyword(sqlparser::keywords::Keyword::AS)
471 .map_err(ParseError::SqlParseError)?;
472
473 let remaining = collect_remaining_tokens(parser);
475 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
476
477 let stream_dialect = LaminarDialect::default();
478
479 let query = if query_tokens.is_empty() {
480 return Err(ParseError::StreamingError(
481 "Expected SELECT query after AS".to_string(),
482 ));
483 } else {
484 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485 .with_tokens_with_locations(query_tokens);
486 query_parser
487 .parse_query()
488 .map_err(ParseError::SqlParseError)?
489 };
490
491 let query_stmt =
492 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494 let emit_clause = if emit_tokens.is_empty() {
495 None
496 } else {
497 let mut emit_parser =
498 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499 emit_parser::parse_emit_clause(&mut emit_parser)?
500 };
501
502 Ok(StreamingStatement::CreateStream {
503 name,
504 query: Box::new(query_stmt),
505 emit_clause,
506 or_replace,
507 if_not_exists,
508 })
509}
510
511fn parse_drop_stream(
519 parser: &mut sqlparser::parser::Parser,
520) -> Result<StreamingStatement, ParseError> {
521 parser
522 .expect_keyword(sqlparser::keywords::Keyword::DROP)
523 .map_err(ParseError::SqlParseError)?;
524 tokenizer::expect_custom_keyword(parser, "STREAM")?;
525 let if_exists = parser.parse_keywords(&[
526 sqlparser::keywords::Keyword::IF,
527 sqlparser::keywords::Keyword::EXISTS,
528 ]);
529 let name = parser
530 .parse_object_name(false)
531 .map_err(ParseError::SqlParseError)?;
532 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
533 Ok(StreamingStatement::DropStream {
534 name,
535 if_exists,
536 cascade,
537 })
538}
539
540fn parse_alter_source(
553 parser: &mut sqlparser::parser::Parser,
554) -> Result<StreamingStatement, ParseError> {
555 parser
556 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
557 .map_err(ParseError::SqlParseError)?;
558 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
559 let name = parser
560 .parse_object_name(false)
561 .map_err(ParseError::SqlParseError)?;
562
563 if parser.parse_keywords(&[
565 sqlparser::keywords::Keyword::ADD,
566 sqlparser::keywords::Keyword::COLUMN,
567 ]) {
568 let col_name = parser
570 .parse_identifier()
571 .map_err(ParseError::SqlParseError)?;
572 let data_type = parser
573 .parse_data_type()
574 .map_err(ParseError::SqlParseError)?;
575 let column_def = sqlparser::ast::ColumnDef {
576 name: col_name,
577 data_type,
578 options: vec![],
579 };
580 Ok(StreamingStatement::AlterSource {
581 name,
582 operation: statements::AlterSourceOperation::AddColumn { column_def },
583 })
584 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
585 parser
587 .expect_token(&sqlparser::tokenizer::Token::LParen)
588 .map_err(ParseError::SqlParseError)?;
589 let mut properties = std::collections::HashMap::new();
590 loop {
591 let key = parser
592 .parse_literal_string()
593 .map_err(ParseError::SqlParseError)?;
594 parser
595 .expect_token(&sqlparser::tokenizer::Token::Eq)
596 .map_err(ParseError::SqlParseError)?;
597 let value = parser
598 .parse_literal_string()
599 .map_err(ParseError::SqlParseError)?;
600 properties.insert(key, value);
601 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
602 break;
603 }
604 }
605 parser
606 .expect_token(&sqlparser::tokenizer::Token::RParen)
607 .map_err(ParseError::SqlParseError)?;
608 Ok(StreamingStatement::AlterSource {
609 name,
610 operation: statements::AlterSourceOperation::SetProperties { properties },
611 })
612 } else {
613 Err(ParseError::StreamingError(
614 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
615 ))
616 }
617}
618
619fn parse_describe(
625 parser: &mut sqlparser::parser::Parser,
626) -> Result<StreamingStatement, ParseError> {
627 let token = parser.next_token();
629 match &token.token {
630 sqlparser::tokenizer::Token::Word(w)
631 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
632 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
633 _ => {
634 return Err(ParseError::StreamingError(
635 "Expected DESCRIBE or DESC".to_string(),
636 ));
637 }
638 }
639 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
640 let name = parser
641 .parse_object_name(false)
642 .map_err(ParseError::SqlParseError)?;
643 Ok(StreamingStatement::Describe { name, extended })
644}
645
646fn parse_show_create_source(
648 parser: &mut sqlparser::parser::Parser,
649) -> Result<StreamingStatement, ParseError> {
650 parser
652 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
653 .map_err(ParseError::SqlParseError)?;
654 parser
655 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
656 .map_err(ParseError::SqlParseError)?;
657 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
658 let name = parser
659 .parse_object_name(false)
660 .map_err(ParseError::SqlParseError)?;
661 Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
662}
663
664fn parse_show_create_sink(
666 parser: &mut sqlparser::parser::Parser,
667) -> Result<StreamingStatement, ParseError> {
668 parser
670 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
671 .map_err(ParseError::SqlParseError)?;
672 parser
673 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
674 .map_err(ParseError::SqlParseError)?;
675 tokenizer::expect_custom_keyword(parser, "SINK")?;
676 let name = parser
677 .parse_object_name(false)
678 .map_err(ParseError::SqlParseError)?;
679 Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
680}
681
682fn parse_explain(
690 parser: &mut sqlparser::parser::Parser,
691 original_sql: &str,
692) -> Result<StreamingStatement, ParseError> {
693 parser
694 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
695 .map_err(ParseError::SqlParseError)?;
696
697 let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
699
700 let explain_prefix_upper = original_sql.to_uppercase();
702 let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
703 let inner_start = if analyze {
704 explain_prefix_upper
705 .find("ANALYZE")
706 .map_or(0, |pos| pos + "ANALYZE".len())
707 } else {
708 explain_prefix_upper
709 .find("EXPLAIN")
710 .map_or(0, |pos| pos + "EXPLAIN".len())
711 };
712 let inner_sql = original_sql[inner_start..].trim();
713 let _ = skip_keyword; let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
717 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
718 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
719 })?;
720 Ok(StreamingStatement::Explain {
721 statement: Box::new(inner),
722 analyze,
723 })
724}
725
726fn parse_create_materialized_view(
739 parser: &mut sqlparser::parser::Parser,
740) -> Result<StreamingStatement, ParseError> {
741 parser
742 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
743 .map_err(ParseError::SqlParseError)?;
744
745 let or_replace = parser.parse_keywords(&[
746 sqlparser::keywords::Keyword::OR,
747 sqlparser::keywords::Keyword::REPLACE,
748 ]);
749
750 parser
751 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
752 .map_err(ParseError::SqlParseError)?;
753 parser
754 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
755 .map_err(ParseError::SqlParseError)?;
756
757 let if_not_exists = parser.parse_keywords(&[
758 sqlparser::keywords::Keyword::IF,
759 sqlparser::keywords::Keyword::NOT,
760 sqlparser::keywords::Keyword::EXISTS,
761 ]);
762
763 let name = parser
764 .parse_object_name(false)
765 .map_err(ParseError::SqlParseError)?;
766
767 parser
768 .expect_keyword(sqlparser::keywords::Keyword::AS)
769 .map_err(ParseError::SqlParseError)?;
770
771 let remaining = collect_remaining_tokens(parser);
773 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
774
775 let mv_dialect = LaminarDialect::default();
776
777 let query = if query_tokens.is_empty() {
778 return Err(ParseError::StreamingError(
779 "Expected SELECT query after AS".to_string(),
780 ));
781 } else {
782 let mut query_parser =
783 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
784 query_parser
785 .parse_query()
786 .map_err(ParseError::SqlParseError)?
787 };
788
789 let query_stmt =
790 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
791
792 let emit_clause = if emit_tokens.is_empty() {
793 None
794 } else {
795 let mut emit_parser =
796 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
797 emit_parser::parse_emit_clause(&mut emit_parser)?
798 };
799
800 Ok(StreamingStatement::CreateMaterializedView {
801 name,
802 query: Box::new(query_stmt),
803 emit_clause,
804 or_replace,
805 if_not_exists,
806 })
807}
808
809fn collect_remaining_tokens(
811 parser: &mut sqlparser::parser::Parser,
812) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
813 let mut tokens = Vec::new();
814 loop {
815 let token = parser.next_token();
816 if token.token == sqlparser::tokenizer::Token::EOF {
817 tokens.push(token);
818 break;
819 }
820 tokens.push(token);
821 }
822 tokens
823}
824
825fn split_at_emit(
830 tokens: &[sqlparser::tokenizer::TokenWithSpan],
831) -> (
832 Vec<sqlparser::tokenizer::TokenWithSpan>,
833 Vec<sqlparser::tokenizer::TokenWithSpan>,
834) {
835 let mut depth: i32 = 0;
836 for (i, token) in tokens.iter().enumerate() {
837 match &token.token {
838 sqlparser::tokenizer::Token::LParen => depth += 1,
839 sqlparser::tokenizer::Token::RParen => {
840 depth -= 1;
841 }
842 sqlparser::tokenizer::Token::Word(w)
843 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
844 {
845 let mut query_tokens = tokens[..i].to_vec();
846 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
847 token: sqlparser::tokenizer::Token::EOF,
848 span: sqlparser::tokenizer::Span::empty(),
849 });
850 let emit_tokens = tokens[i..].to_vec();
851 return (query_tokens, emit_tokens);
852 }
853 _ => {}
854 }
855 }
856 (tokens.to_vec(), vec![])
857}
858
859#[derive(Debug, thiserror::Error)]
861pub enum ParseError {
862 #[error("SQL parse error: {0}")]
864 SqlParseError(#[from] sqlparser::parser::ParserError),
865
866 #[error("Streaming SQL error: {0}")]
868 StreamingError(String),
869
870 #[error("Window function error: {0}")]
872 WindowError(String),
873
874 #[error("Validation error: {0}")]
876 ValidationError(String),
877}
878
879#[cfg(test)]
880mod tests {
881 use super::*;
882
883 fn parse_one(sql: &str) -> StreamingStatement {
885 let stmts = StreamingParser::parse_sql(sql).unwrap();
886 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
887 stmts.into_iter().next().unwrap()
888 }
889
890 #[test]
891 fn test_parse_drop_source() {
892 let stmt = parse_one("DROP SOURCE events");
893 match stmt {
894 StreamingStatement::DropSource {
895 name,
896 if_exists,
897 cascade,
898 } => {
899 assert_eq!(name.to_string(), "events");
900 assert!(!if_exists);
901 assert!(!cascade);
902 }
903 _ => panic!("Expected DropSource, got {stmt:?}"),
904 }
905 }
906
907 #[test]
908 fn test_parse_drop_source_if_exists() {
909 let stmt = parse_one("DROP SOURCE IF EXISTS events");
910 match stmt {
911 StreamingStatement::DropSource {
912 name,
913 if_exists,
914 cascade,
915 } => {
916 assert_eq!(name.to_string(), "events");
917 assert!(if_exists);
918 assert!(!cascade);
919 }
920 _ => panic!("Expected DropSource, got {stmt:?}"),
921 }
922 }
923
924 #[test]
925 fn test_parse_drop_source_cascade() {
926 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
927 match stmt {
928 StreamingStatement::DropSource {
929 name,
930 if_exists,
931 cascade,
932 } => {
933 assert_eq!(name.to_string(), "events");
934 assert!(if_exists);
935 assert!(cascade);
936 }
937 _ => panic!("Expected DropSource, got {stmt:?}"),
938 }
939 }
940
941 #[test]
942 fn test_parse_drop_sink() {
943 let stmt = parse_one("DROP SINK output");
944 match stmt {
945 StreamingStatement::DropSink {
946 name,
947 if_exists,
948 cascade,
949 } => {
950 assert_eq!(name.to_string(), "output");
951 assert!(!if_exists);
952 assert!(!cascade);
953 }
954 _ => panic!("Expected DropSink, got {stmt:?}"),
955 }
956 }
957
958 #[test]
959 fn test_parse_drop_sink_if_exists() {
960 let stmt = parse_one("DROP SINK IF EXISTS output");
961 match stmt {
962 StreamingStatement::DropSink {
963 name,
964 if_exists,
965 cascade,
966 } => {
967 assert_eq!(name.to_string(), "output");
968 assert!(if_exists);
969 assert!(!cascade);
970 }
971 _ => panic!("Expected DropSink, got {stmt:?}"),
972 }
973 }
974
975 #[test]
976 fn test_parse_drop_sink_cascade() {
977 let stmt = parse_one("DROP SINK output CASCADE");
978 match stmt {
979 StreamingStatement::DropSink {
980 name,
981 if_exists,
982 cascade,
983 } => {
984 assert_eq!(name.to_string(), "output");
985 assert!(!if_exists);
986 assert!(cascade);
987 }
988 _ => panic!("Expected DropSink, got {stmt:?}"),
989 }
990 }
991
992 #[test]
993 fn test_parse_drop_materialized_view() {
994 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
995 match stmt {
996 StreamingStatement::DropMaterializedView {
997 name,
998 if_exists,
999 cascade,
1000 } => {
1001 assert_eq!(name.to_string(), "live_stats");
1002 assert!(!if_exists);
1003 assert!(!cascade);
1004 }
1005 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1006 }
1007 }
1008
1009 #[test]
1010 fn test_parse_drop_materialized_view_if_exists_cascade() {
1011 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1012 match stmt {
1013 StreamingStatement::DropMaterializedView {
1014 name,
1015 if_exists,
1016 cascade,
1017 } => {
1018 assert_eq!(name.to_string(), "live_stats");
1019 assert!(if_exists);
1020 assert!(cascade);
1021 }
1022 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1023 }
1024 }
1025
1026 #[test]
1027 fn test_parse_show_sources() {
1028 let stmt = parse_one("SHOW SOURCES");
1029 assert!(matches!(
1030 stmt,
1031 StreamingStatement::Show(ShowCommand::Sources)
1032 ));
1033 }
1034
1035 #[test]
1036 fn test_parse_show_sinks() {
1037 let stmt = parse_one("SHOW SINKS");
1038 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1039 }
1040
1041 #[test]
1042 fn test_parse_show_queries() {
1043 let stmt = parse_one("SHOW QUERIES");
1044 assert!(matches!(
1045 stmt,
1046 StreamingStatement::Show(ShowCommand::Queries)
1047 ));
1048 }
1049
1050 #[test]
1051 fn test_parse_show_materialized_views() {
1052 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1053 assert!(matches!(
1054 stmt,
1055 StreamingStatement::Show(ShowCommand::MaterializedViews)
1056 ));
1057 }
1058
1059 #[test]
1060 fn test_parse_describe() {
1061 let stmt = parse_one("DESCRIBE events");
1062 match stmt {
1063 StreamingStatement::Describe { name, extended } => {
1064 assert_eq!(name.to_string(), "events");
1065 assert!(!extended);
1066 }
1067 _ => panic!("Expected Describe, got {stmt:?}"),
1068 }
1069 }
1070
1071 #[test]
1072 fn test_parse_describe_extended() {
1073 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1074 match stmt {
1075 StreamingStatement::Describe { name, extended } => {
1076 assert_eq!(name.to_string(), "my_schema.events");
1077 assert!(extended);
1078 }
1079 _ => panic!("Expected Describe, got {stmt:?}"),
1080 }
1081 }
1082
1083 #[test]
1084 fn test_parse_explain_select() {
1085 let stmt = parse_one("EXPLAIN SELECT * FROM events");
1086 match stmt {
1087 StreamingStatement::Explain {
1088 statement, analyze, ..
1089 } => {
1090 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1091 assert!(!analyze);
1092 }
1093 _ => panic!("Expected Explain, got {stmt:?}"),
1094 }
1095 }
1096
1097 #[test]
1098 fn test_parse_explain_create_source() {
1099 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1100 match stmt {
1101 StreamingStatement::Explain { statement, .. } => {
1102 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1103 }
1104 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1105 }
1106 }
1107
1108 #[test]
1109 fn test_parse_explain_analyze_select() {
1110 let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1111 match stmt {
1112 StreamingStatement::Explain {
1113 statement, analyze, ..
1114 } => {
1115 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1116 assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1117 }
1118 _ => panic!("Expected Explain, got {stmt:?}"),
1119 }
1120 }
1121
1122 #[test]
1123 fn test_parse_create_materialized_view() {
1124 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1125 match stmt {
1126 StreamingStatement::CreateMaterializedView {
1127 name,
1128 emit_clause,
1129 or_replace,
1130 if_not_exists,
1131 ..
1132 } => {
1133 assert_eq!(name.to_string(), "live_stats");
1134 assert!(emit_clause.is_none());
1135 assert!(!or_replace);
1136 assert!(!if_not_exists);
1137 }
1138 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1139 }
1140 }
1141
1142 #[test]
1143 fn test_parse_create_materialized_view_with_emit() {
1144 let stmt = parse_one(
1145 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1146 );
1147 match stmt {
1148 StreamingStatement::CreateMaterializedView {
1149 name, emit_clause, ..
1150 } => {
1151 assert_eq!(name.to_string(), "live_stats");
1152 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1153 }
1154 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1155 }
1156 }
1157
1158 #[test]
1159 fn test_parse_create_or_replace_materialized_view() {
1160 let stmt = parse_one(
1161 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1162 );
1163 match stmt {
1164 StreamingStatement::CreateMaterializedView {
1165 name,
1166 or_replace,
1167 if_not_exists,
1168 ..
1169 } => {
1170 assert_eq!(name.to_string(), "live_stats");
1171 assert!(or_replace);
1172 assert!(!if_not_exists);
1173 }
1174 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1175 }
1176 }
1177
1178 #[test]
1179 fn test_parse_create_materialized_view_if_not_exists() {
1180 let stmt = parse_one(
1181 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1182 );
1183 match stmt {
1184 StreamingStatement::CreateMaterializedView {
1185 name,
1186 or_replace,
1187 if_not_exists,
1188 ..
1189 } => {
1190 assert_eq!(name.to_string(), "live_stats");
1191 assert!(!or_replace);
1192 assert!(if_not_exists);
1193 }
1194 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1195 }
1196 }
1197
1198 #[test]
1199 fn test_parse_insert_into() {
1200 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1201 match stmt {
1202 StreamingStatement::InsertInto {
1203 table_name,
1204 columns,
1205 values,
1206 } => {
1207 assert_eq!(table_name.to_string(), "events");
1208 assert_eq!(columns.len(), 2);
1209 assert_eq!(columns[0].to_string(), "id");
1210 assert_eq!(columns[1].to_string(), "name");
1211 assert_eq!(values.len(), 1);
1212 assert_eq!(values[0].len(), 2);
1213 }
1214 _ => panic!("Expected InsertInto, got {stmt:?}"),
1215 }
1216 }
1217
1218 #[test]
1219 fn test_parse_insert_into_multiple_rows() {
1220 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1221 match stmt {
1222 StreamingStatement::InsertInto {
1223 table_name,
1224 columns,
1225 values,
1226 } => {
1227 assert_eq!(table_name.to_string(), "events");
1228 assert!(columns.is_empty());
1229 assert_eq!(values.len(), 3);
1230 }
1231 _ => panic!("Expected InsertInto, got {stmt:?}"),
1232 }
1233 }
1234
1235 #[test]
1238 fn test_parse_create_stream() {
1239 let stmt = parse_one(
1240 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1241 );
1242 match stmt {
1243 StreamingStatement::CreateStream {
1244 name,
1245 or_replace,
1246 if_not_exists,
1247 emit_clause,
1248 ..
1249 } => {
1250 assert_eq!(name.to_string(), "session_activity");
1251 assert!(!or_replace);
1252 assert!(!if_not_exists);
1253 assert!(emit_clause.is_none());
1254 }
1255 _ => panic!("Expected CreateStream, got {stmt:?}"),
1256 }
1257 }
1258
1259 #[test]
1260 fn test_parse_create_or_replace_stream() {
1261 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1262 match stmt {
1263 StreamingStatement::CreateStream { or_replace, .. } => {
1264 assert!(or_replace);
1265 }
1266 _ => panic!("Expected CreateStream, got {stmt:?}"),
1267 }
1268 }
1269
1270 #[test]
1271 fn test_parse_create_stream_if_not_exists() {
1272 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1273 match stmt {
1274 StreamingStatement::CreateStream { if_not_exists, .. } => {
1275 assert!(if_not_exists);
1276 }
1277 _ => panic!("Expected CreateStream, got {stmt:?}"),
1278 }
1279 }
1280
1281 #[test]
1282 fn test_parse_create_stream_with_emit() {
1283 let stmt =
1284 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1285 match stmt {
1286 StreamingStatement::CreateStream { emit_clause, .. } => {
1287 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1288 }
1289 _ => panic!("Expected CreateStream, got {stmt:?}"),
1290 }
1291 }
1292
1293 #[test]
1294 fn test_parse_drop_stream() {
1295 let stmt = parse_one("DROP STREAM my_stream");
1296 match stmt {
1297 StreamingStatement::DropStream {
1298 name,
1299 if_exists,
1300 cascade,
1301 } => {
1302 assert_eq!(name.to_string(), "my_stream");
1303 assert!(!if_exists);
1304 assert!(!cascade);
1305 }
1306 _ => panic!("Expected DropStream, got {stmt:?}"),
1307 }
1308 }
1309
1310 #[test]
1311 fn test_parse_drop_stream_if_exists() {
1312 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1313 match stmt {
1314 StreamingStatement::DropStream {
1315 name,
1316 if_exists,
1317 cascade,
1318 } => {
1319 assert_eq!(name.to_string(), "my_stream");
1320 assert!(if_exists);
1321 assert!(!cascade);
1322 }
1323 _ => panic!("Expected DropStream, got {stmt:?}"),
1324 }
1325 }
1326
1327 #[test]
1328 fn test_parse_drop_stream_cascade() {
1329 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1330 match stmt {
1331 StreamingStatement::DropStream {
1332 name,
1333 if_exists,
1334 cascade,
1335 } => {
1336 assert_eq!(name.to_string(), "my_stream");
1337 assert!(!if_exists);
1338 assert!(cascade);
1339 }
1340 _ => panic!("Expected DropStream, got {stmt:?}"),
1341 }
1342 }
1343
1344 #[test]
1345 fn test_parse_show_streams() {
1346 let stmt = parse_one("SHOW STREAMS");
1347 assert!(matches!(
1348 stmt,
1349 StreamingStatement::Show(ShowCommand::Streams)
1350 ));
1351 }
1352
1353 #[test]
1354 fn test_parse_alter_source_add_column() {
1355 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1356 match stmt {
1357 StreamingStatement::AlterSource { name, operation } => {
1358 assert_eq!(name.to_string(), "events");
1359 match operation {
1360 statements::AlterSourceOperation::AddColumn { column_def } => {
1361 assert_eq!(column_def.name.value, "new_col");
1362 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1363 }
1364 statements::AlterSourceOperation::SetProperties { .. } => {
1365 panic!("Expected AddColumn")
1366 }
1367 }
1368 }
1369 _ => panic!("Expected AlterSource, got {stmt:?}"),
1370 }
1371 }
1372
1373 #[test]
1374 fn test_parse_alter_source_set_properties() {
1375 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1376 match stmt {
1377 StreamingStatement::AlterSource { name, operation } => {
1378 assert_eq!(name.to_string(), "events");
1379 match operation {
1380 statements::AlterSourceOperation::SetProperties { properties } => {
1381 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1382 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1383 }
1384 statements::AlterSourceOperation::AddColumn { .. } => {
1385 panic!("Expected SetProperties")
1386 }
1387 }
1388 }
1389 _ => panic!("Expected AlterSource, got {stmt:?}"),
1390 }
1391 }
1392
1393 #[test]
1394 fn test_parse_checkpoint() {
1395 let stmt = parse_one("CHECKPOINT");
1396 assert!(
1397 matches!(stmt, StreamingStatement::Checkpoint),
1398 "Expected Checkpoint, got {stmt:?}"
1399 );
1400 }
1401
1402 #[test]
1403 fn test_parse_show_checkpoint_status() {
1404 let stmt = parse_one("SHOW CHECKPOINT STATUS");
1405 assert!(
1406 matches!(
1407 stmt,
1408 StreamingStatement::Show(ShowCommand::CheckpointStatus)
1409 ),
1410 "Expected Show(CheckpointStatus), got {stmt:?}"
1411 );
1412 }
1413
1414 #[test]
1415 fn test_parse_restore_checkpoint() {
1416 let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1417 match stmt {
1418 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1419 assert_eq!(checkpoint_id, 42);
1420 }
1421 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1422 }
1423 }
1424
1425 #[test]
1426 fn test_parse_restore_checkpoint_large_id() {
1427 let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1428 match stmt {
1429 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1430 assert_eq!(checkpoint_id, 123_456);
1431 }
1432 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1433 }
1434 }
1435
1436 #[test]
1437 fn test_parse_show_create_source() {
1438 let stmt = parse_one("SHOW CREATE SOURCE events");
1439 match stmt {
1440 StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1441 assert_eq!(name.to_string(), "events");
1442 }
1443 _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1444 }
1445 }
1446
1447 #[test]
1448 fn test_parse_show_create_sink() {
1449 let stmt = parse_one("SHOW CREATE SINK output");
1450 match stmt {
1451 StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1452 assert_eq!(name.to_string(), "output");
1453 }
1454 _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1455 }
1456 }
1457}