1#[allow(clippy::disallowed_types)] use std::collections::HashMap;
8use std::time::Duration;
9
10use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
11
12use super::window_rewriter::WindowRewriter;
13use super::ParseError;
14
15#[derive(Debug, Clone, PartialEq)]
17pub enum ShowCommand {
18 Sources,
20 Sinks,
22 Queries,
24 MaterializedViews,
26 Streams,
28 Tables,
30 CheckpointStatus,
32 CreateSource {
34 name: ObjectName,
36 },
37 CreateSink {
39 name: ObjectName,
41 },
42}
43
44#[derive(Debug, Clone, PartialEq)]
46pub enum StreamingStatement {
47 Standard(Box<sqlparser::ast::Statement>),
49
50 CreateSource(Box<CreateSourceStatement>),
52
53 CreateSink(Box<CreateSinkStatement>),
55
56 CreateContinuousQuery {
58 name: ObjectName,
60 query: Box<StreamingStatement>,
62 emit_clause: Option<EmitClause>,
64 },
65
66 DropSource {
68 name: ObjectName,
70 if_exists: bool,
72 cascade: bool,
74 },
75
76 DropSink {
78 name: ObjectName,
80 if_exists: bool,
82 cascade: bool,
84 },
85
86 DropMaterializedView {
88 name: ObjectName,
90 if_exists: bool,
92 cascade: bool,
94 },
95
96 Show(ShowCommand),
98
99 Describe {
101 name: ObjectName,
103 extended: bool,
105 },
106
107 Explain {
109 statement: Box<StreamingStatement>,
111 analyze: bool,
113 },
114
115 CreateMaterializedView {
117 name: ObjectName,
119 query: Box<StreamingStatement>,
121 emit_clause: Option<EmitClause>,
123 or_replace: bool,
125 if_not_exists: bool,
127 },
128
129 CreateStream {
131 name: ObjectName,
133 query: Box<StreamingStatement>,
135 emit_clause: Option<EmitClause>,
137 or_replace: bool,
139 if_not_exists: bool,
141 },
142
143 DropStream {
145 name: ObjectName,
147 if_exists: bool,
149 cascade: bool,
151 },
152
153 AlterSource {
155 name: ObjectName,
157 operation: AlterSourceOperation,
159 },
160
161 InsertInto {
163 table_name: ObjectName,
165 columns: Vec<Ident>,
167 values: Vec<Vec<Expr>>,
169 },
170
171 CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
173
174 DropLookupTable {
176 name: ObjectName,
178 if_exists: bool,
180 },
181
182 Checkpoint,
184
185 RestoreCheckpoint {
187 checkpoint_id: u64,
189 },
190}
191
192#[derive(Debug, Clone, PartialEq)]
194pub enum AlterSourceOperation {
195 AddColumn {
197 column_def: ColumnDef,
199 },
200 SetProperties {
202 properties: HashMap<String, String>,
204 },
205}
206
207#[derive(Debug, Clone, PartialEq)]
209pub struct FormatSpec {
210 pub format_type: String,
212 pub options: HashMap<String, String>,
214}
215
216#[derive(Debug, Clone, PartialEq)]
218pub struct CreateSourceStatement {
219 pub name: ObjectName,
221 pub columns: Vec<ColumnDef>,
223 pub watermark: Option<WatermarkDef>,
225 pub with_options: HashMap<String, String>,
227 pub or_replace: bool,
229 pub if_not_exists: bool,
231 pub connector_type: Option<String>,
233 pub connector_options: HashMap<String, String>,
235 pub format: Option<FormatSpec>,
237 pub has_wildcard: bool,
239 pub wildcard_prefix: Option<String>,
241}
242
243#[derive(Debug, Clone, PartialEq)]
245pub struct CreateSinkStatement {
246 pub name: ObjectName,
248 pub from: SinkFrom,
250 pub with_options: HashMap<String, String>,
252 pub or_replace: bool,
254 pub if_not_exists: bool,
256 pub filter: Option<Expr>,
258 pub connector_type: Option<String>,
260 pub connector_options: HashMap<String, String>,
262 pub format: Option<FormatSpec>,
264 pub output_options: HashMap<String, String>,
266}
267
268#[derive(Debug, Clone, PartialEq)]
270pub enum SinkFrom {
271 Table(ObjectName),
273 Query(Box<StreamingStatement>),
275}
276
277#[derive(Debug, Clone, PartialEq)]
279pub struct WatermarkDef {
280 pub column: Ident,
282 pub expression: Option<Expr>,
286}
287
288#[derive(Debug, Clone, PartialEq, Default)]
294pub struct LateDataClause {
295 pub allowed_lateness: Option<Box<Expr>>,
297 pub side_output: Option<String>,
299}
300
301impl LateDataClause {
302 #[must_use]
304 pub fn with_allowed_lateness(lateness: Expr) -> Self {
305 Self {
306 allowed_lateness: Some(Box::new(lateness)),
307 side_output: None,
308 }
309 }
310
311 #[must_use]
313 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
314 Self {
315 allowed_lateness: Some(Box::new(lateness)),
316 side_output: Some(side_output),
317 }
318 }
319
320 #[must_use]
322 pub fn side_output_only(side_output: String) -> Self {
323 Self {
324 allowed_lateness: None,
325 side_output: Some(side_output),
326 }
327 }
328
329 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
335 match &self.allowed_lateness {
336 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
337 None => Ok(Duration::ZERO),
338 }
339 }
340
341 #[must_use]
343 pub fn has_side_output(&self) -> bool {
344 self.side_output.is_some()
345 }
346
347 #[must_use]
349 pub fn get_side_output(&self) -> Option<&str> {
350 self.side_output.as_deref()
351 }
352}
353
354#[derive(Debug, Clone, PartialEq)]
358pub enum EmitStrategy {
359 OnWatermark,
361 OnWindowClose,
363 Periodic(Duration),
365 OnUpdate,
367 Changelog,
369 FinalOnly,
371}
372
373#[derive(Debug, Clone, PartialEq)]
378pub enum EmitClause {
379 AfterWatermark,
385
386 OnWindowClose,
392
393 Periodically {
398 interval: Box<Expr>,
400 },
401
402 OnUpdate,
407
408 Changes,
422
423 Final,
429}
430
431impl std::fmt::Display for EmitClause {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 match self {
434 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
435 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
436 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
437 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
438 EmitClause::Changes => write!(f, "EMIT CHANGES"),
439 EmitClause::Final => write!(f, "EMIT FINAL"),
440 }
441 }
442}
443
444impl EmitClause {
445 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
451 match self {
452 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
453 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
454 EmitClause::Periodically { interval } => {
455 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
456 Ok(EmitStrategy::Periodic(duration))
457 }
458 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
459 EmitClause::Changes => Ok(EmitStrategy::Changelog),
460 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
461 }
462 }
463
464 #[must_use]
466 pub fn requires_changelog(&self) -> bool {
467 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
468 }
469
470 #[must_use]
472 pub fn is_append_only(&self) -> bool {
473 matches!(
474 self,
475 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
476 )
477 }
478
479 #[must_use]
485 pub fn requires_watermark(&self) -> bool {
486 matches!(
487 self,
488 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
489 )
490 }
491}
492
493#[derive(Debug, Clone, PartialEq)]
495pub enum WindowFunction {
496 Tumble {
498 time_column: Box<Expr>,
500 interval: Box<Expr>,
502 offset: Option<Box<Expr>>,
504 },
505 Hop {
507 time_column: Box<Expr>,
509 slide_interval: Box<Expr>,
511 window_interval: Box<Expr>,
513 offset: Option<Box<Expr>>,
515 },
516 Session {
518 time_column: Box<Expr>,
520 gap_interval: Box<Expr>,
522 },
523 Cumulate {
525 time_column: Box<Expr>,
527 step_interval: Box<Expr>,
529 max_size_interval: Box<Expr>,
531 },
532}
533
534#[cfg(test)]
535mod tests {
536 use super::*;
537 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
538
539 #[test]
540 fn test_create_source_statement() {
541 let stmt = CreateSourceStatement {
542 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
543 columns: vec![
544 ColumnDef {
545 name: Ident::new("id"),
546 data_type: DataType::BigInt(None),
547 options: vec![],
548 },
549 ColumnDef {
550 name: Ident::new("timestamp"),
551 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
552 options: vec![],
553 },
554 ],
555 watermark: Some(WatermarkDef {
556 column: Ident::new("timestamp"),
557 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
558 }),
559 with_options: HashMap::from([
560 ("connector".to_string(), "kafka".to_string()),
561 ("topic".to_string(), "events".to_string()),
562 ]),
563 or_replace: false,
564 if_not_exists: true,
565 connector_type: None,
566 connector_options: HashMap::new(),
567 format: None,
568 has_wildcard: false,
569 wildcard_prefix: None,
570 };
571
572 assert_eq!(stmt.columns.len(), 2);
574 assert!(stmt.watermark.is_some());
575 assert_eq!(
576 stmt.with_options.get("connector"),
577 Some(&"kafka".to_string())
578 );
579 }
580
581 #[test]
582 fn test_emit_clause_variants() {
583 let emit1 = EmitClause::AfterWatermark;
584 let emit2 = EmitClause::OnWindowClose;
585 let emit3 = EmitClause::Periodically {
586 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
587 };
588 let emit4 = EmitClause::OnUpdate;
589
590 match emit1 {
591 EmitClause::AfterWatermark => (),
592 _ => panic!("Expected AfterWatermark"),
593 }
594
595 match emit2 {
596 EmitClause::OnWindowClose => (),
597 _ => panic!("Expected OnWindowClose"),
598 }
599
600 match emit3 {
601 EmitClause::Periodically { .. } => (),
602 _ => panic!("Expected Periodically"),
603 }
604
605 match emit4 {
606 EmitClause::OnUpdate => (),
607 _ => panic!("Expected OnUpdate"),
608 }
609 }
610
611 #[test]
612 fn test_window_functions() {
613 let tumble = WindowFunction::Tumble {
614 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
615 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
616 offset: None,
617 };
618
619 let hop = WindowFunction::Hop {
620 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
621 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
622 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
623 offset: None,
624 };
625
626 match tumble {
627 WindowFunction::Tumble { .. } => (),
628 _ => panic!("Expected Tumble"),
629 }
630
631 match hop {
632 WindowFunction::Hop { .. } => (),
633 _ => panic!("Expected Hop"),
634 }
635 }
636
637 #[test]
638 fn test_late_data_clause_default() {
639 let clause = LateDataClause::default();
640 assert!(clause.allowed_lateness.is_none());
641 assert!(clause.side_output.is_none());
642 }
643
644 #[test]
645 fn test_late_data_clause_with_allowed_lateness() {
646 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
647 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
648 assert!(clause.allowed_lateness.is_some());
649 assert!(clause.side_output.is_none());
650 }
651
652 #[test]
653 fn test_late_data_clause_with_side_output() {
654 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
655 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
656 assert!(clause.allowed_lateness.is_some());
657 assert_eq!(clause.side_output, Some("late_events".to_string()));
658 }
659
660 #[test]
661 fn test_late_data_clause_side_output_only() {
662 let clause = LateDataClause::side_output_only("late_events".to_string());
663 assert!(clause.allowed_lateness.is_none());
664 assert_eq!(clause.side_output, Some("late_events".to_string()));
665 }
666
667 #[test]
668 fn test_show_command_variants() {
669 let sources = ShowCommand::Sources;
670 let sinks = ShowCommand::Sinks;
671 let queries = ShowCommand::Queries;
672 let mvs = ShowCommand::MaterializedViews;
673
674 assert_eq!(sources, ShowCommand::Sources);
675 assert_eq!(sinks, ShowCommand::Sinks);
676 assert_eq!(queries, ShowCommand::Queries);
677 assert_eq!(mvs, ShowCommand::MaterializedViews);
678 }
679
680 #[test]
681 fn test_show_command_clone() {
682 let cmd = ShowCommand::Sources;
683 let cloned = cmd.clone();
684 assert_eq!(cmd, cloned);
685 }
686
687 #[test]
688 fn test_drop_source_statement() {
689 let stmt = StreamingStatement::DropSource {
690 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
691 if_exists: true,
692 cascade: false,
693 };
694 match stmt {
695 StreamingStatement::DropSource {
696 name,
697 if_exists,
698 cascade,
699 } => {
700 assert_eq!(name.to_string(), "events");
701 assert!(if_exists);
702 assert!(!cascade);
703 }
704 _ => panic!("Expected DropSource"),
705 }
706 }
707
708 #[test]
709 fn test_drop_sink_statement() {
710 let stmt = StreamingStatement::DropSink {
711 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
712 if_exists: false,
713 cascade: false,
714 };
715 match stmt {
716 StreamingStatement::DropSink {
717 name,
718 if_exists,
719 cascade,
720 } => {
721 assert_eq!(name.to_string(), "output");
722 assert!(!if_exists);
723 assert!(!cascade);
724 }
725 _ => panic!("Expected DropSink"),
726 }
727 }
728
729 #[test]
730 fn test_drop_materialized_view_statement() {
731 let stmt = StreamingStatement::DropMaterializedView {
732 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
733 if_exists: true,
734 cascade: true,
735 };
736 match stmt {
737 StreamingStatement::DropMaterializedView {
738 name,
739 if_exists,
740 cascade,
741 } => {
742 assert_eq!(name.to_string(), "live_stats");
743 assert!(if_exists);
744 assert!(cascade);
745 }
746 _ => panic!("Expected DropMaterializedView"),
747 }
748 }
749
750 #[test]
751 fn test_show_statement() {
752 let stmt = StreamingStatement::Show(ShowCommand::Sources);
753 match stmt {
754 StreamingStatement::Show(ShowCommand::Sources) => (),
755 _ => panic!("Expected Show(Sources)"),
756 }
757 }
758
759 #[test]
760 fn test_describe_statement() {
761 let stmt = StreamingStatement::Describe {
762 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
763 extended: true,
764 };
765 match stmt {
766 StreamingStatement::Describe { name, extended } => {
767 assert_eq!(name.to_string(), "events");
768 assert!(extended);
769 }
770 _ => panic!("Expected Describe"),
771 }
772 }
773
774 #[test]
775 fn test_explain_statement() {
776 let dialect = sqlparser::dialect::GenericDialect {};
778 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
779 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
780
781 let stmt = StreamingStatement::Explain {
782 statement: Box::new(inner),
783 analyze: false,
784 };
785 match stmt {
786 StreamingStatement::Explain { statement, .. } => {
787 assert!(matches!(*statement, StreamingStatement::Standard(_)));
788 }
789 _ => panic!("Expected Explain"),
790 }
791 }
792
793 #[test]
794 fn test_create_materialized_view_statement() {
795 let dialect = sqlparser::dialect::GenericDialect {};
797 let stmts =
798 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
799 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
800
801 let stmt = StreamingStatement::CreateMaterializedView {
802 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
803 query: Box::new(query),
804 emit_clause: Some(EmitClause::OnWindowClose),
805 or_replace: false,
806 if_not_exists: true,
807 };
808 match stmt {
809 StreamingStatement::CreateMaterializedView {
810 name,
811 emit_clause,
812 or_replace,
813 if_not_exists,
814 ..
815 } => {
816 assert_eq!(name.to_string(), "live_stats");
817 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
818 assert!(!or_replace);
819 assert!(if_not_exists);
820 }
821 _ => panic!("Expected CreateMaterializedView"),
822 }
823 }
824
825 #[test]
826 fn test_insert_into_statement() {
827 let stmt = StreamingStatement::InsertInto {
828 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
829 columns: vec![Ident::new("id"), Ident::new("name")],
830 values: vec![vec![
831 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
832 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
833 ]],
834 };
835 match stmt {
836 StreamingStatement::InsertInto {
837 table_name,
838 columns,
839 values,
840 } => {
841 assert_eq!(table_name.to_string(), "events");
842 assert_eq!(columns.len(), 2);
843 assert_eq!(values.len(), 1);
844 assert_eq!(values[0].len(), 2);
845 }
846 _ => panic!("Expected InsertInto"),
847 }
848 }
849
850 #[test]
851 fn test_eowc_requires_watermark_helper() {
852 assert!(EmitClause::OnWindowClose.requires_watermark());
854 assert!(EmitClause::Final.requires_watermark());
855 assert!(EmitClause::AfterWatermark.requires_watermark());
856
857 assert!(!EmitClause::OnUpdate.requires_watermark());
859 assert!(!EmitClause::Changes.requires_watermark());
860 let periodic = EmitClause::Periodically {
861 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
862 };
863 assert!(!periodic.requires_watermark());
864 }
865}