Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14/// SHOW command variants for listing streaming objects.
15#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17    /// SHOW SOURCES - list all registered sources
18    Sources,
19    /// SHOW SINKS - list all registered sinks
20    Sinks,
21    /// SHOW QUERIES - list all running continuous queries
22    Queries,
23    /// SHOW MATERIALIZED VIEWS - list all materialized views
24    MaterializedViews,
25    /// SHOW STREAMS - list all named streams
26    Streams,
27    /// SHOW TABLES - list all reference/dimension tables
28    Tables,
29}
30
31/// Streaming-specific SQL statements
32#[derive(Debug, Clone, PartialEq)]
33pub enum StreamingStatement {
34    /// Standard SQL statement
35    Standard(Box<sqlparser::ast::Statement>),
36
37    /// CREATE SOURCE statement
38    CreateSource(Box<CreateSourceStatement>),
39
40    /// CREATE SINK statement
41    CreateSink(Box<CreateSinkStatement>),
42
43    /// CREATE CONTINUOUS QUERY
44    CreateContinuousQuery {
45        /// Query name
46        name: ObjectName,
47        /// SQL query with streaming extensions
48        query: Box<StreamingStatement>,
49        /// EMIT clause if present
50        emit_clause: Option<EmitClause>,
51    },
52
53    /// DROP SOURCE statement
54    DropSource {
55        /// Source name to drop
56        name: ObjectName,
57        /// Whether IF EXISTS was specified
58        if_exists: bool,
59        /// Whether CASCADE was specified (drops dependent streams/MVs)
60        cascade: bool,
61    },
62
63    /// DROP SINK statement
64    DropSink {
65        /// Sink name to drop
66        name: ObjectName,
67        /// Whether IF EXISTS was specified
68        if_exists: bool,
69        /// Whether CASCADE was specified
70        cascade: bool,
71    },
72
73    /// DROP MATERIALIZED VIEW statement
74    DropMaterializedView {
75        /// View name to drop
76        name: ObjectName,
77        /// Whether IF EXISTS was specified
78        if_exists: bool,
79        /// Whether CASCADE was specified
80        cascade: bool,
81    },
82
83    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
84    Show(ShowCommand),
85
86    /// DESCRIBE source, sink, or other streaming object
87    Describe {
88        /// Object name to describe
89        name: ObjectName,
90        /// Whether EXTENDED was specified for additional detail
91        extended: bool,
92    },
93
94    /// EXPLAIN a streaming query plan
95    Explain {
96        /// The statement to explain
97        statement: Box<StreamingStatement>,
98    },
99
100    /// CREATE MATERIALIZED VIEW
101    CreateMaterializedView {
102        /// View name
103        name: ObjectName,
104        /// The backing query
105        query: Box<StreamingStatement>,
106        /// Optional EMIT clause
107        emit_clause: Option<EmitClause>,
108        /// Whether OR REPLACE was specified
109        or_replace: bool,
110        /// Whether IF NOT EXISTS was specified
111        if_not_exists: bool,
112    },
113
114    /// CREATE STREAM — named streaming pipeline
115    CreateStream {
116        /// Stream name
117        name: ObjectName,
118        /// Backing query (AS SELECT ...)
119        query: Box<StreamingStatement>,
120        /// Optional EMIT clause
121        emit_clause: Option<EmitClause>,
122        /// Whether OR REPLACE was specified
123        or_replace: bool,
124        /// Whether IF NOT EXISTS was specified
125        if_not_exists: bool,
126    },
127
128    /// DROP STREAM statement
129    DropStream {
130        /// Stream name to drop
131        name: ObjectName,
132        /// Whether IF EXISTS was specified
133        if_exists: bool,
134        /// Whether CASCADE was specified
135        cascade: bool,
136    },
137
138    /// ALTER SOURCE — modify a source definition
139    AlterSource {
140        /// Source name to alter
141        name: ObjectName,
142        /// The alteration to apply
143        operation: AlterSourceOperation,
144    },
145
146    /// INSERT INTO a streaming source or table
147    InsertInto {
148        /// Target table or source name
149        table_name: ObjectName,
150        /// Column names (empty if not specified)
151        columns: Vec<Ident>,
152        /// Row values
153        values: Vec<Vec<Expr>>,
154    },
155
156    /// CREATE LOOKUP TABLE statement
157    CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
158
159    /// DROP LOOKUP TABLE statement
160    DropLookupTable {
161        /// Lookup table name to drop
162        name: ObjectName,
163        /// Whether IF EXISTS was specified
164        if_exists: bool,
165    },
166}
167
168/// Operations for ALTER SOURCE statements.
169#[derive(Debug, Clone, PartialEq)]
170pub enum AlterSourceOperation {
171    /// Add a new column: `ALTER SOURCE name ADD COLUMN col_name data_type`
172    AddColumn {
173        /// Column definition
174        column_def: ColumnDef,
175    },
176    /// Set source properties: `ALTER SOURCE name SET ('key' = 'value', ...)`
177    SetProperties {
178        /// Key-value pairs
179        properties: HashMap<String, String>,
180    },
181}
182
183/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
184#[derive(Debug, Clone, PartialEq)]
185pub struct FormatSpec {
186    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
187    pub format_type: String,
188    /// Additional format options (from WITH clause after FORMAT).
189    pub options: HashMap<String, String>,
190}
191
192/// CREATE SOURCE statement
193#[derive(Debug, Clone, PartialEq)]
194pub struct CreateSourceStatement {
195    /// Source name
196    pub name: ObjectName,
197    /// Column definitions
198    pub columns: Vec<ColumnDef>,
199    /// Watermark definition
200    pub watermark: Option<WatermarkDef>,
201    /// Source connector options (from WITH clause)
202    pub with_options: HashMap<String, String>,
203    /// Whether to replace existing source
204    pub or_replace: bool,
205    /// Whether to skip if exists
206    pub if_not_exists: bool,
207    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
208    pub connector_type: Option<String>,
209    /// Connector-specific options (from `FROM KAFKA (...)`)
210    pub connector_options: HashMap<String, String>,
211    /// Format specification (e.g., `FORMAT JSON`)
212    pub format: Option<FormatSpec>,
213    /// Whether the column list includes a `*` wildcard for schema inference.
214    pub has_wildcard: bool,
215    /// Optional prefix for wildcard-expanded columns (from `PREFIX 'str'`).
216    pub wildcard_prefix: Option<String>,
217}
218
219/// CREATE SINK statement
220#[derive(Debug, Clone, PartialEq)]
221pub struct CreateSinkStatement {
222    /// Sink name
223    pub name: ObjectName,
224    /// Input query or table
225    pub from: SinkFrom,
226    /// Sink connector options (from WITH clause)
227    pub with_options: HashMap<String, String>,
228    /// Whether to replace existing sink
229    pub or_replace: bool,
230    /// Whether to skip if exists
231    pub if_not_exists: bool,
232    /// Optional WHERE filter expression
233    pub filter: Option<Expr>,
234    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
235    pub connector_type: Option<String>,
236    /// Connector-specific options (from `INTO KAFKA (...)`)
237    pub connector_options: HashMap<String, String>,
238    /// Format specification (e.g., `FORMAT JSON`)
239    pub format: Option<FormatSpec>,
240    /// Output options (from `WITH (key = ...)` after FORMAT)
241    pub output_options: HashMap<String, String>,
242}
243
244/// Source for a sink
245#[derive(Debug, Clone, PartialEq)]
246pub enum SinkFrom {
247    /// From a table or source
248    Table(ObjectName),
249    /// From a SELECT query
250    Query(Box<StreamingStatement>),
251}
252
253/// Watermark definition
254#[derive(Debug, Clone, PartialEq)]
255pub struct WatermarkDef {
256    /// Column to use for watermark
257    pub column: Ident,
258    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
259    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
260    /// meaning watermark advances via `source.watermark()` with zero delay.
261    pub expression: Option<Expr>,
262}
263
264/// Late data handling clause.
265///
266/// Controls what happens to events that arrive after their window has closed.
267/// This is the SQL AST representation of late data configuration.
268/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
269#[derive(Debug, Clone, PartialEq, Default)]
270pub struct LateDataClause {
271    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
272    pub allowed_lateness: Option<Box<Expr>>,
273    /// Side output name for late events (e.g., `late_events`)
274    pub side_output: Option<String>,
275}
276
277impl LateDataClause {
278    /// Creates a clause with allowed lateness only.
279    #[must_use]
280    pub fn with_allowed_lateness(lateness: Expr) -> Self {
281        Self {
282            allowed_lateness: Some(Box::new(lateness)),
283            side_output: None,
284        }
285    }
286
287    /// Creates a clause with both allowed lateness and side output.
288    #[must_use]
289    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
290        Self {
291            allowed_lateness: Some(Box::new(lateness)),
292            side_output: Some(side_output),
293        }
294    }
295
296    /// Creates a clause with side output only (uses default lateness).
297    #[must_use]
298    pub fn side_output_only(side_output: String) -> Self {
299        Self {
300            allowed_lateness: None,
301            side_output: Some(side_output),
302        }
303    }
304
305    /// Convert to allowed lateness Duration.
306    ///
307    /// # Errors
308    ///
309    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
310    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
311        match &self.allowed_lateness {
312            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
313            None => Ok(Duration::ZERO),
314        }
315    }
316
317    /// Check if this clause has a side output configured.
318    #[must_use]
319    pub fn has_side_output(&self) -> bool {
320        self.side_output.is_some()
321    }
322
323    /// Get the side output name, if configured.
324    #[must_use]
325    pub fn get_side_output(&self) -> Option<&str> {
326        self.side_output.as_deref()
327    }
328}
329
330/// Emit strategy for runtime operator configuration.
331///
332/// This is the runtime representation that operators use.
333#[derive(Debug, Clone, PartialEq)]
334pub enum EmitStrategy {
335    /// Emit when watermark passes window end
336    OnWatermark,
337    /// Emit only when window closes (no intermediate results)
338    OnWindowClose,
339    /// Emit at fixed intervals
340    Periodic(Duration),
341    /// Emit on every state change
342    OnUpdate,
343    /// Emit changelog records with Z-set weights
344    Changelog,
345    /// Emit only final results, suppress all intermediate
346    FinalOnly,
347}
348
349/// EMIT clause for controlling output timing.
350///
351/// This is the SQL AST representation of emit strategies.
352/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
353#[derive(Debug, Clone, PartialEq)]
354pub enum EmitClause {
355    // === Existing ===
356    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
357    ///
358    /// Emit results when the watermark passes the window end.
359    /// This is the most efficient strategy.
360    AfterWatermark,
361
362    /// EMIT ON WINDOW CLOSE
363    ///
364    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
365    /// Only emits when window closes, no intermediate results.
366    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
367    OnWindowClose,
368
369    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
370    ///
371    /// Emit intermediate results at fixed intervals.
372    /// Final results are still emitted on watermark.
373    Periodically {
374        /// The interval expression (e.g., INTERVAL '5' SECOND)
375        interval: Box<Expr>,
376    },
377
378    /// EMIT ON UPDATE
379    ///
380    /// Emit updated results after every state change.
381    /// This provides lowest latency but highest overhead.
382    OnUpdate,
383
384    // === New ===
385    /// EMIT CHANGES
386    ///
387    /// Emit changelog records with Z-set weights for CDC pipelines.
388    /// Every emission includes operation type and weight:
389    /// - Insert (+1 weight)
390    /// - Delete (-1 weight)
391    /// - Update (retraction pair: -1 old, +1 new)
392    ///
393    /// Required for:
394    /// - CDC pipelines
395    /// - Cascading materialized views
396    /// - Downstream consumers that need to track changes
397    Changes,
398
399    /// EMIT FINAL
400    ///
401    /// Suppress ALL intermediate results, emit only finalized.
402    /// Also drops late data entirely after window close.
403    /// Use for BI reporting where only final, exact results matter.
404    Final,
405}
406
407impl std::fmt::Display for EmitClause {
408    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
409        match self {
410            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
411            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
412            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
413            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
414            EmitClause::Changes => write!(f, "EMIT CHANGES"),
415            EmitClause::Final => write!(f, "EMIT FINAL"),
416        }
417    }
418}
419
420impl EmitClause {
421    /// Convert to runtime EmitStrategy.
422    ///
423    /// # Errors
424    ///
425    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
426    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
427        match self {
428            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
429            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
430            EmitClause::Periodically { interval } => {
431                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
432                Ok(EmitStrategy::Periodic(duration))
433            }
434            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
435            EmitClause::Changes => Ok(EmitStrategy::Changelog),
436            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
437        }
438    }
439
440    /// Check if this emit strategy requires changelog/retraction support.
441    #[must_use]
442    pub fn requires_changelog(&self) -> bool {
443        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
444    }
445
446    /// Check if this emit strategy is append-only (no retractions).
447    #[must_use]
448    pub fn is_append_only(&self) -> bool {
449        matches!(
450            self,
451            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
452        )
453    }
454
455    /// Returns true if this emit strategy requires a watermark on the source.
456    ///
457    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
458    /// advancement to trigger window closure. Without a watermark, timers will
459    /// never fire and windows will never close.
460    #[must_use]
461    pub fn requires_watermark(&self) -> bool {
462        matches!(
463            self,
464            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
465        )
466    }
467}
468
469/// Window function types
470#[derive(Debug, Clone, PartialEq)]
471pub enum WindowFunction {
472    /// TUMBLE(column, interval)
473    Tumble {
474        /// The time column to window on
475        time_column: Box<Expr>,
476        /// The window interval
477        interval: Box<Expr>,
478    },
479    /// HOP(column, slide, size)
480    Hop {
481        /// The time column to window on
482        time_column: Box<Expr>,
483        /// The slide interval (how often to create a new window)
484        slide_interval: Box<Expr>,
485        /// The window size interval
486        window_interval: Box<Expr>,
487    },
488    /// SESSION(column, gap)
489    Session {
490        /// The time column to window on
491        time_column: Box<Expr>,
492        /// The gap interval (max gap between events in same session)
493        gap_interval: Box<Expr>,
494    },
495    /// CUMULATE(column, step, size)
496    Cumulate {
497        /// The time column to window on
498        time_column: Box<Expr>,
499        /// The step interval (window growth increment)
500        step_interval: Box<Expr>,
501        /// The max window size interval (epoch size)
502        max_size_interval: Box<Expr>,
503    },
504}
505
506#[cfg(test)]
507mod tests {
508    use super::*;
509    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
510
511    #[test]
512    fn test_create_source_statement() {
513        let stmt = CreateSourceStatement {
514            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
515            columns: vec![
516                ColumnDef {
517                    name: Ident::new("id"),
518                    data_type: DataType::BigInt(None),
519                    options: vec![],
520                },
521                ColumnDef {
522                    name: Ident::new("timestamp"),
523                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
524                    options: vec![],
525                },
526            ],
527            watermark: Some(WatermarkDef {
528                column: Ident::new("timestamp"),
529                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
530            }),
531            with_options: HashMap::from([
532                ("connector".to_string(), "kafka".to_string()),
533                ("topic".to_string(), "events".to_string()),
534            ]),
535            or_replace: false,
536            if_not_exists: true,
537            connector_type: None,
538            connector_options: HashMap::new(),
539            format: None,
540            has_wildcard: false,
541            wildcard_prefix: None,
542        };
543
544        // Check the statement fields
545        assert_eq!(stmt.columns.len(), 2);
546        assert!(stmt.watermark.is_some());
547        assert_eq!(
548            stmt.with_options.get("connector"),
549            Some(&"kafka".to_string())
550        );
551    }
552
553    #[test]
554    fn test_emit_clause_variants() {
555        let emit1 = EmitClause::AfterWatermark;
556        let emit2 = EmitClause::OnWindowClose;
557        let emit3 = EmitClause::Periodically {
558            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
559        };
560        let emit4 = EmitClause::OnUpdate;
561
562        match emit1 {
563            EmitClause::AfterWatermark => (),
564            _ => panic!("Expected AfterWatermark"),
565        }
566
567        match emit2 {
568            EmitClause::OnWindowClose => (),
569            _ => panic!("Expected OnWindowClose"),
570        }
571
572        match emit3 {
573            EmitClause::Periodically { .. } => (),
574            _ => panic!("Expected Periodically"),
575        }
576
577        match emit4 {
578            EmitClause::OnUpdate => (),
579            _ => panic!("Expected OnUpdate"),
580        }
581    }
582
583    #[test]
584    fn test_window_functions() {
585        let tumble = WindowFunction::Tumble {
586            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
587            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
588        };
589
590        let hop = WindowFunction::Hop {
591            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
592            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
593            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
594        };
595
596        match tumble {
597            WindowFunction::Tumble { .. } => (),
598            _ => panic!("Expected Tumble"),
599        }
600
601        match hop {
602            WindowFunction::Hop { .. } => (),
603            _ => panic!("Expected Hop"),
604        }
605    }
606
607    #[test]
608    fn test_late_data_clause_default() {
609        let clause = LateDataClause::default();
610        assert!(clause.allowed_lateness.is_none());
611        assert!(clause.side_output.is_none());
612    }
613
614    #[test]
615    fn test_late_data_clause_with_allowed_lateness() {
616        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
617        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
618        assert!(clause.allowed_lateness.is_some());
619        assert!(clause.side_output.is_none());
620    }
621
622    #[test]
623    fn test_late_data_clause_with_side_output() {
624        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
625        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
626        assert!(clause.allowed_lateness.is_some());
627        assert_eq!(clause.side_output, Some("late_events".to_string()));
628    }
629
630    #[test]
631    fn test_late_data_clause_side_output_only() {
632        let clause = LateDataClause::side_output_only("late_events".to_string());
633        assert!(clause.allowed_lateness.is_none());
634        assert_eq!(clause.side_output, Some("late_events".to_string()));
635    }
636
637    #[test]
638    fn test_show_command_variants() {
639        let sources = ShowCommand::Sources;
640        let sinks = ShowCommand::Sinks;
641        let queries = ShowCommand::Queries;
642        let mvs = ShowCommand::MaterializedViews;
643
644        assert_eq!(sources, ShowCommand::Sources);
645        assert_eq!(sinks, ShowCommand::Sinks);
646        assert_eq!(queries, ShowCommand::Queries);
647        assert_eq!(mvs, ShowCommand::MaterializedViews);
648    }
649
650    #[test]
651    fn test_show_command_clone() {
652        let cmd = ShowCommand::Sources;
653        let cloned = cmd.clone();
654        assert_eq!(cmd, cloned);
655    }
656
657    #[test]
658    fn test_drop_source_statement() {
659        let stmt = StreamingStatement::DropSource {
660            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
661            if_exists: true,
662            cascade: false,
663        };
664        match stmt {
665            StreamingStatement::DropSource {
666                name,
667                if_exists,
668                cascade,
669            } => {
670                assert_eq!(name.to_string(), "events");
671                assert!(if_exists);
672                assert!(!cascade);
673            }
674            _ => panic!("Expected DropSource"),
675        }
676    }
677
678    #[test]
679    fn test_drop_sink_statement() {
680        let stmt = StreamingStatement::DropSink {
681            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
682            if_exists: false,
683            cascade: false,
684        };
685        match stmt {
686            StreamingStatement::DropSink {
687                name,
688                if_exists,
689                cascade,
690            } => {
691                assert_eq!(name.to_string(), "output");
692                assert!(!if_exists);
693                assert!(!cascade);
694            }
695            _ => panic!("Expected DropSink"),
696        }
697    }
698
699    #[test]
700    fn test_drop_materialized_view_statement() {
701        let stmt = StreamingStatement::DropMaterializedView {
702            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
703            if_exists: true,
704            cascade: true,
705        };
706        match stmt {
707            StreamingStatement::DropMaterializedView {
708                name,
709                if_exists,
710                cascade,
711            } => {
712                assert_eq!(name.to_string(), "live_stats");
713                assert!(if_exists);
714                assert!(cascade);
715            }
716            _ => panic!("Expected DropMaterializedView"),
717        }
718    }
719
720    #[test]
721    fn test_show_statement() {
722        let stmt = StreamingStatement::Show(ShowCommand::Sources);
723        match stmt {
724            StreamingStatement::Show(ShowCommand::Sources) => (),
725            _ => panic!("Expected Show(Sources)"),
726        }
727    }
728
729    #[test]
730    fn test_describe_statement() {
731        let stmt = StreamingStatement::Describe {
732            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
733            extended: true,
734        };
735        match stmt {
736            StreamingStatement::Describe { name, extended } => {
737                assert_eq!(name.to_string(), "events");
738                assert!(extended);
739            }
740            _ => panic!("Expected Describe"),
741        }
742    }
743
744    #[test]
745    fn test_explain_statement() {
746        // Build an inner Standard statement using sqlparser
747        let dialect = sqlparser::dialect::GenericDialect {};
748        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
749        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
750
751        let stmt = StreamingStatement::Explain {
752            statement: Box::new(inner),
753        };
754        match stmt {
755            StreamingStatement::Explain { statement } => {
756                assert!(matches!(*statement, StreamingStatement::Standard(_)));
757            }
758            _ => panic!("Expected Explain"),
759        }
760    }
761
762    #[test]
763    fn test_create_materialized_view_statement() {
764        // Build a query statement using sqlparser
765        let dialect = sqlparser::dialect::GenericDialect {};
766        let stmts =
767            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
768        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
769
770        let stmt = StreamingStatement::CreateMaterializedView {
771            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
772            query: Box::new(query),
773            emit_clause: Some(EmitClause::OnWindowClose),
774            or_replace: false,
775            if_not_exists: true,
776        };
777        match stmt {
778            StreamingStatement::CreateMaterializedView {
779                name,
780                emit_clause,
781                or_replace,
782                if_not_exists,
783                ..
784            } => {
785                assert_eq!(name.to_string(), "live_stats");
786                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
787                assert!(!or_replace);
788                assert!(if_not_exists);
789            }
790            _ => panic!("Expected CreateMaterializedView"),
791        }
792    }
793
794    #[test]
795    fn test_insert_into_statement() {
796        let stmt = StreamingStatement::InsertInto {
797            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
798            columns: vec![Ident::new("id"), Ident::new("name")],
799            values: vec![vec![
800                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
801                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
802            ]],
803        };
804        match stmt {
805            StreamingStatement::InsertInto {
806                table_name,
807                columns,
808                values,
809            } => {
810                assert_eq!(table_name.to_string(), "events");
811                assert_eq!(columns.len(), 2);
812                assert_eq!(values.len(), 1);
813                assert_eq!(values[0].len(), 2);
814            }
815            _ => panic!("Expected InsertInto"),
816        }
817    }
818
819    #[test]
820    fn test_eowc_requires_watermark_helper() {
821        // Watermark-dependent strategies
822        assert!(EmitClause::OnWindowClose.requires_watermark());
823        assert!(EmitClause::Final.requires_watermark());
824        assert!(EmitClause::AfterWatermark.requires_watermark());
825
826        // Non-watermark strategies
827        assert!(!EmitClause::OnUpdate.requires_watermark());
828        assert!(!EmitClause::Changes.requires_watermark());
829        let periodic = EmitClause::Periodically {
830            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
831        };
832        assert!(!periodic.requires_watermark());
833    }
834}