1use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17 Sources,
19 Sinks,
21 Queries,
23 MaterializedViews,
25 Streams,
27 Tables,
29 CheckpointStatus,
31 CreateSource {
33 name: ObjectName,
35 },
36 CreateSink {
38 name: ObjectName,
40 },
41}
42
43#[derive(Debug, Clone, PartialEq)]
45pub enum StreamingStatement {
46 Standard(Box<sqlparser::ast::Statement>),
48
49 CreateSource(Box<CreateSourceStatement>),
51
52 CreateSink(Box<CreateSinkStatement>),
54
55 CreateContinuousQuery {
57 name: ObjectName,
59 query: Box<StreamingStatement>,
61 emit_clause: Option<EmitClause>,
63 },
64
65 DropSource {
67 name: ObjectName,
69 if_exists: bool,
71 cascade: bool,
73 },
74
75 DropSink {
77 name: ObjectName,
79 if_exists: bool,
81 cascade: bool,
83 },
84
85 DropMaterializedView {
87 name: ObjectName,
89 if_exists: bool,
91 cascade: bool,
93 },
94
95 Show(ShowCommand),
97
98 Describe {
100 name: ObjectName,
102 extended: bool,
104 },
105
106 Explain {
108 statement: Box<StreamingStatement>,
110 analyze: bool,
112 },
113
114 CreateMaterializedView {
116 name: ObjectName,
118 query: Box<StreamingStatement>,
120 emit_clause: Option<EmitClause>,
122 or_replace: bool,
124 if_not_exists: bool,
126 },
127
128 CreateStream {
130 name: ObjectName,
132 query: Box<StreamingStatement>,
134 emit_clause: Option<EmitClause>,
136 or_replace: bool,
138 if_not_exists: bool,
140 },
141
142 DropStream {
144 name: ObjectName,
146 if_exists: bool,
148 cascade: bool,
150 },
151
152 AlterSource {
154 name: ObjectName,
156 operation: AlterSourceOperation,
158 },
159
160 InsertInto {
162 table_name: ObjectName,
164 columns: Vec<Ident>,
166 values: Vec<Vec<Expr>>,
168 },
169
170 CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
172
173 DropLookupTable {
175 name: ObjectName,
177 if_exists: bool,
179 },
180
181 Checkpoint,
183
184 RestoreCheckpoint {
186 checkpoint_id: u64,
188 },
189}
190
191#[derive(Debug, Clone, PartialEq)]
193pub enum AlterSourceOperation {
194 AddColumn {
196 column_def: ColumnDef,
198 },
199 SetProperties {
201 properties: HashMap<String, String>,
203 },
204}
205
206#[derive(Debug, Clone, PartialEq)]
208pub struct FormatSpec {
209 pub format_type: String,
211 pub options: HashMap<String, String>,
213}
214
215#[derive(Debug, Clone, PartialEq)]
217pub struct CreateSourceStatement {
218 pub name: ObjectName,
220 pub columns: Vec<ColumnDef>,
222 pub watermark: Option<WatermarkDef>,
224 pub with_options: HashMap<String, String>,
226 pub or_replace: bool,
228 pub if_not_exists: bool,
230 pub connector_type: Option<String>,
232 pub connector_options: HashMap<String, String>,
234 pub format: Option<FormatSpec>,
236 pub has_wildcard: bool,
238 pub wildcard_prefix: Option<String>,
240}
241
242#[derive(Debug, Clone, PartialEq)]
244pub struct CreateSinkStatement {
245 pub name: ObjectName,
247 pub from: SinkFrom,
249 pub with_options: HashMap<String, String>,
251 pub or_replace: bool,
253 pub if_not_exists: bool,
255 pub filter: Option<Expr>,
257 pub connector_type: Option<String>,
259 pub connector_options: HashMap<String, String>,
261 pub format: Option<FormatSpec>,
263 pub output_options: HashMap<String, String>,
265}
266
267#[derive(Debug, Clone, PartialEq)]
269pub enum SinkFrom {
270 Table(ObjectName),
272 Query(Box<StreamingStatement>),
274}
275
276#[derive(Debug, Clone, PartialEq)]
278pub struct WatermarkDef {
279 pub column: Ident,
281 pub expression: Option<Expr>,
285}
286
287#[derive(Debug, Clone, PartialEq, Default)]
293pub struct LateDataClause {
294 pub allowed_lateness: Option<Box<Expr>>,
296 pub side_output: Option<String>,
298}
299
300impl LateDataClause {
301 #[must_use]
303 pub fn with_allowed_lateness(lateness: Expr) -> Self {
304 Self {
305 allowed_lateness: Some(Box::new(lateness)),
306 side_output: None,
307 }
308 }
309
310 #[must_use]
312 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
313 Self {
314 allowed_lateness: Some(Box::new(lateness)),
315 side_output: Some(side_output),
316 }
317 }
318
319 #[must_use]
321 pub fn side_output_only(side_output: String) -> Self {
322 Self {
323 allowed_lateness: None,
324 side_output: Some(side_output),
325 }
326 }
327
328 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
334 match &self.allowed_lateness {
335 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
336 None => Ok(Duration::ZERO),
337 }
338 }
339
340 #[must_use]
342 pub fn has_side_output(&self) -> bool {
343 self.side_output.is_some()
344 }
345
346 #[must_use]
348 pub fn get_side_output(&self) -> Option<&str> {
349 self.side_output.as_deref()
350 }
351}
352
353#[derive(Debug, Clone, PartialEq)]
357pub enum EmitStrategy {
358 OnWatermark,
360 OnWindowClose,
362 Periodic(Duration),
364 OnUpdate,
366 Changelog,
368 FinalOnly,
370}
371
372#[derive(Debug, Clone, PartialEq)]
377pub enum EmitClause {
378 AfterWatermark,
384
385 OnWindowClose,
391
392 Periodically {
397 interval: Box<Expr>,
399 },
400
401 OnUpdate,
406
407 Changes,
421
422 Final,
428}
429
430impl std::fmt::Display for EmitClause {
431 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432 match self {
433 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
434 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
435 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
436 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
437 EmitClause::Changes => write!(f, "EMIT CHANGES"),
438 EmitClause::Final => write!(f, "EMIT FINAL"),
439 }
440 }
441}
442
443impl EmitClause {
444 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
450 match self {
451 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
452 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
453 EmitClause::Periodically { interval } => {
454 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
455 Ok(EmitStrategy::Periodic(duration))
456 }
457 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
458 EmitClause::Changes => Ok(EmitStrategy::Changelog),
459 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
460 }
461 }
462
463 #[must_use]
465 pub fn requires_changelog(&self) -> bool {
466 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
467 }
468
469 #[must_use]
471 pub fn is_append_only(&self) -> bool {
472 matches!(
473 self,
474 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
475 )
476 }
477
478 #[must_use]
484 pub fn requires_watermark(&self) -> bool {
485 matches!(
486 self,
487 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
488 )
489 }
490}
491
492#[derive(Debug, Clone, PartialEq)]
494pub enum WindowFunction {
495 Tumble {
497 time_column: Box<Expr>,
499 interval: Box<Expr>,
501 offset: Option<Box<Expr>>,
503 },
504 Hop {
506 time_column: Box<Expr>,
508 slide_interval: Box<Expr>,
510 window_interval: Box<Expr>,
512 offset: Option<Box<Expr>>,
514 },
515 Session {
517 time_column: Box<Expr>,
519 gap_interval: Box<Expr>,
521 },
522 Cumulate {
524 time_column: Box<Expr>,
526 step_interval: Box<Expr>,
528 max_size_interval: Box<Expr>,
530 },
531}
532
533#[cfg(test)]
534mod tests {
535 use super::*;
536 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
537
538 #[test]
539 fn test_create_source_statement() {
540 let stmt = CreateSourceStatement {
541 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
542 columns: vec![
543 ColumnDef {
544 name: Ident::new("id"),
545 data_type: DataType::BigInt(None),
546 options: vec![],
547 },
548 ColumnDef {
549 name: Ident::new("timestamp"),
550 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
551 options: vec![],
552 },
553 ],
554 watermark: Some(WatermarkDef {
555 column: Ident::new("timestamp"),
556 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
557 }),
558 with_options: HashMap::from([
559 ("connector".to_string(), "kafka".to_string()),
560 ("topic".to_string(), "events".to_string()),
561 ]),
562 or_replace: false,
563 if_not_exists: true,
564 connector_type: None,
565 connector_options: HashMap::new(),
566 format: None,
567 has_wildcard: false,
568 wildcard_prefix: None,
569 };
570
571 assert_eq!(stmt.columns.len(), 2);
573 assert!(stmt.watermark.is_some());
574 assert_eq!(
575 stmt.with_options.get("connector"),
576 Some(&"kafka".to_string())
577 );
578 }
579
580 #[test]
581 fn test_emit_clause_variants() {
582 let emit1 = EmitClause::AfterWatermark;
583 let emit2 = EmitClause::OnWindowClose;
584 let emit3 = EmitClause::Periodically {
585 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
586 };
587 let emit4 = EmitClause::OnUpdate;
588
589 match emit1 {
590 EmitClause::AfterWatermark => (),
591 _ => panic!("Expected AfterWatermark"),
592 }
593
594 match emit2 {
595 EmitClause::OnWindowClose => (),
596 _ => panic!("Expected OnWindowClose"),
597 }
598
599 match emit3 {
600 EmitClause::Periodically { .. } => (),
601 _ => panic!("Expected Periodically"),
602 }
603
604 match emit4 {
605 EmitClause::OnUpdate => (),
606 _ => panic!("Expected OnUpdate"),
607 }
608 }
609
610 #[test]
611 fn test_window_functions() {
612 let tumble = WindowFunction::Tumble {
613 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
614 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
615 offset: None,
616 };
617
618 let hop = WindowFunction::Hop {
619 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
620 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
621 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
622 offset: None,
623 };
624
625 match tumble {
626 WindowFunction::Tumble { .. } => (),
627 _ => panic!("Expected Tumble"),
628 }
629
630 match hop {
631 WindowFunction::Hop { .. } => (),
632 _ => panic!("Expected Hop"),
633 }
634 }
635
636 #[test]
637 fn test_late_data_clause_default() {
638 let clause = LateDataClause::default();
639 assert!(clause.allowed_lateness.is_none());
640 assert!(clause.side_output.is_none());
641 }
642
643 #[test]
644 fn test_late_data_clause_with_allowed_lateness() {
645 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
646 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
647 assert!(clause.allowed_lateness.is_some());
648 assert!(clause.side_output.is_none());
649 }
650
651 #[test]
652 fn test_late_data_clause_with_side_output() {
653 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
654 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
655 assert!(clause.allowed_lateness.is_some());
656 assert_eq!(clause.side_output, Some("late_events".to_string()));
657 }
658
659 #[test]
660 fn test_late_data_clause_side_output_only() {
661 let clause = LateDataClause::side_output_only("late_events".to_string());
662 assert!(clause.allowed_lateness.is_none());
663 assert_eq!(clause.side_output, Some("late_events".to_string()));
664 }
665
666 #[test]
667 fn test_show_command_variants() {
668 let sources = ShowCommand::Sources;
669 let sinks = ShowCommand::Sinks;
670 let queries = ShowCommand::Queries;
671 let mvs = ShowCommand::MaterializedViews;
672
673 assert_eq!(sources, ShowCommand::Sources);
674 assert_eq!(sinks, ShowCommand::Sinks);
675 assert_eq!(queries, ShowCommand::Queries);
676 assert_eq!(mvs, ShowCommand::MaterializedViews);
677 }
678
679 #[test]
680 fn test_show_command_clone() {
681 let cmd = ShowCommand::Sources;
682 let cloned = cmd.clone();
683 assert_eq!(cmd, cloned);
684 }
685
686 #[test]
687 fn test_drop_source_statement() {
688 let stmt = StreamingStatement::DropSource {
689 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
690 if_exists: true,
691 cascade: false,
692 };
693 match stmt {
694 StreamingStatement::DropSource {
695 name,
696 if_exists,
697 cascade,
698 } => {
699 assert_eq!(name.to_string(), "events");
700 assert!(if_exists);
701 assert!(!cascade);
702 }
703 _ => panic!("Expected DropSource"),
704 }
705 }
706
707 #[test]
708 fn test_drop_sink_statement() {
709 let stmt = StreamingStatement::DropSink {
710 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
711 if_exists: false,
712 cascade: false,
713 };
714 match stmt {
715 StreamingStatement::DropSink {
716 name,
717 if_exists,
718 cascade,
719 } => {
720 assert_eq!(name.to_string(), "output");
721 assert!(!if_exists);
722 assert!(!cascade);
723 }
724 _ => panic!("Expected DropSink"),
725 }
726 }
727
728 #[test]
729 fn test_drop_materialized_view_statement() {
730 let stmt = StreamingStatement::DropMaterializedView {
731 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
732 if_exists: true,
733 cascade: true,
734 };
735 match stmt {
736 StreamingStatement::DropMaterializedView {
737 name,
738 if_exists,
739 cascade,
740 } => {
741 assert_eq!(name.to_string(), "live_stats");
742 assert!(if_exists);
743 assert!(cascade);
744 }
745 _ => panic!("Expected DropMaterializedView"),
746 }
747 }
748
749 #[test]
750 fn test_show_statement() {
751 let stmt = StreamingStatement::Show(ShowCommand::Sources);
752 match stmt {
753 StreamingStatement::Show(ShowCommand::Sources) => (),
754 _ => panic!("Expected Show(Sources)"),
755 }
756 }
757
758 #[test]
759 fn test_describe_statement() {
760 let stmt = StreamingStatement::Describe {
761 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
762 extended: true,
763 };
764 match stmt {
765 StreamingStatement::Describe { name, extended } => {
766 assert_eq!(name.to_string(), "events");
767 assert!(extended);
768 }
769 _ => panic!("Expected Describe"),
770 }
771 }
772
773 #[test]
774 fn test_explain_statement() {
775 let dialect = sqlparser::dialect::GenericDialect {};
777 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
778 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
779
780 let stmt = StreamingStatement::Explain {
781 statement: Box::new(inner),
782 analyze: false,
783 };
784 match stmt {
785 StreamingStatement::Explain { statement, .. } => {
786 assert!(matches!(*statement, StreamingStatement::Standard(_)));
787 }
788 _ => panic!("Expected Explain"),
789 }
790 }
791
792 #[test]
793 fn test_create_materialized_view_statement() {
794 let dialect = sqlparser::dialect::GenericDialect {};
796 let stmts =
797 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
798 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
799
800 let stmt = StreamingStatement::CreateMaterializedView {
801 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
802 query: Box::new(query),
803 emit_clause: Some(EmitClause::OnWindowClose),
804 or_replace: false,
805 if_not_exists: true,
806 };
807 match stmt {
808 StreamingStatement::CreateMaterializedView {
809 name,
810 emit_clause,
811 or_replace,
812 if_not_exists,
813 ..
814 } => {
815 assert_eq!(name.to_string(), "live_stats");
816 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
817 assert!(!or_replace);
818 assert!(if_not_exists);
819 }
820 _ => panic!("Expected CreateMaterializedView"),
821 }
822 }
823
824 #[test]
825 fn test_insert_into_statement() {
826 let stmt = StreamingStatement::InsertInto {
827 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
828 columns: vec![Ident::new("id"), Ident::new("name")],
829 values: vec![vec![
830 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
831 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
832 ]],
833 };
834 match stmt {
835 StreamingStatement::InsertInto {
836 table_name,
837 columns,
838 values,
839 } => {
840 assert_eq!(table_name.to_string(), "events");
841 assert_eq!(columns.len(), 2);
842 assert_eq!(values.len(), 1);
843 assert_eq!(values[0].len(), 2);
844 }
845 _ => panic!("Expected InsertInto"),
846 }
847 }
848
849 #[test]
850 fn test_eowc_requires_watermark_helper() {
851 assert!(EmitClause::OnWindowClose.requires_watermark());
853 assert!(EmitClause::Final.requires_watermark());
854 assert!(EmitClause::AfterWatermark.requires_watermark());
855
856 assert!(!EmitClause::OnUpdate.requires_watermark());
858 assert!(!EmitClause::Changes.requires_watermark());
859 let periodic = EmitClause::Periodically {
860 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
861 };
862 assert!(!periodic.requires_watermark());
863 }
864}