1use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17 Sources,
19 Sinks,
21 Queries,
23 MaterializedViews,
25 Streams,
27 Tables,
29}
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum StreamingStatement {
34 Standard(Box<sqlparser::ast::Statement>),
36
37 CreateSource(Box<CreateSourceStatement>),
39
40 CreateSink(Box<CreateSinkStatement>),
42
43 CreateContinuousQuery {
45 name: ObjectName,
47 query: Box<StreamingStatement>,
49 emit_clause: Option<EmitClause>,
51 },
52
53 DropSource {
55 name: ObjectName,
57 if_exists: bool,
59 cascade: bool,
61 },
62
63 DropSink {
65 name: ObjectName,
67 if_exists: bool,
69 cascade: bool,
71 },
72
73 DropMaterializedView {
75 name: ObjectName,
77 if_exists: bool,
79 cascade: bool,
81 },
82
83 Show(ShowCommand),
85
86 Describe {
88 name: ObjectName,
90 extended: bool,
92 },
93
94 Explain {
96 statement: Box<StreamingStatement>,
98 },
99
100 CreateMaterializedView {
102 name: ObjectName,
104 query: Box<StreamingStatement>,
106 emit_clause: Option<EmitClause>,
108 or_replace: bool,
110 if_not_exists: bool,
112 },
113
114 CreateStream {
116 name: ObjectName,
118 query: Box<StreamingStatement>,
120 emit_clause: Option<EmitClause>,
122 or_replace: bool,
124 if_not_exists: bool,
126 },
127
128 DropStream {
130 name: ObjectName,
132 if_exists: bool,
134 cascade: bool,
136 },
137
138 AlterSource {
140 name: ObjectName,
142 operation: AlterSourceOperation,
144 },
145
146 InsertInto {
148 table_name: ObjectName,
150 columns: Vec<Ident>,
152 values: Vec<Vec<Expr>>,
154 },
155
156 CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
158
159 DropLookupTable {
161 name: ObjectName,
163 if_exists: bool,
165 },
166}
167
168#[derive(Debug, Clone, PartialEq)]
170pub enum AlterSourceOperation {
171 AddColumn {
173 column_def: ColumnDef,
175 },
176 SetProperties {
178 properties: HashMap<String, String>,
180 },
181}
182
183#[derive(Debug, Clone, PartialEq)]
185pub struct FormatSpec {
186 pub format_type: String,
188 pub options: HashMap<String, String>,
190}
191
192#[derive(Debug, Clone, PartialEq)]
194pub struct CreateSourceStatement {
195 pub name: ObjectName,
197 pub columns: Vec<ColumnDef>,
199 pub watermark: Option<WatermarkDef>,
201 pub with_options: HashMap<String, String>,
203 pub or_replace: bool,
205 pub if_not_exists: bool,
207 pub connector_type: Option<String>,
209 pub connector_options: HashMap<String, String>,
211 pub format: Option<FormatSpec>,
213 pub has_wildcard: bool,
215 pub wildcard_prefix: Option<String>,
217}
218
219#[derive(Debug, Clone, PartialEq)]
221pub struct CreateSinkStatement {
222 pub name: ObjectName,
224 pub from: SinkFrom,
226 pub with_options: HashMap<String, String>,
228 pub or_replace: bool,
230 pub if_not_exists: bool,
232 pub filter: Option<Expr>,
234 pub connector_type: Option<String>,
236 pub connector_options: HashMap<String, String>,
238 pub format: Option<FormatSpec>,
240 pub output_options: HashMap<String, String>,
242}
243
244#[derive(Debug, Clone, PartialEq)]
246pub enum SinkFrom {
247 Table(ObjectName),
249 Query(Box<StreamingStatement>),
251}
252
253#[derive(Debug, Clone, PartialEq)]
255pub struct WatermarkDef {
256 pub column: Ident,
258 pub expression: Option<Expr>,
262}
263
264#[derive(Debug, Clone, PartialEq, Default)]
270pub struct LateDataClause {
271 pub allowed_lateness: Option<Box<Expr>>,
273 pub side_output: Option<String>,
275}
276
277impl LateDataClause {
278 #[must_use]
280 pub fn with_allowed_lateness(lateness: Expr) -> Self {
281 Self {
282 allowed_lateness: Some(Box::new(lateness)),
283 side_output: None,
284 }
285 }
286
287 #[must_use]
289 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
290 Self {
291 allowed_lateness: Some(Box::new(lateness)),
292 side_output: Some(side_output),
293 }
294 }
295
296 #[must_use]
298 pub fn side_output_only(side_output: String) -> Self {
299 Self {
300 allowed_lateness: None,
301 side_output: Some(side_output),
302 }
303 }
304
305 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
311 match &self.allowed_lateness {
312 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
313 None => Ok(Duration::ZERO),
314 }
315 }
316
317 #[must_use]
319 pub fn has_side_output(&self) -> bool {
320 self.side_output.is_some()
321 }
322
323 #[must_use]
325 pub fn get_side_output(&self) -> Option<&str> {
326 self.side_output.as_deref()
327 }
328}
329
330#[derive(Debug, Clone, PartialEq)]
334pub enum EmitStrategy {
335 OnWatermark,
337 OnWindowClose,
339 Periodic(Duration),
341 OnUpdate,
343 Changelog,
345 FinalOnly,
347}
348
349#[derive(Debug, Clone, PartialEq)]
354pub enum EmitClause {
355 AfterWatermark,
361
362 OnWindowClose,
368
369 Periodically {
374 interval: Box<Expr>,
376 },
377
378 OnUpdate,
383
384 Changes,
398
399 Final,
405}
406
407impl std::fmt::Display for EmitClause {
408 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409 match self {
410 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
411 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
412 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
413 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
414 EmitClause::Changes => write!(f, "EMIT CHANGES"),
415 EmitClause::Final => write!(f, "EMIT FINAL"),
416 }
417 }
418}
419
420impl EmitClause {
421 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
427 match self {
428 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
429 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
430 EmitClause::Periodically { interval } => {
431 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
432 Ok(EmitStrategy::Periodic(duration))
433 }
434 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
435 EmitClause::Changes => Ok(EmitStrategy::Changelog),
436 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
437 }
438 }
439
440 #[must_use]
442 pub fn requires_changelog(&self) -> bool {
443 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
444 }
445
446 #[must_use]
448 pub fn is_append_only(&self) -> bool {
449 matches!(
450 self,
451 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
452 )
453 }
454
455 #[must_use]
461 pub fn requires_watermark(&self) -> bool {
462 matches!(
463 self,
464 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
465 )
466 }
467}
468
469#[derive(Debug, Clone, PartialEq)]
471pub enum WindowFunction {
472 Tumble {
474 time_column: Box<Expr>,
476 interval: Box<Expr>,
478 },
479 Hop {
481 time_column: Box<Expr>,
483 slide_interval: Box<Expr>,
485 window_interval: Box<Expr>,
487 },
488 Session {
490 time_column: Box<Expr>,
492 gap_interval: Box<Expr>,
494 },
495 Cumulate {
497 time_column: Box<Expr>,
499 step_interval: Box<Expr>,
501 max_size_interval: Box<Expr>,
503 },
504}
505
506#[cfg(test)]
507mod tests {
508 use super::*;
509 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
510
511 #[test]
512 fn test_create_source_statement() {
513 let stmt = CreateSourceStatement {
514 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
515 columns: vec![
516 ColumnDef {
517 name: Ident::new("id"),
518 data_type: DataType::BigInt(None),
519 options: vec![],
520 },
521 ColumnDef {
522 name: Ident::new("timestamp"),
523 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
524 options: vec![],
525 },
526 ],
527 watermark: Some(WatermarkDef {
528 column: Ident::new("timestamp"),
529 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
530 }),
531 with_options: HashMap::from([
532 ("connector".to_string(), "kafka".to_string()),
533 ("topic".to_string(), "events".to_string()),
534 ]),
535 or_replace: false,
536 if_not_exists: true,
537 connector_type: None,
538 connector_options: HashMap::new(),
539 format: None,
540 has_wildcard: false,
541 wildcard_prefix: None,
542 };
543
544 assert_eq!(stmt.columns.len(), 2);
546 assert!(stmt.watermark.is_some());
547 assert_eq!(
548 stmt.with_options.get("connector"),
549 Some(&"kafka".to_string())
550 );
551 }
552
553 #[test]
554 fn test_emit_clause_variants() {
555 let emit1 = EmitClause::AfterWatermark;
556 let emit2 = EmitClause::OnWindowClose;
557 let emit3 = EmitClause::Periodically {
558 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
559 };
560 let emit4 = EmitClause::OnUpdate;
561
562 match emit1 {
563 EmitClause::AfterWatermark => (),
564 _ => panic!("Expected AfterWatermark"),
565 }
566
567 match emit2 {
568 EmitClause::OnWindowClose => (),
569 _ => panic!("Expected OnWindowClose"),
570 }
571
572 match emit3 {
573 EmitClause::Periodically { .. } => (),
574 _ => panic!("Expected Periodically"),
575 }
576
577 match emit4 {
578 EmitClause::OnUpdate => (),
579 _ => panic!("Expected OnUpdate"),
580 }
581 }
582
583 #[test]
584 fn test_window_functions() {
585 let tumble = WindowFunction::Tumble {
586 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
587 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
588 };
589
590 let hop = WindowFunction::Hop {
591 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
592 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
593 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
594 };
595
596 match tumble {
597 WindowFunction::Tumble { .. } => (),
598 _ => panic!("Expected Tumble"),
599 }
600
601 match hop {
602 WindowFunction::Hop { .. } => (),
603 _ => panic!("Expected Hop"),
604 }
605 }
606
607 #[test]
608 fn test_late_data_clause_default() {
609 let clause = LateDataClause::default();
610 assert!(clause.allowed_lateness.is_none());
611 assert!(clause.side_output.is_none());
612 }
613
614 #[test]
615 fn test_late_data_clause_with_allowed_lateness() {
616 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
617 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
618 assert!(clause.allowed_lateness.is_some());
619 assert!(clause.side_output.is_none());
620 }
621
622 #[test]
623 fn test_late_data_clause_with_side_output() {
624 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
625 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
626 assert!(clause.allowed_lateness.is_some());
627 assert_eq!(clause.side_output, Some("late_events".to_string()));
628 }
629
630 #[test]
631 fn test_late_data_clause_side_output_only() {
632 let clause = LateDataClause::side_output_only("late_events".to_string());
633 assert!(clause.allowed_lateness.is_none());
634 assert_eq!(clause.side_output, Some("late_events".to_string()));
635 }
636
637 #[test]
638 fn test_show_command_variants() {
639 let sources = ShowCommand::Sources;
640 let sinks = ShowCommand::Sinks;
641 let queries = ShowCommand::Queries;
642 let mvs = ShowCommand::MaterializedViews;
643
644 assert_eq!(sources, ShowCommand::Sources);
645 assert_eq!(sinks, ShowCommand::Sinks);
646 assert_eq!(queries, ShowCommand::Queries);
647 assert_eq!(mvs, ShowCommand::MaterializedViews);
648 }
649
650 #[test]
651 fn test_show_command_clone() {
652 let cmd = ShowCommand::Sources;
653 let cloned = cmd.clone();
654 assert_eq!(cmd, cloned);
655 }
656
657 #[test]
658 fn test_drop_source_statement() {
659 let stmt = StreamingStatement::DropSource {
660 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
661 if_exists: true,
662 cascade: false,
663 };
664 match stmt {
665 StreamingStatement::DropSource {
666 name,
667 if_exists,
668 cascade,
669 } => {
670 assert_eq!(name.to_string(), "events");
671 assert!(if_exists);
672 assert!(!cascade);
673 }
674 _ => panic!("Expected DropSource"),
675 }
676 }
677
678 #[test]
679 fn test_drop_sink_statement() {
680 let stmt = StreamingStatement::DropSink {
681 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
682 if_exists: false,
683 cascade: false,
684 };
685 match stmt {
686 StreamingStatement::DropSink {
687 name,
688 if_exists,
689 cascade,
690 } => {
691 assert_eq!(name.to_string(), "output");
692 assert!(!if_exists);
693 assert!(!cascade);
694 }
695 _ => panic!("Expected DropSink"),
696 }
697 }
698
699 #[test]
700 fn test_drop_materialized_view_statement() {
701 let stmt = StreamingStatement::DropMaterializedView {
702 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
703 if_exists: true,
704 cascade: true,
705 };
706 match stmt {
707 StreamingStatement::DropMaterializedView {
708 name,
709 if_exists,
710 cascade,
711 } => {
712 assert_eq!(name.to_string(), "live_stats");
713 assert!(if_exists);
714 assert!(cascade);
715 }
716 _ => panic!("Expected DropMaterializedView"),
717 }
718 }
719
720 #[test]
721 fn test_show_statement() {
722 let stmt = StreamingStatement::Show(ShowCommand::Sources);
723 match stmt {
724 StreamingStatement::Show(ShowCommand::Sources) => (),
725 _ => panic!("Expected Show(Sources)"),
726 }
727 }
728
729 #[test]
730 fn test_describe_statement() {
731 let stmt = StreamingStatement::Describe {
732 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
733 extended: true,
734 };
735 match stmt {
736 StreamingStatement::Describe { name, extended } => {
737 assert_eq!(name.to_string(), "events");
738 assert!(extended);
739 }
740 _ => panic!("Expected Describe"),
741 }
742 }
743
744 #[test]
745 fn test_explain_statement() {
746 let dialect = sqlparser::dialect::GenericDialect {};
748 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
749 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
750
751 let stmt = StreamingStatement::Explain {
752 statement: Box::new(inner),
753 };
754 match stmt {
755 StreamingStatement::Explain { statement } => {
756 assert!(matches!(*statement, StreamingStatement::Standard(_)));
757 }
758 _ => panic!("Expected Explain"),
759 }
760 }
761
762 #[test]
763 fn test_create_materialized_view_statement() {
764 let dialect = sqlparser::dialect::GenericDialect {};
766 let stmts =
767 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
768 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
769
770 let stmt = StreamingStatement::CreateMaterializedView {
771 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
772 query: Box::new(query),
773 emit_clause: Some(EmitClause::OnWindowClose),
774 or_replace: false,
775 if_not_exists: true,
776 };
777 match stmt {
778 StreamingStatement::CreateMaterializedView {
779 name,
780 emit_clause,
781 or_replace,
782 if_not_exists,
783 ..
784 } => {
785 assert_eq!(name.to_string(), "live_stats");
786 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
787 assert!(!or_replace);
788 assert!(if_not_exists);
789 }
790 _ => panic!("Expected CreateMaterializedView"),
791 }
792 }
793
794 #[test]
795 fn test_insert_into_statement() {
796 let stmt = StreamingStatement::InsertInto {
797 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
798 columns: vec![Ident::new("id"), Ident::new("name")],
799 values: vec![vec![
800 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
801 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
802 ]],
803 };
804 match stmt {
805 StreamingStatement::InsertInto {
806 table_name,
807 columns,
808 values,
809 } => {
810 assert_eq!(table_name.to_string(), "events");
811 assert_eq!(columns.len(), 2);
812 assert_eq!(values.len(), 1);
813 assert_eq!(values[0].len(), 2);
814 }
815 _ => panic!("Expected InsertInto"),
816 }
817 }
818
819 #[test]
820 fn test_eowc_requires_watermark_helper() {
821 assert!(EmitClause::OnWindowClose.requires_watermark());
823 assert!(EmitClause::Final.requires_watermark());
824 assert!(EmitClause::AfterWatermark.requires_watermark());
825
826 assert!(!EmitClause::OnUpdate.requires_watermark());
828 assert!(!EmitClause::Changes.requires_watermark());
829 let periodic = EmitClause::Periodically {
830 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
831 };
832 assert!(!periodic.requires_watermark());
833 }
834}