Skip to main content

laminar_sql/parser/
statements.rs

1//! Streaming SQL statement types
2//!
3//! This module defines AST types for streaming SQL extensions and provides
4//! conversion methods to translate them to runtime operator configurations.
5
6use std::collections::HashMap;
7use std::time::Duration;
8
9use sqlparser::ast::{ColumnDef, Expr, Ident, ObjectName};
10
11use super::window_rewriter::WindowRewriter;
12use super::ParseError;
13
14/// SHOW command variants for listing streaming objects.
15#[derive(Debug, Clone, PartialEq)]
16pub enum ShowCommand {
17    /// SHOW SOURCES - list all registered sources
18    Sources,
19    /// SHOW SINKS - list all registered sinks
20    Sinks,
21    /// SHOW QUERIES - list all running continuous queries
22    Queries,
23    /// SHOW MATERIALIZED VIEWS - list all materialized views
24    MaterializedViews,
25    /// SHOW STREAMS - list all named streams
26    Streams,
27}
28
29/// Streaming-specific SQL statements
30#[derive(Debug, Clone, PartialEq)]
31pub enum StreamingStatement {
32    /// Standard SQL statement
33    Standard(Box<sqlparser::ast::Statement>),
34
35    /// CREATE SOURCE statement
36    CreateSource(Box<CreateSourceStatement>),
37
38    /// CREATE SINK statement
39    CreateSink(Box<CreateSinkStatement>),
40
41    /// CREATE CONTINUOUS QUERY
42    CreateContinuousQuery {
43        /// Query name
44        name: ObjectName,
45        /// SQL query with streaming extensions
46        query: Box<StreamingStatement>,
47        /// EMIT clause if present
48        emit_clause: Option<EmitClause>,
49    },
50
51    /// DROP SOURCE statement
52    DropSource {
53        /// Source name to drop
54        name: ObjectName,
55        /// Whether IF EXISTS was specified
56        if_exists: bool,
57    },
58
59    /// DROP SINK statement
60    DropSink {
61        /// Sink name to drop
62        name: ObjectName,
63        /// Whether IF EXISTS was specified
64        if_exists: bool,
65    },
66
67    /// DROP MATERIALIZED VIEW statement
68    DropMaterializedView {
69        /// View name to drop
70        name: ObjectName,
71        /// Whether IF EXISTS was specified
72        if_exists: bool,
73        /// Whether CASCADE was specified
74        cascade: bool,
75    },
76
77    /// SHOW SOURCES/SINKS/QUERIES/MATERIALIZED VIEWS
78    Show(ShowCommand),
79
80    /// DESCRIBE source, sink, or other streaming object
81    Describe {
82        /// Object name to describe
83        name: ObjectName,
84        /// Whether EXTENDED was specified for additional detail
85        extended: bool,
86    },
87
88    /// EXPLAIN a streaming query plan
89    Explain {
90        /// The statement to explain
91        statement: Box<StreamingStatement>,
92    },
93
94    /// CREATE MATERIALIZED VIEW
95    CreateMaterializedView {
96        /// View name
97        name: ObjectName,
98        /// The backing query
99        query: Box<StreamingStatement>,
100        /// Optional EMIT clause
101        emit_clause: Option<EmitClause>,
102        /// Whether OR REPLACE was specified
103        or_replace: bool,
104        /// Whether IF NOT EXISTS was specified
105        if_not_exists: bool,
106    },
107
108    /// CREATE STREAM — named streaming pipeline
109    CreateStream {
110        /// Stream name
111        name: ObjectName,
112        /// Backing query (AS SELECT ...)
113        query: Box<StreamingStatement>,
114        /// Optional EMIT clause
115        emit_clause: Option<EmitClause>,
116        /// Whether OR REPLACE was specified
117        or_replace: bool,
118        /// Whether IF NOT EXISTS was specified
119        if_not_exists: bool,
120    },
121
122    /// DROP STREAM statement
123    DropStream {
124        /// Stream name to drop
125        name: ObjectName,
126        /// Whether IF EXISTS was specified
127        if_exists: bool,
128    },
129
130    /// INSERT INTO a streaming source or table
131    InsertInto {
132        /// Target table or source name
133        table_name: ObjectName,
134        /// Column names (empty if not specified)
135        columns: Vec<Ident>,
136        /// Row values
137        values: Vec<Vec<Expr>>,
138    },
139}
140
141/// Format specification for serialization (e.g., FORMAT JSON, FORMAT AVRO).
142#[derive(Debug, Clone, PartialEq)]
143pub struct FormatSpec {
144    /// Format type (e.g., "JSON", "AVRO", "PROTOBUF").
145    pub format_type: String,
146    /// Additional format options (from WITH clause after FORMAT).
147    pub options: HashMap<String, String>,
148}
149
150/// CREATE SOURCE statement
151#[derive(Debug, Clone, PartialEq)]
152pub struct CreateSourceStatement {
153    /// Source name
154    pub name: ObjectName,
155    /// Column definitions
156    pub columns: Vec<ColumnDef>,
157    /// Watermark definition
158    pub watermark: Option<WatermarkDef>,
159    /// Source connector options (from WITH clause)
160    pub with_options: HashMap<String, String>,
161    /// Whether to replace existing source
162    pub or_replace: bool,
163    /// Whether to skip if exists
164    pub if_not_exists: bool,
165    /// Connector type (e.g., "KAFKA") from `FROM KAFKA (...)` syntax
166    pub connector_type: Option<String>,
167    /// Connector-specific options (from `FROM KAFKA (...)`)
168    pub connector_options: HashMap<String, String>,
169    /// Format specification (e.g., `FORMAT JSON`)
170    pub format: Option<FormatSpec>,
171}
172
173/// CREATE SINK statement
174#[derive(Debug, Clone, PartialEq)]
175pub struct CreateSinkStatement {
176    /// Sink name
177    pub name: ObjectName,
178    /// Input query or table
179    pub from: SinkFrom,
180    /// Sink connector options (from WITH clause)
181    pub with_options: HashMap<String, String>,
182    /// Whether to replace existing sink
183    pub or_replace: bool,
184    /// Whether to skip if exists
185    pub if_not_exists: bool,
186    /// Optional WHERE filter expression
187    pub filter: Option<Expr>,
188    /// Connector type (e.g., "KAFKA") from `INTO KAFKA (...)` syntax
189    pub connector_type: Option<String>,
190    /// Connector-specific options (from `INTO KAFKA (...)`)
191    pub connector_options: HashMap<String, String>,
192    /// Format specification (e.g., `FORMAT JSON`)
193    pub format: Option<FormatSpec>,
194    /// Output options (from `WITH (key = ...)` after FORMAT)
195    pub output_options: HashMap<String, String>,
196}
197
198/// Source for a sink
199#[derive(Debug, Clone, PartialEq)]
200pub enum SinkFrom {
201    /// From a table or source
202    Table(ObjectName),
203    /// From a SELECT query
204    Query(Box<StreamingStatement>),
205}
206
207/// Watermark definition
208#[derive(Debug, Clone, PartialEq)]
209pub struct WatermarkDef {
210    /// Column to use for watermark
211    pub column: Ident,
212    /// Watermark expression (e.g., column - INTERVAL '5' SECOND)
213    pub expression: Expr,
214}
215
216/// Late data handling clause.
217///
218/// Controls what happens to events that arrive after their window has closed.
219/// This is the SQL AST representation of late data configuration.
220/// See `laminar_core::operator::window::LateDataConfig` for the runtime representation.
221#[derive(Debug, Clone, PartialEq, Default)]
222pub struct LateDataClause {
223    /// Allowed lateness duration (e.g., `INTERVAL '1' HOUR`)
224    pub allowed_lateness: Option<Box<Expr>>,
225    /// Side output name for late events (e.g., `late_events`)
226    pub side_output: Option<String>,
227}
228
229impl LateDataClause {
230    /// Creates a clause with allowed lateness only.
231    #[must_use]
232    pub fn with_allowed_lateness(lateness: Expr) -> Self {
233        Self {
234            allowed_lateness: Some(Box::new(lateness)),
235            side_output: None,
236        }
237    }
238
239    /// Creates a clause with both allowed lateness and side output.
240    #[must_use]
241    pub fn with_side_output(lateness: Expr, side_output: String) -> Self {
242        Self {
243            allowed_lateness: Some(Box::new(lateness)),
244            side_output: Some(side_output),
245        }
246    }
247
248    /// Creates a clause with side output only (uses default lateness).
249    #[must_use]
250    pub fn side_output_only(side_output: String) -> Self {
251        Self {
252            allowed_lateness: None,
253            side_output: Some(side_output),
254        }
255    }
256
257    /// Convert to allowed lateness Duration.
258    ///
259    /// # Errors
260    ///
261    /// Returns `ParseError::WindowError` if the interval cannot be parsed.
262    pub fn to_allowed_lateness(&self) -> Result<Duration, ParseError> {
263        match &self.allowed_lateness {
264            Some(expr) => WindowRewriter::parse_interval_to_duration(expr),
265            None => Ok(Duration::ZERO),
266        }
267    }
268
269    /// Check if this clause has a side output configured.
270    #[must_use]
271    pub fn has_side_output(&self) -> bool {
272        self.side_output.is_some()
273    }
274
275    /// Get the side output name, if configured.
276    #[must_use]
277    pub fn get_side_output(&self) -> Option<&str> {
278        self.side_output.as_deref()
279    }
280}
281
282/// Emit strategy for runtime operator configuration.
283///
284/// This is the runtime representation that operators use.
285#[derive(Debug, Clone, PartialEq)]
286pub enum EmitStrategy {
287    /// Emit when watermark passes window end
288    OnWatermark,
289    /// Emit only when window closes (no intermediate results)
290    OnWindowClose,
291    /// Emit at fixed intervals
292    Periodic(Duration),
293    /// Emit on every state change
294    OnUpdate,
295    /// Emit changelog records with Z-set weights
296    Changelog,
297    /// Emit only final results, suppress all intermediate
298    FinalOnly,
299}
300
301/// EMIT clause for controlling output timing.
302///
303/// This is the SQL AST representation of emit strategies.
304/// See `laminar_core::operator::window::EmitStrategy` for the runtime representation.
305#[derive(Debug, Clone, PartialEq)]
306pub enum EmitClause {
307    // === Existing (F011) ===
308    /// EMIT AFTER WATERMARK (or EMIT ON WATERMARK)
309    ///
310    /// Emit results when the watermark passes the window end.
311    /// This is the most efficient strategy.
312    AfterWatermark,
313
314    /// EMIT ON WINDOW CLOSE
315    ///
316    /// For append-only sinks (Kafka, S3, Delta Lake, Iceberg).
317    /// Only emits when window closes, no intermediate results.
318    /// Unlike `AfterWatermark`, this is NOT a synonym - it has distinct behavior.
319    OnWindowClose,
320
321    /// EMIT EVERY INTERVAL 'N' unit (or EMIT PERIODICALLY)
322    ///
323    /// Emit intermediate results at fixed intervals.
324    /// Final results are still emitted on watermark.
325    Periodically {
326        /// The interval expression (e.g., INTERVAL '5' SECOND)
327        interval: Box<Expr>,
328    },
329
330    /// EMIT ON UPDATE
331    ///
332    /// Emit updated results after every state change.
333    /// This provides lowest latency but highest overhead.
334    OnUpdate,
335
336    // === New (F011B) ===
337    /// EMIT CHANGES
338    ///
339    /// Emit changelog records with Z-set weights for CDC pipelines.
340    /// Every emission includes operation type and weight:
341    /// - Insert (+1 weight)
342    /// - Delete (-1 weight)
343    /// - Update (retraction pair: -1 old, +1 new)
344    ///
345    /// Required for:
346    /// - CDC pipelines
347    /// - Cascading materialized views
348    /// - Downstream consumers that need to track changes
349    Changes,
350
351    /// EMIT FINAL
352    ///
353    /// Suppress ALL intermediate results, emit only finalized.
354    /// Also drops late data entirely after window close.
355    /// Use for BI reporting where only final, exact results matter.
356    Final,
357}
358
359impl EmitClause {
360    /// Convert to runtime EmitStrategy.
361    ///
362    /// # Errors
363    ///
364    /// Returns `ParseError::WindowError` if the periodic interval cannot be parsed.
365    pub fn to_emit_strategy(&self) -> Result<EmitStrategy, ParseError> {
366        match self {
367            EmitClause::AfterWatermark => Ok(EmitStrategy::OnWatermark),
368            EmitClause::OnWindowClose => Ok(EmitStrategy::OnWindowClose),
369            EmitClause::Periodically { interval } => {
370                let duration = WindowRewriter::parse_interval_to_duration(interval)?;
371                Ok(EmitStrategy::Periodic(duration))
372            }
373            EmitClause::OnUpdate => Ok(EmitStrategy::OnUpdate),
374            EmitClause::Changes => Ok(EmitStrategy::Changelog),
375            EmitClause::Final => Ok(EmitStrategy::FinalOnly),
376        }
377    }
378
379    /// Check if this emit strategy requires changelog/retraction support.
380    #[must_use]
381    pub fn requires_changelog(&self) -> bool {
382        matches!(self, EmitClause::Changes | EmitClause::OnUpdate)
383    }
384
385    /// Check if this emit strategy is append-only (no retractions).
386    #[must_use]
387    pub fn is_append_only(&self) -> bool {
388        matches!(
389            self,
390            EmitClause::OnWindowClose | EmitClause::Final | EmitClause::AfterWatermark
391        )
392    }
393}
394
395/// Window function types
396#[derive(Debug, Clone, PartialEq)]
397pub enum WindowFunction {
398    /// TUMBLE(column, interval)
399    Tumble {
400        /// The time column to window on
401        time_column: Box<Expr>,
402        /// The window interval
403        interval: Box<Expr>,
404    },
405    /// HOP(column, slide, size)
406    Hop {
407        /// The time column to window on
408        time_column: Box<Expr>,
409        /// The slide interval (how often to create a new window)
410        slide_interval: Box<Expr>,
411        /// The window size interval
412        window_interval: Box<Expr>,
413    },
414    /// SESSION(column, gap)
415    Session {
416        /// The time column to window on
417        time_column: Box<Expr>,
418        /// The gap interval (max gap between events in same session)
419        gap_interval: Box<Expr>,
420    },
421}
422
423#[cfg(test)]
424mod tests {
425    use super::*;
426    use sqlparser::ast::{DataType, Expr, ObjectNamePart};
427
428    #[test]
429    fn test_create_source_statement() {
430        let stmt = CreateSourceStatement {
431            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
432            columns: vec![
433                ColumnDef {
434                    name: Ident::new("id"),
435                    data_type: DataType::BigInt(None),
436                    options: vec![],
437                },
438                ColumnDef {
439                    name: Ident::new("timestamp"),
440                    data_type: DataType::Timestamp(None, sqlparser::ast::TimezoneInfo::None),
441                    options: vec![],
442                },
443            ],
444            watermark: Some(WatermarkDef {
445                column: Ident::new("timestamp"),
446                expression: Expr::Identifier(Ident::new("timestamp")),
447            }),
448            with_options: HashMap::from([
449                ("connector".to_string(), "kafka".to_string()),
450                ("topic".to_string(), "events".to_string()),
451            ]),
452            or_replace: false,
453            if_not_exists: true,
454            connector_type: None,
455            connector_options: HashMap::new(),
456            format: None,
457        };
458
459        // Check the statement fields
460        assert_eq!(stmt.columns.len(), 2);
461        assert!(stmt.watermark.is_some());
462        assert_eq!(
463            stmt.with_options.get("connector"),
464            Some(&"kafka".to_string())
465        );
466    }
467
468    #[test]
469    fn test_emit_clause_variants() {
470        let emit1 = EmitClause::AfterWatermark;
471        let emit2 = EmitClause::OnWindowClose;
472        let emit3 = EmitClause::Periodically {
473            interval: Box::new(Expr::Identifier(Ident::new("5_SECONDS"))),
474        };
475        let emit4 = EmitClause::OnUpdate;
476
477        match emit1 {
478            EmitClause::AfterWatermark => (),
479            _ => panic!("Expected AfterWatermark"),
480        }
481
482        match emit2 {
483            EmitClause::OnWindowClose => (),
484            _ => panic!("Expected OnWindowClose"),
485        }
486
487        match emit3 {
488            EmitClause::Periodically { .. } => (),
489            _ => panic!("Expected Periodically"),
490        }
491
492        match emit4 {
493            EmitClause::OnUpdate => (),
494            _ => panic!("Expected OnUpdate"),
495        }
496    }
497
498    #[test]
499    fn test_window_functions() {
500        let tumble = WindowFunction::Tumble {
501            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
502            interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
503        };
504
505        let hop = WindowFunction::Hop {
506            time_column: Box::new(Expr::Identifier(Ident::new("event_time"))),
507            slide_interval: Box::new(Expr::Identifier(Ident::new("1_MINUTE"))),
508            window_interval: Box::new(Expr::Identifier(Ident::new("5_MINUTES"))),
509        };
510
511        match tumble {
512            WindowFunction::Tumble { .. } => (),
513            _ => panic!("Expected Tumble"),
514        }
515
516        match hop {
517            WindowFunction::Hop { .. } => (),
518            _ => panic!("Expected Hop"),
519        }
520    }
521
522    #[test]
523    fn test_late_data_clause_default() {
524        let clause = LateDataClause::default();
525        assert!(clause.allowed_lateness.is_none());
526        assert!(clause.side_output.is_none());
527    }
528
529    #[test]
530    fn test_late_data_clause_with_allowed_lateness() {
531        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
532        let clause = LateDataClause::with_allowed_lateness(lateness_expr);
533        assert!(clause.allowed_lateness.is_some());
534        assert!(clause.side_output.is_none());
535    }
536
537    #[test]
538    fn test_late_data_clause_with_side_output() {
539        let lateness_expr = Expr::Identifier(Ident::new("INTERVAL '1' HOUR"));
540        let clause = LateDataClause::with_side_output(lateness_expr, "late_events".to_string());
541        assert!(clause.allowed_lateness.is_some());
542        assert_eq!(clause.side_output, Some("late_events".to_string()));
543    }
544
545    #[test]
546    fn test_late_data_clause_side_output_only() {
547        let clause = LateDataClause::side_output_only("late_events".to_string());
548        assert!(clause.allowed_lateness.is_none());
549        assert_eq!(clause.side_output, Some("late_events".to_string()));
550    }
551
552    #[test]
553    fn test_show_command_variants() {
554        let sources = ShowCommand::Sources;
555        let sinks = ShowCommand::Sinks;
556        let queries = ShowCommand::Queries;
557        let mvs = ShowCommand::MaterializedViews;
558
559        assert_eq!(sources, ShowCommand::Sources);
560        assert_eq!(sinks, ShowCommand::Sinks);
561        assert_eq!(queries, ShowCommand::Queries);
562        assert_eq!(mvs, ShowCommand::MaterializedViews);
563    }
564
565    #[test]
566    fn test_show_command_clone() {
567        let cmd = ShowCommand::Sources;
568        let cloned = cmd.clone();
569        assert_eq!(cmd, cloned);
570    }
571
572    #[test]
573    fn test_drop_source_statement() {
574        let stmt = StreamingStatement::DropSource {
575            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
576            if_exists: true,
577        };
578        match stmt {
579            StreamingStatement::DropSource { name, if_exists } => {
580                assert_eq!(name.to_string(), "events");
581                assert!(if_exists);
582            }
583            _ => panic!("Expected DropSource"),
584        }
585    }
586
587    #[test]
588    fn test_drop_sink_statement() {
589        let stmt = StreamingStatement::DropSink {
590            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("output"))]),
591            if_exists: false,
592        };
593        match stmt {
594            StreamingStatement::DropSink { name, if_exists } => {
595                assert_eq!(name.to_string(), "output");
596                assert!(!if_exists);
597            }
598            _ => panic!("Expected DropSink"),
599        }
600    }
601
602    #[test]
603    fn test_drop_materialized_view_statement() {
604        let stmt = StreamingStatement::DropMaterializedView {
605            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
606            if_exists: true,
607            cascade: true,
608        };
609        match stmt {
610            StreamingStatement::DropMaterializedView {
611                name,
612                if_exists,
613                cascade,
614            } => {
615                assert_eq!(name.to_string(), "live_stats");
616                assert!(if_exists);
617                assert!(cascade);
618            }
619            _ => panic!("Expected DropMaterializedView"),
620        }
621    }
622
623    #[test]
624    fn test_show_statement() {
625        let stmt = StreamingStatement::Show(ShowCommand::Sources);
626        match stmt {
627            StreamingStatement::Show(ShowCommand::Sources) => (),
628            _ => panic!("Expected Show(Sources)"),
629        }
630    }
631
632    #[test]
633    fn test_describe_statement() {
634        let stmt = StreamingStatement::Describe {
635            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
636            extended: true,
637        };
638        match stmt {
639            StreamingStatement::Describe { name, extended } => {
640                assert_eq!(name.to_string(), "events");
641                assert!(extended);
642            }
643            _ => panic!("Expected Describe"),
644        }
645    }
646
647    #[test]
648    fn test_explain_statement() {
649        // Build an inner Standard statement using sqlparser
650        let dialect = sqlparser::dialect::GenericDialect {};
651        let stmts = sqlparser::parser::Parser::parse_sql(&dialect, "SELECT 1").unwrap();
652        let inner = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
653
654        let stmt = StreamingStatement::Explain {
655            statement: Box::new(inner),
656        };
657        match stmt {
658            StreamingStatement::Explain { statement } => {
659                assert!(matches!(*statement, StreamingStatement::Standard(_)));
660            }
661            _ => panic!("Expected Explain"),
662        }
663    }
664
665    #[test]
666    fn test_create_materialized_view_statement() {
667        // Build a query statement using sqlparser
668        let dialect = sqlparser::dialect::GenericDialect {};
669        let stmts =
670            sqlparser::parser::Parser::parse_sql(&dialect, "SELECT COUNT(*) FROM events").unwrap();
671        let query = StreamingStatement::Standard(Box::new(stmts.into_iter().next().unwrap()));
672
673        let stmt = StreamingStatement::CreateMaterializedView {
674            name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("live_stats"))]),
675            query: Box::new(query),
676            emit_clause: Some(EmitClause::OnWindowClose),
677            or_replace: false,
678            if_not_exists: true,
679        };
680        match stmt {
681            StreamingStatement::CreateMaterializedView {
682                name,
683                emit_clause,
684                or_replace,
685                if_not_exists,
686                ..
687            } => {
688                assert_eq!(name.to_string(), "live_stats");
689                assert_eq!(emit_clause, Some(EmitClause::OnWindowClose));
690                assert!(!or_replace);
691                assert!(if_not_exists);
692            }
693            _ => panic!("Expected CreateMaterializedView"),
694        }
695    }
696
697    #[test]
698    fn test_insert_into_statement() {
699        let stmt = StreamingStatement::InsertInto {
700            table_name: ObjectName(vec![ObjectNamePart::Identifier(Ident::new("events"))]),
701            columns: vec![Ident::new("id"), Ident::new("name")],
702            values: vec![vec![
703                Expr::Value(sqlparser::ast::Value::Number("1".to_string(), false).into()),
704                Expr::Value(sqlparser::ast::Value::SingleQuotedString("test".to_string()).into()),
705            ]],
706        };
707        match stmt {
708            StreamingStatement::InsertInto {
709                table_name,
710                columns,
711                values,
712            } => {
713                assert_eq!(table_name.to_string(), "events");
714                assert_eq!(columns.len(), 2);
715                assert_eq!(values.len(), 1);
716                assert_eq!(values[0].len(), 2);
717            }
718            _ => panic!("Expected InsertInto"),
719        }
720    }
721}