1pub mod aggregation_parser;
8pub mod analytic_parser;
9mod continuous_query_parser;
10pub(crate) mod dialect;
11mod emit_parser;
12pub mod interval_rewriter;
14pub mod join_parser;
15mod late_data_parser;
16pub mod lookup_table;
18pub mod order_analyzer;
19mod sink_parser;
20mod source_parser;
21mod statements;
22mod tokenizer;
23mod window_rewriter;
24
25pub use lookup_table::CreateLookupTableStatement;
26pub use statements::{
27 AlterSourceOperation, CreateSinkStatement, CreateSourceStatement, EmitClause, EmitStrategy,
28 FormatSpec, LateDataClause, ShowCommand, SinkFrom, StreamingStatement, WatermarkDef,
29 WindowFunction,
30};
31pub use window_rewriter::WindowRewriter;
32
33use dialect::LaminarDialect;
34use tokenizer::{detect_streaming_ddl, StreamingDdlKind};
35
36pub fn parse_streaming_sql(sql: &str) -> Result<Vec<StreamingStatement>, ParseError> {
45 StreamingParser::parse_sql(sql).map_err(ParseError::SqlParseError)
46}
47
48pub struct StreamingParser;
54
55impl StreamingParser {
56 #[allow(clippy::too_many_lines)]
69 pub fn parse_sql(sql: &str) -> Result<Vec<StreamingStatement>, sqlparser::parser::ParserError> {
70 let sql_trimmed = sql.trim();
71 if sql_trimmed.is_empty() {
72 return Err(sqlparser::parser::ParserError::ParserError(
73 "Empty SQL statement".to_string(),
74 ));
75 }
76
77 let dialect = LaminarDialect::default();
78
79 let tokens = sqlparser::tokenizer::Tokenizer::new(&dialect, sql_trimmed)
81 .tokenize_with_location()
82 .map_err(|e| {
83 sqlparser::parser::ParserError::ParserError(format!("Tokenization error: {e}"))
84 })?;
85
86 match detect_streaming_ddl(&tokens) {
88 StreamingDdlKind::CreateSource { .. } => {
89 let mut parser =
90 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
91 let source = source_parser::parse_create_source(&mut parser)
92 .map_err(parse_error_to_parser_error)?;
93 Ok(vec![StreamingStatement::CreateSource(Box::new(source))])
94 }
95 StreamingDdlKind::CreateSink { .. } => {
96 let mut parser =
97 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
98 let sink = sink_parser::parse_create_sink(&mut parser)
99 .map_err(parse_error_to_parser_error)?;
100 Ok(vec![StreamingStatement::CreateSink(Box::new(sink))])
101 }
102 StreamingDdlKind::CreateContinuousQuery { .. } => {
103 let mut parser =
104 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
105 let stmt = continuous_query_parser::parse_continuous_query(&mut parser)
106 .map_err(parse_error_to_parser_error)?;
107 Ok(vec![stmt])
108 }
109 StreamingDdlKind::DropSource { .. } => {
110 let mut parser =
111 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
112 let stmt = parse_drop_source(&mut parser).map_err(parse_error_to_parser_error)?;
113 Ok(vec![stmt])
114 }
115 StreamingDdlKind::DropSink { .. } => {
116 let mut parser =
117 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
118 let stmt = parse_drop_sink(&mut parser).map_err(parse_error_to_parser_error)?;
119 Ok(vec![stmt])
120 }
121 StreamingDdlKind::DropMaterializedView { .. } => {
122 let mut parser =
123 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
124 let stmt = parse_drop_materialized_view(&mut parser)
125 .map_err(parse_error_to_parser_error)?;
126 Ok(vec![stmt])
127 }
128 StreamingDdlKind::ShowSources => {
129 Ok(vec![StreamingStatement::Show(ShowCommand::Sources)])
130 }
131 StreamingDdlKind::ShowSinks => Ok(vec![StreamingStatement::Show(ShowCommand::Sinks)]),
132 StreamingDdlKind::ShowQueries => {
133 Ok(vec![StreamingStatement::Show(ShowCommand::Queries)])
134 }
135 StreamingDdlKind::ShowMaterializedViews => Ok(vec![StreamingStatement::Show(
136 ShowCommand::MaterializedViews,
137 )]),
138 StreamingDdlKind::DescribeSource => {
139 let mut parser =
140 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
141 let stmt = parse_describe(&mut parser).map_err(parse_error_to_parser_error)?;
142 Ok(vec![stmt])
143 }
144 StreamingDdlKind::ExplainStreaming => {
145 let mut parser =
146 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
147 let stmt =
148 parse_explain(&mut parser, sql_trimmed).map_err(parse_error_to_parser_error)?;
149 Ok(vec![stmt])
150 }
151 StreamingDdlKind::CreateMaterializedView { .. } => {
152 let mut parser =
153 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
154 let stmt = parse_create_materialized_view(&mut parser)
155 .map_err(parse_error_to_parser_error)?;
156 Ok(vec![stmt])
157 }
158 StreamingDdlKind::CreateStream { .. } => {
159 let mut parser =
160 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
161 let stmt = parse_create_stream(&mut parser, sql_trimmed)
162 .map_err(parse_error_to_parser_error)?;
163 Ok(vec![stmt])
164 }
165 StreamingDdlKind::DropStream { .. } => {
166 let mut parser =
167 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
168 let stmt = parse_drop_stream(&mut parser).map_err(parse_error_to_parser_error)?;
169 Ok(vec![stmt])
170 }
171 StreamingDdlKind::ShowStreams => {
172 Ok(vec![StreamingStatement::Show(ShowCommand::Streams)])
173 }
174 StreamingDdlKind::ShowTables => Ok(vec![StreamingStatement::Show(ShowCommand::Tables)]),
175 StreamingDdlKind::CreateLookupTable { .. } => {
176 let mut parser =
177 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
178 let lt = lookup_table::parse_create_lookup_table(&mut parser)
179 .map_err(parse_error_to_parser_error)?;
180 Ok(vec![StreamingStatement::CreateLookupTable(Box::new(lt))])
181 }
182 StreamingDdlKind::DropLookupTable { .. } => {
183 let mut parser =
184 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
185 let (name, if_exists) = lookup_table::parse_drop_lookup_table(&mut parser)
186 .map_err(parse_error_to_parser_error)?;
187 Ok(vec![StreamingStatement::DropLookupTable {
188 name,
189 if_exists,
190 }])
191 }
192 StreamingDdlKind::AlterSource => {
193 let mut parser =
194 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
195 let stmt = parse_alter_source(&mut parser).map_err(parse_error_to_parser_error)?;
196 Ok(vec![stmt])
197 }
198 StreamingDdlKind::ShowCheckpointStatus => Ok(vec![StreamingStatement::Show(
199 ShowCommand::CheckpointStatus,
200 )]),
201 StreamingDdlKind::ShowCreateSource => {
202 let mut parser =
203 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
204 let stmt =
205 parse_show_create_source(&mut parser).map_err(parse_error_to_parser_error)?;
206 Ok(vec![stmt])
207 }
208 StreamingDdlKind::ShowCreateSink => {
209 let mut parser =
210 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
211 let stmt =
212 parse_show_create_sink(&mut parser).map_err(parse_error_to_parser_error)?;
213 Ok(vec![stmt])
214 }
215 StreamingDdlKind::Checkpoint => Ok(vec![StreamingStatement::Checkpoint]),
216 StreamingDdlKind::RestoreCheckpoint => {
217 let mut parser =
218 sqlparser::parser::Parser::new(&dialect).with_tokens_with_locations(tokens);
219 let stmt =
220 parse_restore_checkpoint(&mut parser).map_err(parse_error_to_parser_error)?;
221 Ok(vec![stmt])
222 }
223 StreamingDdlKind::None => {
224 let statements = sqlparser::parser::Parser::parse_sql(&dialect, sql_trimmed)?;
226 Ok(statements
227 .into_iter()
228 .map(convert_standard_statement)
229 .collect())
230 }
231 }
232 }
233
234 #[must_use]
236 pub fn has_window_function(expr: &sqlparser::ast::Expr) -> bool {
237 match expr {
238 sqlparser::ast::Expr::Function(func) => {
239 if let Some(name) = func.name.0.last() {
240 let func_name = name.to_string().to_uppercase();
241 matches!(func_name.as_str(), "TUMBLE" | "HOP" | "SESSION")
242 } else {
243 false
244 }
245 }
246 _ => false,
247 }
248 }
249
250 pub fn parse_emit_clause(sql: &str) -> Result<Option<EmitClause>, ParseError> {
256 emit_parser::parse_emit_clause_from_sql(sql)
257 }
258
259 pub fn parse_late_data_clause(sql: &str) -> Result<Option<LateDataClause>, ParseError> {
265 late_data_parser::parse_late_data_clause_from_sql(sql)
266 }
267}
268
269fn parse_error_to_parser_error(e: ParseError) -> sqlparser::parser::ParserError {
271 match e {
272 ParseError::SqlParseError(pe) => pe,
273 ParseError::StreamingError(msg) => sqlparser::parser::ParserError::ParserError(msg),
274 ParseError::WindowError(msg) => {
275 sqlparser::parser::ParserError::ParserError(format!("Window error: {msg}"))
276 }
277 ParseError::ValidationError(msg) => {
278 sqlparser::parser::ParserError::ParserError(format!("Validation error: {msg}"))
279 }
280 }
281}
282
283fn parse_restore_checkpoint(
291 parser: &mut sqlparser::parser::Parser,
292) -> Result<StreamingStatement, ParseError> {
293 tokenizer::expect_custom_keyword(parser, "RESTORE")?;
295 if !parser.parse_keyword(sqlparser::keywords::Keyword::FROM) {
297 return Err(ParseError::StreamingError(
298 "Expected FROM after RESTORE".to_string(),
299 ));
300 }
301 tokenizer::expect_custom_keyword(parser, "CHECKPOINT")?;
303 let token = parser.next_token();
305 match &token.token {
306 sqlparser::tokenizer::Token::Number(n, _) => {
307 let id: u64 = n
308 .parse()
309 .map_err(|_| ParseError::StreamingError(format!("Invalid checkpoint ID: {n}")))?;
310 Ok(StreamingStatement::RestoreCheckpoint { checkpoint_id: id })
311 }
312 other => Err(ParseError::StreamingError(format!(
313 "Expected checkpoint ID (number), found {other}"
314 ))),
315 }
316}
317
318fn convert_standard_statement(stmt: sqlparser::ast::Statement) -> StreamingStatement {
323 if let sqlparser::ast::Statement::Insert(insert) = &stmt {
324 if let sqlparser::ast::TableObject::TableName(ref name) = insert.table {
326 let table_name = name.clone();
327 let columns = insert.columns.clone();
328
329 if let Some(ref source) = insert.source {
331 if let sqlparser::ast::SetExpr::Values(ref values) = *source.body {
332 let rows: Vec<Vec<sqlparser::ast::Expr>> = values.rows.clone();
333 return StreamingStatement::InsertInto {
334 table_name,
335 columns,
336 values: rows,
337 };
338 }
339 }
340 }
341 }
342 StreamingStatement::Standard(Box::new(stmt))
343}
344
345fn parse_drop_source(
353 parser: &mut sqlparser::parser::Parser,
354) -> Result<StreamingStatement, ParseError> {
355 parser
356 .expect_keyword(sqlparser::keywords::Keyword::DROP)
357 .map_err(ParseError::SqlParseError)?;
358 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
359 let if_exists = parser.parse_keywords(&[
360 sqlparser::keywords::Keyword::IF,
361 sqlparser::keywords::Keyword::EXISTS,
362 ]);
363 let name = parser
364 .parse_object_name(false)
365 .map_err(ParseError::SqlParseError)?;
366 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
367 Ok(StreamingStatement::DropSource {
368 name,
369 if_exists,
370 cascade,
371 })
372}
373
374fn parse_drop_sink(
382 parser: &mut sqlparser::parser::Parser,
383) -> Result<StreamingStatement, ParseError> {
384 parser
385 .expect_keyword(sqlparser::keywords::Keyword::DROP)
386 .map_err(ParseError::SqlParseError)?;
387 tokenizer::expect_custom_keyword(parser, "SINK")?;
388 let if_exists = parser.parse_keywords(&[
389 sqlparser::keywords::Keyword::IF,
390 sqlparser::keywords::Keyword::EXISTS,
391 ]);
392 let name = parser
393 .parse_object_name(false)
394 .map_err(ParseError::SqlParseError)?;
395 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
396 Ok(StreamingStatement::DropSink {
397 name,
398 if_exists,
399 cascade,
400 })
401}
402
403fn parse_drop_materialized_view(
411 parser: &mut sqlparser::parser::Parser,
412) -> Result<StreamingStatement, ParseError> {
413 parser
414 .expect_keyword(sqlparser::keywords::Keyword::DROP)
415 .map_err(ParseError::SqlParseError)?;
416 parser
417 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
418 .map_err(ParseError::SqlParseError)?;
419 parser
420 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
421 .map_err(ParseError::SqlParseError)?;
422 let if_exists = parser.parse_keywords(&[
423 sqlparser::keywords::Keyword::IF,
424 sqlparser::keywords::Keyword::EXISTS,
425 ]);
426 let name = parser
427 .parse_object_name(false)
428 .map_err(ParseError::SqlParseError)?;
429 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
430 Ok(StreamingStatement::DropMaterializedView {
431 name,
432 if_exists,
433 cascade,
434 })
435}
436
437fn parse_create_stream(
445 parser: &mut sqlparser::parser::Parser,
446 _original_sql: &str,
447) -> Result<StreamingStatement, ParseError> {
448 parser
449 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
450 .map_err(ParseError::SqlParseError)?;
451
452 let or_replace = parser.parse_keywords(&[
453 sqlparser::keywords::Keyword::OR,
454 sqlparser::keywords::Keyword::REPLACE,
455 ]);
456
457 tokenizer::expect_custom_keyword(parser, "STREAM")?;
458
459 let if_not_exists = parser.parse_keywords(&[
460 sqlparser::keywords::Keyword::IF,
461 sqlparser::keywords::Keyword::NOT,
462 sqlparser::keywords::Keyword::EXISTS,
463 ]);
464
465 let name = parser
466 .parse_object_name(false)
467 .map_err(ParseError::SqlParseError)?;
468
469 parser
470 .expect_keyword(sqlparser::keywords::Keyword::AS)
471 .map_err(ParseError::SqlParseError)?;
472
473 let remaining = collect_remaining_tokens(parser);
475 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
476
477 let stream_dialect = LaminarDialect::default();
478
479 let query = if query_tokens.is_empty() {
480 return Err(ParseError::StreamingError(
481 "Expected SELECT query after AS".to_string(),
482 ));
483 } else {
484 let mut query_parser = sqlparser::parser::Parser::new(&stream_dialect)
485 .with_tokens_with_locations(query_tokens);
486 query_parser
487 .parse_query()
488 .map_err(ParseError::SqlParseError)?
489 };
490
491 let query_stmt =
492 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
493
494 let emit_clause = if emit_tokens.is_empty() {
495 None
496 } else {
497 let mut emit_parser =
498 sqlparser::parser::Parser::new(&stream_dialect).with_tokens_with_locations(emit_tokens);
499 emit_parser::parse_emit_clause(&mut emit_parser)?
500 };
501
502 Ok(StreamingStatement::CreateStream {
503 name,
504 query: Box::new(query_stmt),
505 emit_clause,
506 or_replace,
507 if_not_exists,
508 })
509}
510
511fn parse_drop_stream(
519 parser: &mut sqlparser::parser::Parser,
520) -> Result<StreamingStatement, ParseError> {
521 parser
522 .expect_keyword(sqlparser::keywords::Keyword::DROP)
523 .map_err(ParseError::SqlParseError)?;
524 tokenizer::expect_custom_keyword(parser, "STREAM")?;
525 let if_exists = parser.parse_keywords(&[
526 sqlparser::keywords::Keyword::IF,
527 sqlparser::keywords::Keyword::EXISTS,
528 ]);
529 let name = parser
530 .parse_object_name(false)
531 .map_err(ParseError::SqlParseError)?;
532 let cascade = parser.parse_keyword(sqlparser::keywords::Keyword::CASCADE);
533 Ok(StreamingStatement::DropStream {
534 name,
535 if_exists,
536 cascade,
537 })
538}
539
540fn parse_alter_source(
553 parser: &mut sqlparser::parser::Parser,
554) -> Result<StreamingStatement, ParseError> {
555 parser
556 .expect_keyword(sqlparser::keywords::Keyword::ALTER)
557 .map_err(ParseError::SqlParseError)?;
558 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
559 let name = parser
560 .parse_object_name(false)
561 .map_err(ParseError::SqlParseError)?;
562
563 if parser.parse_keywords(&[
565 sqlparser::keywords::Keyword::ADD,
566 sqlparser::keywords::Keyword::COLUMN,
567 ]) {
568 let col_name = parser
570 .parse_identifier()
571 .map_err(ParseError::SqlParseError)?;
572 let data_type = parser
573 .parse_data_type()
574 .map_err(ParseError::SqlParseError)?;
575 let column_def = sqlparser::ast::ColumnDef {
576 name: col_name,
577 data_type,
578 options: vec![],
579 };
580 Ok(StreamingStatement::AlterSource {
581 name,
582 operation: statements::AlterSourceOperation::AddColumn { column_def },
583 })
584 } else if parser.parse_keyword(sqlparser::keywords::Keyword::SET) {
585 parser
587 .expect_token(&sqlparser::tokenizer::Token::LParen)
588 .map_err(ParseError::SqlParseError)?;
589 #[allow(clippy::disallowed_types)] let mut properties = std::collections::HashMap::new();
591 loop {
592 let key = parser
593 .parse_literal_string()
594 .map_err(ParseError::SqlParseError)?;
595 parser
596 .expect_token(&sqlparser::tokenizer::Token::Eq)
597 .map_err(ParseError::SqlParseError)?;
598 let value = parser
599 .parse_literal_string()
600 .map_err(ParseError::SqlParseError)?;
601 properties.insert(key, value);
602 if !parser.consume_token(&sqlparser::tokenizer::Token::Comma) {
603 break;
604 }
605 }
606 parser
607 .expect_token(&sqlparser::tokenizer::Token::RParen)
608 .map_err(ParseError::SqlParseError)?;
609 Ok(StreamingStatement::AlterSource {
610 name,
611 operation: statements::AlterSourceOperation::SetProperties { properties },
612 })
613 } else {
614 Err(ParseError::StreamingError(
615 "Expected ADD COLUMN or SET after ALTER SOURCE <name>".to_string(),
616 ))
617 }
618}
619
620fn parse_describe(
626 parser: &mut sqlparser::parser::Parser,
627) -> Result<StreamingStatement, ParseError> {
628 let token = parser.next_token();
630 match &token.token {
631 sqlparser::tokenizer::Token::Word(w)
632 if w.keyword == sqlparser::keywords::Keyword::DESCRIBE
633 || w.keyword == sqlparser::keywords::Keyword::DESC => {}
634 _ => {
635 return Err(ParseError::StreamingError(
636 "Expected DESCRIBE or DESC".to_string(),
637 ));
638 }
639 }
640 let extended = tokenizer::try_parse_custom_keyword(parser, "EXTENDED");
641 let name = parser
642 .parse_object_name(false)
643 .map_err(ParseError::SqlParseError)?;
644 Ok(StreamingStatement::Describe { name, extended })
645}
646
647fn parse_show_create_source(
649 parser: &mut sqlparser::parser::Parser,
650) -> Result<StreamingStatement, ParseError> {
651 parser
653 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
654 .map_err(ParseError::SqlParseError)?;
655 parser
656 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
657 .map_err(ParseError::SqlParseError)?;
658 tokenizer::expect_custom_keyword(parser, "SOURCE")?;
659 let name = parser
660 .parse_object_name(false)
661 .map_err(ParseError::SqlParseError)?;
662 Ok(StreamingStatement::Show(ShowCommand::CreateSource { name }))
663}
664
665fn parse_show_create_sink(
667 parser: &mut sqlparser::parser::Parser,
668) -> Result<StreamingStatement, ParseError> {
669 parser
671 .expect_keyword(sqlparser::keywords::Keyword::SHOW)
672 .map_err(ParseError::SqlParseError)?;
673 parser
674 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
675 .map_err(ParseError::SqlParseError)?;
676 tokenizer::expect_custom_keyword(parser, "SINK")?;
677 let name = parser
678 .parse_object_name(false)
679 .map_err(ParseError::SqlParseError)?;
680 Ok(StreamingStatement::Show(ShowCommand::CreateSink { name }))
681}
682
683fn parse_explain(
691 parser: &mut sqlparser::parser::Parser,
692 original_sql: &str,
693) -> Result<StreamingStatement, ParseError> {
694 parser
695 .expect_keyword(sqlparser::keywords::Keyword::EXPLAIN)
696 .map_err(ParseError::SqlParseError)?;
697
698 let analyze = tokenizer::try_parse_custom_keyword(parser, "ANALYZE");
700
701 let explain_prefix_upper = original_sql.to_uppercase();
703 let skip_keyword = if analyze { "ANALYZE" } else { "EXPLAIN" };
704 let inner_start = if analyze {
705 explain_prefix_upper
706 .find("ANALYZE")
707 .map_or(0, |pos| pos + "ANALYZE".len())
708 } else {
709 explain_prefix_upper
710 .find("EXPLAIN")
711 .map_or(0, |pos| pos + "EXPLAIN".len())
712 };
713 let inner_sql = original_sql[inner_start..].trim();
714 let _ = skip_keyword; let inner_stmts = StreamingParser::parse_sql(inner_sql)?;
718 let inner = inner_stmts.into_iter().next().ok_or_else(|| {
719 sqlparser::parser::ParserError::ParserError("Expected statement after EXPLAIN".to_string())
720 })?;
721 Ok(StreamingStatement::Explain {
722 statement: Box::new(inner),
723 analyze,
724 })
725}
726
727fn parse_create_materialized_view(
740 parser: &mut sqlparser::parser::Parser,
741) -> Result<StreamingStatement, ParseError> {
742 parser
743 .expect_keyword(sqlparser::keywords::Keyword::CREATE)
744 .map_err(ParseError::SqlParseError)?;
745
746 let or_replace = parser.parse_keywords(&[
747 sqlparser::keywords::Keyword::OR,
748 sqlparser::keywords::Keyword::REPLACE,
749 ]);
750
751 parser
752 .expect_keyword(sqlparser::keywords::Keyword::MATERIALIZED)
753 .map_err(ParseError::SqlParseError)?;
754 parser
755 .expect_keyword(sqlparser::keywords::Keyword::VIEW)
756 .map_err(ParseError::SqlParseError)?;
757
758 let if_not_exists = parser.parse_keywords(&[
759 sqlparser::keywords::Keyword::IF,
760 sqlparser::keywords::Keyword::NOT,
761 sqlparser::keywords::Keyword::EXISTS,
762 ]);
763
764 let name = parser
765 .parse_object_name(false)
766 .map_err(ParseError::SqlParseError)?;
767
768 parser
769 .expect_keyword(sqlparser::keywords::Keyword::AS)
770 .map_err(ParseError::SqlParseError)?;
771
772 let remaining = collect_remaining_tokens(parser);
774 let (query_tokens, emit_tokens) = split_at_emit(&remaining);
775
776 let mv_dialect = LaminarDialect::default();
777
778 let query = if query_tokens.is_empty() {
779 return Err(ParseError::StreamingError(
780 "Expected SELECT query after AS".to_string(),
781 ));
782 } else {
783 let mut query_parser =
784 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(query_tokens);
785 query_parser
786 .parse_query()
787 .map_err(ParseError::SqlParseError)?
788 };
789
790 let query_stmt =
791 StreamingStatement::Standard(Box::new(sqlparser::ast::Statement::Query(query)));
792
793 let emit_clause = if emit_tokens.is_empty() {
794 None
795 } else {
796 let mut emit_parser =
797 sqlparser::parser::Parser::new(&mv_dialect).with_tokens_with_locations(emit_tokens);
798 emit_parser::parse_emit_clause(&mut emit_parser)?
799 };
800
801 Ok(StreamingStatement::CreateMaterializedView {
802 name,
803 query: Box::new(query_stmt),
804 emit_clause,
805 or_replace,
806 if_not_exists,
807 })
808}
809
810fn collect_remaining_tokens(
812 parser: &mut sqlparser::parser::Parser,
813) -> Vec<sqlparser::tokenizer::TokenWithSpan> {
814 let mut tokens = Vec::new();
815 loop {
816 let token = parser.next_token();
817 if token.token == sqlparser::tokenizer::Token::EOF {
818 tokens.push(token);
819 break;
820 }
821 tokens.push(token);
822 }
823 tokens
824}
825
826fn split_at_emit(
831 tokens: &[sqlparser::tokenizer::TokenWithSpan],
832) -> (
833 Vec<sqlparser::tokenizer::TokenWithSpan>,
834 Vec<sqlparser::tokenizer::TokenWithSpan>,
835) {
836 let mut depth: i32 = 0;
837 for (i, token) in tokens.iter().enumerate() {
838 match &token.token {
839 sqlparser::tokenizer::Token::LParen => depth += 1,
840 sqlparser::tokenizer::Token::RParen => {
841 depth -= 1;
842 }
843 sqlparser::tokenizer::Token::Word(w)
844 if depth == 0 && w.value.eq_ignore_ascii_case("EMIT") =>
845 {
846 let mut query_tokens = tokens[..i].to_vec();
847 query_tokens.push(sqlparser::tokenizer::TokenWithSpan {
848 token: sqlparser::tokenizer::Token::EOF,
849 span: sqlparser::tokenizer::Span::empty(),
850 });
851 let emit_tokens = tokens[i..].to_vec();
852 return (query_tokens, emit_tokens);
853 }
854 _ => {}
855 }
856 }
857 (tokens.to_vec(), vec![])
858}
859
860#[derive(Debug, thiserror::Error)]
862pub enum ParseError {
863 #[error("SQL parse error: {0}")]
865 SqlParseError(#[from] sqlparser::parser::ParserError),
866
867 #[error("Streaming SQL error: {0}")]
869 StreamingError(String),
870
871 #[error("Window function error: {0}")]
873 WindowError(String),
874
875 #[error("Validation error: {0}")]
877 ValidationError(String),
878}
879
880#[cfg(test)]
881mod tests {
882 use super::*;
883
884 fn parse_one(sql: &str) -> StreamingStatement {
886 let stmts = StreamingParser::parse_sql(sql).unwrap();
887 assert_eq!(stmts.len(), 1, "Expected exactly 1 statement");
888 stmts.into_iter().next().unwrap()
889 }
890
891 #[test]
892 fn test_parse_drop_source() {
893 let stmt = parse_one("DROP SOURCE events");
894 match stmt {
895 StreamingStatement::DropSource {
896 name,
897 if_exists,
898 cascade,
899 } => {
900 assert_eq!(name.to_string(), "events");
901 assert!(!if_exists);
902 assert!(!cascade);
903 }
904 _ => panic!("Expected DropSource, got {stmt:?}"),
905 }
906 }
907
908 #[test]
909 fn test_parse_drop_source_if_exists() {
910 let stmt = parse_one("DROP SOURCE IF EXISTS events");
911 match stmt {
912 StreamingStatement::DropSource {
913 name,
914 if_exists,
915 cascade,
916 } => {
917 assert_eq!(name.to_string(), "events");
918 assert!(if_exists);
919 assert!(!cascade);
920 }
921 _ => panic!("Expected DropSource, got {stmt:?}"),
922 }
923 }
924
925 #[test]
926 fn test_parse_drop_source_cascade() {
927 let stmt = parse_one("DROP SOURCE IF EXISTS events CASCADE");
928 match stmt {
929 StreamingStatement::DropSource {
930 name,
931 if_exists,
932 cascade,
933 } => {
934 assert_eq!(name.to_string(), "events");
935 assert!(if_exists);
936 assert!(cascade);
937 }
938 _ => panic!("Expected DropSource, got {stmt:?}"),
939 }
940 }
941
942 #[test]
943 fn test_parse_drop_sink() {
944 let stmt = parse_one("DROP SINK output");
945 match stmt {
946 StreamingStatement::DropSink {
947 name,
948 if_exists,
949 cascade,
950 } => {
951 assert_eq!(name.to_string(), "output");
952 assert!(!if_exists);
953 assert!(!cascade);
954 }
955 _ => panic!("Expected DropSink, got {stmt:?}"),
956 }
957 }
958
959 #[test]
960 fn test_parse_drop_sink_if_exists() {
961 let stmt = parse_one("DROP SINK IF EXISTS output");
962 match stmt {
963 StreamingStatement::DropSink {
964 name,
965 if_exists,
966 cascade,
967 } => {
968 assert_eq!(name.to_string(), "output");
969 assert!(if_exists);
970 assert!(!cascade);
971 }
972 _ => panic!("Expected DropSink, got {stmt:?}"),
973 }
974 }
975
976 #[test]
977 fn test_parse_drop_sink_cascade() {
978 let stmt = parse_one("DROP SINK output CASCADE");
979 match stmt {
980 StreamingStatement::DropSink {
981 name,
982 if_exists,
983 cascade,
984 } => {
985 assert_eq!(name.to_string(), "output");
986 assert!(!if_exists);
987 assert!(cascade);
988 }
989 _ => panic!("Expected DropSink, got {stmt:?}"),
990 }
991 }
992
993 #[test]
994 fn test_parse_drop_materialized_view() {
995 let stmt = parse_one("DROP MATERIALIZED VIEW live_stats");
996 match stmt {
997 StreamingStatement::DropMaterializedView {
998 name,
999 if_exists,
1000 cascade,
1001 } => {
1002 assert_eq!(name.to_string(), "live_stats");
1003 assert!(!if_exists);
1004 assert!(!cascade);
1005 }
1006 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1007 }
1008 }
1009
1010 #[test]
1011 fn test_parse_drop_materialized_view_if_exists_cascade() {
1012 let stmt = parse_one("DROP MATERIALIZED VIEW IF EXISTS live_stats CASCADE");
1013 match stmt {
1014 StreamingStatement::DropMaterializedView {
1015 name,
1016 if_exists,
1017 cascade,
1018 } => {
1019 assert_eq!(name.to_string(), "live_stats");
1020 assert!(if_exists);
1021 assert!(cascade);
1022 }
1023 _ => panic!("Expected DropMaterializedView, got {stmt:?}"),
1024 }
1025 }
1026
1027 #[test]
1028 fn test_parse_show_sources() {
1029 let stmt = parse_one("SHOW SOURCES");
1030 assert!(matches!(
1031 stmt,
1032 StreamingStatement::Show(ShowCommand::Sources)
1033 ));
1034 }
1035
1036 #[test]
1037 fn test_parse_show_sinks() {
1038 let stmt = parse_one("SHOW SINKS");
1039 assert!(matches!(stmt, StreamingStatement::Show(ShowCommand::Sinks)));
1040 }
1041
1042 #[test]
1043 fn test_parse_show_queries() {
1044 let stmt = parse_one("SHOW QUERIES");
1045 assert!(matches!(
1046 stmt,
1047 StreamingStatement::Show(ShowCommand::Queries)
1048 ));
1049 }
1050
1051 #[test]
1052 fn test_parse_show_materialized_views() {
1053 let stmt = parse_one("SHOW MATERIALIZED VIEWS");
1054 assert!(matches!(
1055 stmt,
1056 StreamingStatement::Show(ShowCommand::MaterializedViews)
1057 ));
1058 }
1059
1060 #[test]
1061 fn test_parse_describe() {
1062 let stmt = parse_one("DESCRIBE events");
1063 match stmt {
1064 StreamingStatement::Describe { name, extended } => {
1065 assert_eq!(name.to_string(), "events");
1066 assert!(!extended);
1067 }
1068 _ => panic!("Expected Describe, got {stmt:?}"),
1069 }
1070 }
1071
1072 #[test]
1073 fn test_parse_describe_extended() {
1074 let stmt = parse_one("DESCRIBE EXTENDED my_schema.events");
1075 match stmt {
1076 StreamingStatement::Describe { name, extended } => {
1077 assert_eq!(name.to_string(), "my_schema.events");
1078 assert!(extended);
1079 }
1080 _ => panic!("Expected Describe, got {stmt:?}"),
1081 }
1082 }
1083
1084 #[test]
1085 fn test_parse_explain_select() {
1086 let stmt = parse_one("EXPLAIN SELECT * FROM events");
1087 match stmt {
1088 StreamingStatement::Explain {
1089 statement, analyze, ..
1090 } => {
1091 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1092 assert!(!analyze);
1093 }
1094 _ => panic!("Expected Explain, got {stmt:?}"),
1095 }
1096 }
1097
1098 #[test]
1099 fn test_parse_explain_create_source() {
1100 let stmt = parse_one("EXPLAIN CREATE SOURCE events (id BIGINT)");
1101 match stmt {
1102 StreamingStatement::Explain { statement, .. } => {
1103 assert!(matches!(*statement, StreamingStatement::CreateSource(_)));
1104 }
1105 _ => panic!("Expected Explain wrapping CreateSource, got {stmt:?}"),
1106 }
1107 }
1108
1109 #[test]
1110 fn test_parse_explain_analyze_select() {
1111 let stmt = parse_one("EXPLAIN ANALYZE SELECT * FROM events");
1112 match stmt {
1113 StreamingStatement::Explain {
1114 statement, analyze, ..
1115 } => {
1116 assert!(matches!(*statement, StreamingStatement::Standard(_)));
1117 assert!(analyze, "Expected analyze=true for EXPLAIN ANALYZE");
1118 }
1119 _ => panic!("Expected Explain, got {stmt:?}"),
1120 }
1121 }
1122
1123 #[test]
1124 fn test_parse_create_materialized_view() {
1125 let stmt = parse_one("CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events");
1126 match stmt {
1127 StreamingStatement::CreateMaterializedView {
1128 name,
1129 emit_clause,
1130 or_replace,
1131 if_not_exists,
1132 ..
1133 } => {
1134 assert_eq!(name.to_string(), "live_stats");
1135 assert!(emit_clause.is_none());
1136 assert!(!or_replace);
1137 assert!(!if_not_exists);
1138 }
1139 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1140 }
1141 }
1142
1143 #[test]
1144 fn test_parse_create_materialized_view_with_emit() {
1145 let stmt = parse_one(
1146 "CREATE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE",
1147 );
1148 match stmt {
1149 StreamingStatement::CreateMaterializedView {
1150 name, emit_clause, ..
1151 } => {
1152 assert_eq!(name.to_string(), "live_stats");
1153 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1154 }
1155 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1156 }
1157 }
1158
1159 #[test]
1160 fn test_parse_create_or_replace_materialized_view() {
1161 let stmt = parse_one(
1162 "CREATE OR REPLACE MATERIALIZED VIEW live_stats AS SELECT COUNT(*) FROM events",
1163 );
1164 match stmt {
1165 StreamingStatement::CreateMaterializedView {
1166 name,
1167 or_replace,
1168 if_not_exists,
1169 ..
1170 } => {
1171 assert_eq!(name.to_string(), "live_stats");
1172 assert!(or_replace);
1173 assert!(!if_not_exists);
1174 }
1175 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1176 }
1177 }
1178
1179 #[test]
1180 fn test_parse_create_materialized_view_if_not_exists() {
1181 let stmt = parse_one(
1182 "CREATE MATERIALIZED VIEW IF NOT EXISTS live_stats AS SELECT COUNT(*) FROM events",
1183 );
1184 match stmt {
1185 StreamingStatement::CreateMaterializedView {
1186 name,
1187 or_replace,
1188 if_not_exists,
1189 ..
1190 } => {
1191 assert_eq!(name.to_string(), "live_stats");
1192 assert!(!or_replace);
1193 assert!(if_not_exists);
1194 }
1195 _ => panic!("Expected CreateMaterializedView, got {stmt:?}"),
1196 }
1197 }
1198
1199 #[test]
1200 fn test_parse_insert_into() {
1201 let stmt = parse_one("INSERT INTO events (id, name) VALUES (1, 'test')");
1202 match stmt {
1203 StreamingStatement::InsertInto {
1204 table_name,
1205 columns,
1206 values,
1207 } => {
1208 assert_eq!(table_name.to_string(), "events");
1209 assert_eq!(columns.len(), 2);
1210 assert_eq!(columns[0].to_string(), "id");
1211 assert_eq!(columns[1].to_string(), "name");
1212 assert_eq!(values.len(), 1);
1213 assert_eq!(values[0].len(), 2);
1214 }
1215 _ => panic!("Expected InsertInto, got {stmt:?}"),
1216 }
1217 }
1218
1219 #[test]
1220 fn test_parse_insert_into_multiple_rows() {
1221 let stmt = parse_one("INSERT INTO events VALUES (1, 'a'), (2, 'b'), (3, 'c')");
1222 match stmt {
1223 StreamingStatement::InsertInto {
1224 table_name,
1225 columns,
1226 values,
1227 } => {
1228 assert_eq!(table_name.to_string(), "events");
1229 assert!(columns.is_empty());
1230 assert_eq!(values.len(), 3);
1231 }
1232 _ => panic!("Expected InsertInto, got {stmt:?}"),
1233 }
1234 }
1235
1236 #[test]
1239 fn test_parse_create_stream() {
1240 let stmt = parse_one(
1241 "CREATE STREAM session_activity AS SELECT session_id, COUNT(*) as cnt FROM clicks GROUP BY session_id",
1242 );
1243 match stmt {
1244 StreamingStatement::CreateStream {
1245 name,
1246 or_replace,
1247 if_not_exists,
1248 emit_clause,
1249 ..
1250 } => {
1251 assert_eq!(name.to_string(), "session_activity");
1252 assert!(!or_replace);
1253 assert!(!if_not_exists);
1254 assert!(emit_clause.is_none());
1255 }
1256 _ => panic!("Expected CreateStream, got {stmt:?}"),
1257 }
1258 }
1259
1260 #[test]
1261 fn test_parse_create_or_replace_stream() {
1262 let stmt = parse_one("CREATE OR REPLACE STREAM metrics AS SELECT AVG(value) FROM events");
1263 match stmt {
1264 StreamingStatement::CreateStream { or_replace, .. } => {
1265 assert!(or_replace);
1266 }
1267 _ => panic!("Expected CreateStream, got {stmt:?}"),
1268 }
1269 }
1270
1271 #[test]
1272 fn test_parse_create_stream_if_not_exists() {
1273 let stmt = parse_one("CREATE STREAM IF NOT EXISTS counts AS SELECT COUNT(*) FROM events");
1274 match stmt {
1275 StreamingStatement::CreateStream { if_not_exists, .. } => {
1276 assert!(if_not_exists);
1277 }
1278 _ => panic!("Expected CreateStream, got {stmt:?}"),
1279 }
1280 }
1281
1282 #[test]
1283 fn test_parse_create_stream_with_emit() {
1284 let stmt =
1285 parse_one("CREATE STREAM windowed AS SELECT COUNT(*) FROM events EMIT ON WINDOW CLOSE");
1286 match stmt {
1287 StreamingStatement::CreateStream { emit_clause, .. } => {
1288 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
1289 }
1290 _ => panic!("Expected CreateStream, got {stmt:?}"),
1291 }
1292 }
1293
1294 #[test]
1295 fn test_parse_drop_stream() {
1296 let stmt = parse_one("DROP STREAM my_stream");
1297 match stmt {
1298 StreamingStatement::DropStream {
1299 name,
1300 if_exists,
1301 cascade,
1302 } => {
1303 assert_eq!(name.to_string(), "my_stream");
1304 assert!(!if_exists);
1305 assert!(!cascade);
1306 }
1307 _ => panic!("Expected DropStream, got {stmt:?}"),
1308 }
1309 }
1310
1311 #[test]
1312 fn test_parse_drop_stream_if_exists() {
1313 let stmt = parse_one("DROP STREAM IF EXISTS my_stream");
1314 match stmt {
1315 StreamingStatement::DropStream {
1316 name,
1317 if_exists,
1318 cascade,
1319 } => {
1320 assert_eq!(name.to_string(), "my_stream");
1321 assert!(if_exists);
1322 assert!(!cascade);
1323 }
1324 _ => panic!("Expected DropStream, got {stmt:?}"),
1325 }
1326 }
1327
1328 #[test]
1329 fn test_parse_drop_stream_cascade() {
1330 let stmt = parse_one("DROP STREAM my_stream CASCADE");
1331 match stmt {
1332 StreamingStatement::DropStream {
1333 name,
1334 if_exists,
1335 cascade,
1336 } => {
1337 assert_eq!(name.to_string(), "my_stream");
1338 assert!(!if_exists);
1339 assert!(cascade);
1340 }
1341 _ => panic!("Expected DropStream, got {stmt:?}"),
1342 }
1343 }
1344
1345 #[test]
1346 fn test_parse_show_streams() {
1347 let stmt = parse_one("SHOW STREAMS");
1348 assert!(matches!(
1349 stmt,
1350 StreamingStatement::Show(ShowCommand::Streams)
1351 ));
1352 }
1353
1354 #[test]
1355 fn test_parse_alter_source_add_column() {
1356 let stmt = parse_one("ALTER SOURCE events ADD COLUMN new_col INT");
1357 match stmt {
1358 StreamingStatement::AlterSource { name, operation } => {
1359 assert_eq!(name.to_string(), "events");
1360 match operation {
1361 statements::AlterSourceOperation::AddColumn { column_def } => {
1362 assert_eq!(column_def.name.value, "new_col");
1363 assert_eq!(column_def.data_type, sqlparser::ast::DataType::Int(None));
1364 }
1365 statements::AlterSourceOperation::SetProperties { .. } => {
1366 panic!("Expected AddColumn")
1367 }
1368 }
1369 }
1370 _ => panic!("Expected AlterSource, got {stmt:?}"),
1371 }
1372 }
1373
1374 #[test]
1375 fn test_parse_alter_source_set_properties() {
1376 let stmt = parse_one("ALTER SOURCE events SET ('batch.size' = '1000', 'timeout' = '5s')");
1377 match stmt {
1378 StreamingStatement::AlterSource { name, operation } => {
1379 assert_eq!(name.to_string(), "events");
1380 match operation {
1381 statements::AlterSourceOperation::SetProperties { properties } => {
1382 assert_eq!(properties.get("batch.size"), Some(&"1000".to_string()));
1383 assert_eq!(properties.get("timeout"), Some(&"5s".to_string()));
1384 }
1385 statements::AlterSourceOperation::AddColumn { .. } => {
1386 panic!("Expected SetProperties")
1387 }
1388 }
1389 }
1390 _ => panic!("Expected AlterSource, got {stmt:?}"),
1391 }
1392 }
1393
1394 #[test]
1395 fn test_parse_checkpoint() {
1396 let stmt = parse_one("CHECKPOINT");
1397 assert!(
1398 matches!(stmt, StreamingStatement::Checkpoint),
1399 "Expected Checkpoint, got {stmt:?}"
1400 );
1401 }
1402
1403 #[test]
1404 fn test_parse_show_checkpoint_status() {
1405 let stmt = parse_one("SHOW CHECKPOINT STATUS");
1406 assert!(
1407 matches!(
1408 stmt,
1409 StreamingStatement::Show(ShowCommand::CheckpointStatus)
1410 ),
1411 "Expected Show(CheckpointStatus), got {stmt:?}"
1412 );
1413 }
1414
1415 #[test]
1416 fn test_parse_restore_checkpoint() {
1417 let stmt = parse_one("RESTORE FROM CHECKPOINT 42");
1418 match stmt {
1419 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1420 assert_eq!(checkpoint_id, 42);
1421 }
1422 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1423 }
1424 }
1425
1426 #[test]
1427 fn test_parse_restore_checkpoint_large_id() {
1428 let stmt = parse_one("RESTORE FROM CHECKPOINT 123456");
1429 match stmt {
1430 StreamingStatement::RestoreCheckpoint { checkpoint_id } => {
1431 assert_eq!(checkpoint_id, 123_456);
1432 }
1433 _ => panic!("Expected RestoreCheckpoint, got {stmt:?}"),
1434 }
1435 }
1436
1437 #[test]
1438 fn test_parse_show_create_source() {
1439 let stmt = parse_one("SHOW CREATE SOURCE events");
1440 match stmt {
1441 StreamingStatement::Show(ShowCommand::CreateSource { name }) => {
1442 assert_eq!(name.to_string(), "events");
1443 }
1444 _ => panic!("Expected Show(CreateSource), got {stmt:?}"),
1445 }
1446 }
1447
1448 #[test]
1449 fn test_parse_show_create_sink() {
1450 let stmt = parse_one("SHOW CREATE SINK output");
1451 match stmt {
1452 StreamingStatement::Show(ShowCommand::CreateSink { name }) => {
1453 assert_eq!(name.to_string(), "output");
1454 }
1455 _ => panic!("Expected Show(CreateSink), got {stmt:?}"),
1456 }
1457 }
1458}