1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod join_parser;
13mod late_data_parser;
14pub mod lookup_table;
16pub mod order_analyzer;
17mod sink_parser;
18mod source_parser;
19mod statements;
20mod tokenizer;
21mod window_rewriter;
22
23pub use lookup_table::CreateLookupTableStatement;
24pub use statements::{
25 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
26 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
27 WindowFunction,
28};
29pub use window_rewriter::WindowRewriter;
30
31use dialect::LaminarDialect;
32use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
33
34pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
43 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
44}
45
46pub struct StreamingParser;
52
53impl StreamingParser {
54 #[allow(clippy::too_many_lines)]
67 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
68 let sql_trimmed = sql.trim();
69 if sql_trimmed.is_empty() {
70 return Err(sqlparser::parser::ParserError::ParserError(
71 "Empty SQL statement".to_string(),
72 ));
73 }
74
75 let dialect = LaminarDialect::default();
76
77 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
79 .tokenize_with_location()
80 .map_err(|e| {
81 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
82 })?;
83
84 match detect_streaming_ddl(&tokens) {
86 StreamingDdlKind::CreateSource { .. } => {
87 let mut parser =
88 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
89 let source = source_parser::parse_create_source(&mut parser)
90 .map_err(parse_error_to_parser_error)?;
91 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
92 }
93 StreamingDdlKind::CreateSink { .. } => {
94 let mut parser =
95 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
96 let sink = sink_parser::parse_create_sink(&mut parser)
97 .map_err(parse_error_to_parser_error)?;
98 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
99 }
100 StreamingDdlKind::CreateContinuousQuery { .. } => {
101 let mut parser =
102 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
103 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
104 .map_err(parse_error_to_parser_error)?;
105 Ok(vec![stmt])
106 }
107 StreamingDdlKind::DropSource { .. } => {
108 let mut parser =
109 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
110 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
111 Ok(vec![stmt])
112 }
113 StreamingDdlKind::DropSink { .. } => {
114 let mut parser =
115 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
116 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
117 Ok(vec![stmt])
118 }
119 StreamingDdlKind::DropMaterializedView { .. } => {
120 let mut parser =
121 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
122 let stmt = parse_drop_materialized_view(&mut parser)
123 .map_err(parse_error_to_parser_error)?;
124 Ok(vec![stmt])
125 }
126 StreamingDdlKind::ShowSources => {
127 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
128 }
129 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
130 StreamingDdlKind::ShowQueries => {
131 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
132 }
133 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
134 ShowCommand::MaterializedViews,
135 )]),
136 StreamingDdlKind::DescribeSource => {
137 let mut parser =
138 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
139 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
140 Ok(vec![stmt])
141 }
142 StreamingDdlKind::ExplainStreaming => {
143 let mut parser =
144 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
145 let stmt =
146 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
147 Ok(vec![stmt])
148 }
149 StreamingDdlKind::CreateMaterializedView { .. } => {
150 let mut parser =
151 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
152 let stmt = parse_create_materialized_view(&mut parser)
153 .map_err(parse_error_to_parser_error)?;
154 Ok(vec![stmt])
155 }
156 StreamingDdlKind::CreateStream { .. } => {
157 let mut parser =
158 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
159 let stmt = parse_create_stream(&mut parser, sql_trimmed)
160 .map_err(parse_error_to_parser_error)?;
161 Ok(vec![stmt])
162 }
163 StreamingDdlKind::DropStream { .. } => {
164 let mut parser =
165 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
166 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
167 Ok(vec![stmt])
168 }
169 StreamingDdlKind::ShowStreams => {
170 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
171 }
172 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
173 StreamingDdlKind::CreateLookupTable { .. } => {
174 let mut parser =
175 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
176 let lt = lookup_table::parse_create_lookup_table(&mut parser)
177 .map_err(parse_error_to_parser_error)?;
178 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
179 }
180 StreamingDdlKind::DropLookupTable { .. } => {
181 let mut parser =
182 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
183 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
184 .map_err(parse_error_to_parser_error)?;
185 Ok(vec![StreamingStatement::DropLookupTable {
186 name,
187 if_exists,
188 }])
189 }
190 StreamingDdlKind::AlterSource => {
191 let mut parser =
192 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
193 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
194 Ok(vec![stmt])
195 }
196 StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
197 ShowCommand::CheckpointStatus,
198 )]),
199 StreamingDdlKind::ShowCreateSource => {
200 let mut parser =
201 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
202 let stmt =
203 parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
204 Ok(vec![stmt])
205 }
206 StreamingDdlKind::ShowCreateSink => {
207 let mut parser =
208 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
209 let stmt =
210 parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
211 Ok(vec![stmt])
212 }
213 StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
214 StreamingDdlKind::RestoreCheckpoint => {
215 let mut parser =
216 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
217 let stmt =
218 parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
219 Ok(vec![stmt])
220 }
221 StreamingDdlKind::None => {
222 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
224 Ok(statements
225 .into_iter()
226 .map(convert_standard_statement)
227 .collect())
228 }
229 }
230 }
231
232 #[must_use]
234 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
235 match expr {
236 sqlparser::ast::Expr::Function(func) => {
237 if let Some(name) = func.name.0.last() {
238 let func_name = name.to_string().to_uppercase();
239 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
240 } else {
241 false
242 }
243 }
244 _ => false,
245 }
246 }
247
248 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
254 emit_parser::parse_emit_clause_from_sql(sql)
255 }
256
257 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
263 late_data_parser::parse_late_data_clause_from_sql(sql)
264 }
265}
266
267fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
269 match e {
270 ParseError::SqlParseError(pe) => pe,
271 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
272 ParseError::WindowError(msg) => {
273 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
274 }
275 ParseError::ValidationError(msg) => {
276 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
277 }
278 }
279}
280
281fn parse_restore_checkpoint(
289 parser: &mut sqlparser::parser::Parser,
290) -> Result<StreamingStatement, ParseError> {
291 tokenizer::expect_custom_keyword(parser, "RESTORE")?;
293 if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
295 return Err(ParseError::StreamingError(
296 "Expected FROM after RESTORE".to_string(),
297 ));
298 }
299 tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
301 let token = parser.next_token();
303 match &token.token {
304 sqlparser::tokenizer::Token::Number(n, _) => {
305 let id: u64 = n
306 .parse()
307 .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
308 Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
309 }
310 other => Err(ParseError::StreamingError(format!(
311 "Expected checkpoint ID (number), found {other}"
312 ))),
313 }
314}
315
316fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
321 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
322 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
324 let table_name = name.clone();
325 let columns = insert.columns.clone();
326
327 if let Some(ref source) = insert.source {
329 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
330 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
331 return StreamingStatement::InsertInto {
332 table_name,
333 columns,
334 values: rows,
335 };
336 }
337 }
338 }
339 }
340 StreamingStatement::Standard(Box::new(stmt))
341}
342
343fn parse_drop_source(
351 parser: &mut sqlparser::parser::Parser,
352) -> Result<StreamingStatement, ParseError> {
353 parser
354 .expect_keyword(sqlparser::keywords::Keyword::DROP)
355 .map_err(ParseError::SqlParseError)?;
356 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
357 let if_exists = parser.parse_keywords(&[
358 sqlparser::keywords::Keyword::IF,
359 sqlparser::keywords::Keyword::EXISTS,
360 ]);
361 let name = parser
362 .parse_object_name(false)
363 .map_err(ParseError::SqlParseError)?;
364 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
365 Ok(StreamingStatement::DropSource {
366 name,
367 if_exists,
368 cascade,
369 })
370}
371
372fn parse_drop_sink(
380 parser: &mut sqlparser::parser::Parser,
381) -> Result<StreamingStatement, ParseError> {
382 parser
383 .expect_keyword(sqlparser::keywords::Keyword::DROP)
384 .map_err(ParseError::SqlParseError)?;
385 tokenizer::expect_custom_keyword(parser, "SINK")?;
386 let if_exists = parser.parse_keywords(&[
387 sqlparser::keywords::Keyword::IF,
388 sqlparser::keywords::Keyword::EXISTS,
389 ]);
390 let name = parser
391 .parse_object_name(false)
392 .map_err(ParseError::SqlParseError)?;
393 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
394 Ok(StreamingStatement::DropSink {
395 name,
396 if_exists,
397 cascade,
398 })
399}
400
401fn parse_drop_materialized_view(
409 parser: &mut sqlparser::parser::Parser,
410) -> Result<StreamingStatement, ParseError> {
411 parser
412 .expect_keyword(sqlparser::keywords::Keyword::DROP)
413 .map_err(ParseError::SqlParseError)?;
414 parser
415 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
416 .map_err(ParseError::SqlParseError)?;
417 parser
418 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
419 .map_err(ParseError::SqlParseError)?;
420 let if_exists = parser.parse_keywords(&[
421 sqlparser::keywords::Keyword::IF,
422 sqlparser::keywords::Keyword::EXISTS,
423 ]);
424 let name = parser
425 .parse_object_name(false)
426 .map_err(ParseError::SqlParseError)?;
427 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
428 Ok(StreamingStatement::DropMaterializedView {
429 name,
430 if_exists,
431 cascade,
432 })
433}
434
435fn parse_create_stream(
443 parser: &mut sqlparser::parser::Parser,
444 _original_sql: &str,
445) -> Result<StreamingStatement, ParseError> {
446 parser
447 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
448 .map_err(ParseError::SqlParseError)?;
449
450 let or_replace = parser.parse_keywords(&[
451 sqlparser::keywords::Keyword::OR,
452 sqlparser::keywords::Keyword::REPLACE,
453 ]);
454
455 tokenizer::expect_custom_keyword(parser, "STREAM")?;
456
457 let if_not_exists = parser.parse_keywords(&[
458 sqlparser::keywords::Keyword::IF,
459 sqlparser::keywords::Keyword::NOT,
460 sqlparser::keywords::Keyword::EXISTS,
461 ]);
462
463 let name = parser
464 .parse_object_name(false)
465 .map_err(ParseError::SqlParseError)?;
466
467 parser
468 .expect_keyword(sqlparser::keywords::Keyword::AS)
469 .map_err(ParseError::SqlParseError)?;
470
471 let remaining = collect_remaining_tokens(parser);
473 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
474
475 let stream_dialect = LaminarDialect::default();
476
477 let query = if query_tokens.is_empty() {
478 return Err(ParseError::StreamingError(
479 "Expected SELECT query after AS".to_string(),
480 ));
481 } else {
482 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
483 .with_tokens_with_locations(query_tokens);
484 query_parser
485 .parse_query()
486 .map_err(ParseError::SqlParseError)?
487 };
488
489 let query_stmt =
490 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
491
492 let emit_clause = if emit_tokens.is_empty() {
493 None
494 } else {
495 let mut emit_parser =
496 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
497 emit_parser::parse_emit_clause(&mut emit_parser)?
498 };
499
500 Ok(StreamingStatement::CreateStream {
501 name,
502 query: Box::new(query_stmt),
503 emit_clause,
504 or_replace,
505 if_not_exists,
506 })
507}
508
509fn parse_drop_stream(
517 parser: &mut sqlparser::parser::Parser,
518) -> Result<StreamingStatement, ParseError> {
519 parser
520 .expect_keyword(sqlparser::keywords::Keyword::DROP)
521 .map_err(ParseError::SqlParseError)?;
522 tokenizer::expect_custom_keyword(parser, "STREAM")?;
523 let if_exists = parser.parse_keywords(&[
524 sqlparser::keywords::Keyword::IF,
525 sqlparser::keywords::Keyword::EXISTS,
526 ]);
527 let name = parser
528 .parse_object_name(false)
529 .map_err(ParseError::SqlParseError)?;
530 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
531 Ok(StreamingStatement::DropStream {
532 name,
533 if_exists,
534 cascade,
535 })
536}
537
538fn parse_alter_source(
551 parser: &mut sqlparser::parser::Parser,
552) -> Result<StreamingStatement, ParseError> {
553 parser
554 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
555 .map_err(ParseError::SqlParseError)?;
556 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
557 let name = parser
558 .parse_object_name(false)
559 .map_err(ParseError::SqlParseError)?;
560
561 if parser.parse_keywords(&[
563 sqlparser::keywords::Keyword::ADD,
564 sqlparser::keywords::Keyword::COLUMN,
565 ]) {
566 let col_name = parser
568 .parse_identifier()
569 .map_err(ParseError::SqlParseError)?;
570 let data_type = parser
571 .parse_data_type()
572 .map_err(ParseError::SqlParseError)?;
573 let column_def = sqlparser::ast::ColumnDef {
574 name: col_name,
575 data_type,
576 options: vec![],
577 };
578 Ok(StreamingStatement::AlterSource {
579 name,
580 operation: statements::AlterSourceOperation::AddColumn { column_def },
581 })
582 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
583 parser
585 .expect_token(&sqlparser::tokenizer::Token::LParen)
586 .map_err(ParseError::SqlParseError)?;
587 #[allow(clippy::disallowed_types)] let mut properties = std::collections::HashMap::new();
589 loop {
590 let key = parser
591 .parse_literal_string()
592 .map_err(ParseError::SqlParseError)?;
593 parser
594 .expect_token(&sqlparser::tokenizer::Token::Eq)
595 .map_err(ParseError::SqlParseError)?;
596 let value = parser
597 .parse_literal_string()
598 .map_err(ParseError::SqlParseError)?;
599 properties.insert(key, value);
600 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
601 break;
602 }
603 }
604 parser
605 .expect_token(&sqlparser::tokenizer::Token::RParen)
606 .map_err(ParseError::SqlParseError)?;
607 Ok(StreamingStatement::AlterSource {
608 name,
609 operation: statements::AlterSourceOperation::SetProperties { properties },
610 })
611 } else {
612 Err(ParseError::StreamingError(
613 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
614 ))
615 }
616}
617
618fn parse_describe(
624 parser: &mut sqlparser::parser::Parser,
625) -> Result<StreamingStatement, ParseError> {
626 let token = parser.next_token();
628 match &token.token {
629 sqlparser::tokenizer::Token::Word(w)
630 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
631 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
632 _ => {
633 return Err(ParseError::StreamingError(
634 "Expected DESCRIBE or DESC".to_string(),
635 ));
636 }
637 }
638 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
639 let name = parser
640 .parse_object_name(false)
641 .map_err(ParseError::SqlParseError)?;
642 Ok(StreamingStatement::Describe { name, extended })
643}
644
645fn parse_show_create_source(
647 parser: &mut sqlparser::parser::Parser,
648) -> Result<StreamingStatement, ParseError> {
649 parser
651 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
652 .map_err(ParseError::SqlParseError)?;
653 parser
654 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
655 .map_err(ParseError::SqlParseError)?;
656 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
657 let name = parser
658 .parse_object_name(false)
659 .map_err(ParseError::SqlParseError)?;
660 Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
661}
662
663fn parse_show_create_sink(
665 parser: &mut sqlparser::parser::Parser,
666) -> Result<StreamingStatement, ParseError> {
667 parser
669 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
670 .map_err(ParseError::SqlParseError)?;
671 parser
672 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
673 .map_err(ParseError::SqlParseError)?;
674 tokenizer::expect_custom_keyword(parser, "SINK")?;
675 let name = parser
676 .parse_object_name(false)
677 .map_err(ParseError::SqlParseError)?;
678 Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
679}
680
681fn parse_explain(
689 parser: &mut sqlparser::parser::Parser,
690 original_sql: &str,
691) -> Result<StreamingStatement, ParseError> {
692 parser
693 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
694 .map_err(ParseError::SqlParseError)?;
695
696 let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
698
699 let explain_prefix_upper = original_sql.to_uppercase();
701 let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
702 let inner_start = if analyze {
703 explain_prefix_upper
704 .find("ANALYZE")
705 .map_or(0, |pos| pos + "ANALYZE".len())
706 } else {
707 explain_prefix_upper
708 .find("EXPLAIN")
709 .map_or(0, |pos| pos + "EXPLAIN".len())
710 };
711 let inner_sql = original_sql[inner_start..].trim();
712 let _ = skip_keyword; let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
716 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
717 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
718 })?;
719 Ok(StreamingStatement::Explain {
720 statement: Box::new(inner),
721 analyze,
722 })
723}
724
725fn parse_create_materialized_view(
738 parser: &mut sqlparser::parser::Parser,
739) -> Result<StreamingStatement, ParseError> {
740 parser
741 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
742 .map_err(ParseError::SqlParseError)?;
743
744 let or_replace = parser.parse_keywords(&[
745 sqlparser::keywords::Keyword::OR,
746 sqlparser::keywords::Keyword::REPLACE,
747 ]);
748
749 parser
750 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
751 .map_err(ParseError::SqlParseError)?;
752 parser
753 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
754 .map_err(ParseError::SqlParseError)?;
755
756 let if_not_exists = parser.parse_keywords(&[
757 sqlparser::keywords::Keyword::IF,
758 sqlparser::keywords::Keyword::NOT,
759 sqlparser::keywords::Keyword::EXISTS,
760 ]);
761
762 let name = parser
763 .parse_object_name(false)
764 .map_err(ParseError::SqlParseError)?;
765
766 parser
767 .expect_keyword(sqlparser::keywords::Keyword::AS)
768 .map_err(ParseError::SqlParseError)?;
769
770 let remaining = collect_remaining_tokens(parser);
772 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
773
774 let mv_dialect = LaminarDialect::default();
775
776 let query = if query_tokens.is_empty() {
777 return Err(ParseError::StreamingError(
778 "Expected SELECT query after AS".to_string(),
779 ));
780 } else {
781 let mut query_parser =
782 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
783 query_parser
784 .parse_query()
785 .map_err(ParseError::SqlParseError)?
786 };
787
788 let query_stmt =
789 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
790
791 let emit_clause = if emit_tokens.is_empty() {
792 None
793 } else {
794 let mut emit_parser =
795 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
796 emit_parser::parse_emit_clause(&mut emit_parser)?
797 };
798
799 Ok(StreamingStatement::CreateMaterializedView {
800 name,
801 query: Box::new(query_stmt),
802 emit_clause,
803 or_replace,
804 if_not_exists,
805 })
806}
807
808fn collect_remaining_tokens(
810 parser: &mut sqlparser::parser::Parser,
811) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
812 let mut tokens = Vec::new();
813 loop {
814 let token = parser.next_token();
815 if token.token == sqlparser::tokenizer::Token::EOF {
816 tokens.push(token);
817 break;
818 }
819 tokens.push(token);
820 }
821 tokens
822}
823
824fn split_at_emit(
829 tokens: &[sqlparser::tokenizer::TokenWithSpan],
830) -> (
831 Vec<sqlparser::tokenizer::TokenWithSpan>,
832 Vec<sqlparser::tokenizer::TokenWithSpan>,
833) {
834 let mut depth: i32 = 0;
835 for (i, token) in tokens.iter().enumerate() {
836 match &token.token {
837 sqlparser::tokenizer::Token::LParen => depth += 1,
838 sqlparser::tokenizer::Token::RParen => {
839 depth -= 1;
840 }
841 sqlparser::tokenizer::Token::Word(w)
842 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
843 {
844 let mut query_tokens = tokens[..i].to_vec();
845 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
846 token: sqlparser::tokenizer::Token::EOF,
847 span: sqlparser::tokenizer::Span::empty(),
848 });
849 let emit_tokens = tokens[i..].to_vec();
850 return (query_tokens, emit_tokens);
851 }
852 _ => {}
853 }
854 }
855 (tokens.to_vec(), vec![])
856}
857
858#[derive(Debug, thiserror::Error)]
860pub enum ParseError {
861 #[error("SQL parse error: {0}")]
863 SqlParseError(#[from] sqlparser::parser::ParserError),
864
865 #[error("Streaming SQL error: {0}")]
867 StreamingError(String),
868
869 #[error("Window function error: {0}")]
871 WindowError(String),
872
873 #[error("Validation error: {0}")]
875 ValidationError(String),
876}
877
878#[cfg(test)]
879mod tests {
880 use super::*;
881
882 fn parse_one(sql: &str) -> StreamingStatement {
884 let stmts = StreamingParser::parse_sql(sql).unwrap();
885 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
886 stmts.into_iter().next().unwrap()
887 }
888
889 #[test]
890 fn test_parse_drop_source() {
891 let stmt = parse_one("DROP SOURCE events");
892 match stmt {
893 StreamingStatement::DropSource {
894 name,
895 if_exists,
896 cascade,
897 } => {
898 assert_eq!(name.to_string(), "events");
899 assert!(!if_exists);
900 assert!(!cascade);
901 }
902 _ => panic!("Expected DropSource, got {stmt:?}"),
903 }
904 }
905
906 #[test]
907 fn test_parse_drop_source_if_exists() {
908 let stmt = parse_one("DROP SOURCE IF EXISTS events");
909 match stmt {
910 StreamingStatement::DropSource {
911 name,
912 if_exists,
913 cascade,
914 } => {
915 assert_eq!(name.to_string(), "events");
916 assert!(if_exists);
917 assert!(!cascade);
918 }
919 _ => panic!("Expected DropSource, got {stmt:?}"),
920 }
921 }
922
923 #[test]
924 fn test_parse_drop_source_cascade() {
925 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
926 match stmt {
927 StreamingStatement::DropSource {
928 name,
929 if_exists,
930 cascade,
931 } => {
932 assert_eq!(name.to_string(), "events");
933 assert!(if_exists);
934 assert!(cascade);
935 }
936 _ => panic!("Expected DropSource, got {stmt:?}"),
937 }
938 }
939
940 #[test]
941 fn test_parse_drop_sink() {
942 let stmt = parse_one("DROP SINK output");
943 match stmt {
944 StreamingStatement::DropSink {
945 name,
946 if_exists,
947 cascade,
948 } => {
949 assert_eq!(name.to_string(), "output");
950 assert!(!if_exists);
951 assert!(!cascade);
952 }
953 _ => panic!("Expected DropSink, got {stmt:?}"),
954 }
955 }
956
957 #[test]
958 fn test_parse_drop_sink_if_exists() {
959 let stmt = parse_one("DROP SINK IF EXISTS output");
960 match stmt {
961 StreamingStatement::DropSink {
962 name,
963 if_exists,
964 cascade,
965 } => {
966 assert_eq!(name.to_string(), "output");
967 assert!(if_exists);
968 assert!(!cascade);
969 }
970 _ => panic!("Expected DropSink, got {stmt:?}"),
971 }
972 }
973
974 #[test]
975 fn test_parse_drop_sink_cascade() {
976 let stmt = parse_one("DROP SINK output CASCADE");
977 match stmt {
978 StreamingStatement::DropSink {
979 name,
980 if_exists,
981 cascade,
982 } => {
983 assert_eq!(name.to_string(), "output");
984 assert!(!if_exists);
985 assert!(cascade);
986 }
987 _ => panic!("Expected DropSink, got {stmt:?}"),
988 }
989 }
990
991 #[test]
992 fn test_parse_drop_materialized_view() {
993 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
994 match stmt {
995 StreamingStatement::DropMaterializedView {
996 name,
997 if_exists,
998 cascade,
999 } => {
1000 assert_eq!(name.to_string(), "live_stats");
1001 assert!(!if_exists);
1002 assert!(!cascade);
1003 }
1004 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1005 }
1006 }
1007
1008 #[test]
1009 fn test_parse_drop_materialized_view_if_exists_cascade() {
1010 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1011 match stmt {
1012 StreamingStatement::DropMaterializedView {
1013 name,
1014 if_exists,
1015 cascade,
1016 } => {
1017 assert_eq!(name.to_string(), "live_stats");
1018 assert!(if_exists);
1019 assert!(cascade);
1020 }
1021 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1022 }
1023 }
1024
1025 #[test]
1026 fn test_parse_show_sources() {
1027 let stmt = parse_one("SHOW SOURCES");
1028 assert!(matches!(
1029 stmt,
1030 StreamingStatement::Show(ShowCommand::Sources)
1031 ));
1032 }
1033
1034 #[test]
1035 fn test_parse_show_sinks() {
1036 let stmt = parse_one("SHOW SINKS");
1037 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1038 }
1039
1040 #[test]
1041 fn test_parse_show_queries() {
1042 let stmt = parse_one("SHOW QUERIES");
1043 assert!(matches!(
1044 stmt,
1045 StreamingStatement::Show(ShowCommand::Queries)
1046 ));
1047 }
1048
1049 #[test]
1050 fn test_parse_show_materialized_views() {
1051 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1052 assert!(matches!(
1053 stmt,
1054 StreamingStatement::Show(ShowCommand::MaterializedViews)
1055 ));
1056 }
1057
1058 #[test]
1059 fn test_parse_describe() {
1060 let stmt = parse_one("DESCRIBE events");
1061 match stmt {
1062 StreamingStatement::Describe { name, extended } => {
1063 assert_eq!(name.to_string(), "events");
1064 assert!(!extended);
1065 }
1066 _ => panic!("Expected Describe, got {stmt:?}"),
1067 }
1068 }
1069
1070 #[test]
1071 fn test_parse_describe_extended() {
1072 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1073 match stmt {
1074 StreamingStatement::Describe { name, extended } => {
1075 assert_eq!(name.to_string(), "my_schema.events");
1076 assert!(extended);
1077 }
1078 _ => panic!("Expected Describe, got {stmt:?}"),
1079 }
1080 }
1081
1082 #[test]
1083 fn test_parse_explain_select() {
1084 let stmt = parse_one("EXPLAIN SELECT * FROM events");
1085 match stmt {
1086 StreamingStatement::Explain {
1087 statement, analyze, ..
1088 } => {
1089 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1090 assert!(!analyze);
1091 }
1092 _ => panic!("Expected Explain, got {stmt:?}"),
1093 }
1094 }
1095
1096 #[test]
1097 fn test_parse_explain_create_source() {
1098 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1099 match stmt {
1100 StreamingStatement::Explain { statement, .. } => {
1101 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1102 }
1103 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1104 }
1105 }
1106
1107 #[test]
1108 fn test_parse_explain_analyze_select() {
1109 let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1110 match stmt {
1111 StreamingStatement::Explain {
1112 statement, analyze, ..
1113 } => {
1114 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1115 assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1116 }
1117 _ => panic!("Expected Explain, got {stmt:?}"),
1118 }
1119 }
1120
1121 #[test]
1122 fn test_parse_create_materialized_view() {
1123 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1124 match stmt {
1125 StreamingStatement::CreateMaterializedView {
1126 name,
1127 emit_clause,
1128 or_replace,
1129 if_not_exists,
1130 ..
1131 } => {
1132 assert_eq!(name.to_string(), "live_stats");
1133 assert!(emit_clause.is_none());
1134 assert!(!or_replace);
1135 assert!(!if_not_exists);
1136 }
1137 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1138 }
1139 }
1140
1141 #[test]
1142 fn test_parse_create_materialized_view_with_emit() {
1143 let stmt = parse_one(
1144 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1145 );
1146 match stmt {
1147 StreamingStatement::CreateMaterializedView {
1148 name, emit_clause, ..
1149 } => {
1150 assert_eq!(name.to_string(), "live_stats");
1151 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1152 }
1153 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1154 }
1155 }
1156
1157 #[test]
1158 fn test_parse_create_or_replace_materialized_view() {
1159 let stmt = parse_one(
1160 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1161 );
1162 match stmt {
1163 StreamingStatement::CreateMaterializedView {
1164 name,
1165 or_replace,
1166 if_not_exists,
1167 ..
1168 } => {
1169 assert_eq!(name.to_string(), "live_stats");
1170 assert!(or_replace);
1171 assert!(!if_not_exists);
1172 }
1173 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1174 }
1175 }
1176
1177 #[test]
1178 fn test_parse_create_materialized_view_if_not_exists() {
1179 let stmt = parse_one(
1180 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1181 );
1182 match stmt {
1183 StreamingStatement::CreateMaterializedView {
1184 name,
1185 or_replace,
1186 if_not_exists,
1187 ..
1188 } => {
1189 assert_eq!(name.to_string(), "live_stats");
1190 assert!(!or_replace);
1191 assert!(if_not_exists);
1192 }
1193 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1194 }
1195 }
1196
1197 #[test]
1198 fn test_parse_insert_into() {
1199 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1200 match stmt {
1201 StreamingStatement::InsertInto {
1202 table_name,
1203 columns,
1204 values,
1205 } => {
1206 assert_eq!(table_name.to_string(), "events");
1207 assert_eq!(columns.len(), 2);
1208 assert_eq!(columns[0].to_string(), "id");
1209 assert_eq!(columns[1].to_string(), "name");
1210 assert_eq!(values.len(), 1);
1211 assert_eq!(values[0].len(), 2);
1212 }
1213 _ => panic!("Expected InsertInto, got {stmt:?}"),
1214 }
1215 }
1216
1217 #[test]
1218 fn test_parse_insert_into_multiple_rows() {
1219 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1220 match stmt {
1221 StreamingStatement::InsertInto {
1222 table_name,
1223 columns,
1224 values,
1225 } => {
1226 assert_eq!(table_name.to_string(), "events");
1227 assert!(columns.is_empty());
1228 assert_eq!(values.len(), 3);
1229 }
1230 _ => panic!("Expected InsertInto, got {stmt:?}"),
1231 }
1232 }
1233
1234 #[test]
1237 fn test_parse_create_stream() {
1238 let stmt = parse_one(
1239 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1240 );
1241 match stmt {
1242 StreamingStatement::CreateStream {
1243 name,
1244 or_replace,
1245 if_not_exists,
1246 emit_clause,
1247 ..
1248 } => {
1249 assert_eq!(name.to_string(), "session_activity");
1250 assert!(!or_replace);
1251 assert!(!if_not_exists);
1252 assert!(emit_clause.is_none());
1253 }
1254 _ => panic!("Expected CreateStream, got {stmt:?}"),
1255 }
1256 }
1257
1258 #[test]
1259 fn test_parse_create_or_replace_stream() {
1260 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1261 match stmt {
1262 StreamingStatement::CreateStream { or_replace, .. } => {
1263 assert!(or_replace);
1264 }
1265 _ => panic!("Expected CreateStream, got {stmt:?}"),
1266 }
1267 }
1268
1269 #[test]
1270 fn test_parse_create_stream_if_not_exists() {
1271 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1272 match stmt {
1273 StreamingStatement::CreateStream { if_not_exists, .. } => {
1274 assert!(if_not_exists);
1275 }
1276 _ => panic!("Expected CreateStream, got {stmt:?}"),
1277 }
1278 }
1279
1280 #[test]
1281 fn test_parse_create_stream_with_emit() {
1282 let stmt =
1283 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1284 match stmt {
1285 StreamingStatement::CreateStream { emit_clause, .. } => {
1286 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1287 }
1288 _ => panic!("Expected CreateStream, got {stmt:?}"),
1289 }
1290 }
1291
1292 #[test]
1293 fn test_parse_drop_stream() {
1294 let stmt = parse_one("DROP STREAM my_stream");
1295 match stmt {
1296 StreamingStatement::DropStream {
1297 name,
1298 if_exists,
1299 cascade,
1300 } => {
1301 assert_eq!(name.to_string(), "my_stream");
1302 assert!(!if_exists);
1303 assert!(!cascade);
1304 }
1305 _ => panic!("Expected DropStream, got {stmt:?}"),
1306 }
1307 }
1308
1309 #[test]
1310 fn test_parse_drop_stream_if_exists() {
1311 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1312 match stmt {
1313 StreamingStatement::DropStream {
1314 name,
1315 if_exists,
1316 cascade,
1317 } => {
1318 assert_eq!(name.to_string(), "my_stream");
1319 assert!(if_exists);
1320 assert!(!cascade);
1321 }
1322 _ => panic!("Expected DropStream, got {stmt:?}"),
1323 }
1324 }
1325
1326 #[test]
1327 fn test_parse_drop_stream_cascade() {
1328 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1329 match stmt {
1330 StreamingStatement::DropStream {
1331 name,
1332 if_exists,
1333 cascade,
1334 } => {
1335 assert_eq!(name.to_string(), "my_stream");
1336 assert!(!if_exists);
1337 assert!(cascade);
1338 }
1339 _ => panic!("Expected DropStream, got {stmt:?}"),
1340 }
1341 }
1342
1343 #[test]
1344 fn test_parse_show_streams() {
1345 let stmt = parse_one("SHOW STREAMS");
1346 assert!(matches!(
1347 stmt,
1348 StreamingStatement::Show(ShowCommand::Streams)
1349 ));
1350 }
1351
1352 #[test]
1353 fn test_parse_alter_source_add_column() {
1354 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1355 match stmt {
1356 StreamingStatement::AlterSource { name, operation } => {
1357 assert_eq!(name.to_string(), "events");
1358 match operation {
1359 statements::AlterSourceOperation::AddColumn { column_def } => {
1360 assert_eq!(column_def.name.value, "new_col");
1361 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1362 }
1363 statements::AlterSourceOperation::SetProperties { .. } => {
1364 panic!("Expected AddColumn")
1365 }
1366 }
1367 }
1368 _ => panic!("Expected AlterSource, got {stmt:?}"),
1369 }
1370 }
1371
1372 #[test]
1373 fn test_parse_alter_source_set_properties() {
1374 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1375 match stmt {
1376 StreamingStatement::AlterSource { name, operation } => {
1377 assert_eq!(name.to_string(), "events");
1378 match operation {
1379 statements::AlterSourceOperation::SetProperties { properties } => {
1380 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1381 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1382 }
1383 statements::AlterSourceOperation::AddColumn { .. } => {
1384 panic!("Expected SetProperties")
1385 }
1386 }
1387 }
1388 _ => panic!("Expected AlterSource, got {stmt:?}"),
1389 }
1390 }
1391
1392 #[test]
1393 fn test_parse_checkpoint() {
1394 let stmt = parse_one("CHECKPOINT");
1395 assert!(
1396 matches!(stmt, StreamingStatement::Checkpoint),
1397 "Expected Checkpoint, got {stmt:?}"
1398 );
1399 }
1400
1401 #[test]
1402 fn test_parse_show_checkpoint_status() {
1403 let stmt = parse_one("SHOW CHECKPOINT STATUS");
1404 assert!(
1405 matches!(
1406 stmt,
1407 StreamingStatement::Show(ShowCommand::CheckpointStatus)
1408 ),
1409 "Expected Show(CheckpointStatus), got {stmt:?}"
1410 );
1411 }
1412
1413 #[test]
1414 fn test_parse_restore_checkpoint() {
1415 let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1416 match stmt {
1417 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1418 assert_eq!(checkpoint_id, 42);
1419 }
1420 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1421 }
1422 }
1423
1424 #[test]
1425 fn test_parse_restore_checkpoint_large_id() {
1426 let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1427 match stmt {
1428 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1429 assert_eq!(checkpoint_id, 123_456);
1430 }
1431 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1432 }
1433 }
1434
1435 #[test]
1436 fn test_parse_show_create_source() {
1437 let stmt = parse_one("SHOW CREATE SOURCE events");
1438 match stmt {
1439 StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1440 assert_eq!(name.to_string(), "events");
1441 }
1442 _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1443 }
1444 }
1445
1446 #[test]
1447 fn test_parse_show_create_sink() {
1448 let stmt = parse_one("SHOW CREATE SINK output");
1449 match stmt {
1450 StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1451 assert_eq!(name.to_string(), "output");
1452 }
1453 _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1454 }
1455 }
1456}