Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8/// Optimizer rules for lookup join rewriting.
9pub mod lookup_join;
10/// Predicate splitting and pushdown for lookup joins.
11pub mod predicate_split;
12/// Physical optimizer rule for streaming plan validation.
13pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] // cold path: query planning
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26    analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32    CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33    StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36    AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37    WindowFrameConfig, WindowOperatorConfig,
38};
39
40/// Information about a registered lookup table.
41#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43    /// Table name.
44    pub name: String,
45    /// Column names and types.
46    pub columns: Vec<(String, String)>,
47    /// Primary key columns.
48    pub primary_key: Vec<String>,
49    /// Validated properties.
50    pub properties: LookupTableProperties,
51    /// Pre-computed Arrow schema from column definitions.
52    pub arrow_schema: SchemaRef,
53    /// Raw WITH options for connector configuration pass-through.
54    pub raw_options: HashMap<String, String>,
55}
56
57/// Streaming query planner
58pub struct StreamingPlanner {
59    /// Registered sources
60    sources: HashMap<String, SourceInfo>,
61    /// Registered sinks
62    sinks: HashMap<String, SinkInfo>,
63    /// Registered lookup tables
64    lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67/// Information about a registered source
68#[derive(Debug, Clone)]
69pub struct SourceInfo {
70    /// Source name
71    pub name: String,
72    /// Watermark column (if configured)
73    pub watermark_column: Option<String>,
74    /// Connector options
75    pub options: HashMap<String, String>,
76}
77
78/// Information about a registered sink
79#[derive(Debug, Clone)]
80pub struct SinkInfo {
81    /// Sink name
82    pub name: String,
83    /// Source table or query name
84    pub from: String,
85    /// Connector options
86    pub options: HashMap<String, String>,
87}
88
89/// Result of planning a streaming statement
90#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93    /// Source registration (DDL)
94    RegisterSource(SourceInfo),
95
96    /// Sink registration (DDL)
97    RegisterSink(SinkInfo),
98
99    /// Query plan with streaming configurations
100    Query(QueryPlan),
101
102    /// Standard SQL statement (pass-through to DataFusion)
103    Standard(Box<Statement>),
104
105    /// Lookup table registration (DDL)
106    RegisterLookupTable(LookupTableInfo),
107
108    /// Drop a lookup table
109    DropLookupTable {
110        /// Name of the lookup table to drop.
111        name: String,
112    },
113}
114
115/// A query plan with streaming operator configurations
116#[derive(Debug)]
117pub struct QueryPlan {
118    /// Optional name for the continuous query
119    pub name: Option<String>,
120    /// Window configuration if the query has windowed aggregation
121    pub window_config: Option<WindowOperatorConfig>,
122    /// Join configuration(s) if the query has joins (one per join step)
123    pub join_config: Option<Vec<JoinOperatorConfig>>,
124    /// ORDER BY configuration if the query has ordering
125    pub order_config: Option<OrderOperatorConfig>,
126    /// Analytic window function configuration (LAG/LEAD/etc.)
127    pub analytic_config: Option<AnalyticWindowConfig>,
128    /// HAVING clause filter configuration
129    pub having_config: Option<HavingFilterConfig>,
130    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
131    pub frame_config: Option<WindowFrameConfig>,
132    /// Emit strategy
133    pub emit_clause: Option<EmitClause>,
134    /// The underlying SQL statement
135    pub statement: Box<Statement>,
136}
137
138impl StreamingPlanner {
139    /// Creates a new streaming planner
140    #[must_use]
141    pub fn new() -> Self {
142        Self {
143            sources: HashMap::new(),
144            sinks: HashMap::new(),
145            lookup_tables: HashMap::new(),
146        }
147    }
148
149    /// Plans a streaming statement.
150    ///
151    /// # Errors
152    ///
153    /// Returns `PlanningError` if the statement cannot be planned.
154    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
155        match statement {
156            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
157            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
158            StreamingStatement::CreateContinuousQuery {
159                name,
160                query,
161                emit_clause,
162            }
163            | StreamingStatement::CreateStream {
164                name,
165                query,
166                emit_clause,
167                ..
168            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
169            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
170            StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
171            StreamingStatement::DropLookupTable { name, if_exists } => {
172                self.plan_drop_lookup_table(name, *if_exists)
173            }
174            StreamingStatement::DropSource { .. }
175            | StreamingStatement::DropSink { .. }
176            | StreamingStatement::DropStream { .. }
177            | StreamingStatement::DropMaterializedView { .. }
178            | StreamingStatement::Show(_)
179            | StreamingStatement::Describe { .. }
180            | StreamingStatement::Explain { .. }
181            | StreamingStatement::CreateMaterializedView { .. }
182            | StreamingStatement::InsertInto { .. }
183            | StreamingStatement::AlterSource { .. }
184            | StreamingStatement::Checkpoint
185            | StreamingStatement::RestoreCheckpoint { .. } => {
186                // These statements are handled directly by the database facade
187                // and don't need query planning. Return as Standard pass-through.
188                Err(PlanningError::UnsupportedSql(format!(
189                    "Statement type {:?} is handled by the database layer, not the planner",
190                    std::mem::discriminant(statement)
191                )))
192            }
193        }
194    }
195
196    /// Plans a CREATE SOURCE statement.
197    fn plan_create_source(
198        &mut self,
199        source: &CreateSourceStatement,
200    ) -> Result<StreamingPlan, PlanningError> {
201        let name = object_name_to_string(&source.name);
202
203        // Check for existing source
204        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
205            return Err(PlanningError::InvalidQuery(format!(
206                "Source '{}' already exists",
207                name
208            )));
209        }
210
211        // Extract watermark column
212        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
213
214        let info = SourceInfo {
215            name: name.clone(),
216            watermark_column,
217            options: source.with_options.clone(),
218        };
219
220        // Register the source
221        self.sources.insert(name, info.clone());
222
223        Ok(StreamingPlan::RegisterSource(info))
224    }
225
226    /// Plans a CREATE SINK statement.
227    fn plan_create_sink(
228        &mut self,
229        sink: &CreateSinkStatement,
230    ) -> Result<StreamingPlan, PlanningError> {
231        let name = object_name_to_string(&sink.name);
232
233        // Check for existing sink
234        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
235            return Err(PlanningError::InvalidQuery(format!(
236                "Sink '{}' already exists",
237                name
238            )));
239        }
240
241        // Determine the source
242        let from = match &sink.from {
243            SinkFrom::Table(table) => object_name_to_string(table),
244            SinkFrom::Query(_) => format!("{}_query", name),
245        };
246
247        let info = SinkInfo {
248            name: name.clone(),
249            from,
250            options: sink.with_options.clone(),
251        };
252
253        // Register the sink
254        self.sinks.insert(name, info.clone());
255
256        Ok(StreamingPlan::RegisterSink(info))
257    }
258
259    /// Plans a CREATE CONTINUOUS QUERY statement.
260    #[allow(clippy::unused_self)] // Will use planner state for query registration
261    fn plan_continuous_query(
262        &mut self,
263        name: &ObjectName,
264        query: &StreamingStatement,
265        emit_clause: Option<&EmitClause>,
266    ) -> Result<StreamingPlan, PlanningError> {
267        // The query inside should be a standard SELECT
268        let stmt = match query {
269            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
270            _ => {
271                return Err(PlanningError::InvalidQuery(
272                    "Continuous query must contain a SELECT statement".to_string(),
273                ))
274            }
275        };
276
277        // Analyze the query for streaming features
278        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
279
280        Ok(StreamingPlan::Query(QueryPlan {
281            name: Some(object_name_to_string(name)),
282            window_config: query_plan.window_config,
283            join_config: query_plan.join_config,
284            order_config: query_plan.order_config,
285            analytic_config: query_plan.analytic_config,
286            having_config: query_plan.having_config,
287            frame_config: query_plan.frame_config,
288            emit_clause: emit_clause.cloned(),
289            statement: Box::new(stmt),
290        }))
291    }
292
293    /// Plans a standard SQL statement.
294    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
295    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
296        // Check if it's a query that might have streaming features
297        if let Statement::Query(query) = stmt {
298            if let SetExpr::Select(select) = query.body.as_ref() {
299                // Check for window functions in GROUP BY
300                let window_function = Self::extract_window_from_select(select);
301
302                // Check for joins (multi-way)
303                let join_analysis = analyze_joins(select).map_err(|e| {
304                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
305                })?;
306
307                // Check for ORDER BY
308                let order_analysis = analyze_order_by(stmt);
309                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
310                    .map_err(PlanningError::InvalidQuery)?;
311
312                // Check for analytic functions (LAG/LEAD/etc.)
313                let analytic_analysis = analyze_analytic_functions(stmt);
314                let analytic_config =
315                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
316
317                // Check for HAVING clause
318                let agg_analysis = analyze_aggregates(stmt);
319                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
320
321                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
322                let frame_analysis = analyze_window_frames(stmt);
323                let frame_config = frame_analysis
324                    .as_ref()
325                    .map(WindowFrameConfig::from_analysis);
326
327                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
328                if let Some(fa) = &frame_analysis {
329                    for f in &fa.functions {
330                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
331                            return Err(PlanningError::InvalidQuery(
332                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
333                                    .to_string(),
334                            ));
335                        }
336                    }
337                }
338
339                let has_streaming_features = window_function.is_some()
340                    || join_analysis.is_some()
341                    || order_config.is_some()
342                    || analytic_config.is_some()
343                    || having_config.is_some()
344                    || frame_config.is_some();
345
346                if has_streaming_features {
347                    let window_config = match window_function {
348                        Some(w) => Some(
349                            WindowOperatorConfig::from_window_function(&w)
350                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
351                        ),
352                        None => None,
353                    };
354
355                    let join_config =
356                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
357
358                    return Ok(StreamingPlan::Query(QueryPlan {
359                        name: None,
360                        window_config,
361                        join_config,
362                        order_config,
363                        analytic_config,
364                        having_config,
365                        frame_config,
366                        emit_clause: None,
367                        statement: Box::new(stmt.clone()),
368                    }));
369                }
370            }
371        }
372
373        // Pass through standard SQL
374        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
375    }
376
377    /// Analyzes a query for streaming features.
378    fn analyze_query(
379        stmt: &Statement,
380        emit_clause: Option<&EmitClause>,
381    ) -> Result<QueryAnalysis, PlanningError> {
382        let mut analysis = QueryAnalysis::default();
383
384        if let Statement::Query(query) = stmt {
385            if let SetExpr::Select(select) = query.body.as_ref() {
386                // Extract window function
387                if let Some(window) = Self::extract_window_from_select(select) {
388                    let mut config = WindowOperatorConfig::from_window_function(&window)
389                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
390
391                    // Apply emit clause if present
392                    if let Some(emit) = emit_clause {
393                        config = config
394                            .with_emit_clause(emit)
395                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
396                    }
397
398                    analysis.window_config = Some(config);
399                }
400
401                // Extract join info (multi-way)
402                if let Some(multi) = analyze_joins(select).map_err(|e| {
403                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
404                })? {
405                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
406                }
407            }
408        }
409
410        // Extract ORDER BY info
411        let order_analysis = analyze_order_by(stmt);
412        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
413            .map_err(PlanningError::InvalidQuery)?;
414
415        // Extract analytic function info (LAG/LEAD/etc.)
416        if let Some(analytic) = analyze_analytic_functions(stmt) {
417            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
418        }
419
420        // Extract HAVING clause
421        let agg_analysis = analyze_aggregates(stmt);
422        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
423
424        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
425        if let Some(frame_analysis) = analyze_window_frames(stmt) {
426            // Validate: reject UNBOUNDED FOLLOWING
427            for f in &frame_analysis.functions {
428                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
429                    return Err(PlanningError::InvalidQuery(
430                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
431                            .to_string(),
432                    ));
433                }
434            }
435            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
436        }
437
438        Ok(analysis)
439    }
440
441    /// Extracts window function from a SELECT.
442    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
443        // Check GROUP BY for window functions
444        use sqlparser::ast::GroupByExpr;
445        match &select.group_by {
446            GroupByExpr::Expressions(exprs, _modifiers) => {
447                for group_by_expr in exprs {
448                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
449                    {
450                        return Some(window);
451                    }
452                }
453            }
454            GroupByExpr::All(_) => {}
455        }
456        None
457    }
458
459    /// Plans a CREATE LOOKUP TABLE statement.
460    fn plan_create_lookup_table(
461        &mut self,
462        lt: &CreateLookupTableStatement,
463    ) -> Result<StreamingPlan, PlanningError> {
464        let name = object_name_to_string(&lt.name);
465
466        if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
467            return Err(PlanningError::InvalidQuery(format!(
468                "Lookup table '{}' already exists",
469                name
470            )));
471        }
472
473        let columns: Vec<(String, String)> = lt
474            .columns
475            .iter()
476            .map(|c| (c.name.value.clone(), c.data_type.to_string()))
477            .collect();
478
479        let properties = validate_properties(&lt.with_options).map_err(|e| {
480            PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
481        })?;
482
483        // Compute Arrow schema from column definitions
484        let arrow_fields: Vec<Field> = lt
485            .columns
486            .iter()
487            .map(|c| {
488                let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
489                    .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
490                let nullable = !c
491                    .options
492                    .iter()
493                    .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
494                Ok(Field::new(&c.name.value, dt, nullable))
495            })
496            .collect::<Result<_, PlanningError>>()?;
497        let arrow_schema = Arc::new(Schema::new(arrow_fields));
498
499        let info = LookupTableInfo {
500            name: name.clone(),
501            columns,
502            primary_key: lt.primary_key.clone(),
503            properties,
504            arrow_schema,
505            raw_options: lt.with_options.clone(),
506        };
507
508        self.lookup_tables.insert(name, info.clone());
509
510        Ok(StreamingPlan::RegisterLookupTable(info))
511    }
512
513    /// Plans a DROP LOOKUP TABLE statement.
514    fn plan_drop_lookup_table(
515        &mut self,
516        name: &ObjectName,
517        if_exists: bool,
518    ) -> Result<StreamingPlan, PlanningError> {
519        let name_str = object_name_to_string(name);
520
521        if !if_exists && !self.lookup_tables.contains_key(&name_str) {
522            return Err(PlanningError::InvalidQuery(format!(
523                "Lookup table '{}' does not exist",
524                name_str
525            )));
526        }
527
528        self.lookup_tables.remove(&name_str);
529
530        Ok(StreamingPlan::DropLookupTable { name: name_str })
531    }
532
533    /// Gets a registered source by name.
534    #[must_use]
535    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
536        self.sources.get(name)
537    }
538
539    /// Gets a registered sink by name.
540    #[must_use]
541    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
542        self.sinks.get(name)
543    }
544
545    /// Lists all registered sources.
546    #[must_use]
547    pub fn list_sources(&self) -> Vec<&SourceInfo> {
548        self.sources.values().collect()
549    }
550
551    /// Lists all registered sinks.
552    #[must_use]
553    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
554        self.sinks.values().collect()
555    }
556
557    /// Gets a registered lookup table by name.
558    #[must_use]
559    pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
560        self.lookup_tables.get(name)
561    }
562
563    /// Lists all registered lookup tables.
564    #[must_use]
565    pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
566        self.lookup_tables.values().collect()
567    }
568
569    /// Returns a clone of the lookup tables map for optimizer rule construction.
570    #[must_use]
571    pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
572        self.lookup_tables.clone()
573    }
574
575    /// Converts a query plan's SQL statement into a `DataFusion`
576    /// `LogicalPlan`. Window UDFs (TUMBLE, HOP, SESSION) must be registered
577    /// on `ctx` via
578    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
579    /// for windowed queries to resolve correctly.
580    ///
581    /// # Errors
582    ///
583    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
584    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
585    pub async fn to_logical_plan(
586        &self,
587        plan: &QueryPlan,
588        ctx: &SessionContext,
589    ) -> Result<LogicalPlan, PlanningError> {
590        // Convert the AST statement back to SQL and let DataFusion re-parse
591        // it with its own sqlparser version. This avoids version mismatches
592        // between our sqlparser (0.60) and DataFusion's (0.59).
593        let sql = plan.statement.to_string();
594        ctx.state()
595            .create_logical_plan(&sql)
596            .await
597            .map_err(PlanningError::DataFusion)
598    }
599}
600
601impl Default for StreamingPlanner {
602    fn default() -> Self {
603        Self::new()
604    }
605}
606
607/// Intermediate query analysis result
608#[derive(Debug, Default)]
609#[allow(clippy::struct_field_names)]
610struct QueryAnalysis {
611    window_config: Option<WindowOperatorConfig>,
612    join_config: Option<Vec<JoinOperatorConfig>>,
613    order_config: Option<OrderOperatorConfig>,
614    analytic_config: Option<AnalyticWindowConfig>,
615    having_config: Option<HavingFilterConfig>,
616    frame_config: Option<WindowFrameConfig>,
617}
618
619/// Helper to convert `ObjectName` to String
620fn object_name_to_string(name: &ObjectName) -> String {
621    name.to_string()
622}
623
624/// Planning errors
625#[derive(Debug, thiserror::Error)]
626pub enum PlanningError {
627    /// Unsupported SQL feature
628    UnsupportedSql(String),
629
630    /// Invalid query
631    InvalidQuery(String),
632
633    /// Source not found
634    SourceNotFound(String),
635
636    /// Sink not found
637    SinkNotFound(String),
638
639    /// `DataFusion` error during logical plan creation (translated on display)
640    DataFusion(#[from] datafusion_common::DataFusionError),
641}
642
643impl std::fmt::Display for PlanningError {
644    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
645        match self {
646            Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
647            Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
648            Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
649            Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
650            Self::DataFusion(e) => {
651                let translated = crate::error::translate_datafusion_error(&e.to_string());
652                write!(f, "{translated}")
653            }
654        }
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661    use crate::parser::StreamingParser;
662
663    #[test]
664    fn test_plan_create_source() {
665        let mut planner = StreamingPlanner::new();
666        let statements =
667            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
668
669        let plan = planner.plan(&statements[0]).unwrap();
670        match plan {
671            StreamingPlan::RegisterSource(info) => {
672                assert_eq!(info.name, "events");
673            }
674            _ => panic!("Expected RegisterSource plan"),
675        }
676    }
677
678    #[test]
679    fn test_plan_create_sink() {
680        let mut planner = StreamingPlanner::new();
681        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
682
683        let plan = planner.plan(&statements[0]).unwrap();
684        match plan {
685            StreamingPlan::RegisterSink(info) => {
686                assert_eq!(info.name, "output");
687                assert_eq!(info.from, "events");
688            }
689            _ => panic!("Expected RegisterSink plan"),
690        }
691    }
692
693    #[test]
694    fn test_plan_duplicate_source() {
695        let mut planner = StreamingPlanner::new();
696
697        // First source
698        let statements =
699            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
700        planner.plan(&statements[0]).unwrap();
701
702        // Duplicate should fail
703        let result = planner.plan(&statements[0]);
704        assert!(result.is_err());
705    }
706
707    #[test]
708    fn test_plan_source_if_not_exists() {
709        let mut planner = StreamingPlanner::new();
710
711        // First source
712        let statements =
713            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
714        planner.plan(&statements[0]).unwrap();
715
716        // IF NOT EXISTS should succeed
717        let statements =
718            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
719                .unwrap();
720        let result = planner.plan(&statements[0]);
721        assert!(result.is_ok());
722    }
723
724    #[test]
725    fn test_plan_source_or_replace() {
726        let mut planner = StreamingPlanner::new();
727
728        // First source
729        let statements =
730            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
731        planner.plan(&statements[0]).unwrap();
732
733        // OR REPLACE should succeed
734        let statements =
735            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
736                .unwrap();
737        let result = planner.plan(&statements[0]);
738        assert!(result.is_ok());
739    }
740
741    #[test]
742    fn test_plan_source_with_watermark() {
743        let mut planner = StreamingPlanner::new();
744        let statements = StreamingParser::parse_sql(
745            "CREATE SOURCE events (
746                id INT,
747                ts TIMESTAMP,
748                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
749            )",
750        )
751        .unwrap();
752
753        let plan = planner.plan(&statements[0]).unwrap();
754        match plan {
755            StreamingPlan::RegisterSource(info) => {
756                assert_eq!(info.name, "events");
757                assert_eq!(info.watermark_column, Some("ts".to_string()));
758            }
759            _ => panic!("Expected RegisterSource plan"),
760        }
761    }
762
763    #[test]
764    fn test_plan_standard_select() {
765        let mut planner = StreamingPlanner::new();
766        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
767
768        let plan = planner.plan(&statements[0]).unwrap();
769        match plan {
770            StreamingPlan::Standard(_) => {}
771            _ => panic!("Expected Standard plan for simple SELECT"),
772        }
773    }
774
775    #[test]
776    fn test_list_sources_and_sinks() {
777        let mut planner = StreamingPlanner::new();
778
779        // Create sources
780        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
781        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
782        planner.plan(&s1[0]).unwrap();
783        planner.plan(&s2[0]).unwrap();
784
785        // Create sinks
786        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
787        planner.plan(&k1[0]).unwrap();
788
789        assert_eq!(planner.list_sources().len(), 2);
790        assert_eq!(planner.list_sinks().len(), 1);
791        assert!(planner.get_source("src1").is_some());
792        assert!(planner.get_sink("sink1").is_some());
793    }
794
795    #[test]
796    fn test_plan_query_with_window() {
797        let mut planner = StreamingPlanner::new();
798        let statements = StreamingParser::parse_sql(
799            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
800        )
801        .unwrap();
802
803        let plan = planner.plan(&statements[0]).unwrap();
804        match plan {
805            StreamingPlan::Query(query_plan) => {
806                assert!(query_plan.window_config.is_some());
807                let config = query_plan.window_config.unwrap();
808                assert_eq!(config.time_column, "event_time");
809                assert_eq!(config.size.as_secs(), 300);
810            }
811            _ => panic!("Expected Query plan"),
812        }
813    }
814
815    #[test]
816    fn test_plan_query_with_join() {
817        let mut planner = StreamingPlanner::new();
818        let statements = StreamingParser::parse_sql(
819            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
820        )
821        .unwrap();
822
823        let plan = planner.plan(&statements[0]).unwrap();
824        match plan {
825            StreamingPlan::Query(query_plan) => {
826                assert!(query_plan.join_config.is_some());
827                let configs = query_plan.join_config.unwrap();
828                assert_eq!(configs.len(), 1);
829                assert_eq!(configs[0].left_key(), "order_id");
830                assert_eq!(configs[0].right_key(), "order_id");
831            }
832            _ => panic!("Expected Query plan"),
833        }
834    }
835
836    #[test]
837    fn test_plan_query_with_lag() {
838        let mut planner = StreamingPlanner::new();
839        let statements = StreamingParser::parse_sql(
840            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
841        )
842        .unwrap();
843
844        let plan = planner.plan(&statements[0]).unwrap();
845        match plan {
846            StreamingPlan::Query(query_plan) => {
847                assert!(query_plan.analytic_config.is_some());
848                let config = query_plan.analytic_config.unwrap();
849                assert_eq!(config.functions.len(), 1);
850                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
851            }
852            _ => panic!("Expected Query plan with analytic config"),
853        }
854    }
855
856    #[test]
857    fn test_plan_query_with_having() {
858        let mut planner = StreamingPlanner::new();
859        let statements = StreamingParser::parse_sql(
860            "SELECT symbol, COUNT(*) AS cnt FROM trades \
861             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
862             HAVING COUNT(*) > 10",
863        )
864        .unwrap();
865
866        let plan = planner.plan(&statements[0]).unwrap();
867        match plan {
868            StreamingPlan::Query(query_plan) => {
869                assert!(query_plan.window_config.is_some());
870                assert!(query_plan.having_config.is_some());
871                let config = query_plan.having_config.unwrap();
872                assert!(
873                    config.predicate().contains("COUNT(*)"),
874                    "predicate was: {}",
875                    config.predicate()
876                );
877            }
878            _ => panic!("Expected Query plan with having config"),
879        }
880    }
881
882    #[test]
883    fn test_plan_query_without_having() {
884        let mut planner = StreamingPlanner::new();
885        let statements = StreamingParser::parse_sql(
886            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
887        )
888        .unwrap();
889
890        let plan = planner.plan(&statements[0]).unwrap();
891        match plan {
892            StreamingPlan::Query(query_plan) => {
893                assert!(query_plan.having_config.is_none());
894            }
895            _ => panic!("Expected Query plan"),
896        }
897    }
898
899    #[test]
900    fn test_plan_having_only_produces_query_plan() {
901        // HAVING without window function still produces a Query plan
902        let mut planner = StreamingPlanner::new();
903        let statements = StreamingParser::parse_sql(
904            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
905        )
906        .unwrap();
907
908        let plan = planner.plan(&statements[0]).unwrap();
909        match plan {
910            StreamingPlan::Query(query_plan) => {
911                assert!(query_plan.having_config.is_some());
912                assert!(query_plan.window_config.is_none());
913            }
914            _ => panic!("Expected Query plan for HAVING-only query"),
915        }
916    }
917
918    #[test]
919    fn test_plan_having_compound_predicate() {
920        let mut planner = StreamingPlanner::new();
921        let statements = StreamingParser::parse_sql(
922            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
923             FROM trades GROUP BY symbol \
924             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
925        )
926        .unwrap();
927
928        let plan = planner.plan(&statements[0]).unwrap();
929        match plan {
930            StreamingPlan::Query(query_plan) => {
931                let config = query_plan.having_config.unwrap();
932                let pred = config.predicate();
933                assert!(pred.contains("AND"), "predicate was: {pred}");
934            }
935            _ => panic!("Expected Query plan"),
936        }
937    }
938
939    #[test]
940    fn test_plan_query_with_lead() {
941        let mut planner = StreamingPlanner::new();
942        let statements = StreamingParser::parse_sql(
943            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
944        )
945        .unwrap();
946
947        let plan = planner.plan(&statements[0]).unwrap();
948        match plan {
949            StreamingPlan::Query(query_plan) => {
950                assert!(query_plan.analytic_config.is_some());
951                let config = query_plan.analytic_config.unwrap();
952                assert!(config.has_lookahead());
953                assert_eq!(config.functions[0].offset, 2);
954            }
955            _ => panic!("Expected Query plan with analytic config"),
956        }
957    }
958
959    // -- Multi-way join planner tests --
960
961    #[test]
962    fn test_plan_single_join_produces_vec_of_one() {
963        let mut planner = StreamingPlanner::new();
964        let statements =
965            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
966
967        let plan = planner.plan(&statements[0]).unwrap();
968        match plan {
969            StreamingPlan::Query(qp) => {
970                let configs = qp.join_config.unwrap();
971                assert_eq!(configs.len(), 1);
972            }
973            _ => panic!("Expected Query plan"),
974        }
975    }
976
977    #[test]
978    fn test_plan_two_way_join() {
979        let mut planner = StreamingPlanner::new();
980        let statements = StreamingParser::parse_sql(
981            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
982        )
983        .unwrap();
984
985        let plan = planner.plan(&statements[0]).unwrap();
986        match plan {
987            StreamingPlan::Query(qp) => {
988                let configs = qp.join_config.unwrap();
989                assert_eq!(configs.len(), 2);
990                assert_eq!(configs[0].left_key(), "id");
991                assert_eq!(configs[0].right_key(), "a_id");
992                assert_eq!(configs[1].left_key(), "id");
993                assert_eq!(configs[1].right_key(), "b_id");
994            }
995            _ => panic!("Expected Query plan"),
996        }
997    }
998
999    #[test]
1000    fn test_plan_mixed_join_types() {
1001        let mut planner = StreamingPlanner::new();
1002        let statements = StreamingParser::parse_sql(
1003            "SELECT * FROM orders o \
1004             JOIN payments p ON o.id = p.order_id \
1005                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1006             JOIN customers c ON p.cust_id = c.id",
1007        )
1008        .unwrap();
1009
1010        let plan = planner.plan(&statements[0]).unwrap();
1011        match plan {
1012            StreamingPlan::Query(qp) => {
1013                let configs = qp.join_config.unwrap();
1014                assert_eq!(configs.len(), 2);
1015                assert!(configs[0].is_stream_stream());
1016                assert!(configs[1].is_lookup());
1017            }
1018            _ => panic!("Expected Query plan"),
1019        }
1020    }
1021
1022    #[test]
1023    fn test_plan_backward_compat_no_join() {
1024        let mut planner = StreamingPlanner::new();
1025        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1026
1027        let plan = planner.plan(&statements[0]).unwrap();
1028        match plan {
1029            StreamingPlan::Standard(_) => {} // No join → pass-through
1030            _ => panic!("Expected Standard plan for simple SELECT"),
1031        }
1032    }
1033
1034    // -- Window Frame planner tests --
1035
1036    #[test]
1037    fn test_plan_query_with_rows_frame() {
1038        let mut planner = StreamingPlanner::new();
1039        let statements = StreamingParser::parse_sql(
1040            "SELECT AVG(price) OVER (ORDER BY ts \
1041             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1042        )
1043        .unwrap();
1044
1045        let plan = planner.plan(&statements[0]).unwrap();
1046        match plan {
1047            StreamingPlan::Query(qp) => {
1048                assert!(qp.frame_config.is_some());
1049                let fc = qp.frame_config.unwrap();
1050                assert_eq!(fc.functions.len(), 1);
1051                assert_eq!(fc.functions[0].source_column, "price");
1052            }
1053            _ => panic!("Expected Query plan with frame_config"),
1054        }
1055    }
1056
1057    #[test]
1058    fn test_plan_frame_with_partition() {
1059        let mut planner = StreamingPlanner::new();
1060        let statements = StreamingParser::parse_sql(
1061            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1062             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1063        )
1064        .unwrap();
1065
1066        let plan = planner.plan(&statements[0]).unwrap();
1067        match plan {
1068            StreamingPlan::Query(qp) => {
1069                let fc = qp.frame_config.unwrap();
1070                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1071                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1072            }
1073            _ => panic!("Expected Query plan with frame_config"),
1074        }
1075    }
1076
1077    #[test]
1078    fn test_plan_no_frame_is_standard() {
1079        let mut planner = StreamingPlanner::new();
1080        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1081
1082        let plan = planner.plan(&statements[0]).unwrap();
1083        match plan {
1084            StreamingPlan::Standard(_) => {} // No frame → pass-through
1085            _ => panic!("Expected Standard plan for simple SELECT"),
1086        }
1087    }
1088
1089    #[test]
1090    fn test_plan_unbounded_following_rejected() {
1091        let mut planner = StreamingPlanner::new();
1092        let statements = StreamingParser::parse_sql(
1093            "SELECT SUM(amount) OVER (ORDER BY id \
1094             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1095             FROM orders",
1096        )
1097        .unwrap();
1098
1099        let result = planner.plan(&statements[0]);
1100        assert!(result.is_err());
1101        let err = result.unwrap_err().to_string();
1102        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1103    }
1104}