Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12/// Physical optimizer rule for streaming plan validation.
13pub mod streaming_optimizer;
14
15use std::collections::HashMap;
16
17use datafusion::logical_expr::LogicalPlan;
18use datafusion::prelude::SessionContext;
19use sqlparser::ast::{ObjectName, SetExpr, Statement};
20
21use crate::parser::aggregation_parser::analyze_aggregates;
22use crate::parser::analytic_parser::{
23    analyze_analytic_functions, analyze_window_frames, FrameBound,
24};
25use crate::parser::join_parser::analyze_joins;
26use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
27use crate::parser::order_analyzer::analyze_order_by;
28use crate::parser::{
29    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
30    StreamingStatement, WindowFunction, WindowRewriter,
31};
32use crate::translator::{
33    AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
34    OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
35};
36
37/// Information about a registered lookup table.
38#[derive(Debug, Clone)]
39pub struct LookupTableInfo {
40    /// Table name.
41    pub name: String,
42    /// Column names and types.
43    pub columns: Vec<(String, String)>,
44    /// Primary key columns.
45    pub primary_key: Vec<String>,
46    /// Validated properties.
47    pub properties: LookupTableProperties,
48}
49
50/// Streaming query planner
51pub struct StreamingPlanner {
52    /// Registered sources
53    sources: HashMap<String, SourceInfo>,
54    /// Registered sinks
55    sinks: HashMap<String, SinkInfo>,
56    /// Registered lookup tables
57    lookup_tables: HashMap<String, LookupTableInfo>,
58}
59
60/// Information about a registered source
61#[derive(Debug, Clone)]
62pub struct SourceInfo {
63    /// Source name
64    pub name: String,
65    /// Watermark column (if configured)
66    pub watermark_column: Option<String>,
67    /// Connector options
68    pub options: HashMap<String, String>,
69}
70
71/// Information about a registered sink
72#[derive(Debug, Clone)]
73pub struct SinkInfo {
74    /// Sink name
75    pub name: String,
76    /// Source table or query name
77    pub from: String,
78    /// Connector options
79    pub options: HashMap<String, String>,
80}
81
82/// Result of planning a streaming statement
83#[derive(Debug)]
84#[allow(clippy::large_enum_variant)]
85pub enum StreamingPlan {
86    /// Source registration (DDL)
87    RegisterSource(SourceInfo),
88
89    /// Sink registration (DDL)
90    RegisterSink(SinkInfo),
91
92    /// Query plan with streaming configurations
93    Query(QueryPlan),
94
95    /// Standard SQL statement (pass-through to DataFusion)
96    Standard(Box<Statement>),
97
98    /// DAG topology explanation (from EXPLAIN DAG)
99    DagExplain(DagExplainOutput),
100
101    /// Lookup table registration (DDL)
102    RegisterLookupTable(LookupTableInfo),
103
104    /// Drop a lookup table
105    DropLookupTable {
106        /// Name of the lookup table to drop.
107        name: String,
108    },
109}
110
111/// A query plan with streaming operator configurations
112#[derive(Debug)]
113pub struct QueryPlan {
114    /// Optional name for the continuous query
115    pub name: Option<String>,
116    /// Window configuration if the query has windowed aggregation
117    pub window_config: Option<WindowOperatorConfig>,
118    /// Join configuration(s) if the query has joins (one per join step)
119    pub join_config: Option<Vec<JoinOperatorConfig>>,
120    /// ORDER BY configuration if the query has ordering
121    pub order_config: Option<OrderOperatorConfig>,
122    /// Analytic window function configuration (LAG/LEAD/etc.)
123    pub analytic_config: Option<AnalyticWindowConfig>,
124    /// HAVING clause filter configuration
125    pub having_config: Option<HavingFilterConfig>,
126    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
127    pub frame_config: Option<WindowFrameConfig>,
128    /// Emit strategy
129    pub emit_clause: Option<EmitClause>,
130    /// The underlying SQL statement
131    pub statement: Box<Statement>,
132}
133
134impl StreamingPlanner {
135    /// Creates a new streaming planner
136    #[must_use]
137    pub fn new() -> Self {
138        Self {
139            sources: HashMap::new(),
140            sinks: HashMap::new(),
141            lookup_tables: HashMap::new(),
142        }
143    }
144
145    /// Plans a streaming statement.
146    ///
147    /// # Errors
148    ///
149    /// Returns `PlanningError` if the statement cannot be planned.
150    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
151        match statement {
152            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
153            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
154            StreamingStatement::CreateContinuousQuery {
155                name,
156                query,
157                emit_clause,
158            }
159            | StreamingStatement::CreateStream {
160                name,
161                query,
162                emit_clause,
163                ..
164            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
165            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
166            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
167            StreamingStatement::DropLookupTable { name, if_exists } => {
168                self.plan_drop_lookup_table(name, *if_exists)
169            }
170            StreamingStatement::DropSource { .. }
171            | StreamingStatement::DropSink { .. }
172            | StreamingStatement::DropStream { .. }
173            | StreamingStatement::DropMaterializedView { .. }
174            | StreamingStatement::Show(_)
175            | StreamingStatement::Describe { .. }
176            | StreamingStatement::Explain { .. }
177            | StreamingStatement::CreateMaterializedView { .. }
178            | StreamingStatement::InsertInto { .. }
179            | StreamingStatement::AlterSource { .. }
180            | StreamingStatement::Checkpoint
181            | StreamingStatement::RestoreCheckpoint { .. } => {
182                // These statements are handled directly by the database facade
183                // and don't need query planning. Return as Standard pass-through.
184                Err(PlanningError::UnsupportedSql(format!(
185                    "Statement type {:?} is handled by the database layer, not the planner",
186                    std::mem::discriminant(statement)
187                )))
188            }
189        }
190    }
191
192    /// Plans a CREATE SOURCE statement.
193    fn plan_create_source(
194        &mut self,
195        source: &CreateSourceStatement,
196    ) -> Result<StreamingPlan, PlanningError> {
197        let name = object_name_to_string(&source.name);
198
199        // Check for existing source
200        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
201            return Err(PlanningError::InvalidQuery(format!(
202                "Source '{}' already exists",
203                name
204            )));
205        }
206
207        // Extract watermark column
208        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
209
210        let info = SourceInfo {
211            name: name.clone(),
212            watermark_column,
213            options: source.with_options.clone(),
214        };
215
216        // Register the source
217        self.sources.insert(name, info.clone());
218
219        Ok(StreamingPlan::RegisterSource(info))
220    }
221
222    /// Plans a CREATE SINK statement.
223    fn plan_create_sink(
224        &mut self,
225        sink: &CreateSinkStatement,
226    ) -> Result<StreamingPlan, PlanningError> {
227        let name = object_name_to_string(&sink.name);
228
229        // Check for existing sink
230        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
231            return Err(PlanningError::InvalidQuery(format!(
232                "Sink '{}' already exists",
233                name
234            )));
235        }
236
237        // Determine the source
238        let from = match &sink.from {
239            SinkFrom::Table(table) => object_name_to_string(table),
240            SinkFrom::Query(_) => format!("{}_query", name),
241        };
242
243        let info = SinkInfo {
244            name: name.clone(),
245            from,
246            options: sink.with_options.clone(),
247        };
248
249        // Register the sink
250        self.sinks.insert(name, info.clone());
251
252        Ok(StreamingPlan::RegisterSink(info))
253    }
254
255    /// Plans a CREATE CONTINUOUS QUERY statement.
256    #[allow(clippy::unused_self)] // Will use planner state for query registration
257    fn plan_continuous_query(
258        &mut self,
259        name: &ObjectName,
260        query: &StreamingStatement,
261        emit_clause: Option<&EmitClause>,
262    ) -> Result<StreamingPlan, PlanningError> {
263        // The query inside should be a standard SELECT
264        let stmt = match query {
265            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
266            _ => {
267                return Err(PlanningError::InvalidQuery(
268                    "Continuous query must contain a SELECT statement".to_string(),
269                ))
270            }
271        };
272
273        // Analyze the query for streaming features
274        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
275
276        Ok(StreamingPlan::Query(QueryPlan {
277            name: Some(object_name_to_string(name)),
278            window_config: query_plan.window_config,
279            join_config: query_plan.join_config,
280            order_config: query_plan.order_config,
281            analytic_config: query_plan.analytic_config,
282            having_config: query_plan.having_config,
283            frame_config: query_plan.frame_config,
284            emit_clause: emit_clause.cloned(),
285            statement: Box::new(stmt),
286        }))
287    }
288
289    /// Plans a standard SQL statement.
290    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
291    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
292        // Check if it's a query that might have streaming features
293        if let Statement::Query(query) = stmt {
294            if let SetExpr::Select(select) = query.body.as_ref() {
295                // Check for window functions in GROUP BY
296                let window_function = Self::extract_window_from_select(select);
297
298                // Check for joins (multi-way)
299                let join_analysis = analyze_joins(select).map_err(|e| {
300                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
301                })?;
302
303                // Check for ORDER BY
304                let order_analysis = analyze_order_by(stmt);
305                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
306                    .map_err(PlanningError::InvalidQuery)?;
307
308                // Check for analytic functions (LAG/LEAD/etc.)
309                let analytic_analysis = analyze_analytic_functions(stmt);
310                let analytic_config =
311                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
312
313                // Check for HAVING clause
314                let agg_analysis = analyze_aggregates(stmt);
315                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
316
317                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
318                let frame_analysis = analyze_window_frames(stmt);
319                let frame_config = frame_analysis
320                    .as_ref()
321                    .map(WindowFrameConfig::from_analysis);
322
323                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
324                if let Some(fa) = &frame_analysis {
325                    for f in &fa.functions {
326                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
327                            return Err(PlanningError::InvalidQuery(
328                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
329                                    .to_string(),
330                            ));
331                        }
332                    }
333                }
334
335                let has_streaming_features = window_function.is_some()
336                    || join_analysis.is_some()
337                    || order_config.is_some()
338                    || analytic_config.is_some()
339                    || having_config.is_some()
340                    || frame_config.is_some();
341
342                if has_streaming_features {
343                    let window_config = match window_function {
344                        Some(w) => Some(
345                            WindowOperatorConfig::from_window_function(&w)
346                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
347                        ),
348                        None => None,
349                    };
350
351                    let join_config =
352                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
353
354                    return Ok(StreamingPlan::Query(QueryPlan {
355                        name: None,
356                        window_config,
357                        join_config,
358                        order_config,
359                        analytic_config,
360                        having_config,
361                        frame_config,
362                        emit_clause: None,
363                        statement: Box::new(stmt.clone()),
364                    }));
365                }
366            }
367        }
368
369        // Pass through standard SQL
370        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
371    }
372
373    /// Analyzes a query for streaming features.
374    fn analyze_query(
375        stmt: &Statement,
376        emit_clause: Option<&EmitClause>,
377    ) -> Result<QueryAnalysis, PlanningError> {
378        let mut analysis = QueryAnalysis::default();
379
380        if let Statement::Query(query) = stmt {
381            if let SetExpr::Select(select) = query.body.as_ref() {
382                // Extract window function
383                if let Some(window) = Self::extract_window_from_select(select) {
384                    let mut config = WindowOperatorConfig::from_window_function(&window)
385                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
386
387                    // Apply emit clause if present
388                    if let Some(emit) = emit_clause {
389                        config = config
390                            .with_emit_clause(emit)
391                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
392                    }
393
394                    analysis.window_config = Some(config);
395                }
396
397                // Extract join info (multi-way)
398                if let Some(multi) = analyze_joins(select).map_err(|e| {
399                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
400                })? {
401                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
402                }
403            }
404        }
405
406        // Extract ORDER BY info
407        let order_analysis = analyze_order_by(stmt);
408        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
409            .map_err(PlanningError::InvalidQuery)?;
410
411        // Extract analytic function info (LAG/LEAD/etc.)
412        if let Some(analytic) = analyze_analytic_functions(stmt) {
413            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
414        }
415
416        // Extract HAVING clause
417        let agg_analysis = analyze_aggregates(stmt);
418        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
419
420        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
421        if let Some(frame_analysis) = analyze_window_frames(stmt) {
422            // Validate: reject UNBOUNDED FOLLOWING
423            for f in &frame_analysis.functions {
424                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
425                    return Err(PlanningError::InvalidQuery(
426                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
427                            .to_string(),
428                    ));
429                }
430            }
431            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
432        }
433
434        Ok(analysis)
435    }
436
437    /// Extracts window function from a SELECT.
438    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
439        // Check GROUP BY for window functions
440        use sqlparser::ast::GroupByExpr;
441        match &select.group_by {
442            GroupByExpr::Expressions(exprs, _modifiers) => {
443                for group_by_expr in exprs {
444                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
445                    {
446                        return Some(window);
447                    }
448                }
449            }
450            GroupByExpr::All(_) => {}
451        }
452        None
453    }
454
455    /// Plans a CREATE LOOKUP TABLE statement.
456    fn plan_create_lookup_table(
457        &mut self,
458        lt: &CreateLookupTableStatement,
459    ) -> Result<StreamingPlan, PlanningError> {
460        let name = object_name_to_string(&lt.name);
461
462        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
463            return Err(PlanningError::InvalidQuery(format!(
464                "Lookup table '{}' already exists",
465                name
466            )));
467        }
468
469        let columns: Vec<(String, String)> = lt
470            .columns
471            .iter()
472            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
473            .collect();
474
475        let properties = validate_properties(&lt.with_options).map_err(|e| {
476            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
477        })?;
478
479        let info = LookupTableInfo {
480            name: name.clone(),
481            columns,
482            primary_key: lt.primary_key.clone(),
483            properties,
484        };
485
486        self.lookup_tables.insert(name, info.clone());
487
488        Ok(StreamingPlan::RegisterLookupTable(info))
489    }
490
491    /// Plans a DROP LOOKUP TABLE statement.
492    fn plan_drop_lookup_table(
493        &mut self,
494        name: &ObjectName,
495        if_exists: bool,
496    ) -> Result<StreamingPlan, PlanningError> {
497        let name_str = object_name_to_string(name);
498
499        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
500            return Err(PlanningError::InvalidQuery(format!(
501                "Lookup table '{}' does not exist",
502                name_str
503            )));
504        }
505
506        self.lookup_tables.remove(&name_str);
507
508        Ok(StreamingPlan::DropLookupTable { name: name_str })
509    }
510
511    /// Gets a registered source by name.
512    #[must_use]
513    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
514        self.sources.get(name)
515    }
516
517    /// Gets a registered sink by name.
518    #[must_use]
519    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
520        self.sinks.get(name)
521    }
522
523    /// Lists all registered sources.
524    #[must_use]
525    pub fn list_sources(&self) -> Vec<&SourceInfo> {
526        self.sources.values().collect()
527    }
528
529    /// Lists all registered sinks.
530    #[must_use]
531    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
532        self.sinks.values().collect()
533    }
534
535    /// Gets a registered lookup table by name.
536    #[must_use]
537    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
538        self.lookup_tables.get(name)
539    }
540
541    /// Lists all registered lookup tables.
542    #[must_use]
543    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
544        self.lookup_tables.values().collect()
545    }
546
547    /// Creates a `DataFusion` logical plan from a query plan.
548    ///
549    /// Converts the query plan's SQL statement into a `DataFusion`
550    /// `LogicalPlan` using the session context's state. Window UDFs
551    /// (TUMBLE, HOP, SESSION) must be registered on the context via
552    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
553    /// for windowed queries to resolve correctly.
554    ///
555    /// # Arguments
556    ///
557    /// * `plan` - The streaming query plan containing the SQL statement
558    /// * `ctx` - `DataFusion` session context with registered UDFs
559    ///
560    /// # Errors
561    ///
562    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
563    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
564    pub async fn to_logical_plan(
565        &self,
566        plan: &QueryPlan,
567        ctx: &SessionContext,
568    ) -> Result<LogicalPlan, PlanningError> {
569        // Convert the AST statement back to SQL and let DataFusion re-parse
570        // it with its own sqlparser version. This avoids version mismatches
571        // between our sqlparser (0.60) and DataFusion's (0.59).
572        let sql = plan.statement.to_string();
573        ctx.state()
574            .create_logical_plan(&sql)
575            .await
576            .map_err(PlanningError::DataFusion)
577    }
578}
579
580impl Default for StreamingPlanner {
581    fn default() -> Self {
582        Self::new()
583    }
584}
585
586/// Intermediate query analysis result
587#[derive(Debug, Default)]
588#[allow(clippy::struct_field_names)]
589struct QueryAnalysis {
590    window_config: Option<WindowOperatorConfig>,
591    join_config: Option<Vec<JoinOperatorConfig>>,
592    order_config: Option<OrderOperatorConfig>,
593    analytic_config: Option<AnalyticWindowConfig>,
594    having_config: Option<HavingFilterConfig>,
595    frame_config: Option<WindowFrameConfig>,
596}
597
598/// Helper to convert `ObjectName` to String
599fn object_name_to_string(name: &ObjectName) -> String {
600    name.to_string()
601}
602
603/// Planning errors
604#[derive(Debug, thiserror::Error)]
605pub enum PlanningError {
606    /// Unsupported SQL feature
607    UnsupportedSql(String),
608
609    /// Invalid query
610    InvalidQuery(String),
611
612    /// Source not found
613    SourceNotFound(String),
614
615    /// Sink not found
616    SinkNotFound(String),
617
618    /// `DataFusion` error during logical plan creation (translated on display)
619    DataFusion(#[from] datafusion_common::DataFusionError),
620}
621
622impl std::fmt::Display for PlanningError {
623    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
624        match self {
625            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
626            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
627            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
628            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
629            Self::DataFusion(e) => {
630                let translated = crate::error::translate_datafusion_error(&e.to_string());
631                write!(f, "{translated}")
632            }
633        }
634    }
635}
636
637#[cfg(test)]
638mod tests {
639    use super::*;
640    use crate::parser::StreamingParser;
641
642    #[test]
643    fn test_plan_create_source() {
644        let mut planner = StreamingPlanner::new();
645        let statements =
646            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
647
648        let plan = planner.plan(&statements[0]).unwrap();
649        match plan {
650            StreamingPlan::RegisterSource(info) => {
651                assert_eq!(info.name, "events");
652            }
653            _ => panic!("Expected RegisterSource plan"),
654        }
655    }
656
657    #[test]
658    fn test_plan_create_sink() {
659        let mut planner = StreamingPlanner::new();
660        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
661
662        let plan = planner.plan(&statements[0]).unwrap();
663        match plan {
664            StreamingPlan::RegisterSink(info) => {
665                assert_eq!(info.name, "output");
666                assert_eq!(info.from, "events");
667            }
668            _ => panic!("Expected RegisterSink plan"),
669        }
670    }
671
672    #[test]
673    fn test_plan_duplicate_source() {
674        let mut planner = StreamingPlanner::new();
675
676        // First source
677        let statements =
678            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
679        planner.plan(&statements[0]).unwrap();
680
681        // Duplicate should fail
682        let result = planner.plan(&statements[0]);
683        assert!(result.is_err());
684    }
685
686    #[test]
687    fn test_plan_source_if_not_exists() {
688        let mut planner = StreamingPlanner::new();
689
690        // First source
691        let statements =
692            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
693        planner.plan(&statements[0]).unwrap();
694
695        // IF NOT EXISTS should succeed
696        let statements =
697            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
698                .unwrap();
699        let result = planner.plan(&statements[0]);
700        assert!(result.is_ok());
701    }
702
703    #[test]
704    fn test_plan_source_or_replace() {
705        let mut planner = StreamingPlanner::new();
706
707        // First source
708        let statements =
709            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
710        planner.plan(&statements[0]).unwrap();
711
712        // OR REPLACE should succeed
713        let statements =
714            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
715                .unwrap();
716        let result = planner.plan(&statements[0]);
717        assert!(result.is_ok());
718    }
719
720    #[test]
721    fn test_plan_source_with_watermark() {
722        let mut planner = StreamingPlanner::new();
723        let statements = StreamingParser::parse_sql(
724            "CREATE SOURCE events (
725                id INT,
726                ts TIMESTAMP,
727                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
728            )",
729        )
730        .unwrap();
731
732        let plan = planner.plan(&statements[0]).unwrap();
733        match plan {
734            StreamingPlan::RegisterSource(info) => {
735                assert_eq!(info.name, "events");
736                assert_eq!(info.watermark_column, Some("ts".to_string()));
737            }
738            _ => panic!("Expected RegisterSource plan"),
739        }
740    }
741
742    #[test]
743    fn test_plan_standard_select() {
744        let mut planner = StreamingPlanner::new();
745        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
746
747        let plan = planner.plan(&statements[0]).unwrap();
748        match plan {
749            StreamingPlan::Standard(_) => {}
750            _ => panic!("Expected Standard plan for simple SELECT"),
751        }
752    }
753
754    #[test]
755    fn test_list_sources_and_sinks() {
756        let mut planner = StreamingPlanner::new();
757
758        // Create sources
759        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
760        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
761        planner.plan(&s1[0]).unwrap();
762        planner.plan(&s2[0]).unwrap();
763
764        // Create sinks
765        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
766        planner.plan(&k1[0]).unwrap();
767
768        assert_eq!(planner.list_sources().len(), 2);
769        assert_eq!(planner.list_sinks().len(), 1);
770        assert!(planner.get_source("src1").is_some());
771        assert!(planner.get_sink("sink1").is_some());
772    }
773
774    #[test]
775    fn test_plan_query_with_window() {
776        let mut planner = StreamingPlanner::new();
777        let statements = StreamingParser::parse_sql(
778            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
779        )
780        .unwrap();
781
782        let plan = planner.plan(&statements[0]).unwrap();
783        match plan {
784            StreamingPlan::Query(query_plan) => {
785                assert!(query_plan.window_config.is_some());
786                let config = query_plan.window_config.unwrap();
787                assert_eq!(config.time_column, "event_time");
788                assert_eq!(config.size.as_secs(), 300);
789            }
790            _ => panic!("Expected Query plan"),
791        }
792    }
793
794    #[test]
795    fn test_plan_query_with_join() {
796        let mut planner = StreamingPlanner::new();
797        let statements = StreamingParser::parse_sql(
798            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
799        )
800        .unwrap();
801
802        let plan = planner.plan(&statements[0]).unwrap();
803        match plan {
804            StreamingPlan::Query(query_plan) => {
805                assert!(query_plan.join_config.is_some());
806                let configs = query_plan.join_config.unwrap();
807                assert_eq!(configs.len(), 1);
808                assert_eq!(configs[0].left_key(), "order_id");
809                assert_eq!(configs[0].right_key(), "order_id");
810            }
811            _ => panic!("Expected Query plan"),
812        }
813    }
814
815    #[test]
816    fn test_plan_query_with_lag() {
817        let mut planner = StreamingPlanner::new();
818        let statements = StreamingParser::parse_sql(
819            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
820        )
821        .unwrap();
822
823        let plan = planner.plan(&statements[0]).unwrap();
824        match plan {
825            StreamingPlan::Query(query_plan) => {
826                assert!(query_plan.analytic_config.is_some());
827                let config = query_plan.analytic_config.unwrap();
828                assert_eq!(config.functions.len(), 1);
829                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
830            }
831            _ => panic!("Expected Query plan with analytic config"),
832        }
833    }
834
835    #[test]
836    fn test_plan_query_with_having() {
837        let mut planner = StreamingPlanner::new();
838        let statements = StreamingParser::parse_sql(
839            "SELECT symbol, COUNT(*) AS cnt FROM trades \
840             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
841             HAVING COUNT(*) > 10",
842        )
843        .unwrap();
844
845        let plan = planner.plan(&statements[0]).unwrap();
846        match plan {
847            StreamingPlan::Query(query_plan) => {
848                assert!(query_plan.window_config.is_some());
849                assert!(query_plan.having_config.is_some());
850                let config = query_plan.having_config.unwrap();
851                assert!(
852                    config.predicate().contains("COUNT(*)"),
853                    "predicate was: {}",
854                    config.predicate()
855                );
856            }
857            _ => panic!("Expected Query plan with having config"),
858        }
859    }
860
861    #[test]
862    fn test_plan_query_without_having() {
863        let mut planner = StreamingPlanner::new();
864        let statements = StreamingParser::parse_sql(
865            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
866        )
867        .unwrap();
868
869        let plan = planner.plan(&statements[0]).unwrap();
870        match plan {
871            StreamingPlan::Query(query_plan) => {
872                assert!(query_plan.having_config.is_none());
873            }
874            _ => panic!("Expected Query plan"),
875        }
876    }
877
878    #[test]
879    fn test_plan_having_only_produces_query_plan() {
880        // HAVING without window function still produces a Query plan
881        let mut planner = StreamingPlanner::new();
882        let statements = StreamingParser::parse_sql(
883            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
884        )
885        .unwrap();
886
887        let plan = planner.plan(&statements[0]).unwrap();
888        match plan {
889            StreamingPlan::Query(query_plan) => {
890                assert!(query_plan.having_config.is_some());
891                assert!(query_plan.window_config.is_none());
892            }
893            _ => panic!("Expected Query plan for HAVING-only query"),
894        }
895    }
896
897    #[test]
898    fn test_plan_having_compound_predicate() {
899        let mut planner = StreamingPlanner::new();
900        let statements = StreamingParser::parse_sql(
901            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
902             FROM trades GROUP BY symbol \
903             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
904        )
905        .unwrap();
906
907        let plan = planner.plan(&statements[0]).unwrap();
908        match plan {
909            StreamingPlan::Query(query_plan) => {
910                let config = query_plan.having_config.unwrap();
911                let pred = config.predicate();
912                assert!(pred.contains("AND"), "predicate was: {pred}");
913            }
914            _ => panic!("Expected Query plan"),
915        }
916    }
917
918    #[test]
919    fn test_plan_query_with_lead() {
920        let mut planner = StreamingPlanner::new();
921        let statements = StreamingParser::parse_sql(
922            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
923        )
924        .unwrap();
925
926        let plan = planner.plan(&statements[0]).unwrap();
927        match plan {
928            StreamingPlan::Query(query_plan) => {
929                assert!(query_plan.analytic_config.is_some());
930                let config = query_plan.analytic_config.unwrap();
931                assert!(config.has_lookahead());
932                assert_eq!(config.functions[0].offset, 2);
933            }
934            _ => panic!("Expected Query plan with analytic config"),
935        }
936    }
937
938    // -- Multi-way join planner tests --
939
940    #[test]
941    fn test_plan_single_join_produces_vec_of_one() {
942        let mut planner = StreamingPlanner::new();
943        let statements =
944            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
945
946        let plan = planner.plan(&statements[0]).unwrap();
947        match plan {
948            StreamingPlan::Query(qp) => {
949                let configs = qp.join_config.unwrap();
950                assert_eq!(configs.len(), 1);
951            }
952            _ => panic!("Expected Query plan"),
953        }
954    }
955
956    #[test]
957    fn test_plan_two_way_join() {
958        let mut planner = StreamingPlanner::new();
959        let statements = StreamingParser::parse_sql(
960            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
961        )
962        .unwrap();
963
964        let plan = planner.plan(&statements[0]).unwrap();
965        match plan {
966            StreamingPlan::Query(qp) => {
967                let configs = qp.join_config.unwrap();
968                assert_eq!(configs.len(), 2);
969                assert_eq!(configs[0].left_key(), "id");
970                assert_eq!(configs[0].right_key(), "a_id");
971                assert_eq!(configs[1].left_key(), "id");
972                assert_eq!(configs[1].right_key(), "b_id");
973            }
974            _ => panic!("Expected Query plan"),
975        }
976    }
977
978    #[test]
979    fn test_plan_mixed_join_types() {
980        let mut planner = StreamingPlanner::new();
981        let statements = StreamingParser::parse_sql(
982            "SELECT * FROM orders o \
983             JOIN payments p ON o.id = p.order_id \
984                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
985             JOIN customers c ON p.cust_id = c.id",
986        )
987        .unwrap();
988
989        let plan = planner.plan(&statements[0]).unwrap();
990        match plan {
991            StreamingPlan::Query(qp) => {
992                let configs = qp.join_config.unwrap();
993                assert_eq!(configs.len(), 2);
994                assert!(configs[0].is_stream_stream());
995                assert!(configs[1].is_lookup());
996            }
997            _ => panic!("Expected Query plan"),
998        }
999    }
1000
1001    #[test]
1002    fn test_plan_backward_compat_no_join() {
1003        let mut planner = StreamingPlanner::new();
1004        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1005
1006        let plan = planner.plan(&statements[0]).unwrap();
1007        match plan {
1008            StreamingPlan::Standard(_) => {} // No join → pass-through
1009            _ => panic!("Expected Standard plan for simple SELECT"),
1010        }
1011    }
1012
1013    // -- Window Frame planner tests --
1014
1015    #[test]
1016    fn test_plan_query_with_rows_frame() {
1017        let mut planner = StreamingPlanner::new();
1018        let statements = StreamingParser::parse_sql(
1019            "SELECT AVG(price) OVER (ORDER BY ts \
1020             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1021        )
1022        .unwrap();
1023
1024        let plan = planner.plan(&statements[0]).unwrap();
1025        match plan {
1026            StreamingPlan::Query(qp) => {
1027                assert!(qp.frame_config.is_some());
1028                let fc = qp.frame_config.unwrap();
1029                assert_eq!(fc.functions.len(), 1);
1030                assert_eq!(fc.functions[0].source_column, "price");
1031            }
1032            _ => panic!("Expected Query plan with frame_config"),
1033        }
1034    }
1035
1036    #[test]
1037    fn test_plan_frame_with_partition() {
1038        let mut planner = StreamingPlanner::new();
1039        let statements = StreamingParser::parse_sql(
1040            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1041             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1042        )
1043        .unwrap();
1044
1045        let plan = planner.plan(&statements[0]).unwrap();
1046        match plan {
1047            StreamingPlan::Query(qp) => {
1048                let fc = qp.frame_config.unwrap();
1049                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1050                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1051            }
1052            _ => panic!("Expected Query plan with frame_config"),
1053        }
1054    }
1055
1056    #[test]
1057    fn test_plan_no_frame_is_standard() {
1058        let mut planner = StreamingPlanner::new();
1059        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1060
1061        let plan = planner.plan(&statements[0]).unwrap();
1062        match plan {
1063            StreamingPlan::Standard(_) => {} // No frame → pass-through
1064            _ => panic!("Expected Standard plan for simple SELECT"),
1065        }
1066    }
1067
1068    #[test]
1069    fn test_plan_unbounded_following_rejected() {
1070        let mut planner = StreamingPlanner::new();
1071        let statements = StreamingParser::parse_sql(
1072            "SELECT SUM(amount) OVER (ORDER BY id \
1073             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1074             FROM orders",
1075        )
1076        .unwrap();
1077
1078        let result = planner.plan(&statements[0]);
1079        assert!(result.is_err());
1080        let err = result.unwrap_err().to_string();
1081        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1082    }
1083}