Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12/// Physical optimizer rule for streaming plan validation.
13pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] // cold path: query planning
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26    analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33    StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36    AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
37    OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
38};
39
40/// Information about a registered lookup table.
41#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43    /// Table name.
44    pub name: String,
45    /// Column names and types.
46    pub columns: Vec<(String, String)>,
47    /// Primary key columns.
48    pub primary_key: Vec<String>,
49    /// Validated properties.
50    pub properties: LookupTableProperties,
51    /// Pre-computed Arrow schema from column definitions.
52    pub arrow_schema: SchemaRef,
53    /// Raw WITH options for connector configuration pass-through.
54    pub raw_options: HashMap<String, String>,
55}
56
57/// Streaming query planner
58pub struct StreamingPlanner {
59    /// Registered sources
60    sources: HashMap<String, SourceInfo>,
61    /// Registered sinks
62    sinks: HashMap<String, SinkInfo>,
63    /// Registered lookup tables
64    lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67/// Information about a registered source
68#[derive(Debug, Clone)]
69pub struct SourceInfo {
70    /// Source name
71    pub name: String,
72    /// Watermark column (if configured)
73    pub watermark_column: Option<String>,
74    /// Connector options
75    pub options: HashMap<String, String>,
76}
77
78/// Information about a registered sink
79#[derive(Debug, Clone)]
80pub struct SinkInfo {
81    /// Sink name
82    pub name: String,
83    /// Source table or query name
84    pub from: String,
85    /// Connector options
86    pub options: HashMap<String, String>,
87}
88
89/// Result of planning a streaming statement
90#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93    /// Source registration (DDL)
94    RegisterSource(SourceInfo),
95
96    /// Sink registration (DDL)
97    RegisterSink(SinkInfo),
98
99    /// Query plan with streaming configurations
100    Query(QueryPlan),
101
102    /// Standard SQL statement (pass-through to DataFusion)
103    Standard(Box<Statement>),
104
105    /// DAG topology explanation (from EXPLAIN DAG)
106    DagExplain(DagExplainOutput),
107
108    /// Lookup table registration (DDL)
109    RegisterLookupTable(LookupTableInfo),
110
111    /// Drop a lookup table
112    DropLookupTable {
113        /// Name of the lookup table to drop.
114        name: String,
115    },
116}
117
118/// A query plan with streaming operator configurations
119#[derive(Debug)]
120pub struct QueryPlan {
121    /// Optional name for the continuous query
122    pub name: Option<String>,
123    /// Window configuration if the query has windowed aggregation
124    pub window_config: Option<WindowOperatorConfig>,
125    /// Join configuration(s) if the query has joins (one per join step)
126    pub join_config: Option<Vec<JoinOperatorConfig>>,
127    /// ORDER BY configuration if the query has ordering
128    pub order_config: Option<OrderOperatorConfig>,
129    /// Analytic window function configuration (LAG/LEAD/etc.)
130    pub analytic_config: Option<AnalyticWindowConfig>,
131    /// HAVING clause filter configuration
132    pub having_config: Option<HavingFilterConfig>,
133    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
134    pub frame_config: Option<WindowFrameConfig>,
135    /// Emit strategy
136    pub emit_clause: Option<EmitClause>,
137    /// The underlying SQL statement
138    pub statement: Box<Statement>,
139}
140
141impl StreamingPlanner {
142    /// Creates a new streaming planner
143    #[must_use]
144    pub fn new() -> Self {
145        Self {
146            sources: HashMap::new(),
147            sinks: HashMap::new(),
148            lookup_tables: HashMap::new(),
149        }
150    }
151
152    /// Plans a streaming statement.
153    ///
154    /// # Errors
155    ///
156    /// Returns `PlanningError` if the statement cannot be planned.
157    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
158        match statement {
159            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
160            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
161            StreamingStatement::CreateContinuousQuery {
162                name,
163                query,
164                emit_clause,
165            }
166            | StreamingStatement::CreateStream {
167                name,
168                query,
169                emit_clause,
170                ..
171            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
172            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
173            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
174            StreamingStatement::DropLookupTable { name, if_exists } => {
175                self.plan_drop_lookup_table(name, *if_exists)
176            }
177            StreamingStatement::DropSource { .. }
178            | StreamingStatement::DropSink { .. }
179            | StreamingStatement::DropStream { .. }
180            | StreamingStatement::DropMaterializedView { .. }
181            | StreamingStatement::Show(_)
182            | StreamingStatement::Describe { .. }
183            | StreamingStatement::Explain { .. }
184            | StreamingStatement::CreateMaterializedView { .. }
185            | StreamingStatement::InsertInto { .. }
186            | StreamingStatement::AlterSource { .. }
187            | StreamingStatement::Checkpoint
188            | StreamingStatement::RestoreCheckpoint { .. } => {
189                // These statements are handled directly by the database facade
190                // and don't need query planning. Return as Standard pass-through.
191                Err(PlanningError::UnsupportedSql(format!(
192                    "Statement type {:?} is handled by the database layer, not the planner",
193                    std::mem::discriminant(statement)
194                )))
195            }
196        }
197    }
198
199    /// Plans a CREATE SOURCE statement.
200    fn plan_create_source(
201        &mut self,
202        source: &CreateSourceStatement,
203    ) -> Result<StreamingPlan, PlanningError> {
204        let name = object_name_to_string(&source.name);
205
206        // Check for existing source
207        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
208            return Err(PlanningError::InvalidQuery(format!(
209                "Source '{}' already exists",
210                name
211            )));
212        }
213
214        // Extract watermark column
215        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
216
217        let info = SourceInfo {
218            name: name.clone(),
219            watermark_column,
220            options: source.with_options.clone(),
221        };
222
223        // Register the source
224        self.sources.insert(name, info.clone());
225
226        Ok(StreamingPlan::RegisterSource(info))
227    }
228
229    /// Plans a CREATE SINK statement.
230    fn plan_create_sink(
231        &mut self,
232        sink: &CreateSinkStatement,
233    ) -> Result<StreamingPlan, PlanningError> {
234        let name = object_name_to_string(&sink.name);
235
236        // Check for existing sink
237        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
238            return Err(PlanningError::InvalidQuery(format!(
239                "Sink '{}' already exists",
240                name
241            )));
242        }
243
244        // Determine the source
245        let from = match &sink.from {
246            SinkFrom::Table(table) => object_name_to_string(table),
247            SinkFrom::Query(_) => format!("{}_query", name),
248        };
249
250        let info = SinkInfo {
251            name: name.clone(),
252            from,
253            options: sink.with_options.clone(),
254        };
255
256        // Register the sink
257        self.sinks.insert(name, info.clone());
258
259        Ok(StreamingPlan::RegisterSink(info))
260    }
261
262    /// Plans a CREATE CONTINUOUS QUERY statement.
263    #[allow(clippy::unused_self)] // Will use planner state for query registration
264    fn plan_continuous_query(
265        &mut self,
266        name: &ObjectName,
267        query: &StreamingStatement,
268        emit_clause: Option<&EmitClause>,
269    ) -> Result<StreamingPlan, PlanningError> {
270        // The query inside should be a standard SELECT
271        let stmt = match query {
272            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
273            _ => {
274                return Err(PlanningError::InvalidQuery(
275                    "Continuous query must contain a SELECT statement".to_string(),
276                ))
277            }
278        };
279
280        // Analyze the query for streaming features
281        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
282
283        Ok(StreamingPlan::Query(QueryPlan {
284            name: Some(object_name_to_string(name)),
285            window_config: query_plan.window_config,
286            join_config: query_plan.join_config,
287            order_config: query_plan.order_config,
288            analytic_config: query_plan.analytic_config,
289            having_config: query_plan.having_config,
290            frame_config: query_plan.frame_config,
291            emit_clause: emit_clause.cloned(),
292            statement: Box::new(stmt),
293        }))
294    }
295
296    /// Plans a standard SQL statement.
297    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
298    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
299        // Check if it's a query that might have streaming features
300        if let Statement::Query(query) = stmt {
301            if let SetExpr::Select(select) = query.body.as_ref() {
302                // Check for window functions in GROUP BY
303                let window_function = Self::extract_window_from_select(select);
304
305                // Check for joins (multi-way)
306                let join_analysis = analyze_joins(select).map_err(|e| {
307                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
308                })?;
309
310                // Check for ORDER BY
311                let order_analysis = analyze_order_by(stmt);
312                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
313                    .map_err(PlanningError::InvalidQuery)?;
314
315                // Check for analytic functions (LAG/LEAD/etc.)
316                let analytic_analysis = analyze_analytic_functions(stmt);
317                let analytic_config =
318                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
319
320                // Check for HAVING clause
321                let agg_analysis = analyze_aggregates(stmt);
322                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
323
324                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
325                let frame_analysis = analyze_window_frames(stmt);
326                let frame_config = frame_analysis
327                    .as_ref()
328                    .map(WindowFrameConfig::from_analysis);
329
330                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
331                if let Some(fa) = &frame_analysis {
332                    for f in &fa.functions {
333                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
334                            return Err(PlanningError::InvalidQuery(
335                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
336                                    .to_string(),
337                            ));
338                        }
339                    }
340                }
341
342                let has_streaming_features = window_function.is_some()
343                    || join_analysis.is_some()
344                    || order_config.is_some()
345                    || analytic_config.is_some()
346                    || having_config.is_some()
347                    || frame_config.is_some();
348
349                if has_streaming_features {
350                    let window_config = match window_function {
351                        Some(w) => Some(
352                            WindowOperatorConfig::from_window_function(&w)
353                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
354                        ),
355                        None => None,
356                    };
357
358                    let join_config =
359                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
360
361                    return Ok(StreamingPlan::Query(QueryPlan {
362                        name: None,
363                        window_config,
364                        join_config,
365                        order_config,
366                        analytic_config,
367                        having_config,
368                        frame_config,
369                        emit_clause: None,
370                        statement: Box::new(stmt.clone()),
371                    }));
372                }
373            }
374        }
375
376        // Pass through standard SQL
377        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
378    }
379
380    /// Analyzes a query for streaming features.
381    fn analyze_query(
382        stmt: &Statement,
383        emit_clause: Option<&EmitClause>,
384    ) -> Result<QueryAnalysis, PlanningError> {
385        let mut analysis = QueryAnalysis::default();
386
387        if let Statement::Query(query) = stmt {
388            if let SetExpr::Select(select) = query.body.as_ref() {
389                // Extract window function
390                if let Some(window) = Self::extract_window_from_select(select) {
391                    let mut config = WindowOperatorConfig::from_window_function(&window)
392                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
393
394                    // Apply emit clause if present
395                    if let Some(emit) = emit_clause {
396                        config = config
397                            .with_emit_clause(emit)
398                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
399                    }
400
401                    analysis.window_config = Some(config);
402                }
403
404                // Extract join info (multi-way)
405                if let Some(multi) = analyze_joins(select).map_err(|e| {
406                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
407                })? {
408                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
409                }
410            }
411        }
412
413        // Extract ORDER BY info
414        let order_analysis = analyze_order_by(stmt);
415        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
416            .map_err(PlanningError::InvalidQuery)?;
417
418        // Extract analytic function info (LAG/LEAD/etc.)
419        if let Some(analytic) = analyze_analytic_functions(stmt) {
420            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
421        }
422
423        // Extract HAVING clause
424        let agg_analysis = analyze_aggregates(stmt);
425        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
426
427        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
428        if let Some(frame_analysis) = analyze_window_frames(stmt) {
429            // Validate: reject UNBOUNDED FOLLOWING
430            for f in &frame_analysis.functions {
431                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
432                    return Err(PlanningError::InvalidQuery(
433                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
434                            .to_string(),
435                    ));
436                }
437            }
438            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
439        }
440
441        Ok(analysis)
442    }
443
444    /// Extracts window function from a SELECT.
445    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
446        // Check GROUP BY for window functions
447        use sqlparser::ast::GroupByExpr;
448        match &select.group_by {
449            GroupByExpr::Expressions(exprs, _modifiers) => {
450                for group_by_expr in exprs {
451                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
452                    {
453                        return Some(window);
454                    }
455                }
456            }
457            GroupByExpr::All(_) => {}
458        }
459        None
460    }
461
462    /// Plans a CREATE LOOKUP TABLE statement.
463    fn plan_create_lookup_table(
464        &mut self,
465        lt: &CreateLookupTableStatement,
466    ) -> Result<StreamingPlan, PlanningError> {
467        let name = object_name_to_string(&lt.name);
468
469        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
470            return Err(PlanningError::InvalidQuery(format!(
471                "Lookup table '{}' already exists",
472                name
473            )));
474        }
475
476        let columns: Vec<(String, String)> = lt
477            .columns
478            .iter()
479            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
480            .collect();
481
482        let properties = validate_properties(&lt.with_options).map_err(|e| {
483            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
484        })?;
485
486        // Compute Arrow schema from column definitions
487        let arrow_fields: Vec<Field> = lt
488            .columns
489            .iter()
490            .map(|c| {
491                let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
492                    .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
493                let nullable = !c
494                    .options
495                    .iter()
496                    .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
497                Ok(Field::new(&c.name.value, dt, nullable))
498            })
499            .collect::<Result<_, PlanningError>>()?;
500        let arrow_schema = Arc::new(Schema::new(arrow_fields));
501
502        let info = LookupTableInfo {
503            name: name.clone(),
504            columns,
505            primary_key: lt.primary_key.clone(),
506            properties,
507            arrow_schema,
508            raw_options: lt.with_options.clone(),
509        };
510
511        self.lookup_tables.insert(name, info.clone());
512
513        Ok(StreamingPlan::RegisterLookupTable(info))
514    }
515
516    /// Plans a DROP LOOKUP TABLE statement.
517    fn plan_drop_lookup_table(
518        &mut self,
519        name: &ObjectName,
520        if_exists: bool,
521    ) -> Result<StreamingPlan, PlanningError> {
522        let name_str = object_name_to_string(name);
523
524        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
525            return Err(PlanningError::InvalidQuery(format!(
526                "Lookup table '{}' does not exist",
527                name_str
528            )));
529        }
530
531        self.lookup_tables.remove(&name_str);
532
533        Ok(StreamingPlan::DropLookupTable { name: name_str })
534    }
535
536    /// Gets a registered source by name.
537    #[must_use]
538    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
539        self.sources.get(name)
540    }
541
542    /// Gets a registered sink by name.
543    #[must_use]
544    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
545        self.sinks.get(name)
546    }
547
548    /// Lists all registered sources.
549    #[must_use]
550    pub fn list_sources(&self) -> Vec<&SourceInfo> {
551        self.sources.values().collect()
552    }
553
554    /// Lists all registered sinks.
555    #[must_use]
556    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
557        self.sinks.values().collect()
558    }
559
560    /// Gets a registered lookup table by name.
561    #[must_use]
562    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
563        self.lookup_tables.get(name)
564    }
565
566    /// Lists all registered lookup tables.
567    #[must_use]
568    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
569        self.lookup_tables.values().collect()
570    }
571
572    /// Returns a clone of the lookup tables map for optimizer rule construction.
573    #[must_use]
574    pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
575        self.lookup_tables.clone()
576    }
577
578    /// Creates a `DataFusion` logical plan from a query plan.
579    ///
580    /// Converts the query plan's SQL statement into a `DataFusion`
581    /// `LogicalPlan` using the session context's state. Window UDFs
582    /// (TUMBLE, HOP, SESSION) must be registered on the context via
583    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
584    /// for windowed queries to resolve correctly.
585    ///
586    /// # Arguments
587    ///
588    /// * `plan` - The streaming query plan containing the SQL statement
589    /// * `ctx` - `DataFusion` session context with registered UDFs
590    ///
591    /// # Errors
592    ///
593    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
594    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
595    pub async fn to_logical_plan(
596        &self,
597        plan: &QueryPlan,
598        ctx: &SessionContext,
599    ) -> Result<LogicalPlan, PlanningError> {
600        // Convert the AST statement back to SQL and let DataFusion re-parse
601        // it with its own sqlparser version. This avoids version mismatches
602        // between our sqlparser (0.60) and DataFusion's (0.59).
603        let sql = plan.statement.to_string();
604        ctx.state()
605            .create_logical_plan(&sql)
606            .await
607            .map_err(PlanningError::DataFusion)
608    }
609}
610
611impl Default for StreamingPlanner {
612    fn default() -> Self {
613        Self::new()
614    }
615}
616
617/// Intermediate query analysis result
618#[derive(Debug, Default)]
619#[allow(clippy::struct_field_names)]
620struct QueryAnalysis {
621    window_config: Option<WindowOperatorConfig>,
622    join_config: Option<Vec<JoinOperatorConfig>>,
623    order_config: Option<OrderOperatorConfig>,
624    analytic_config: Option<AnalyticWindowConfig>,
625    having_config: Option<HavingFilterConfig>,
626    frame_config: Option<WindowFrameConfig>,
627}
628
629/// Helper to convert `ObjectName` to String
630fn object_name_to_string(name: &ObjectName) -> String {
631    name.to_string()
632}
633
634/// Planning errors
635#[derive(Debug, thiserror::Error)]
636pub enum PlanningError {
637    /// Unsupported SQL feature
638    UnsupportedSql(String),
639
640    /// Invalid query
641    InvalidQuery(String),
642
643    /// Source not found
644    SourceNotFound(String),
645
646    /// Sink not found
647    SinkNotFound(String),
648
649    /// `DataFusion` error during logical plan creation (translated on display)
650    DataFusion(#[from] datafusion_common::DataFusionError),
651}
652
653impl std::fmt::Display for PlanningError {
654    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
655        match self {
656            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
657            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
658            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
659            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
660            Self::DataFusion(e) => {
661                let translated = crate::error::translate_datafusion_error(&e.to_string());
662                write!(f, "{translated}")
663            }
664        }
665    }
666}
667
668#[cfg(test)]
669mod tests {
670    use super::*;
671    use crate::parser::StreamingParser;
672
673    #[test]
674    fn test_plan_create_source() {
675        let mut planner = StreamingPlanner::new();
676        let statements =
677            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
678
679        let plan = planner.plan(&statements[0]).unwrap();
680        match plan {
681            StreamingPlan::RegisterSource(info) => {
682                assert_eq!(info.name, "events");
683            }
684            _ => panic!("Expected RegisterSource plan"),
685        }
686    }
687
688    #[test]
689    fn test_plan_create_sink() {
690        let mut planner = StreamingPlanner::new();
691        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
692
693        let plan = planner.plan(&statements[0]).unwrap();
694        match plan {
695            StreamingPlan::RegisterSink(info) => {
696                assert_eq!(info.name, "output");
697                assert_eq!(info.from, "events");
698            }
699            _ => panic!("Expected RegisterSink plan"),
700        }
701    }
702
703    #[test]
704    fn test_plan_duplicate_source() {
705        let mut planner = StreamingPlanner::new();
706
707        // First source
708        let statements =
709            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
710        planner.plan(&statements[0]).unwrap();
711
712        // Duplicate should fail
713        let result = planner.plan(&statements[0]);
714        assert!(result.is_err());
715    }
716
717    #[test]
718    fn test_plan_source_if_not_exists() {
719        let mut planner = StreamingPlanner::new();
720
721        // First source
722        let statements =
723            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
724        planner.plan(&statements[0]).unwrap();
725
726        // IF NOT EXISTS should succeed
727        let statements =
728            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
729                .unwrap();
730        let result = planner.plan(&statements[0]);
731        assert!(result.is_ok());
732    }
733
734    #[test]
735    fn test_plan_source_or_replace() {
736        let mut planner = StreamingPlanner::new();
737
738        // First source
739        let statements =
740            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
741        planner.plan(&statements[0]).unwrap();
742
743        // OR REPLACE should succeed
744        let statements =
745            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
746                .unwrap();
747        let result = planner.plan(&statements[0]);
748        assert!(result.is_ok());
749    }
750
751    #[test]
752    fn test_plan_source_with_watermark() {
753        let mut planner = StreamingPlanner::new();
754        let statements = StreamingParser::parse_sql(
755            "CREATE SOURCE events (
756                id INT,
757                ts TIMESTAMP,
758                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
759            )",
760        )
761        .unwrap();
762
763        let plan = planner.plan(&statements[0]).unwrap();
764        match plan {
765            StreamingPlan::RegisterSource(info) => {
766                assert_eq!(info.name, "events");
767                assert_eq!(info.watermark_column, Some("ts".to_string()));
768            }
769            _ => panic!("Expected RegisterSource plan"),
770        }
771    }
772
773    #[test]
774    fn test_plan_standard_select() {
775        let mut planner = StreamingPlanner::new();
776        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
777
778        let plan = planner.plan(&statements[0]).unwrap();
779        match plan {
780            StreamingPlan::Standard(_) => {}
781            _ => panic!("Expected Standard plan for simple SELECT"),
782        }
783    }
784
785    #[test]
786    fn test_list_sources_and_sinks() {
787        let mut planner = StreamingPlanner::new();
788
789        // Create sources
790        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
791        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
792        planner.plan(&s1[0]).unwrap();
793        planner.plan(&s2[0]).unwrap();
794
795        // Create sinks
796        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
797        planner.plan(&k1[0]).unwrap();
798
799        assert_eq!(planner.list_sources().len(), 2);
800        assert_eq!(planner.list_sinks().len(), 1);
801        assert!(planner.get_source("src1").is_some());
802        assert!(planner.get_sink("sink1").is_some());
803    }
804
805    #[test]
806    fn test_plan_query_with_window() {
807        let mut planner = StreamingPlanner::new();
808        let statements = StreamingParser::parse_sql(
809            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
810        )
811        .unwrap();
812
813        let plan = planner.plan(&statements[0]).unwrap();
814        match plan {
815            StreamingPlan::Query(query_plan) => {
816                assert!(query_plan.window_config.is_some());
817                let config = query_plan.window_config.unwrap();
818                assert_eq!(config.time_column, "event_time");
819                assert_eq!(config.size.as_secs(), 300);
820            }
821            _ => panic!("Expected Query plan"),
822        }
823    }
824
825    #[test]
826    fn test_plan_query_with_join() {
827        let mut planner = StreamingPlanner::new();
828        let statements = StreamingParser::parse_sql(
829            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
830        )
831        .unwrap();
832
833        let plan = planner.plan(&statements[0]).unwrap();
834        match plan {
835            StreamingPlan::Query(query_plan) => {
836                assert!(query_plan.join_config.is_some());
837                let configs = query_plan.join_config.unwrap();
838                assert_eq!(configs.len(), 1);
839                assert_eq!(configs[0].left_key(), "order_id");
840                assert_eq!(configs[0].right_key(), "order_id");
841            }
842            _ => panic!("Expected Query plan"),
843        }
844    }
845
846    #[test]
847    fn test_plan_query_with_lag() {
848        let mut planner = StreamingPlanner::new();
849        let statements = StreamingParser::parse_sql(
850            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
851        )
852        .unwrap();
853
854        let plan = planner.plan(&statements[0]).unwrap();
855        match plan {
856            StreamingPlan::Query(query_plan) => {
857                assert!(query_plan.analytic_config.is_some());
858                let config = query_plan.analytic_config.unwrap();
859                assert_eq!(config.functions.len(), 1);
860                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
861            }
862            _ => panic!("Expected Query plan with analytic config"),
863        }
864    }
865
866    #[test]
867    fn test_plan_query_with_having() {
868        let mut planner = StreamingPlanner::new();
869        let statements = StreamingParser::parse_sql(
870            "SELECT symbol, COUNT(*) AS cnt FROM trades \
871             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
872             HAVING COUNT(*) > 10",
873        )
874        .unwrap();
875
876        let plan = planner.plan(&statements[0]).unwrap();
877        match plan {
878            StreamingPlan::Query(query_plan) => {
879                assert!(query_plan.window_config.is_some());
880                assert!(query_plan.having_config.is_some());
881                let config = query_plan.having_config.unwrap();
882                assert!(
883                    config.predicate().contains("COUNT(*)"),
884                    "predicate was: {}",
885                    config.predicate()
886                );
887            }
888            _ => panic!("Expected Query plan with having config"),
889        }
890    }
891
892    #[test]
893    fn test_plan_query_without_having() {
894        let mut planner = StreamingPlanner::new();
895        let statements = StreamingParser::parse_sql(
896            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
897        )
898        .unwrap();
899
900        let plan = planner.plan(&statements[0]).unwrap();
901        match plan {
902            StreamingPlan::Query(query_plan) => {
903                assert!(query_plan.having_config.is_none());
904            }
905            _ => panic!("Expected Query plan"),
906        }
907    }
908
909    #[test]
910    fn test_plan_having_only_produces_query_plan() {
911        // HAVING without window function still produces a Query plan
912        let mut planner = StreamingPlanner::new();
913        let statements = StreamingParser::parse_sql(
914            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
915        )
916        .unwrap();
917
918        let plan = planner.plan(&statements[0]).unwrap();
919        match plan {
920            StreamingPlan::Query(query_plan) => {
921                assert!(query_plan.having_config.is_some());
922                assert!(query_plan.window_config.is_none());
923            }
924            _ => panic!("Expected Query plan for HAVING-only query"),
925        }
926    }
927
928    #[test]
929    fn test_plan_having_compound_predicate() {
930        let mut planner = StreamingPlanner::new();
931        let statements = StreamingParser::parse_sql(
932            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
933             FROM trades GROUP BY symbol \
934             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
935        )
936        .unwrap();
937
938        let plan = planner.plan(&statements[0]).unwrap();
939        match plan {
940            StreamingPlan::Query(query_plan) => {
941                let config = query_plan.having_config.unwrap();
942                let pred = config.predicate();
943                assert!(pred.contains("AND"), "predicate was: {pred}");
944            }
945            _ => panic!("Expected Query plan"),
946        }
947    }
948
949    #[test]
950    fn test_plan_query_with_lead() {
951        let mut planner = StreamingPlanner::new();
952        let statements = StreamingParser::parse_sql(
953            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
954        )
955        .unwrap();
956
957        let plan = planner.plan(&statements[0]).unwrap();
958        match plan {
959            StreamingPlan::Query(query_plan) => {
960                assert!(query_plan.analytic_config.is_some());
961                let config = query_plan.analytic_config.unwrap();
962                assert!(config.has_lookahead());
963                assert_eq!(config.functions[0].offset, 2);
964            }
965            _ => panic!("Expected Query plan with analytic config"),
966        }
967    }
968
969    // -- Multi-way join planner tests --
970
971    #[test]
972    fn test_plan_single_join_produces_vec_of_one() {
973        let mut planner = StreamingPlanner::new();
974        let statements =
975            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
976
977        let plan = planner.plan(&statements[0]).unwrap();
978        match plan {
979            StreamingPlan::Query(qp) => {
980                let configs = qp.join_config.unwrap();
981                assert_eq!(configs.len(), 1);
982            }
983            _ => panic!("Expected Query plan"),
984        }
985    }
986
987    #[test]
988    fn test_plan_two_way_join() {
989        let mut planner = StreamingPlanner::new();
990        let statements = StreamingParser::parse_sql(
991            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
992        )
993        .unwrap();
994
995        let plan = planner.plan(&statements[0]).unwrap();
996        match plan {
997            StreamingPlan::Query(qp) => {
998                let configs = qp.join_config.unwrap();
999                assert_eq!(configs.len(), 2);
1000                assert_eq!(configs[0].left_key(), "id");
1001                assert_eq!(configs[0].right_key(), "a_id");
1002                assert_eq!(configs[1].left_key(), "id");
1003                assert_eq!(configs[1].right_key(), "b_id");
1004            }
1005            _ => panic!("Expected Query plan"),
1006        }
1007    }
1008
1009    #[test]
1010    fn test_plan_mixed_join_types() {
1011        let mut planner = StreamingPlanner::new();
1012        let statements = StreamingParser::parse_sql(
1013            "SELECT * FROM orders o \
1014             JOIN payments p ON o.id = p.order_id \
1015                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1016             JOIN customers c ON p.cust_id = c.id",
1017        )
1018        .unwrap();
1019
1020        let plan = planner.plan(&statements[0]).unwrap();
1021        match plan {
1022            StreamingPlan::Query(qp) => {
1023                let configs = qp.join_config.unwrap();
1024                assert_eq!(configs.len(), 2);
1025                assert!(configs[0].is_stream_stream());
1026                assert!(configs[1].is_lookup());
1027            }
1028            _ => panic!("Expected Query plan"),
1029        }
1030    }
1031
1032    #[test]
1033    fn test_plan_backward_compat_no_join() {
1034        let mut planner = StreamingPlanner::new();
1035        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1036
1037        let plan = planner.plan(&statements[0]).unwrap();
1038        match plan {
1039            StreamingPlan::Standard(_) => {} // No join → pass-through
1040            _ => panic!("Expected Standard plan for simple SELECT"),
1041        }
1042    }
1043
1044    // -- Window Frame planner tests --
1045
1046    #[test]
1047    fn test_plan_query_with_rows_frame() {
1048        let mut planner = StreamingPlanner::new();
1049        let statements = StreamingParser::parse_sql(
1050            "SELECT AVG(price) OVER (ORDER BY ts \
1051             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1052        )
1053        .unwrap();
1054
1055        let plan = planner.plan(&statements[0]).unwrap();
1056        match plan {
1057            StreamingPlan::Query(qp) => {
1058                assert!(qp.frame_config.is_some());
1059                let fc = qp.frame_config.unwrap();
1060                assert_eq!(fc.functions.len(), 1);
1061                assert_eq!(fc.functions[0].source_column, "price");
1062            }
1063            _ => panic!("Expected Query plan with frame_config"),
1064        }
1065    }
1066
1067    #[test]
1068    fn test_plan_frame_with_partition() {
1069        let mut planner = StreamingPlanner::new();
1070        let statements = StreamingParser::parse_sql(
1071            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1072             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1073        )
1074        .unwrap();
1075
1076        let plan = planner.plan(&statements[0]).unwrap();
1077        match plan {
1078            StreamingPlan::Query(qp) => {
1079                let fc = qp.frame_config.unwrap();
1080                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1081                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1082            }
1083            _ => panic!("Expected Query plan with frame_config"),
1084        }
1085    }
1086
1087    #[test]
1088    fn test_plan_no_frame_is_standard() {
1089        let mut planner = StreamingPlanner::new();
1090        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1091
1092        let plan = planner.plan(&statements[0]).unwrap();
1093        match plan {
1094            StreamingPlan::Standard(_) => {} // No frame → pass-through
1095            _ => panic!("Expected Standard plan for simple SELECT"),
1096        }
1097    }
1098
1099    #[test]
1100    fn test_plan_unbounded_following_rejected() {
1101        let mut planner = StreamingPlanner::new();
1102        let statements = StreamingParser::parse_sql(
1103            "SELECT SUM(amount) OVER (ORDER BY id \
1104             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1105             FROM orders",
1106        )
1107        .unwrap();
1108
1109        let result = planner.plan(&statements[0]);
1110        assert!(result.is_err());
1111        let err = result.unwrap_err().to_string();
1112        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1113    }
1114}