1use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17 Sources,
19 Sinks,
21 Queries,
23 MaterializedViews,
25 Streams,
27 Tables,
29}
30
31#[derive(Debug, Clone, PartialEq)]
33pub enum StreamingStatement {
34 Standard(Box<sqlparser::ast::Statement>),
36
37 CreateSource(Box<CreateSourceStatement>),
39
40 CreateSink(Box<CreateSinkStatement>),
42
43 CreateContinuousQuery {
45 name: ObjectName,
47 query: Box<StreamingStatement>,
49 emit_clause: Option<EmitClause>,
51 },
52
53 DropSource {
55 name: ObjectName,
57 if_exists: bool,
59 },
60
61 DropSink {
63 name: ObjectName,
65 if_exists: bool,
67 },
68
69 DropMaterializedView {
71 name: ObjectName,
73 if_exists: bool,
75 cascade: bool,
77 },
78
79 Show(ShowCommand),
81
82 Describe {
84 name: ObjectName,
86 extended: bool,
88 },
89
90 Explain {
92 statement: Box<StreamingStatement>,
94 },
95
96 CreateMaterializedView {
98 name: ObjectName,
100 query: Box<StreamingStatement>,
102 emit_clause: Option<EmitClause>,
104 or_replace: bool,
106 if_not_exists: bool,
108 },
109
110 CreateStream {
112 name: ObjectName,
114 query: Box<StreamingStatement>,
116 emit_clause: Option<EmitClause>,
118 or_replace: bool,
120 if_not_exists: bool,
122 },
123
124 DropStream {
126 name: ObjectName,
128 if_exists: bool,
130 },
131
132 InsertInto {
134 table_name: ObjectName,
136 columns: Vec<Ident>,
138 values: Vec<Vec<Expr>>,
140 },
141}
142
143#[derive(Debug, Clone, PartialEq)]
145pub struct FormatSpec {
146 pub format_type: String,
148 pub options: HashMap<String, String>,
150}
151
152#[derive(Debug, Clone, PartialEq)]
154pub struct CreateSourceStatement {
155 pub name: ObjectName,
157 pub columns: Vec<ColumnDef>,
159 pub watermark: Option<WatermarkDef>,
161 pub with_options: HashMap<String, String>,
163 pub or_replace: bool,
165 pub if_not_exists: bool,
167 pub connector_type: Option<String>,
169 pub connector_options: HashMap<String, String>,
171 pub format: Option<FormatSpec>,
173}
174
175#[derive(Debug, Clone, PartialEq)]
177pub struct CreateSinkStatement {
178 pub name: ObjectName,
180 pub from: SinkFrom,
182 pub with_options: HashMap<String, String>,
184 pub or_replace: bool,
186 pub if_not_exists: bool,
188 pub filter: Option<Expr>,
190 pub connector_type: Option<String>,
192 pub connector_options: HashMap<String, String>,
194 pub format: Option<FormatSpec>,
196 pub output_options: HashMap<String, String>,
198}
199
200#[derive(Debug, Clone, PartialEq)]
202pub enum SinkFrom {
203 Table(ObjectName),
205 Query(Box<StreamingStatement>),
207}
208
209#[derive(Debug, Clone, PartialEq)]
211pub struct WatermarkDef {
212 pub column: Ident,
214 pub expression: Option<Expr>,
218}
219
220#[derive(Debug, Clone, PartialEq, Default)]
226pub struct LateDataClause {
227 pub allowed_lateness: Option<Box<Expr>>,
229 pub side_output: Option<String>,
231}
232
233impl LateDataClause {
234 #[must_use]
236 pub fn with_allowed_lateness(lateness: Expr) -> Self {
237 Self {
238 allowed_lateness: Some(Box::new(lateness)),
239 side_output: None,
240 }
241 }
242
243 #[must_use]
245 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
246 Self {
247 allowed_lateness: Some(Box::new(lateness)),
248 side_output: Some(side_output),
249 }
250 }
251
252 #[must_use]
254 pub fn side_output_only(side_output: String) -> Self {
255 Self {
256 allowed_lateness: None,
257 side_output: Some(side_output),
258 }
259 }
260
261 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
267 match &self.allowed_lateness {
268 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
269 None => Ok(Duration::ZERO),
270 }
271 }
272
273 #[must_use]
275 pub fn has_side_output(&self) -> bool {
276 self.side_output.is_some()
277 }
278
279 #[must_use]
281 pub fn get_side_output(&self) -> Option<&str> {
282 self.side_output.as_deref()
283 }
284}
285
286#[derive(Debug, Clone, PartialEq)]
290pub enum EmitStrategy {
291 OnWatermark,
293 OnWindowClose,
295 Periodic(Duration),
297 OnUpdate,
299 Changelog,
301 FinalOnly,
303}
304
305#[derive(Debug, Clone, PartialEq)]
310pub enum EmitClause {
311 AfterWatermark,
317
318 OnWindowClose,
324
325 Periodically {
330 interval: Box<Expr>,
332 },
333
334 OnUpdate,
339
340 Changes,
354
355 Final,
361}
362
363impl std::fmt::Display for EmitClause {
364 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365 match self {
366 EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
367 EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
368 EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
369 EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
370 EmitClause::Changes => write!(f, "EMIT CHANGES"),
371 EmitClause::Final => write!(f, "EMIT FINAL"),
372 }
373 }
374}
375
376impl EmitClause {
377 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
383 match self {
384 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
385 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
386 EmitClause::Periodically { interval } => {
387 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
388 Ok(EmitStrategy::Periodic(duration))
389 }
390 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
391 EmitClause::Changes => Ok(EmitStrategy::Changelog),
392 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
393 }
394 }
395
396 #[must_use]
398 pub fn requires_changelog(&self) -> bool {
399 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
400 }
401
402 #[must_use]
404 pub fn is_append_only(&self) -> bool {
405 matches!(
406 self,
407 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
408 )
409 }
410
411 #[must_use]
417 pub fn requires_watermark(&self) -> bool {
418 matches!(
419 self,
420 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
421 )
422 }
423}
424
425#[derive(Debug, Clone, PartialEq)]
427pub enum WindowFunction {
428 Tumble {
430 time_column: Box<Expr>,
432 interval: Box<Expr>,
434 },
435 Hop {
437 time_column: Box<Expr>,
439 slide_interval: Box<Expr>,
441 window_interval: Box<Expr>,
443 },
444 Session {
446 time_column: Box<Expr>,
448 gap_interval: Box<Expr>,
450 },
451}
452
453#[cfg(test)]
454mod tests {
455 use super::*;
456 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
457
458 #[test]
459 fn test_create_source_statement() {
460 let stmt = CreateSourceStatement {
461 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
462 columns: vec![
463 ColumnDef {
464 name: Ident::new("id"),
465 data_type: DataType::BigInt(None),
466 options: vec![],
467 },
468 ColumnDef {
469 name: Ident::new("timestamp"),
470 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
471 options: vec![],
472 },
473 ],
474 watermark: Some(WatermarkDef {
475 column: Ident::new("timestamp"),
476 expression: Some(Expr::Identifier(Ident::new("timestamp"))),
477 }),
478 with_options: HashMap::from([
479 ("connector".to_string(), "kafka".to_string()),
480 ("topic".to_string(), "events".to_string()),
481 ]),
482 or_replace: false,
483 if_not_exists: true,
484 connector_type: None,
485 connector_options: HashMap::new(),
486 format: None,
487 };
488
489 assert_eq!(stmt.columns.len(), 2);
491 assert!(stmt.watermark.is_some());
492 assert_eq!(
493 stmt.with_options.get("connector"),
494 Some(&"kafka".to_string())
495 );
496 }
497
498 #[test]
499 fn test_emit_clause_variants() {
500 let emit1 = EmitClause::AfterWatermark;
501 let emit2 = EmitClause::OnWindowClose;
502 let emit3 = EmitClause::Periodically {
503 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
504 };
505 let emit4 = EmitClause::OnUpdate;
506
507 match emit1 {
508 EmitClause::AfterWatermark => (),
509 _ => panic!("Expected AfterWatermark"),
510 }
511
512 match emit2 {
513 EmitClause::OnWindowClose => (),
514 _ => panic!("Expected OnWindowClose"),
515 }
516
517 match emit3 {
518 EmitClause::Periodically { .. } => (),
519 _ => panic!("Expected Periodically"),
520 }
521
522 match emit4 {
523 EmitClause::OnUpdate => (),
524 _ => panic!("Expected OnUpdate"),
525 }
526 }
527
528 #[test]
529 fn test_window_functions() {
530 let tumble = WindowFunction::Tumble {
531 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
532 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
533 };
534
535 let hop = WindowFunction::Hop {
536 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
537 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
538 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
539 };
540
541 match tumble {
542 WindowFunction::Tumble { .. } => (),
543 _ => panic!("Expected Tumble"),
544 }
545
546 match hop {
547 WindowFunction::Hop { .. } => (),
548 _ => panic!("Expected Hop"),
549 }
550 }
551
552 #[test]
553 fn test_late_data_clause_default() {
554 let clause = LateDataClause::default();
555 assert!(clause.allowed_lateness.is_none());
556 assert!(clause.side_output.is_none());
557 }
558
559 #[test]
560 fn test_late_data_clause_with_allowed_lateness() {
561 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
562 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
563 assert!(clause.allowed_lateness.is_some());
564 assert!(clause.side_output.is_none());
565 }
566
567 #[test]
568 fn test_late_data_clause_with_side_output() {
569 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
570 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
571 assert!(clause.allowed_lateness.is_some());
572 assert_eq!(clause.side_output, Some("late_events".to_string()));
573 }
574
575 #[test]
576 fn test_late_data_clause_side_output_only() {
577 let clause = LateDataClause::side_output_only("late_events".to_string());
578 assert!(clause.allowed_lateness.is_none());
579 assert_eq!(clause.side_output, Some("late_events".to_string()));
580 }
581
582 #[test]
583 fn test_show_command_variants() {
584 let sources = ShowCommand::Sources;
585 let sinks = ShowCommand::Sinks;
586 let queries = ShowCommand::Queries;
587 let mvs = ShowCommand::MaterializedViews;
588
589 assert_eq!(sources, ShowCommand::Sources);
590 assert_eq!(sinks, ShowCommand::Sinks);
591 assert_eq!(queries, ShowCommand::Queries);
592 assert_eq!(mvs, ShowCommand::MaterializedViews);
593 }
594
595 #[test]
596 fn test_show_command_clone() {
597 let cmd = ShowCommand::Sources;
598 let cloned = cmd.clone();
599 assert_eq!(cmd, cloned);
600 }
601
602 #[test]
603 fn test_drop_source_statement() {
604 let stmt = StreamingStatement::DropSource {
605 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
606 if_exists: true,
607 };
608 match stmt {
609 StreamingStatement::DropSource { name, if_exists } => {
610 assert_eq!(name.to_string(), "events");
611 assert!(if_exists);
612 }
613 _ => panic!("Expected DropSource"),
614 }
615 }
616
617 #[test]
618 fn test_drop_sink_statement() {
619 let stmt = StreamingStatement::DropSink {
620 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
621 if_exists: false,
622 };
623 match stmt {
624 StreamingStatement::DropSink { name, if_exists } => {
625 assert_eq!(name.to_string(), "output");
626 assert!(!if_exists);
627 }
628 _ => panic!("Expected DropSink"),
629 }
630 }
631
632 #[test]
633 fn test_drop_materialized_view_statement() {
634 let stmt = StreamingStatement::DropMaterializedView {
635 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
636 if_exists: true,
637 cascade: true,
638 };
639 match stmt {
640 StreamingStatement::DropMaterializedView {
641 name,
642 if_exists,
643 cascade,
644 } => {
645 assert_eq!(name.to_string(), "live_stats");
646 assert!(if_exists);
647 assert!(cascade);
648 }
649 _ => panic!("Expected DropMaterializedView"),
650 }
651 }
652
653 #[test]
654 fn test_show_statement() {
655 let stmt = StreamingStatement::Show(ShowCommand::Sources);
656 match stmt {
657 StreamingStatement::Show(ShowCommand::Sources) => (),
658 _ => panic!("Expected Show(Sources)"),
659 }
660 }
661
662 #[test]
663 fn test_describe_statement() {
664 let stmt = StreamingStatement::Describe {
665 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
666 extended: true,
667 };
668 match stmt {
669 StreamingStatement::Describe { name, extended } => {
670 assert_eq!(name.to_string(), "events");
671 assert!(extended);
672 }
673 _ => panic!("Expected Describe"),
674 }
675 }
676
677 #[test]
678 fn test_explain_statement() {
679 let dialect = sqlparser::dialect::GenericDialect {};
681 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
682 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
683
684 let stmt = StreamingStatement::Explain {
685 statement: Box::new(inner),
686 };
687 match stmt {
688 StreamingStatement::Explain { statement } => {
689 assert!(matches!(*statement, StreamingStatement::Standard(_)));
690 }
691 _ => panic!("Expected Explain"),
692 }
693 }
694
695 #[test]
696 fn test_create_materialized_view_statement() {
697 let dialect = sqlparser::dialect::GenericDialect {};
699 let stmts =
700 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
701 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
702
703 let stmt = StreamingStatement::CreateMaterializedView {
704 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
705 query: Box::new(query),
706 emit_clause: Some(EmitClause::OnWindowClose),
707 or_replace: false,
708 if_not_exists: true,
709 };
710 match stmt {
711 StreamingStatement::CreateMaterializedView {
712 name,
713 emit_clause,
714 or_replace,
715 if_not_exists,
716 ..
717 } => {
718 assert_eq!(name.to_string(), "live_stats");
719 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
720 assert!(!or_replace);
721 assert!(if_not_exists);
722 }
723 _ => panic!("Expected CreateMaterializedView"),
724 }
725 }
726
727 #[test]
728 fn test_insert_into_statement() {
729 let stmt = StreamingStatement::InsertInto {
730 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
731 columns: vec![Ident::new("id"), Ident::new("name")],
732 values: vec![vec![
733 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
734 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
735 ]],
736 };
737 match stmt {
738 StreamingStatement::InsertInto {
739 table_name,
740 columns,
741 values,
742 } => {
743 assert_eq!(table_name.to_string(), "events");
744 assert_eq!(columns.len(), 2);
745 assert_eq!(values.len(), 1);
746 assert_eq!(values[0].len(), 2);
747 }
748 _ => panic!("Expected InsertInto"),
749 }
750 }
751
752 #[test]
753 fn test_eowc_requires_watermark_helper() {
754 assert!(EmitClause::OnWindowClose.requires_watermark());
756 assert!(EmitClause::Final.requires_watermark());
757 assert!(EmitClause::AfterWatermark.requires_watermark());
758
759 assert!(!EmitClause::OnUpdate.requires_watermark());
761 assert!(!EmitClause::Changes.requires_watermark());
762 let periodic = EmitClause::Periodically {
763 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
764 };
765 assert!(!periodic.requires_watermark());
766 }
767}