Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14/// SHOW command variants for listing streaming objects.
15#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17    /// SHOW SOURCES - list all registered sources
18    Sources,
19    /// SHOW SINKS - list all registered sinks
20    Sinks,
21    /// SHOW QUERIES - list all running continuous queries
22    Queries,
23    /// SHOW MATERIALIZED VIEWS - list all materialized views
24    MaterializedViews,
25    /// SHOW STREAMS - list all named streams
26    Streams,
27    /// SHOW TABLES - list all reference/dimension tables
28    Tables,
29    /// SHOW CHECKPOINT STATUS - display checkpoint state
30    CheckpointStatus,
31    /// `SHOW CREATE SOURCE` — reconstruct source DDL
32    CreateSource {
33        /// Source name
34        name: ObjectName,
35    },
36    /// `SHOW CREATE SINK` — reconstruct sink DDL
37    CreateSink {
38        /// Sink name
39        name: ObjectName,
40    },
41}
42
43/// Streaming-specific SQL statements
44#[derive(Debug, Clone, PartialEq)]
45pub enum StreamingStatement {
46    /// Standard SQL statement
47    Standard(Box<sqlparser::ast::Statement>),
48
49    /// CREATE SOURCE statement
50    CreateSource(Box<CreateSourceStatement>),
51
52    /// CREATE SINK statement
53    CreateSink(Box<CreateSinkStatement>),
54
55    /// CREATE CONTINUOUS QUERY
56    CreateContinuousQuery {
57        /// Query name
58        name: ObjectName,
59        /// SQL query with streaming extensions
60        query: Box<StreamingStatement>,
61        /// EMIT clause if present
62        emit_clause: Option<EmitClause>,
63    },
64
65    /// DROP SOURCE statement
66    DropSource {
67        /// Source name to drop
68        name: ObjectName,
69        /// Whether IF EXISTS was specified
70        if_exists: bool,
71        /// Whether CASCADE was specified (drops dependent streams/MVs)
72        cascade: bool,
73    },
74
75    /// DROP SINK statement
76    DropSink {
77        /// Sink name to drop
78        name: ObjectName,
79        /// Whether IF EXISTS was specified
80        if_exists: bool,
81        /// Whether CASCADE was specified
82        cascade: bool,
83    },
84
85    /// DROP MATERIALIZED VIEW statement
86    DropMaterializedView {
87        /// View name to drop
88        name: ObjectName,
89        /// Whether IF EXISTS was specified
90        if_exists: bool,
91        /// Whether CASCADE was specified
92        cascade: bool,
93    },
94
95    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
96    Show(ShowCommand),
97
98    /// DESCRIBE source, sink, or other streaming object
99    Describe {
100        /// Object name to describe
101        name: ObjectName,
102        /// Whether EXTENDED was specified for additional detail
103        extended: bool,
104    },
105
106    /// `EXPLAIN [ANALYZE]` a streaming query plan
107    Explain {
108        /// The statement to explain
109        statement: Box<StreamingStatement>,
110        /// Whether ANALYZE was specified (execute and collect metrics)
111        analyze: bool,
112    },
113
114    /// CREATE MATERIALIZED VIEW
115    CreateMaterializedView {
116        /// View name
117        name: ObjectName,
118        /// The backing query
119        query: Box<StreamingStatement>,
120        /// Optional EMIT clause
121        emit_clause: Option<EmitClause>,
122        /// Whether OR REPLACE was specified
123        or_replace: bool,
124        /// Whether IF NOT EXISTS was specified
125        if_not_exists: bool,
126    },
127
128    /// CREATE STREAM — named streaming pipeline
129    CreateStream {
130        /// Stream name
131        name: ObjectName,
132        /// Backing query (AS SELECT ...)
133        query: Box<StreamingStatement>,
134        /// Optional EMIT clause
135        emit_clause: Option<EmitClause>,
136        /// Whether OR REPLACE was specified
137        or_replace: bool,
138        /// Whether IF NOT EXISTS was specified
139        if_not_exists: bool,
140    },
141
142    /// DROP STREAM statement
143    DropStream {
144        /// Stream name to drop
145        name: ObjectName,
146        /// Whether IF EXISTS was specified
147        if_exists: bool,
148        /// Whether CASCADE was specified
149        cascade: bool,
150    },
151
152    /// ALTER SOURCE — modify a source definition
153    AlterSource {
154        /// Source name to alter
155        name: ObjectName,
156        /// The alteration to apply
157        operation: AlterSourceOperation,
158    },
159
160    /// INSERT INTO a streaming source or table
161    InsertInto {
162        /// Target table or source name
163        table_name: ObjectName,
164        /// Column names (empty if not specified)
165        columns: Vec<Ident>,
166        /// Row values
167        values: Vec<Vec<Expr>>,
168    },
169
170    /// CREATE LOOKUP TABLE statement
171    CreateLookupTable(Box<super::lookup_table::CreateLookupTableStatement>),
172
173    /// DROP LOOKUP TABLE statement
174    DropLookupTable {
175        /// Lookup table name to drop
176        name: ObjectName,
177        /// Whether IF EXISTS was specified
178        if_exists: bool,
179    },
180
181    /// CHECKPOINT — trigger an immediate checkpoint
182    Checkpoint,
183
184    /// `RESTORE FROM CHECKPOINT <id>`
185    RestoreCheckpoint {
186        /// The checkpoint ID to restore from
187        checkpoint_id: u64,
188    },
189}
190
191/// Operations for ALTER SOURCE statements.
192#[derive(Debug, Clone, PartialEq)]
193pub enum AlterSourceOperation {
194    /// Add a new column: `ALTER SOURCE name ADD COLUMN col_name data_type`
195    AddColumn {
196        /// Column definition
197        column_def: ColumnDef,
198    },
199    /// Set source properties: `ALTER SOURCE name SET ('key' = 'value', ...)`
200    SetProperties {
201        /// Key-value pairs
202        properties: HashMap<String, String>,
203    },
204}
205
206/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
207#[derive(Debug, Clone, PartialEq)]
208pub struct FormatSpec {
209    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
210    pub format_type: String,
211    /// Additional format options (from WITH clause after FORMAT).
212    pub options: HashMap<String, String>,
213}
214
215/// CREATE SOURCE statement
216#[derive(Debug, Clone, PartialEq)]
217pub struct CreateSourceStatement {
218    /// Source name
219    pub name: ObjectName,
220    /// Column definitions
221    pub columns: Vec<ColumnDef>,
222    /// Watermark definition
223    pub watermark: Option<WatermarkDef>,
224    /// Source connector options (from WITH clause)
225    pub with_options: HashMap<String, String>,
226    /// Whether to replace existing source
227    pub or_replace: bool,
228    /// Whether to skip if exists
229    pub if_not_exists: bool,
230    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
231    pub connector_type: Option<String>,
232    /// Connector-specific options (from `FROM KAFKA (...)`)
233    pub connector_options: HashMap<String, String>,
234    /// Format specification (e.g., `FORMAT JSON`)
235    pub format: Option<FormatSpec>,
236    /// Whether the column list includes a `*` wildcard for schema inference.
237    pub has_wildcard: bool,
238    /// Optional prefix for wildcard-expanded columns (from `PREFIX 'str'`).
239    pub wildcard_prefix: Option<String>,
240}
241
242/// CREATE SINK statement
243#[derive(Debug, Clone, PartialEq)]
244pub struct CreateSinkStatement {
245    /// Sink name
246    pub name: ObjectName,
247    /// Input query or table
248    pub from: SinkFrom,
249    /// Sink connector options (from WITH clause)
250    pub with_options: HashMap<String, String>,
251    /// Whether to replace existing sink
252    pub or_replace: bool,
253    /// Whether to skip if exists
254    pub if_not_exists: bool,
255    /// Optional WHERE filter expression
256    pub filter: Option<Expr>,
257    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
258    pub connector_type: Option<String>,
259    /// Connector-specific options (from `INTO KAFKA (...)`)
260    pub connector_options: HashMap<String, String>,
261    /// Format specification (e.g., `FORMAT JSON`)
262    pub format: Option<FormatSpec>,
263    /// Output options (from `WITH (key = ...)` after FORMAT)
264    pub output_options: HashMap<String, String>,
265}
266
267/// Source for a sink
268#[derive(Debug, Clone, PartialEq)]
269pub enum SinkFrom {
270    /// From a table or source
271    Table(ObjectName),
272    /// From a SELECT query
273    Query(Box<StreamingStatement>),
274}
275
276/// Watermark definition
277#[derive(Debug, Clone, PartialEq)]
278pub struct WatermarkDef {
279    /// Column to use for watermark
280    pub column: Ident,
281    /// Watermark expression (e.g., column - INTERVAL '5' SECOND).
282    /// `None` when `WATERMARK FOR col` is used without `AS expr`,
283    /// meaning watermark advances via `source.watermark()` with zero delay.
284    pub expression: Option<Expr>,
285}
286
287/// Late data handling clause.
288///
289/// Controls what happens to events that arrive after their window has closed.
290/// This is the SQL AST representation of late data configuration.
291/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
292#[derive(Debug, Clone, PartialEq, Default)]
293pub struct LateDataClause {
294    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
295    pub allowed_lateness: Option<Box<Expr>>,
296    /// Side output name for late events (e.g., `late_events`)
297    pub side_output: Option<String>,
298}
299
300impl LateDataClause {
301    /// Creates a clause with allowed lateness only.
302    #[must_use]
303    pub fn with_allowed_lateness(lateness: Expr) -> Self {
304        Self {
305            allowed_lateness: Some(Box::new(lateness)),
306            side_output: None,
307        }
308    }
309
310    /// Creates a clause with both allowed lateness and side output.
311    #[must_use]
312    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
313        Self {
314            allowed_lateness: Some(Box::new(lateness)),
315            side_output: Some(side_output),
316        }
317    }
318
319    /// Creates a clause with side output only (uses default lateness).
320    #[must_use]
321    pub fn side_output_only(side_output: String) -> Self {
322        Self {
323            allowed_lateness: None,
324            side_output: Some(side_output),
325        }
326    }
327
328    /// Convert to allowed lateness Duration.
329    ///
330    /// # Errors
331    ///
332    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
333    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
334        match &self.allowed_lateness {
335            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
336            None => Ok(Duration::ZERO),
337        }
338    }
339
340    /// Check if this clause has a side output configured.
341    #[must_use]
342    pub fn has_side_output(&self) -> bool {
343        self.side_output.is_some()
344    }
345
346    /// Get the side output name, if configured.
347    #[must_use]
348    pub fn get_side_output(&self) -> Option<&str> {
349        self.side_output.as_deref()
350    }
351}
352
353/// Emit strategy for runtime operator configuration.
354///
355/// This is the runtime representation that operators use.
356#[derive(Debug, Clone, PartialEq)]
357pub enum EmitStrategy {
358    /// Emit when watermark passes window end
359    OnWatermark,
360    /// Emit only when window closes (no intermediate results)
361    OnWindowClose,
362    /// Emit at fixed intervals
363    Periodic(Duration),
364    /// Emit on every state change
365    OnUpdate,
366    /// Emit changelog records with Z-set weights
367    Changelog,
368    /// Emit only final results, suppress all intermediate
369    FinalOnly,
370}
371
372/// EMIT clause for controlling output timing.
373///
374/// This is the SQL AST representation of emit strategies.
375/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
376#[derive(Debug, Clone, PartialEq)]
377pub enum EmitClause {
378    // === Existing ===
379    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
380    ///
381    /// Emit results when the watermark passes the window end.
382    /// This is the most efficient strategy.
383    AfterWatermark,
384
385    /// EMIT ON WINDOW CLOSE
386    ///
387    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
388    /// Only emits when window closes, no intermediate results.
389    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
390    OnWindowClose,
391
392    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
393    ///
394    /// Emit intermediate results at fixed intervals.
395    /// Final results are still emitted on watermark.
396    Periodically {
397        /// The interval expression (e.g., INTERVAL '5' SECOND)
398        interval: Box<Expr>,
399    },
400
401    /// EMIT ON UPDATE
402    ///
403    /// Emit updated results after every state change.
404    /// This provides lowest latency but highest overhead.
405    OnUpdate,
406
407    // === New ===
408    /// EMIT CHANGES
409    ///
410    /// Emit changelog records with Z-set weights for CDC pipelines.
411    /// Every emission includes operation type and weight:
412    /// - Insert (+1 weight)
413    /// - Delete (-1 weight)
414    /// - Update (retraction pair: -1 old, +1 new)
415    ///
416    /// Required for:
417    /// - CDC pipelines
418    /// - Cascading materialized views
419    /// - Downstream consumers that need to track changes
420    Changes,
421
422    /// EMIT FINAL
423    ///
424    /// Suppress ALL intermediate results, emit only finalized.
425    /// Also drops late data entirely after window close.
426    /// Use for BI reporting where only final, exact results matter.
427    Final,
428}
429
430impl std::fmt::Display for EmitClause {
431    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
432        match self {
433            EmitClause::AfterWatermark => write!(f, "EMIT AFTER WATERMARK"),
434            EmitClause::OnWindowClose => write!(f, "EMIT ON WINDOW CLOSE"),
435            EmitClause::Periodically { interval } => write!(f, "EMIT EVERY {interval}"),
436            EmitClause::OnUpdate => write!(f, "EMIT ON UPDATE"),
437            EmitClause::Changes => write!(f, "EMIT CHANGES"),
438            EmitClause::Final => write!(f, "EMIT FINAL"),
439        }
440    }
441}
442
443impl EmitClause {
444    /// Convert to runtime EmitStrategy.
445    ///
446    /// # Errors
447    ///
448    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
449    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
450        match self {
451            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
452            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
453            EmitClause::Periodically { interval } => {
454                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
455                Ok(EmitStrategy::Periodic(duration))
456            }
457            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
458            EmitClause::Changes => Ok(EmitStrategy::Changelog),
459            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
460        }
461    }
462
463    /// Check if this emit strategy requires changelog/retraction support.
464    #[must_use]
465    pub fn requires_changelog(&self) -> bool {
466        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
467    }
468
469    /// Check if this emit strategy is append-only (no retractions).
470    #[must_use]
471    pub fn is_append_only(&self) -> bool {
472        matches!(
473            self,
474            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
475        )
476    }
477
478    /// Returns true if this emit strategy requires a watermark on the source.
479    ///
480    /// `OnWindowClose`, `Final`, and `AfterWatermark` all depend on watermark
481    /// advancement to trigger window closure. Without a watermark, timers will
482    /// never fire and windows will never close.
483    #[must_use]
484    pub fn requires_watermark(&self) -> bool {
485        matches!(
486            self,
487            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
488        )
489    }
490}
491
492/// Window function types
493#[derive(Debug, Clone, PartialEq)]
494pub enum WindowFunction {
495    /// TUMBLE(column, interval [, offset])
496    Tumble {
497        /// The time column to window on
498        time_column: Box<Expr>,
499        /// The window interval
500        interval: Box<Expr>,
501        /// Optional offset for timezone-aligned windows
502        offset: Option<Box<Expr>>,
503    },
504    /// HOP(column, slide, size [, offset])
505    Hop {
506        /// The time column to window on
507        time_column: Box<Expr>,
508        /// The slide interval (how often to create a new window)
509        slide_interval: Box<Expr>,
510        /// The window size interval
511        window_interval: Box<Expr>,
512        /// Optional offset for timezone-aligned windows
513        offset: Option<Box<Expr>>,
514    },
515    /// SESSION(column, gap)
516    Session {
517        /// The time column to window on
518        time_column: Box<Expr>,
519        /// The gap interval (max gap between events in same session)
520        gap_interval: Box<Expr>,
521    },
522    /// CUMULATE(column, step, size)
523    Cumulate {
524        /// The time column to window on
525        time_column: Box<Expr>,
526        /// The step interval (window growth increment)
527        step_interval: Box<Expr>,
528        /// The max window size interval (epoch size)
529        max_size_interval: Box<Expr>,
530    },
531}
532
533#[cfg(test)]
534mod tests {
535    use super::*;
536    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
537
538    #[test]
539    fn test_create_source_statement() {
540        let stmt = CreateSourceStatement {
541            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
542            columns: vec![
543                ColumnDef {
544                    name: Ident::new("id"),
545                    data_type: DataType::BigInt(None),
546                    options: vec![],
547                },
548                ColumnDef {
549                    name: Ident::new("timestamp"),
550                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
551                    options: vec![],
552                },
553            ],
554            watermark: Some(WatermarkDef {
555                column: Ident::new("timestamp"),
556                expression: Some(Expr::Identifier(Ident::new("timestamp"))),
557            }),
558            with_options: HashMap::from([
559                ("connector".to_string(), "kafka".to_string()),
560                ("topic".to_string(), "events".to_string()),
561            ]),
562            or_replace: false,
563            if_not_exists: true,
564            connector_type: None,
565            connector_options: HashMap::new(),
566            format: None,
567            has_wildcard: false,
568            wildcard_prefix: None,
569        };
570
571        // Check the statement fields
572        assert_eq!(stmt.columns.len(), 2);
573        assert!(stmt.watermark.is_some());
574        assert_eq!(
575            stmt.with_options.get("connector"),
576            Some(&"kafka".to_string())
577        );
578    }
579
580    #[test]
581    fn test_emit_clause_variants() {
582        let emit1 = EmitClause::AfterWatermark;
583        let emit2 = EmitClause::OnWindowClose;
584        let emit3 = EmitClause::Periodically {
585            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
586        };
587        let emit4 = EmitClause::OnUpdate;
588
589        match emit1 {
590            EmitClause::AfterWatermark => (),
591            _ => panic!("Expected AfterWatermark"),
592        }
593
594        match emit2 {
595            EmitClause::OnWindowClose => (),
596            _ => panic!("Expected OnWindowClose"),
597        }
598
599        match emit3 {
600            EmitClause::Periodically { .. } => (),
601            _ => panic!("Expected Periodically"),
602        }
603
604        match emit4 {
605            EmitClause::OnUpdate => (),
606            _ => panic!("Expected OnUpdate"),
607        }
608    }
609
610    #[test]
611    fn test_window_functions() {
612        let tumble = WindowFunction::Tumble {
613            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
614            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
615            offset: None,
616        };
617
618        let hop = WindowFunction::Hop {
619            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
620            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
621            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
622            offset: None,
623        };
624
625        match tumble {
626            WindowFunction::Tumble { .. } => (),
627            _ => panic!("Expected Tumble"),
628        }
629
630        match hop {
631            WindowFunction::Hop { .. } => (),
632            _ => panic!("Expected Hop"),
633        }
634    }
635
636    #[test]
637    fn test_late_data_clause_default() {
638        let clause = LateDataClause::default();
639        assert!(clause.allowed_lateness.is_none());
640        assert!(clause.side_output.is_none());
641    }
642
643    #[test]
644    fn test_late_data_clause_with_allowed_lateness() {
645        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
646        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
647        assert!(clause.allowed_lateness.is_some());
648        assert!(clause.side_output.is_none());
649    }
650
651    #[test]
652    fn test_late_data_clause_with_side_output() {
653        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
654        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
655        assert!(clause.allowed_lateness.is_some());
656        assert_eq!(clause.side_output, Some("late_events".to_string()));
657    }
658
659    #[test]
660    fn test_late_data_clause_side_output_only() {
661        let clause = LateDataClause::side_output_only("late_events".to_string());
662        assert!(clause.allowed_lateness.is_none());
663        assert_eq!(clause.side_output, Some("late_events".to_string()));
664    }
665
666    #[test]
667    fn test_show_command_variants() {
668        let sources = ShowCommand::Sources;
669        let sinks = ShowCommand::Sinks;
670        let queries = ShowCommand::Queries;
671        let mvs = ShowCommand::MaterializedViews;
672
673        assert_eq!(sources, ShowCommand::Sources);
674        assert_eq!(sinks, ShowCommand::Sinks);
675        assert_eq!(queries, ShowCommand::Queries);
676        assert_eq!(mvs, ShowCommand::MaterializedViews);
677    }
678
679    #[test]
680    fn test_show_command_clone() {
681        let cmd = ShowCommand::Sources;
682        let cloned = cmd.clone();
683        assert_eq!(cmd, cloned);
684    }
685
686    #[test]
687    fn test_drop_source_statement() {
688        let stmt = StreamingStatement::DropSource {
689            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
690            if_exists: true,
691            cascade: false,
692        };
693        match stmt {
694            StreamingStatement::DropSource {
695                name,
696                if_exists,
697                cascade,
698            } => {
699                assert_eq!(name.to_string(), "events");
700                assert!(if_exists);
701                assert!(!cascade);
702            }
703            _ => panic!("Expected DropSource"),
704        }
705    }
706
707    #[test]
708    fn test_drop_sink_statement() {
709        let stmt = StreamingStatement::DropSink {
710            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
711            if_exists: false,
712            cascade: false,
713        };
714        match stmt {
715            StreamingStatement::DropSink {
716                name,
717                if_exists,
718                cascade,
719            } => {
720                assert_eq!(name.to_string(), "output");
721                assert!(!if_exists);
722                assert!(!cascade);
723            }
724            _ => panic!("Expected DropSink"),
725        }
726    }
727
728    #[test]
729    fn test_drop_materialized_view_statement() {
730        let stmt = StreamingStatement::DropMaterializedView {
731            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
732            if_exists: true,
733            cascade: true,
734        };
735        match stmt {
736            StreamingStatement::DropMaterializedView {
737                name,
738                if_exists,
739                cascade,
740            } => {
741                assert_eq!(name.to_string(), "live_stats");
742                assert!(if_exists);
743                assert!(cascade);
744            }
745            _ => panic!("Expected DropMaterializedView"),
746        }
747    }
748
749    #[test]
750    fn test_show_statement() {
751        let stmt = StreamingStatement::Show(ShowCommand::Sources);
752        match stmt {
753            StreamingStatement::Show(ShowCommand::Sources) => (),
754            _ => panic!("Expected Show(Sources)"),
755        }
756    }
757
758    #[test]
759    fn test_describe_statement() {
760        let stmt = StreamingStatement::Describe {
761            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
762            extended: true,
763        };
764        match stmt {
765            StreamingStatement::Describe { name, extended } => {
766                assert_eq!(name.to_string(), "events");
767                assert!(extended);
768            }
769            _ => panic!("Expected Describe"),
770        }
771    }
772
773    #[test]
774    fn test_explain_statement() {
775        // Build an inner Standard statement using sqlparser
776        let dialect = sqlparser::dialect::GenericDialect {};
777        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
778        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
779
780        let stmt = StreamingStatement::Explain {
781            statement: Box::new(inner),
782            analyze: false,
783        };
784        match stmt {
785            StreamingStatement::Explain { statement, .. } => {
786                assert!(matches!(*statement, StreamingStatement::Standard(_)));
787            }
788            _ => panic!("Expected Explain"),
789        }
790    }
791
792    #[test]
793    fn test_create_materialized_view_statement() {
794        // Build a query statement using sqlparser
795        let dialect = sqlparser::dialect::GenericDialect {};
796        let stmts =
797            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
798        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
799
800        let stmt = StreamingStatement::CreateMaterializedView {
801            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
802            query: Box::new(query),
803            emit_clause: Some(EmitClause::OnWindowClose),
804            or_replace: false,
805            if_not_exists: true,
806        };
807        match stmt {
808            StreamingStatement::CreateMaterializedView {
809                name,
810                emit_clause,
811                or_replace,
812                if_not_exists,
813                ..
814            } => {
815                assert_eq!(name.to_string(), "live_stats");
816                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
817                assert!(!or_replace);
818                assert!(if_not_exists);
819            }
820            _ => panic!("Expected CreateMaterializedView"),
821        }
822    }
823
824    #[test]
825    fn test_insert_into_statement() {
826        let stmt = StreamingStatement::InsertInto {
827            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
828            columns: vec![Ident::new("id"), Ident::new("name")],
829            values: vec![vec![
830                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
831                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
832            ]],
833        };
834        match stmt {
835            StreamingStatement::InsertInto {
836                table_name,
837                columns,
838                values,
839            } => {
840                assert_eq!(table_name.to_string(), "events");
841                assert_eq!(columns.len(), 2);
842                assert_eq!(values.len(), 1);
843                assert_eq!(values[0].len(), 2);
844            }
845            _ => panic!("Expected InsertInto"),
846        }
847    }
848
849    #[test]
850    fn test_eowc_requires_watermark_helper() {
851        // Watermark-dependent strategies
852        assert!(EmitClause::OnWindowClose.requires_watermark());
853        assert!(EmitClause::Final.requires_watermark());
854        assert!(EmitClause::AfterWatermark.requires_watermark());
855
856        // Non-watermark strategies
857        assert!(!EmitClause::OnUpdate.requires_watermark());
858        assert!(!EmitClause::Changes.requires_watermark());
859        let periodic = EmitClause::Periodically {
860            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
861        };
862        assert!(!periodic.requires_watermark());
863    }
864}