Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14/// SHOW command variants for listing streaming objects.
15#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17    /// SHOW SOURCES - list all registered sources
18    Sources,
19    /// SHOW SINKS - list all registered sinks
20    Sinks,
21    /// SHOW QUERIES - list all running continuous queries
22    Queries,
23    /// SHOW MATERIALIZED VIEWS - list all materialized views
24    MaterializedViews,
25    /// SHOW STREAMS - list all named streams
26    Streams,
27    /// SHOW TABLES - list all reference/dimension tables
28    Tables,
29}
30
31/// Streaming-specific SQL statements
32#[derive(Debug, Clone, PartialEq)]
33pub enum StreamingStatement {
34    /// Standard SQL statement
35    Standard(Box<sqlparser::ast::Statement>),
36
37    /// CREATE SOURCE statement
38    CreateSource(Box<CreateSourceStatement>),
39
40    /// CREATE SINK statement
41    CreateSink(Box<CreateSinkStatement>),
42
43    /// CREATE CONTINUOUS QUERY
44    CreateContinuousQuery {
45        /// Query name
46        name: ObjectName,
47        /// SQL query with streaming extensions
48        query: Box<StreamingStatement>,
49        /// EMIT clause if present
50        emit_clause: Option<EmitClause>,
51    },
52
53    /// DROP SOURCE statement
54    DropSource {
55        /// Source name to drop
56        name: ObjectName,
57        /// Whether IF EXISTS was specified
58        if_exists: bool,
59    },
60
61    /// DROP SINK statement
62    DropSink {
63        /// Sink name to drop
64        name: ObjectName,
65        /// Whether IF EXISTS was specified
66        if_exists: bool,
67    },
68
69    /// DROP MATERIALIZED VIEW statement
70    DropMaterializedView {
71        /// View name to drop
72        name: ObjectName,
73        /// Whether IF EXISTS was specified
74        if_exists: bool,
75        /// Whether CASCADE was specified
76        cascade: bool,
77    },
78
79    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
80    Show(ShowCommand),
81
82    /// DESCRIBE source, sink, or other streaming object
83    Describe {
84        /// Object name to describe
85        name: ObjectName,
86        /// Whether EXTENDED was specified for additional detail
87        extended: bool,
88    },
89
90    /// EXPLAIN a streaming query plan
91    Explain {
92        /// The statement to explain
93        statement: Box<StreamingStatement>,
94    },
95
96    /// CREATE MATERIALIZED VIEW
97    CreateMaterializedView {
98        /// View name
99        name: ObjectName,
100        /// The backing query
101        query: Box<StreamingStatement>,
102        /// Optional EMIT clause
103        emit_clause: Option<EmitClause>,
104        /// Whether OR REPLACE was specified
105        or_replace: bool,
106        /// Whether IF NOT EXISTS was specified
107        if_not_exists: bool,
108    },
109
110    /// CREATE STREAM — named streaming pipeline
111    CreateStream {
112        /// Stream name
113        name: ObjectName,
114        /// Backing query (AS SELECT ...)
115        query: Box<StreamingStatement>,
116        /// Optional EMIT clause
117        emit_clause: Option<EmitClause>,
118        /// Whether OR REPLACE was specified
119        or_replace: bool,
120        /// Whether IF NOT EXISTS was specified
121        if_not_exists: bool,
122    },
123
124    /// DROP STREAM statement
125    DropStream {
126        /// Stream name to drop
127        name: ObjectName,
128        /// Whether IF EXISTS was specified
129        if_exists: bool,
130    },
131
132    /// INSERT INTO a streaming source or table
133    InsertInto {
134        /// Target table or source name
135        table_name: ObjectName,
136        /// Column names (empty if not specified)
137        columns: Vec<Ident>,
138        /// Row values
139        values: Vec<Vec<Expr>>,
140    },
141}
142
143/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
144#[derive(Debug, Clone, PartialEq)]
145pub struct FormatSpec {
146    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
147    pub format_type: String,
148    /// Additional format options (from WITH clause after FORMAT).
149    pub options: HashMap<String, String>,
150}
151
152/// CREATE SOURCE statement
153#[derive(Debug, Clone, PartialEq)]
154pub struct CreateSourceStatement {
155    /// Source name
156    pub name: ObjectName,
157    /// Column definitions
158    pub columns: Vec<ColumnDef>,
159    /// Watermark definition
160    pub watermark: Option<WatermarkDef>,
161    /// Source connector options (from WITH clause)
162    pub with_options: HashMap<String, String>,
163    /// Whether to replace existing source
164    pub or_replace: bool,
165    /// Whether to skip if exists
166    pub if_not_exists: bool,
167    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
168    pub connector_type: Option<String>,
169    /// Connector-specific options (from `FROM KAFKA (...)`)
170    pub connector_options: HashMap<String, String>,
171    /// Format specification (e.g., `FORMAT JSON`)
172    pub format: Option<FormatSpec>,
173}
174
175/// CREATE SINK statement
176#[derive(Debug, Clone, PartialEq)]
177pub struct CreateSinkStatement {
178    /// Sink name
179    pub name: ObjectName,
180    /// Input query or table
181    pub from: SinkFrom,
182    /// Sink connector options (from WITH clause)
183    pub with_options: HashMap<String, String>,
184    /// Whether to replace existing sink
185    pub or_replace: bool,
186    /// Whether to skip if exists
187    pub if_not_exists: bool,
188    /// Optional WHERE filter expression
189    pub filter: Option<Expr>,
190    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
191    pub connector_type: Option<String>,
192    /// Connector-specific options (from `INTO KAFKA (...)`)
193    pub connector_options: HashMap<String, String>,
194    /// Format specification (e.g., `FORMAT JSON`)
195    pub format: Option<FormatSpec>,
196    /// Output options (from `WITH (key = ...)` after FORMAT)
197    pub output_options: HashMap<String, String>,
198}
199
200/// Source for a sink
201#[derive(Debug, Clone, PartialEq)]
202pub enum SinkFrom {
203    /// From a table or source
204    Table(ObjectName),
205    /// From a SELECT query
206    Query(Box<StreamingStatement>),
207}
208
209/// Watermark definition
210#[derive(Debug, Clone, PartialEq)]
211pub struct WatermarkDef {
212    /// Column to use for watermark
213    pub column: Ident,
214    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
215    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
216    /// meaning watermark advances via `source.watermark()` with zero delay.
217    pub expression: Option<Expr>,
218}
219
220/// Late data handling clause.
221///
222/// Controls what happens to events that arrive after their window has closed.
223/// This is the SQL AST representation of late data configuration.
224/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
225#[derive(Debug, Clone, PartialEq, Default)]
226pub struct LateDataClause {
227    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
228    pub allowed_lateness: Option<Box<Expr>>,
229    /// Side output name for late events (e.g., `late_events`)
230    pub side_output: Option<String>,
231}
232
233impl LateDataClause {
234    /// Creates a clause with allowed lateness only.
235    #[must_use]
236    pub fn with_allowed_lateness(lateness: Expr) -> Self {
237        Self {
238            allowed_lateness: Some(Box::new(lateness)),
239            side_output: None,
240        }
241    }
242
243    /// Creates a clause with both allowed lateness and side output.
244    #[must_use]
245    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
246        Self {
247            allowed_lateness: Some(Box::new(lateness)),
248            side_output: Some(side_output),
249        }
250    }
251
252    /// Creates a clause with side output only (uses default lateness).
253    #[must_use]
254    pub fn side_output_only(side_output: String) -> Self {
255        Self {
256            allowed_lateness: None,
257            side_output: Some(side_output),
258        }
259    }
260
261    /// Convert to allowed lateness Duration.
262    ///
263    /// # Errors
264    ///
265    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
266    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
267        match &self.allowed_lateness {
268            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
269            None => Ok(Duration::ZERO),
270        }
271    }
272
273    /// Check if this clause has a side output configured.
274    #[must_use]
275    pub fn has_side_output(&self) -> bool {
276        self.side_output.is_some()
277    }
278
279    /// Get the side output name, if configured.
280    #[must_use]
281    pub fn get_side_output(&self) -> Option<&str> {
282        self.side_output.as_deref()
283    }
284}
285
286/// Emit strategy for runtime operator configuration.
287///
288/// This is the runtime representation that operators use.
289#[derive(Debug, Clone, PartialEq)]
290pub enum EmitStrategy {
291    /// Emit when watermark passes window end
292    OnWatermark,
293    /// Emit only when window closes (no intermediate results)
294    OnWindowClose,
295    /// Emit at fixed intervals
296    Periodic(Duration),
297    /// Emit on every state change
298    OnUpdate,
299    /// Emit changelog records with Z-set weights
300    Changelog,
301    /// Emit only final results, suppress all intermediate
302    FinalOnly,
303}
304
305/// EMIT clause for controlling output timing.
306///
307/// This is the SQL AST representation of emit strategies.
308/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
309#[derive(Debug, Clone, PartialEq)]
310pub enum EmitClause {
311    // === Existing (F011) ===
312    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
313    ///
314    /// Emit results when the watermark passes the window end.
315    /// This is the most efficient strategy.
316    AfterWatermark,
317
318    /// EMIT ON WINDOW CLOSE
319    ///
320    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
321    /// Only emits when window closes, no intermediate results.
322    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
323    OnWindowClose,
324
325    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
326    ///
327    /// Emit intermediate results at fixed intervals.
328    /// Final results are still emitted on watermark.
329    Periodically {
330        /// The interval expression (e.g., INTERVAL '5' SECOND)
331        interval: Box<Expr>,
332    },
333
334    /// EMIT ON UPDATE
335    ///
336    /// Emit updated results after every state change.
337    /// This provides lowest latency but highest overhead.
338    OnUpdate,
339
340    // === New (F011B) ===
341    /// EMIT CHANGES
342    ///
343    /// Emit changelog records with Z-set weights for CDC pipelines.
344    /// Every emission includes operation type and weight:
345    /// - Insert (+1 weight)
346    /// - Delete (-1 weight)
347    /// - Update (retraction pair: -1 old, +1 new)
348    ///
349    /// Required for:
350    /// - CDC pipelines
351    /// - Cascading materialized views
352    /// - Downstream consumers that need to track changes
353    Changes,
354
355    /// EMIT FINAL
356    ///
357    /// Suppress ALL intermediate results, emit only finalized.
358    /// Also drops late data entirely after window close.
359    /// Use for BI reporting where only final, exact results matter.
360    Final,
361}
362
363impl std::fmt::Display for EmitClause {
364    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
365        match self {
366            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
367            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
368            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
369            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
370            EmitClause::Changes => write!(f, "EMIT CHANGES"),
371            EmitClause::Final => write!(f, "EMIT FINAL"),
372        }
373    }
374}
375
376impl EmitClause {
377    /// Convert to runtime EmitStrategy.
378    ///
379    /// # Errors
380    ///
381    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
382    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
383        match self {
384            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
385            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
386            EmitClause::Periodically { interval } => {
387                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
388                Ok(EmitStrategy::Periodic(duration))
389            }
390            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
391            EmitClause::Changes => Ok(EmitStrategy::Changelog),
392            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
393        }
394    }
395
396    /// Check if this emit strategy requires changelog/retraction support.
397    #[must_use]
398    pub fn requires_changelog(&self) -> bool {
399        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
400    }
401
402    /// Check if this emit strategy is append-only (no retractions).
403    #[must_use]
404    pub fn is_append_only(&self) -> bool {
405        matches!(
406            self,
407            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
408        )
409    }
410
411    /// Returns true if this emit strategy requires a watermark on the source.
412    ///
413    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
414    /// advancement to trigger window closure. Without a watermark, timers will
415    /// never fire and windows will never close.
416    #[must_use]
417    pub fn requires_watermark(&self) -> bool {
418        matches!(
419            self,
420            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
421        )
422    }
423}
424
425/// Window function types
426#[derive(Debug, Clone, PartialEq)]
427pub enum WindowFunction {
428    /// TUMBLE(column, interval)
429    Tumble {
430        /// The time column to window on
431        time_column: Box<Expr>,
432        /// The window interval
433        interval: Box<Expr>,
434    },
435    /// HOP(column, slide, size)
436    Hop {
437        /// The time column to window on
438        time_column: Box<Expr>,
439        /// The slide interval (how often to create a new window)
440        slide_interval: Box<Expr>,
441        /// The window size interval
442        window_interval: Box<Expr>,
443    },
444    /// SESSION(column, gap)
445    Session {
446        /// The time column to window on
447        time_column: Box<Expr>,
448        /// The gap interval (max gap between events in same session)
449        gap_interval: Box<Expr>,
450    },
451}
452
453#[cfg(test)]
454mod tests {
455    use super::*;
456    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
457
458    #[test]
459    fn test_create_source_statement() {
460        let stmt = CreateSourceStatement {
461            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
462            columns: vec![
463                ColumnDef {
464                    name: Ident::new("id"),
465                    data_type: DataType::BigInt(None),
466                    options: vec![],
467                },
468                ColumnDef {
469                    name: Ident::new("timestamp"),
470                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
471                    options: vec![],
472                },
473            ],
474            watermark: Some(WatermarkDef {
475                column: Ident::new("timestamp"),
476                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
477            }),
478            with_options: HashMap::from([
479                ("connector".to_string(), "kafka".to_string()),
480                ("topic".to_string(), "events".to_string()),
481            ]),
482            or_replace: false,
483            if_not_exists: true,
484            connector_type: None,
485            connector_options: HashMap::new(),
486            format: None,
487        };
488
489        // Check the statement fields
490        assert_eq!(stmt.columns.len(), 2);
491        assert!(stmt.watermark.is_some());
492        assert_eq!(
493            stmt.with_options.get("connector"),
494            Some(&"kafka".to_string())
495        );
496    }
497
498    #[test]
499    fn test_emit_clause_variants() {
500        let emit1 = EmitClause::AfterWatermark;
501        let emit2 = EmitClause::OnWindowClose;
502        let emit3 = EmitClause::Periodically {
503            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
504        };
505        let emit4 = EmitClause::OnUpdate;
506
507        match emit1 {
508            EmitClause::AfterWatermark => (),
509            _ => panic!("Expected AfterWatermark"),
510        }
511
512        match emit2 {
513            EmitClause::OnWindowClose => (),
514            _ => panic!("Expected OnWindowClose"),
515        }
516
517        match emit3 {
518            EmitClause::Periodically { .. } => (),
519            _ => panic!("Expected Periodically"),
520        }
521
522        match emit4 {
523            EmitClause::OnUpdate => (),
524            _ => panic!("Expected OnUpdate"),
525        }
526    }
527
528    #[test]
529    fn test_window_functions() {
530        let tumble = WindowFunction::Tumble {
531            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
532            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
533        };
534
535        let hop = WindowFunction::Hop {
536            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
537            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
538            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
539        };
540
541        match tumble {
542            WindowFunction::Tumble { .. } => (),
543            _ => panic!("Expected Tumble"),
544        }
545
546        match hop {
547            WindowFunction::Hop { .. } => (),
548            _ => panic!("Expected Hop"),
549        }
550    }
551
552    #[test]
553    fn test_late_data_clause_default() {
554        let clause = LateDataClause::default();
555        assert!(clause.allowed_lateness.is_none());
556        assert!(clause.side_output.is_none());
557    }
558
559    #[test]
560    fn test_late_data_clause_with_allowed_lateness() {
561        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
562        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
563        assert!(clause.allowed_lateness.is_some());
564        assert!(clause.side_output.is_none());
565    }
566
567    #[test]
568    fn test_late_data_clause_with_side_output() {
569        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
570        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
571        assert!(clause.allowed_lateness.is_some());
572        assert_eq!(clause.side_output, Some("late_events".to_string()));
573    }
574
575    #[test]
576    fn test_late_data_clause_side_output_only() {
577        let clause = LateDataClause::side_output_only("late_events".to_string());
578        assert!(clause.allowed_lateness.is_none());
579        assert_eq!(clause.side_output, Some("late_events".to_string()));
580    }
581
582    #[test]
583    fn test_show_command_variants() {
584        let sources = ShowCommand::Sources;
585        let sinks = ShowCommand::Sinks;
586        let queries = ShowCommand::Queries;
587        let mvs = ShowCommand::MaterializedViews;
588
589        assert_eq!(sources, ShowCommand::Sources);
590        assert_eq!(sinks, ShowCommand::Sinks);
591        assert_eq!(queries, ShowCommand::Queries);
592        assert_eq!(mvs, ShowCommand::MaterializedViews);
593    }
594
595    #[test]
596    fn test_show_command_clone() {
597        let cmd = ShowCommand::Sources;
598        let cloned = cmd.clone();
599        assert_eq!(cmd, cloned);
600    }
601
602    #[test]
603    fn test_drop_source_statement() {
604        let stmt = StreamingStatement::DropSource {
605            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
606            if_exists: true,
607        };
608        match stmt {
609            StreamingStatement::DropSource { name, if_exists } => {
610                assert_eq!(name.to_string(), "events");
611                assert!(if_exists);
612            }
613            _ => panic!("Expected DropSource"),
614        }
615    }
616
617    #[test]
618    fn test_drop_sink_statement() {
619        let stmt = StreamingStatement::DropSink {
620            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
621            if_exists: false,
622        };
623        match stmt {
624            StreamingStatement::DropSink { name, if_exists } => {
625                assert_eq!(name.to_string(), "output");
626                assert!(!if_exists);
627            }
628            _ => panic!("Expected DropSink"),
629        }
630    }
631
632    #[test]
633    fn test_drop_materialized_view_statement() {
634        let stmt = StreamingStatement::DropMaterializedView {
635            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
636            if_exists: true,
637            cascade: true,
638        };
639        match stmt {
640            StreamingStatement::DropMaterializedView {
641                name,
642                if_exists,
643                cascade,
644            } => {
645                assert_eq!(name.to_string(), "live_stats");
646                assert!(if_exists);
647                assert!(cascade);
648            }
649            _ => panic!("Expected DropMaterializedView"),
650        }
651    }
652
653    #[test]
654    fn test_show_statement() {
655        let stmt = StreamingStatement::Show(ShowCommand::Sources);
656        match stmt {
657            StreamingStatement::Show(ShowCommand::Sources) => (),
658            _ => panic!("Expected Show(Sources)"),
659        }
660    }
661
662    #[test]
663    fn test_describe_statement() {
664        let stmt = StreamingStatement::Describe {
665            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
666            extended: true,
667        };
668        match stmt {
669            StreamingStatement::Describe { name, extended } => {
670                assert_eq!(name.to_string(), "events");
671                assert!(extended);
672            }
673            _ => panic!("Expected Describe"),
674        }
675    }
676
677    #[test]
678    fn test_explain_statement() {
679        // Build an inner Standard statement using sqlparser
680        let dialect = sqlparser::dialect::GenericDialect {};
681        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
682        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
683
684        let stmt = StreamingStatement::Explain {
685            statement: Box::new(inner),
686        };
687        match stmt {
688            StreamingStatement::Explain { statement } => {
689                assert!(matches!(*statement, StreamingStatement::Standard(_)));
690            }
691            _ => panic!("Expected Explain"),
692        }
693    }
694
695    #[test]
696    fn test_create_materialized_view_statement() {
697        // Build a query statement using sqlparser
698        let dialect = sqlparser::dialect::GenericDialect {};
699        let stmts =
700            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
701        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
702
703        let stmt = StreamingStatement::CreateMaterializedView {
704            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
705            query: Box::new(query),
706            emit_clause: Some(EmitClause::OnWindowClose),
707            or_replace: false,
708            if_not_exists: true,
709        };
710        match stmt {
711            StreamingStatement::CreateMaterializedView {
712                name,
713                emit_clause,
714                or_replace,
715                if_not_exists,
716                ..
717            } => {
718                assert_eq!(name.to_string(), "live_stats");
719                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
720                assert!(!or_replace);
721                assert!(if_not_exists);
722            }
723            _ => panic!("Expected CreateMaterializedView"),
724        }
725    }
726
727    #[test]
728    fn test_insert_into_statement() {
729        let stmt = StreamingStatement::InsertInto {
730            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
731            columns: vec![Ident::new("id"), Ident::new("name")],
732            values: vec![vec![
733                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
734                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
735            ]],
736        };
737        match stmt {
738            StreamingStatement::InsertInto {
739                table_name,
740                columns,
741                values,
742            } => {
743                assert_eq!(table_name.to_string(), "events");
744                assert_eq!(columns.len(), 2);
745                assert_eq!(values.len(), 1);
746                assert_eq!(values[0].len(), 2);
747            }
748            _ => panic!("Expected InsertInto"),
749        }
750    }
751
752    #[test]
753    fn test_eowc_requires_watermark_helper() {
754        // Watermark-dependent strategies
755        assert!(EmitClause::OnWindowClose.requires_watermark());
756        assert!(EmitClause::Final.requires_watermark());
757        assert!(EmitClause::AfterWatermark.requires_watermark());
758
759        // Non-watermark strategies
760        assert!(!EmitClause::OnUpdate.requires_watermark());
761        assert!(!EmitClause::Changes.requires_watermark());
762        let periodic = EmitClause::Periodically {
763            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
764        };
765        assert!(!periodic.requires_watermark());
766    }
767}