1use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17 Sources,
19 Sinks,
21 Queries,
23 MaterializedViews,
25 Streams,
27}
28
29#[derive(Debug, Clone, PartialEq)]
31pub enum StreamingStatement {
32 Standard(Box<sqlparser::ast::Statement>),
34
35 CreateSource(Box<CreateSourceStatement>),
37
38 CreateSink(Box<CreateSinkStatement>),
40
41 CreateContinuousQuery {
43 name: ObjectName,
45 query: Box<StreamingStatement>,
47 emit_clause: Option<EmitClause>,
49 },
50
51 DropSource {
53 name: ObjectName,
55 if_exists: bool,
57 },
58
59 DropSink {
61 name: ObjectName,
63 if_exists: bool,
65 },
66
67 DropMaterializedView {
69 name: ObjectName,
71 if_exists: bool,
73 cascade: bool,
75 },
76
77 Show(ShowCommand),
79
80 Describe {
82 name: ObjectName,
84 extended: bool,
86 },
87
88 Explain {
90 statement: Box<StreamingStatement>,
92 },
93
94 CreateMaterializedView {
96 name: ObjectName,
98 query: Box<StreamingStatement>,
100 emit_clause: Option<EmitClause>,
102 or_replace: bool,
104 if_not_exists: bool,
106 },
107
108 CreateStream {
110 name: ObjectName,
112 query: Box<StreamingStatement>,
114 emit_clause: Option<EmitClause>,
116 or_replace: bool,
118 if_not_exists: bool,
120 },
121
122 DropStream {
124 name: ObjectName,
126 if_exists: bool,
128 },
129
130 InsertInto {
132 table_name: ObjectName,
134 columns: Vec<Ident>,
136 values: Vec<Vec<Expr>>,
138 },
139}
140
141#[derive(Debug, Clone, PartialEq)]
143pub struct FormatSpec {
144 pub format_type: String,
146 pub options: HashMap<String, String>,
148}
149
150#[derive(Debug, Clone, PartialEq)]
152pub struct CreateSourceStatement {
153 pub name: ObjectName,
155 pub columns: Vec<ColumnDef>,
157 pub watermark: Option<WatermarkDef>,
159 pub with_options: HashMap<String, String>,
161 pub or_replace: bool,
163 pub if_not_exists: bool,
165 pub connector_type: Option<String>,
167 pub connector_options: HashMap<String, String>,
169 pub format: Option<FormatSpec>,
171}
172
173#[derive(Debug, Clone, PartialEq)]
175pub struct CreateSinkStatement {
176 pub name: ObjectName,
178 pub from: SinkFrom,
180 pub with_options: HashMap<String, String>,
182 pub or_replace: bool,
184 pub if_not_exists: bool,
186 pub filter: Option<Expr>,
188 pub connector_type: Option<String>,
190 pub connector_options: HashMap<String, String>,
192 pub format: Option<FormatSpec>,
194 pub output_options: HashMap<String, String>,
196}
197
198#[derive(Debug, Clone, PartialEq)]
200pub enum SinkFrom {
201 Table(ObjectName),
203 Query(Box<StreamingStatement>),
205}
206
207#[derive(Debug, Clone, PartialEq)]
209pub struct WatermarkDef {
210 pub column: Ident,
212 pub expression: Expr,
214}
215
216#[derive(Debug, Clone, PartialEq, Default)]
222pub struct LateDataClause {
223 pub allowed_lateness: Option<Box<Expr>>,
225 pub side_output: Option<String>,
227}
228
229impl LateDataClause {
230 #[must_use]
232 pub fn with_allowed_lateness(lateness: Expr) -> Self {
233 Self {
234 allowed_lateness: Some(Box::new(lateness)),
235 side_output: None,
236 }
237 }
238
239 #[must_use]
241 pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
242 Self {
243 allowed_lateness: Some(Box::new(lateness)),
244 side_output: Some(side_output),
245 }
246 }
247
248 #[must_use]
250 pub fn side_output_only(side_output: String) -> Self {
251 Self {
252 allowed_lateness: None,
253 side_output: Some(side_output),
254 }
255 }
256
257 pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
263 match &self.allowed_lateness {
264 Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
265 None => Ok(Duration::ZERO),
266 }
267 }
268
269 #[must_use]
271 pub fn has_side_output(&self) -> bool {
272 self.side_output.is_some()
273 }
274
275 #[must_use]
277 pub fn get_side_output(&self) -> Option<&str> {
278 self.side_output.as_deref()
279 }
280}
281
282#[derive(Debug, Clone, PartialEq)]
286pub enum EmitStrategy {
287 OnWatermark,
289 OnWindowClose,
291 Periodic(Duration),
293 OnUpdate,
295 Changelog,
297 FinalOnly,
299}
300
301#[derive(Debug, Clone, PartialEq)]
306pub enum EmitClause {
307 AfterWatermark,
313
314 OnWindowClose,
320
321 Periodically {
326 interval: Box<Expr>,
328 },
329
330 OnUpdate,
335
336 Changes,
350
351 Final,
357}
358
359impl EmitClause {
360 pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
366 match self {
367 EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
368 EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
369 EmitClause::Periodically { interval } => {
370 let duration = WindowRewriter::parse_interval_to_duration(interval)?;
371 Ok(EmitStrategy::Periodic(duration))
372 }
373 EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
374 EmitClause::Changes => Ok(EmitStrategy::Changelog),
375 EmitClause::Final => Ok(EmitStrategy::FinalOnly),
376 }
377 }
378
379 #[must_use]
381 pub fn requires_changelog(&self) -> bool {
382 matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
383 }
384
385 #[must_use]
387 pub fn is_append_only(&self) -> bool {
388 matches!(
389 self,
390 EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
391 )
392 }
393}
394
395#[derive(Debug, Clone, PartialEq)]
397pub enum WindowFunction {
398 Tumble {
400 time_column: Box<Expr>,
402 interval: Box<Expr>,
404 },
405 Hop {
407 time_column: Box<Expr>,
409 slide_interval: Box<Expr>,
411 window_interval: Box<Expr>,
413 },
414 Session {
416 time_column: Box<Expr>,
418 gap_interval: Box<Expr>,
420 },
421}
422
423#[cfg(test)]
424mod tests {
425 use super::*;
426 use sqlparser::ast::{DataType, Expr, ObjectNamePart};
427
428 #[test]
429 fn test_create_source_statement() {
430 let stmt = CreateSourceStatement {
431 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
432 columns: vec![
433 ColumnDef {
434 name: Ident::new("id"),
435 data_type: DataType::BigInt(None),
436 options: vec![],
437 },
438 ColumnDef {
439 name: Ident::new("timestamp"),
440 data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
441 options: vec![],
442 },
443 ],
444 watermark: Some(WatermarkDef {
445 column: Ident::new("timestamp"),
446 expression: Expr::Identifier(Ident::new("timestamp")),
447 }),
448 with_options: HashMap::from([
449 ("connector".to_string(), "kafka".to_string()),
450 ("topic".to_string(), "events".to_string()),
451 ]),
452 or_replace: false,
453 if_not_exists: true,
454 connector_type: None,
455 connector_options: HashMap::new(),
456 format: None,
457 };
458
459 assert_eq!(stmt.columns.len(), 2);
461 assert!(stmt.watermark.is_some());
462 assert_eq!(
463 stmt.with_options.get("connector"),
464 Some(&"kafka".to_string())
465 );
466 }
467
468 #[test]
469 fn test_emit_clause_variants() {
470 let emit1 = EmitClause::AfterWatermark;
471 let emit2 = EmitClause::OnWindowClose;
472 let emit3 = EmitClause::Periodically {
473 interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
474 };
475 let emit4 = EmitClause::OnUpdate;
476
477 match emit1 {
478 EmitClause::AfterWatermark => (),
479 _ => panic!("Expected AfterWatermark"),
480 }
481
482 match emit2 {
483 EmitClause::OnWindowClose => (),
484 _ => panic!("Expected OnWindowClose"),
485 }
486
487 match emit3 {
488 EmitClause::Periodically { .. } => (),
489 _ => panic!("Expected Periodically"),
490 }
491
492 match emit4 {
493 EmitClause::OnUpdate => (),
494 _ => panic!("Expected OnUpdate"),
495 }
496 }
497
498 #[test]
499 fn test_window_functions() {
500 let tumble = WindowFunction::Tumble {
501 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
502 interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
503 };
504
505 let hop = WindowFunction::Hop {
506 time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
507 slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
508 window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
509 };
510
511 match tumble {
512 WindowFunction::Tumble { .. } => (),
513 _ => panic!("Expected Tumble"),
514 }
515
516 match hop {
517 WindowFunction::Hop { .. } => (),
518 _ => panic!("Expected Hop"),
519 }
520 }
521
522 #[test]
523 fn test_late_data_clause_default() {
524 let clause = LateDataClause::default();
525 assert!(clause.allowed_lateness.is_none());
526 assert!(clause.side_output.is_none());
527 }
528
529 #[test]
530 fn test_late_data_clause_with_allowed_lateness() {
531 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
532 let clause = LateDataClause::with_allowed_lateness(lateness_expr);
533 assert!(clause.allowed_lateness.is_some());
534 assert!(clause.side_output.is_none());
535 }
536
537 #[test]
538 fn test_late_data_clause_with_side_output() {
539 let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
540 let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
541 assert!(clause.allowed_lateness.is_some());
542 assert_eq!(clause.side_output, Some("late_events".to_string()));
543 }
544
545 #[test]
546 fn test_late_data_clause_side_output_only() {
547 let clause = LateDataClause::side_output_only("late_events".to_string());
548 assert!(clause.allowed_lateness.is_none());
549 assert_eq!(clause.side_output, Some("late_events".to_string()));
550 }
551
552 #[test]
553 fn test_show_command_variants() {
554 let sources = ShowCommand::Sources;
555 let sinks = ShowCommand::Sinks;
556 let queries = ShowCommand::Queries;
557 let mvs = ShowCommand::MaterializedViews;
558
559 assert_eq!(sources, ShowCommand::Sources);
560 assert_eq!(sinks, ShowCommand::Sinks);
561 assert_eq!(queries, ShowCommand::Queries);
562 assert_eq!(mvs, ShowCommand::MaterializedViews);
563 }
564
565 #[test]
566 fn test_show_command_clone() {
567 let cmd = ShowCommand::Sources;
568 let cloned = cmd.clone();
569 assert_eq!(cmd, cloned);
570 }
571
572 #[test]
573 fn test_drop_source_statement() {
574 let stmt = StreamingStatement::DropSource {
575 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
576 if_exists: true,
577 };
578 match stmt {
579 StreamingStatement::DropSource { name, if_exists } => {
580 assert_eq!(name.to_string(), "events");
581 assert!(if_exists);
582 }
583 _ => panic!("Expected DropSource"),
584 }
585 }
586
587 #[test]
588 fn test_drop_sink_statement() {
589 let stmt = StreamingStatement::DropSink {
590 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
591 if_exists: false,
592 };
593 match stmt {
594 StreamingStatement::DropSink { name, if_exists } => {
595 assert_eq!(name.to_string(), "output");
596 assert!(!if_exists);
597 }
598 _ => panic!("Expected DropSink"),
599 }
600 }
601
602 #[test]
603 fn test_drop_materialized_view_statement() {
604 let stmt = StreamingStatement::DropMaterializedView {
605 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
606 if_exists: true,
607 cascade: true,
608 };
609 match stmt {
610 StreamingStatement::DropMaterializedView {
611 name,
612 if_exists,
613 cascade,
614 } => {
615 assert_eq!(name.to_string(), "live_stats");
616 assert!(if_exists);
617 assert!(cascade);
618 }
619 _ => panic!("Expected DropMaterializedView"),
620 }
621 }
622
623 #[test]
624 fn test_show_statement() {
625 let stmt = StreamingStatement::Show(ShowCommand::Sources);
626 match stmt {
627 StreamingStatement::Show(ShowCommand::Sources) => (),
628 _ => panic!("Expected Show(Sources)"),
629 }
630 }
631
632 #[test]
633 fn test_describe_statement() {
634 let stmt = StreamingStatement::Describe {
635 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
636 extended: true,
637 };
638 match stmt {
639 StreamingStatement::Describe { name, extended } => {
640 assert_eq!(name.to_string(), "events");
641 assert!(extended);
642 }
643 _ => panic!("Expected Describe"),
644 }
645 }
646
647 #[test]
648 fn test_explain_statement() {
649 let dialect = sqlparser::dialect::GenericDialect {};
651 let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
652 let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
653
654 let stmt = StreamingStatement::Explain {
655 statement: Box::new(inner),
656 };
657 match stmt {
658 StreamingStatement::Explain { statement } => {
659 assert!(matches!(*statement, StreamingStatement::Standard(_)));
660 }
661 _ => panic!("Expected Explain"),
662 }
663 }
664
665 #[test]
666 fn test_create_materialized_view_statement() {
667 let dialect = sqlparser::dialect::GenericDialect {};
669 let stmts =
670 sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
671 let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
672
673 let stmt = StreamingStatement::CreateMaterializedView {
674 name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
675 query: Box::new(query),
676 emit_clause: Some(EmitClause::OnWindowClose),
677 or_replace: false,
678 if_not_exists: true,
679 };
680 match stmt {
681 StreamingStatement::CreateMaterializedView {
682 name,
683 emit_clause,
684 or_replace,
685 if_not_exists,
686 ..
687 } => {
688 assert_eq!(name.to_string(), "live_stats");
689 assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
690 assert!(!or_replace);
691 assert!(if_not_exists);
692 }
693 _ => panic!("Expected CreateMaterializedView"),
694 }
695 }
696
697 #[test]
698 fn test_insert_into_statement() {
699 let stmt = StreamingStatement::InsertInto {
700 table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
701 columns: vec![Ident::new("id"), Ident::new("name")],
702 values: vec![vec![
703 Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
704 Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
705 ]],
706 };
707 match stmt {
708 StreamingStatement::InsertInto {
709 table_name,
710 columns,
711 values,
712 } => {
713 assert_eq!(table_name.to_string(), "events");
714 assert_eq!(columns.len(), 2);
715 assert_eq!(values.len(), 1);
716 assert_eq!(values[0].len(), 2);
717 }
718 _ => panic!("Expected InsertInto"),
719 }
720 }
721}