Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12
13use std::collections::HashMap;
14
15use datafusion::logical_expr::LogicalPlan;
16use datafusion::prelude::SessionContext;
17use sqlparser::ast::{ObjectName, SetExpr, Statement};
18
19use crate::parser::aggregation_parser::analyze_aggregates;
20use crate::parser::analytic_parser::{
21    analyze_analytic_functions, analyze_window_frames, FrameBound,
22};
23use crate::parser::join_parser::analyze_joins;
24use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
25use crate::parser::order_analyzer::analyze_order_by;
26use crate::parser::{
27    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
28    StreamingStatement, WindowFunction, WindowRewriter,
29};
30use crate::translator::{
31    AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
32    OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
33};
34
35/// Information about a registered lookup table.
36#[derive(Debug, Clone)]
37pub struct LookupTableInfo {
38    /// Table name.
39    pub name: String,
40    /// Column names and types.
41    pub columns: Vec<(String, String)>,
42    /// Primary key columns.
43    pub primary_key: Vec<String>,
44    /// Validated properties.
45    pub properties: LookupTableProperties,
46}
47
48/// Streaming query planner
49pub struct StreamingPlanner {
50    /// Registered sources
51    sources: HashMap<String, SourceInfo>,
52    /// Registered sinks
53    sinks: HashMap<String, SinkInfo>,
54    /// Registered lookup tables
55    lookup_tables: HashMap<String, LookupTableInfo>,
56}
57
58/// Information about a registered source
59#[derive(Debug, Clone)]
60pub struct SourceInfo {
61    /// Source name
62    pub name: String,
63    /// Watermark column (if configured)
64    pub watermark_column: Option<String>,
65    /// Connector options
66    pub options: HashMap<String, String>,
67}
68
69/// Information about a registered sink
70#[derive(Debug, Clone)]
71pub struct SinkInfo {
72    /// Sink name
73    pub name: String,
74    /// Source table or query name
75    pub from: String,
76    /// Connector options
77    pub options: HashMap<String, String>,
78}
79
80/// Result of planning a streaming statement
81#[derive(Debug)]
82#[allow(clippy::large_enum_variant)]
83pub enum StreamingPlan {
84    /// Source registration (DDL)
85    RegisterSource(SourceInfo),
86
87    /// Sink registration (DDL)
88    RegisterSink(SinkInfo),
89
90    /// Query plan with streaming configurations
91    Query(QueryPlan),
92
93    /// Standard SQL statement (pass-through to DataFusion)
94    Standard(Box<Statement>),
95
96    /// DAG topology explanation (from EXPLAIN DAG)
97    DagExplain(DagExplainOutput),
98
99    /// Lookup table registration (DDL)
100    RegisterLookupTable(LookupTableInfo),
101
102    /// Drop a lookup table
103    DropLookupTable {
104        /// Name of the lookup table to drop.
105        name: String,
106    },
107}
108
109/// A query plan with streaming operator configurations
110#[derive(Debug)]
111pub struct QueryPlan {
112    /// Optional name for the continuous query
113    pub name: Option<String>,
114    /// Window configuration if the query has windowed aggregation
115    pub window_config: Option<WindowOperatorConfig>,
116    /// Join configuration(s) if the query has joins (one per join step)
117    pub join_config: Option<Vec<JoinOperatorConfig>>,
118    /// ORDER BY configuration if the query has ordering
119    pub order_config: Option<OrderOperatorConfig>,
120    /// Analytic window function configuration (LAG/LEAD/etc.)
121    pub analytic_config: Option<AnalyticWindowConfig>,
122    /// HAVING clause filter configuration
123    pub having_config: Option<HavingFilterConfig>,
124    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
125    pub frame_config: Option<WindowFrameConfig>,
126    /// Emit strategy
127    pub emit_clause: Option<EmitClause>,
128    /// The underlying SQL statement
129    pub statement: Box<Statement>,
130}
131
132impl StreamingPlanner {
133    /// Creates a new streaming planner
134    #[must_use]
135    pub fn new() -> Self {
136        Self {
137            sources: HashMap::new(),
138            sinks: HashMap::new(),
139            lookup_tables: HashMap::new(),
140        }
141    }
142
143    /// Plans a streaming statement.
144    ///
145    /// # Errors
146    ///
147    /// Returns `PlanningError` if the statement cannot be planned.
148    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
149        match statement {
150            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
151            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
152            StreamingStatement::CreateContinuousQuery {
153                name,
154                query,
155                emit_clause,
156            }
157            | StreamingStatement::CreateStream {
158                name,
159                query,
160                emit_clause,
161                ..
162            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
163            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
164            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
165            StreamingStatement::DropLookupTable { name, if_exists } => {
166                self.plan_drop_lookup_table(name, *if_exists)
167            }
168            StreamingStatement::DropSource { .. }
169            | StreamingStatement::DropSink { .. }
170            | StreamingStatement::DropStream { .. }
171            | StreamingStatement::DropMaterializedView { .. }
172            | StreamingStatement::Show(_)
173            | StreamingStatement::Describe { .. }
174            | StreamingStatement::Explain { .. }
175            | StreamingStatement::CreateMaterializedView { .. }
176            | StreamingStatement::InsertInto { .. }
177            | StreamingStatement::AlterSource { .. } => {
178                // These statements are handled directly by the database facade
179                // and don't need query planning. Return as Standard pass-through.
180                Err(PlanningError::UnsupportedSql(format!(
181                    "Statement type {:?} is handled by the database layer, not the planner",
182                    std::mem::discriminant(statement)
183                )))
184            }
185        }
186    }
187
188    /// Plans a CREATE SOURCE statement.
189    fn plan_create_source(
190        &mut self,
191        source: &CreateSourceStatement,
192    ) -> Result<StreamingPlan, PlanningError> {
193        let name = object_name_to_string(&source.name);
194
195        // Check for existing source
196        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
197            return Err(PlanningError::InvalidQuery(format!(
198                "Source '{}' already exists",
199                name
200            )));
201        }
202
203        // Extract watermark column
204        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
205
206        let info = SourceInfo {
207            name: name.clone(),
208            watermark_column,
209            options: source.with_options.clone(),
210        };
211
212        // Register the source
213        self.sources.insert(name, info.clone());
214
215        Ok(StreamingPlan::RegisterSource(info))
216    }
217
218    /// Plans a CREATE SINK statement.
219    fn plan_create_sink(
220        &mut self,
221        sink: &CreateSinkStatement,
222    ) -> Result<StreamingPlan, PlanningError> {
223        let name = object_name_to_string(&sink.name);
224
225        // Check for existing sink
226        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
227            return Err(PlanningError::InvalidQuery(format!(
228                "Sink '{}' already exists",
229                name
230            )));
231        }
232
233        // Determine the source
234        let from = match &sink.from {
235            SinkFrom::Table(table) => object_name_to_string(table),
236            SinkFrom::Query(_) => format!("{}_query", name),
237        };
238
239        let info = SinkInfo {
240            name: name.clone(),
241            from,
242            options: sink.with_options.clone(),
243        };
244
245        // Register the sink
246        self.sinks.insert(name, info.clone());
247
248        Ok(StreamingPlan::RegisterSink(info))
249    }
250
251    /// Plans a CREATE CONTINUOUS QUERY statement.
252    #[allow(clippy::unused_self)] // Will use planner state for query registration
253    fn plan_continuous_query(
254        &mut self,
255        name: &ObjectName,
256        query: &StreamingStatement,
257        emit_clause: Option<&EmitClause>,
258    ) -> Result<StreamingPlan, PlanningError> {
259        // The query inside should be a standard SELECT
260        let stmt = match query {
261            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
262            _ => {
263                return Err(PlanningError::InvalidQuery(
264                    "Continuous query must contain a SELECT statement".to_string(),
265                ))
266            }
267        };
268
269        // Analyze the query for streaming features
270        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
271
272        Ok(StreamingPlan::Query(QueryPlan {
273            name: Some(object_name_to_string(name)),
274            window_config: query_plan.window_config,
275            join_config: query_plan.join_config,
276            order_config: query_plan.order_config,
277            analytic_config: query_plan.analytic_config,
278            having_config: query_plan.having_config,
279            frame_config: query_plan.frame_config,
280            emit_clause: emit_clause.cloned(),
281            statement: Box::new(stmt),
282        }))
283    }
284
285    /// Plans a standard SQL statement.
286    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
287    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
288        // Check if it's a query that might have streaming features
289        if let Statement::Query(query) = stmt {
290            if let SetExpr::Select(select) = query.body.as_ref() {
291                // Check for window functions in GROUP BY
292                let window_function = Self::extract_window_from_select(select);
293
294                // Check for joins (multi-way)
295                let join_analysis = analyze_joins(select).map_err(|e| {
296                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
297                })?;
298
299                // Check for ORDER BY
300                let order_analysis = analyze_order_by(stmt);
301                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
302                    .map_err(PlanningError::InvalidQuery)?;
303
304                // Check for analytic functions (LAG/LEAD/etc.)
305                let analytic_analysis = analyze_analytic_functions(stmt);
306                let analytic_config =
307                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
308
309                // Check for HAVING clause
310                let agg_analysis = analyze_aggregates(stmt);
311                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
312
313                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
314                let frame_analysis = analyze_window_frames(stmt);
315                let frame_config = frame_analysis
316                    .as_ref()
317                    .map(WindowFrameConfig::from_analysis);
318
319                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
320                if let Some(fa) = &frame_analysis {
321                    for f in &fa.functions {
322                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
323                            return Err(PlanningError::InvalidQuery(
324                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
325                                    .to_string(),
326                            ));
327                        }
328                    }
329                }
330
331                let has_streaming_features = window_function.is_some()
332                    || join_analysis.is_some()
333                    || order_config.is_some()
334                    || analytic_config.is_some()
335                    || having_config.is_some()
336                    || frame_config.is_some();
337
338                if has_streaming_features {
339                    let window_config = match window_function {
340                        Some(w) => Some(
341                            WindowOperatorConfig::from_window_function(&w)
342                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
343                        ),
344                        None => None,
345                    };
346
347                    let join_config =
348                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
349
350                    return Ok(StreamingPlan::Query(QueryPlan {
351                        name: None,
352                        window_config,
353                        join_config,
354                        order_config,
355                        analytic_config,
356                        having_config,
357                        frame_config,
358                        emit_clause: None,
359                        statement: Box::new(stmt.clone()),
360                    }));
361                }
362            }
363        }
364
365        // Pass through standard SQL
366        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
367    }
368
369    /// Analyzes a query for streaming features.
370    fn analyze_query(
371        stmt: &Statement,
372        emit_clause: Option<&EmitClause>,
373    ) -> Result<QueryAnalysis, PlanningError> {
374        let mut analysis = QueryAnalysis::default();
375
376        if let Statement::Query(query) = stmt {
377            if let SetExpr::Select(select) = query.body.as_ref() {
378                // Extract window function
379                if let Some(window) = Self::extract_window_from_select(select) {
380                    let mut config = WindowOperatorConfig::from_window_function(&window)
381                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
382
383                    // Apply emit clause if present
384                    if let Some(emit) = emit_clause {
385                        config = config
386                            .with_emit_clause(emit)
387                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
388                    }
389
390                    analysis.window_config = Some(config);
391                }
392
393                // Extract join info (multi-way)
394                if let Some(multi) = analyze_joins(select).map_err(|e| {
395                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
396                })? {
397                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
398                }
399            }
400        }
401
402        // Extract ORDER BY info
403        let order_analysis = analyze_order_by(stmt);
404        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
405            .map_err(PlanningError::InvalidQuery)?;
406
407        // Extract analytic function info (LAG/LEAD/etc.)
408        if let Some(analytic) = analyze_analytic_functions(stmt) {
409            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
410        }
411
412        // Extract HAVING clause
413        let agg_analysis = analyze_aggregates(stmt);
414        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
415
416        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
417        if let Some(frame_analysis) = analyze_window_frames(stmt) {
418            // Validate: reject UNBOUNDED FOLLOWING
419            for f in &frame_analysis.functions {
420                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
421                    return Err(PlanningError::InvalidQuery(
422                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
423                            .to_string(),
424                    ));
425                }
426            }
427            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
428        }
429
430        Ok(analysis)
431    }
432
433    /// Extracts window function from a SELECT.
434    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
435        // Check GROUP BY for window functions
436        use sqlparser::ast::GroupByExpr;
437        match &select.group_by {
438            GroupByExpr::Expressions(exprs, _modifiers) => {
439                for group_by_expr in exprs {
440                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
441                    {
442                        return Some(window);
443                    }
444                }
445            }
446            GroupByExpr::All(_) => {}
447        }
448        None
449    }
450
451    /// Plans a CREATE LOOKUP TABLE statement.
452    fn plan_create_lookup_table(
453        &mut self,
454        lt: &CreateLookupTableStatement,
455    ) -> Result<StreamingPlan, PlanningError> {
456        let name = object_name_to_string(&lt.name);
457
458        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
459            return Err(PlanningError::InvalidQuery(format!(
460                "Lookup table '{}' already exists",
461                name
462            )));
463        }
464
465        let columns: Vec<(String, String)> = lt
466            .columns
467            .iter()
468            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
469            .collect();
470
471        let properties = validate_properties(&lt.with_options).map_err(|e| {
472            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
473        })?;
474
475        let info = LookupTableInfo {
476            name: name.clone(),
477            columns,
478            primary_key: lt.primary_key.clone(),
479            properties,
480        };
481
482        self.lookup_tables.insert(name, info.clone());
483
484        Ok(StreamingPlan::RegisterLookupTable(info))
485    }
486
487    /// Plans a DROP LOOKUP TABLE statement.
488    fn plan_drop_lookup_table(
489        &mut self,
490        name: &ObjectName,
491        if_exists: bool,
492    ) -> Result<StreamingPlan, PlanningError> {
493        let name_str = object_name_to_string(name);
494
495        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
496            return Err(PlanningError::InvalidQuery(format!(
497                "Lookup table '{}' does not exist",
498                name_str
499            )));
500        }
501
502        self.lookup_tables.remove(&name_str);
503
504        Ok(StreamingPlan::DropLookupTable { name: name_str })
505    }
506
507    /// Gets a registered source by name.
508    #[must_use]
509    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
510        self.sources.get(name)
511    }
512
513    /// Gets a registered sink by name.
514    #[must_use]
515    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
516        self.sinks.get(name)
517    }
518
519    /// Lists all registered sources.
520    #[must_use]
521    pub fn list_sources(&self) -> Vec<&SourceInfo> {
522        self.sources.values().collect()
523    }
524
525    /// Lists all registered sinks.
526    #[must_use]
527    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
528        self.sinks.values().collect()
529    }
530
531    /// Gets a registered lookup table by name.
532    #[must_use]
533    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
534        self.lookup_tables.get(name)
535    }
536
537    /// Lists all registered lookup tables.
538    #[must_use]
539    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
540        self.lookup_tables.values().collect()
541    }
542
543    /// Creates a `DataFusion` logical plan from a query plan.
544    ///
545    /// Converts the query plan's SQL statement into a `DataFusion`
546    /// `LogicalPlan` using the session context's state. Window UDFs
547    /// (TUMBLE, HOP, SESSION) must be registered on the context via
548    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
549    /// for windowed queries to resolve correctly.
550    ///
551    /// # Arguments
552    ///
553    /// * `plan` - The streaming query plan containing the SQL statement
554    /// * `ctx` - `DataFusion` session context with registered UDFs
555    ///
556    /// # Errors
557    ///
558    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
559    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
560    pub async fn to_logical_plan(
561        &self,
562        plan: &QueryPlan,
563        ctx: &SessionContext,
564    ) -> Result<LogicalPlan, PlanningError> {
565        // Convert the AST statement back to SQL and let DataFusion re-parse
566        // it with its own sqlparser version. This avoids version mismatches
567        // between our sqlparser (0.60) and DataFusion's (0.59).
568        let sql = plan.statement.to_string();
569        ctx.state()
570            .create_logical_plan(&sql)
571            .await
572            .map_err(PlanningError::DataFusion)
573    }
574}
575
576impl Default for StreamingPlanner {
577    fn default() -> Self {
578        Self::new()
579    }
580}
581
582/// Intermediate query analysis result
583#[derive(Debug, Default)]
584#[allow(clippy::struct_field_names)]
585struct QueryAnalysis {
586    window_config: Option<WindowOperatorConfig>,
587    join_config: Option<Vec<JoinOperatorConfig>>,
588    order_config: Option<OrderOperatorConfig>,
589    analytic_config: Option<AnalyticWindowConfig>,
590    having_config: Option<HavingFilterConfig>,
591    frame_config: Option<WindowFrameConfig>,
592}
593
594/// Helper to convert `ObjectName` to String
595fn object_name_to_string(name: &ObjectName) -> String {
596    name.to_string()
597}
598
599/// Planning errors
600#[derive(Debug, thiserror::Error)]
601pub enum PlanningError {
602    /// Unsupported SQL feature
603    UnsupportedSql(String),
604
605    /// Invalid query
606    InvalidQuery(String),
607
608    /// Source not found
609    SourceNotFound(String),
610
611    /// Sink not found
612    SinkNotFound(String),
613
614    /// `DataFusion` error during logical plan creation (translated on display)
615    DataFusion(#[from] datafusion_common::DataFusionError),
616}
617
618impl std::fmt::Display for PlanningError {
619    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620        match self {
621            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
622            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
623            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
624            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
625            Self::DataFusion(e) => {
626                let translated = crate::error::translate_datafusion_error(&e.to_string());
627                write!(f, "{translated}")
628            }
629        }
630    }
631}
632
633#[cfg(test)]
634mod tests {
635    use super::*;
636    use crate::parser::StreamingParser;
637
638    #[test]
639    fn test_plan_create_source() {
640        let mut planner = StreamingPlanner::new();
641        let statements =
642            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
643
644        let plan = planner.plan(&statements[0]).unwrap();
645        match plan {
646            StreamingPlan::RegisterSource(info) => {
647                assert_eq!(info.name, "events");
648            }
649            _ => panic!("Expected RegisterSource plan"),
650        }
651    }
652
653    #[test]
654    fn test_plan_create_sink() {
655        let mut planner = StreamingPlanner::new();
656        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
657
658        let plan = planner.plan(&statements[0]).unwrap();
659        match plan {
660            StreamingPlan::RegisterSink(info) => {
661                assert_eq!(info.name, "output");
662                assert_eq!(info.from, "events");
663            }
664            _ => panic!("Expected RegisterSink plan"),
665        }
666    }
667
668    #[test]
669    fn test_plan_duplicate_source() {
670        let mut planner = StreamingPlanner::new();
671
672        // First source
673        let statements =
674            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
675        planner.plan(&statements[0]).unwrap();
676
677        // Duplicate should fail
678        let result = planner.plan(&statements[0]);
679        assert!(result.is_err());
680    }
681
682    #[test]
683    fn test_plan_source_if_not_exists() {
684        let mut planner = StreamingPlanner::new();
685
686        // First source
687        let statements =
688            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
689        planner.plan(&statements[0]).unwrap();
690
691        // IF NOT EXISTS should succeed
692        let statements =
693            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
694                .unwrap();
695        let result = planner.plan(&statements[0]);
696        assert!(result.is_ok());
697    }
698
699    #[test]
700    fn test_plan_source_or_replace() {
701        let mut planner = StreamingPlanner::new();
702
703        // First source
704        let statements =
705            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
706        planner.plan(&statements[0]).unwrap();
707
708        // OR REPLACE should succeed
709        let statements =
710            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
711                .unwrap();
712        let result = planner.plan(&statements[0]);
713        assert!(result.is_ok());
714    }
715
716    #[test]
717    fn test_plan_source_with_watermark() {
718        let mut planner = StreamingPlanner::new();
719        let statements = StreamingParser::parse_sql(
720            "CREATE SOURCE events (
721                id INT,
722                ts TIMESTAMP,
723                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
724            )",
725        )
726        .unwrap();
727
728        let plan = planner.plan(&statements[0]).unwrap();
729        match plan {
730            StreamingPlan::RegisterSource(info) => {
731                assert_eq!(info.name, "events");
732                assert_eq!(info.watermark_column, Some("ts".to_string()));
733            }
734            _ => panic!("Expected RegisterSource plan"),
735        }
736    }
737
738    #[test]
739    fn test_plan_standard_select() {
740        let mut planner = StreamingPlanner::new();
741        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
742
743        let plan = planner.plan(&statements[0]).unwrap();
744        match plan {
745            StreamingPlan::Standard(_) => {}
746            _ => panic!("Expected Standard plan for simple SELECT"),
747        }
748    }
749
750    #[test]
751    fn test_list_sources_and_sinks() {
752        let mut planner = StreamingPlanner::new();
753
754        // Create sources
755        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
756        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
757        planner.plan(&s1[0]).unwrap();
758        planner.plan(&s2[0]).unwrap();
759
760        // Create sinks
761        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
762        planner.plan(&k1[0]).unwrap();
763
764        assert_eq!(planner.list_sources().len(), 2);
765        assert_eq!(planner.list_sinks().len(), 1);
766        assert!(planner.get_source("src1").is_some());
767        assert!(planner.get_sink("sink1").is_some());
768    }
769
770    #[test]
771    fn test_plan_query_with_window() {
772        let mut planner = StreamingPlanner::new();
773        let statements = StreamingParser::parse_sql(
774            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
775        )
776        .unwrap();
777
778        let plan = planner.plan(&statements[0]).unwrap();
779        match plan {
780            StreamingPlan::Query(query_plan) => {
781                assert!(query_plan.window_config.is_some());
782                let config = query_plan.window_config.unwrap();
783                assert_eq!(config.time_column, "event_time");
784                assert_eq!(config.size.as_secs(), 300);
785            }
786            _ => panic!("Expected Query plan"),
787        }
788    }
789
790    #[test]
791    fn test_plan_query_with_join() {
792        let mut planner = StreamingPlanner::new();
793        let statements = StreamingParser::parse_sql(
794            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
795        )
796        .unwrap();
797
798        let plan = planner.plan(&statements[0]).unwrap();
799        match plan {
800            StreamingPlan::Query(query_plan) => {
801                assert!(query_plan.join_config.is_some());
802                let configs = query_plan.join_config.unwrap();
803                assert_eq!(configs.len(), 1);
804                assert_eq!(configs[0].left_key(), "order_id");
805                assert_eq!(configs[0].right_key(), "order_id");
806            }
807            _ => panic!("Expected Query plan"),
808        }
809    }
810
811    #[test]
812    fn test_plan_query_with_lag() {
813        let mut planner = StreamingPlanner::new();
814        let statements = StreamingParser::parse_sql(
815            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
816        )
817        .unwrap();
818
819        let plan = planner.plan(&statements[0]).unwrap();
820        match plan {
821            StreamingPlan::Query(query_plan) => {
822                assert!(query_plan.analytic_config.is_some());
823                let config = query_plan.analytic_config.unwrap();
824                assert_eq!(config.functions.len(), 1);
825                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
826            }
827            _ => panic!("Expected Query plan with analytic config"),
828        }
829    }
830
831    #[test]
832    fn test_plan_query_with_having() {
833        let mut planner = StreamingPlanner::new();
834        let statements = StreamingParser::parse_sql(
835            "SELECT symbol, COUNT(*) AS cnt FROM trades \
836             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
837             HAVING COUNT(*) > 10",
838        )
839        .unwrap();
840
841        let plan = planner.plan(&statements[0]).unwrap();
842        match plan {
843            StreamingPlan::Query(query_plan) => {
844                assert!(query_plan.window_config.is_some());
845                assert!(query_plan.having_config.is_some());
846                let config = query_plan.having_config.unwrap();
847                assert!(
848                    config.predicate().contains("COUNT(*)"),
849                    "predicate was: {}",
850                    config.predicate()
851                );
852            }
853            _ => panic!("Expected Query plan with having config"),
854        }
855    }
856
857    #[test]
858    fn test_plan_query_without_having() {
859        let mut planner = StreamingPlanner::new();
860        let statements = StreamingParser::parse_sql(
861            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
862        )
863        .unwrap();
864
865        let plan = planner.plan(&statements[0]).unwrap();
866        match plan {
867            StreamingPlan::Query(query_plan) => {
868                assert!(query_plan.having_config.is_none());
869            }
870            _ => panic!("Expected Query plan"),
871        }
872    }
873
874    #[test]
875    fn test_plan_having_only_produces_query_plan() {
876        // HAVING without window function still produces a Query plan
877        let mut planner = StreamingPlanner::new();
878        let statements = StreamingParser::parse_sql(
879            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
880        )
881        .unwrap();
882
883        let plan = planner.plan(&statements[0]).unwrap();
884        match plan {
885            StreamingPlan::Query(query_plan) => {
886                assert!(query_plan.having_config.is_some());
887                assert!(query_plan.window_config.is_none());
888            }
889            _ => panic!("Expected Query plan for HAVING-only query"),
890        }
891    }
892
893    #[test]
894    fn test_plan_having_compound_predicate() {
895        let mut planner = StreamingPlanner::new();
896        let statements = StreamingParser::parse_sql(
897            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
898             FROM trades GROUP BY symbol \
899             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
900        )
901        .unwrap();
902
903        let plan = planner.plan(&statements[0]).unwrap();
904        match plan {
905            StreamingPlan::Query(query_plan) => {
906                let config = query_plan.having_config.unwrap();
907                let pred = config.predicate();
908                assert!(pred.contains("AND"), "predicate was: {pred}");
909            }
910            _ => panic!("Expected Query plan"),
911        }
912    }
913
914    #[test]
915    fn test_plan_query_with_lead() {
916        let mut planner = StreamingPlanner::new();
917        let statements = StreamingParser::parse_sql(
918            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
919        )
920        .unwrap();
921
922        let plan = planner.plan(&statements[0]).unwrap();
923        match plan {
924            StreamingPlan::Query(query_plan) => {
925                assert!(query_plan.analytic_config.is_some());
926                let config = query_plan.analytic_config.unwrap();
927                assert!(config.has_lookahead());
928                assert_eq!(config.functions[0].offset, 2);
929            }
930            _ => panic!("Expected Query plan with analytic config"),
931        }
932    }
933
934    // -- Multi-way join planner tests --
935
936    #[test]
937    fn test_plan_single_join_produces_vec_of_one() {
938        let mut planner = StreamingPlanner::new();
939        let statements =
940            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
941
942        let plan = planner.plan(&statements[0]).unwrap();
943        match plan {
944            StreamingPlan::Query(qp) => {
945                let configs = qp.join_config.unwrap();
946                assert_eq!(configs.len(), 1);
947            }
948            _ => panic!("Expected Query plan"),
949        }
950    }
951
952    #[test]
953    fn test_plan_two_way_join() {
954        let mut planner = StreamingPlanner::new();
955        let statements = StreamingParser::parse_sql(
956            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
957        )
958        .unwrap();
959
960        let plan = planner.plan(&statements[0]).unwrap();
961        match plan {
962            StreamingPlan::Query(qp) => {
963                let configs = qp.join_config.unwrap();
964                assert_eq!(configs.len(), 2);
965                assert_eq!(configs[0].left_key(), "id");
966                assert_eq!(configs[0].right_key(), "a_id");
967                assert_eq!(configs[1].left_key(), "id");
968                assert_eq!(configs[1].right_key(), "b_id");
969            }
970            _ => panic!("Expected Query plan"),
971        }
972    }
973
974    #[test]
975    fn test_plan_mixed_join_types() {
976        let mut planner = StreamingPlanner::new();
977        let statements = StreamingParser::parse_sql(
978            "SELECT * FROM orders o \
979             JOIN payments p ON o.id = p.order_id \
980                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
981             JOIN customers c ON p.cust_id = c.id",
982        )
983        .unwrap();
984
985        let plan = planner.plan(&statements[0]).unwrap();
986        match plan {
987            StreamingPlan::Query(qp) => {
988                let configs = qp.join_config.unwrap();
989                assert_eq!(configs.len(), 2);
990                assert!(configs[0].is_stream_stream());
991                assert!(configs[1].is_lookup());
992            }
993            _ => panic!("Expected Query plan"),
994        }
995    }
996
997    #[test]
998    fn test_plan_backward_compat_no_join() {
999        let mut planner = StreamingPlanner::new();
1000        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1001
1002        let plan = planner.plan(&statements[0]).unwrap();
1003        match plan {
1004            StreamingPlan::Standard(_) => {} // No join → pass-through
1005            _ => panic!("Expected Standard plan for simple SELECT"),
1006        }
1007    }
1008
1009    // -- Window Frame planner tests --
1010
1011    #[test]
1012    fn test_plan_query_with_rows_frame() {
1013        let mut planner = StreamingPlanner::new();
1014        let statements = StreamingParser::parse_sql(
1015            "SELECT AVG(price) OVER (ORDER BY ts \
1016             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1017        )
1018        .unwrap();
1019
1020        let plan = planner.plan(&statements[0]).unwrap();
1021        match plan {
1022            StreamingPlan::Query(qp) => {
1023                assert!(qp.frame_config.is_some());
1024                let fc = qp.frame_config.unwrap();
1025                assert_eq!(fc.functions.len(), 1);
1026                assert_eq!(fc.functions[0].source_column, "price");
1027            }
1028            _ => panic!("Expected Query plan with frame_config"),
1029        }
1030    }
1031
1032    #[test]
1033    fn test_plan_frame_with_partition() {
1034        let mut planner = StreamingPlanner::new();
1035        let statements = StreamingParser::parse_sql(
1036            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1037             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1038        )
1039        .unwrap();
1040
1041        let plan = planner.plan(&statements[0]).unwrap();
1042        match plan {
1043            StreamingPlan::Query(qp) => {
1044                let fc = qp.frame_config.unwrap();
1045                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1046                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1047            }
1048            _ => panic!("Expected Query plan with frame_config"),
1049        }
1050    }
1051
1052    #[test]
1053    fn test_plan_no_frame_is_standard() {
1054        let mut planner = StreamingPlanner::new();
1055        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1056
1057        let plan = planner.plan(&statements[0]).unwrap();
1058        match plan {
1059            StreamingPlan::Standard(_) => {} // No frame → pass-through
1060            _ => panic!("Expected Standard plan for simple SELECT"),
1061        }
1062    }
1063
1064    #[test]
1065    fn test_plan_unbounded_following_rejected() {
1066        let mut planner = StreamingPlanner::new();
1067        let statements = StreamingParser::parse_sql(
1068            "SELECT SUM(amount) OVER (ORDER BY id \
1069             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1070             FROM orders",
1071        )
1072        .unwrap();
1073
1074        let result = planner.plan(&statements[0]);
1075        assert!(result.is_err());
1076        let err = result.unwrap_err().to_string();
1077        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1078    }
1079}