Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8
9use std::collections::HashMap;
10
11use datafusion::logical_expr::LogicalPlan;
12use datafusion::prelude::SessionContext;
13use sqlparser::ast::{ObjectName, SetExpr, Statement};
14
15use crate::parser::aggregation_parser::analyze_aggregates;
16use crate::parser::analytic_parser::{
17    analyze_analytic_functions, analyze_window_frames, FrameBound,
18};
19use crate::parser::join_parser::analyze_joins;
20use crate::parser::order_analyzer::analyze_order_by;
21use crate::parser::{
22    CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom, StreamingStatement,
23    WindowFunction, WindowRewriter,
24};
25use crate::translator::{
26    AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
27    OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
28};
29
30/// Streaming query planner
31pub struct StreamingPlanner {
32    /// Registered sources
33    sources: HashMap<String, SourceInfo>,
34    /// Registered sinks
35    sinks: HashMap<String, SinkInfo>,
36}
37
38/// Information about a registered source
39#[derive(Debug, Clone)]
40pub struct SourceInfo {
41    /// Source name
42    pub name: String,
43    /// Watermark column (if configured)
44    pub watermark_column: Option<String>,
45    /// Connector options
46    pub options: HashMap<String, String>,
47}
48
49/// Information about a registered sink
50#[derive(Debug, Clone)]
51pub struct SinkInfo {
52    /// Sink name
53    pub name: String,
54    /// Source table or query name
55    pub from: String,
56    /// Connector options
57    pub options: HashMap<String, String>,
58}
59
60/// Result of planning a streaming statement
61#[derive(Debug)]
62#[allow(clippy::large_enum_variant)]
63pub enum StreamingPlan {
64    /// Source registration (DDL)
65    RegisterSource(SourceInfo),
66
67    /// Sink registration (DDL)
68    RegisterSink(SinkInfo),
69
70    /// Query plan with streaming configurations
71    Query(QueryPlan),
72
73    /// Standard SQL statement (pass-through to DataFusion)
74    Standard(Box<Statement>),
75
76    /// DAG topology explanation (from EXPLAIN DAG)
77    DagExplain(DagExplainOutput),
78}
79
80/// A query plan with streaming operator configurations
81#[derive(Debug)]
82pub struct QueryPlan {
83    /// Optional name for the continuous query
84    pub name: Option<String>,
85    /// Window configuration if the query has windowed aggregation
86    pub window_config: Option<WindowOperatorConfig>,
87    /// Join configuration(s) if the query has joins (one per join step)
88    pub join_config: Option<Vec<JoinOperatorConfig>>,
89    /// ORDER BY configuration if the query has ordering
90    pub order_config: Option<OrderOperatorConfig>,
91    /// Analytic window function configuration (LAG/LEAD/etc.)
92    pub analytic_config: Option<AnalyticWindowConfig>,
93    /// HAVING clause filter configuration
94    pub having_config: Option<HavingFilterConfig>,
95    /// Window frame configuration (ROWS BETWEEN / RANGE BETWEEN)
96    pub frame_config: Option<WindowFrameConfig>,
97    /// Emit strategy
98    pub emit_clause: Option<EmitClause>,
99    /// The underlying SQL statement
100    pub statement: Box<Statement>,
101}
102
103impl StreamingPlanner {
104    /// Creates a new streaming planner
105    #[must_use]
106    pub fn new() -> Self {
107        Self {
108            sources: HashMap::new(),
109            sinks: HashMap::new(),
110        }
111    }
112
113    /// Plans a streaming statement.
114    ///
115    /// # Errors
116    ///
117    /// Returns `PlanningError` if the statement cannot be planned.
118    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
119        match statement {
120            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
121            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
122            StreamingStatement::CreateContinuousQuery {
123                name,
124                query,
125                emit_clause,
126            }
127            | StreamingStatement::CreateStream {
128                name,
129                query,
130                emit_clause,
131                ..
132            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
133            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
134            StreamingStatement::DropSource { .. }
135            | StreamingStatement::DropSink { .. }
136            | StreamingStatement::DropStream { .. }
137            | StreamingStatement::DropMaterializedView { .. }
138            | StreamingStatement::Show(_)
139            | StreamingStatement::Describe { .. }
140            | StreamingStatement::Explain { .. }
141            | StreamingStatement::CreateMaterializedView { .. }
142            | StreamingStatement::InsertInto { .. } => {
143                // These statements are handled directly by the database facade
144                // and don't need query planning. Return as Standard pass-through.
145                Err(PlanningError::UnsupportedSql(format!(
146                    "Statement type {:?} is handled by the database layer, not the planner",
147                    std::mem::discriminant(statement)
148                )))
149            }
150        }
151    }
152
153    /// Plans a CREATE SOURCE statement.
154    fn plan_create_source(
155        &mut self,
156        source: &CreateSourceStatement,
157    ) -> Result<StreamingPlan, PlanningError> {
158        let name = object_name_to_string(&source.name);
159
160        // Check for existing source
161        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
162            return Err(PlanningError::InvalidQuery(format!(
163                "Source '{}' already exists",
164                name
165            )));
166        }
167
168        // Extract watermark column
169        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
170
171        let info = SourceInfo {
172            name: name.clone(),
173            watermark_column,
174            options: source.with_options.clone(),
175        };
176
177        // Register the source
178        self.sources.insert(name, info.clone());
179
180        Ok(StreamingPlan::RegisterSource(info))
181    }
182
183    /// Plans a CREATE SINK statement.
184    fn plan_create_sink(
185        &mut self,
186        sink: &CreateSinkStatement,
187    ) -> Result<StreamingPlan, PlanningError> {
188        let name = object_name_to_string(&sink.name);
189
190        // Check for existing sink
191        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
192            return Err(PlanningError::InvalidQuery(format!(
193                "Sink '{}' already exists",
194                name
195            )));
196        }
197
198        // Determine the source
199        let from = match &sink.from {
200            SinkFrom::Table(table) => object_name_to_string(table),
201            SinkFrom::Query(_) => format!("{}_query", name),
202        };
203
204        let info = SinkInfo {
205            name: name.clone(),
206            from,
207            options: sink.with_options.clone(),
208        };
209
210        // Register the sink
211        self.sinks.insert(name, info.clone());
212
213        Ok(StreamingPlan::RegisterSink(info))
214    }
215
216    /// Plans a CREATE CONTINUOUS QUERY statement.
217    #[allow(clippy::unused_self)] // Will use planner state for query registration
218    fn plan_continuous_query(
219        &mut self,
220        name: &ObjectName,
221        query: &StreamingStatement,
222        emit_clause: Option<&EmitClause>,
223    ) -> Result<StreamingPlan, PlanningError> {
224        // The query inside should be a standard SELECT
225        let stmt = match query {
226            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
227            _ => {
228                return Err(PlanningError::InvalidQuery(
229                    "Continuous query must contain a SELECT statement".to_string(),
230                ))
231            }
232        };
233
234        // Analyze the query for streaming features
235        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
236
237        Ok(StreamingPlan::Query(QueryPlan {
238            name: Some(object_name_to_string(name)),
239            window_config: query_plan.window_config,
240            join_config: query_plan.join_config,
241            order_config: query_plan.order_config,
242            analytic_config: query_plan.analytic_config,
243            having_config: query_plan.having_config,
244            frame_config: query_plan.frame_config,
245            emit_clause: emit_clause.cloned(),
246            statement: Box::new(stmt),
247        }))
248    }
249
250    /// Plans a standard SQL statement.
251    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
252    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
253        // Check if it's a query that might have streaming features
254        if let Statement::Query(query) = stmt {
255            if let SetExpr::Select(select) = query.body.as_ref() {
256                // Check for window functions in GROUP BY
257                let window_function = Self::extract_window_from_select(select);
258
259                // Check for joins (multi-way)
260                let join_analysis = analyze_joins(select).map_err(|e| {
261                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
262                })?;
263
264                // Check for ORDER BY
265                let order_analysis = analyze_order_by(stmt);
266                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
267                    .map_err(PlanningError::InvalidQuery)?;
268
269                // Check for analytic functions (LAG/LEAD/etc.)
270                let analytic_analysis = analyze_analytic_functions(stmt);
271                let analytic_config =
272                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
273
274                // Check for HAVING clause
275                let agg_analysis = analyze_aggregates(stmt);
276                let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
277
278                // Check for window frame functions (ROWS BETWEEN / RANGE BETWEEN)
279                let frame_analysis = analyze_window_frames(stmt);
280                let frame_config = frame_analysis
281                    .as_ref()
282                    .map(WindowFrameConfig::from_analysis);
283
284                // Validate: reject UNBOUNDED FOLLOWING (streaming can't buffer infinite future)
285                if let Some(fa) = &frame_analysis {
286                    for f in &fa.functions {
287                        if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
288                            return Err(PlanningError::InvalidQuery(
289                                "UNBOUNDED FOLLOWING is not supported in streaming window frames"
290                                    .to_string(),
291                            ));
292                        }
293                    }
294                }
295
296                let has_streaming_features = window_function.is_some()
297                    || join_analysis.is_some()
298                    || order_config.is_some()
299                    || analytic_config.is_some()
300                    || having_config.is_some()
301                    || frame_config.is_some();
302
303                if has_streaming_features {
304                    let window_config = match window_function {
305                        Some(w) => Some(
306                            WindowOperatorConfig::from_window_function(&w)
307                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
308                        ),
309                        None => None,
310                    };
311
312                    let join_config =
313                        join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
314
315                    return Ok(StreamingPlan::Query(QueryPlan {
316                        name: None,
317                        window_config,
318                        join_config,
319                        order_config,
320                        analytic_config,
321                        having_config,
322                        frame_config,
323                        emit_clause: None,
324                        statement: Box::new(stmt.clone()),
325                    }));
326                }
327            }
328        }
329
330        // Pass through standard SQL
331        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
332    }
333
334    /// Analyzes a query for streaming features.
335    fn analyze_query(
336        stmt: &Statement,
337        emit_clause: Option<&EmitClause>,
338    ) -> Result<QueryAnalysis, PlanningError> {
339        let mut analysis = QueryAnalysis::default();
340
341        if let Statement::Query(query) = stmt {
342            if let SetExpr::Select(select) = query.body.as_ref() {
343                // Extract window function
344                if let Some(window) = Self::extract_window_from_select(select) {
345                    let mut config = WindowOperatorConfig::from_window_function(&window)
346                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
347
348                    // Apply emit clause if present
349                    if let Some(emit) = emit_clause {
350                        config = config
351                            .with_emit_clause(emit)
352                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
353                    }
354
355                    analysis.window_config = Some(config);
356                }
357
358                // Extract join info (multi-way)
359                if let Some(multi) = analyze_joins(select).map_err(|e| {
360                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
361                })? {
362                    analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
363                }
364            }
365        }
366
367        // Extract ORDER BY info
368        let order_analysis = analyze_order_by(stmt);
369        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
370            .map_err(PlanningError::InvalidQuery)?;
371
372        // Extract analytic function info (LAG/LEAD/etc.)
373        if let Some(analytic) = analyze_analytic_functions(stmt) {
374            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
375        }
376
377        // Extract HAVING clause
378        let agg_analysis = analyze_aggregates(stmt);
379        analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
380
381        // Extract window frame functions (ROWS BETWEEN / RANGE BETWEEN)
382        if let Some(frame_analysis) = analyze_window_frames(stmt) {
383            // Validate: reject UNBOUNDED FOLLOWING
384            for f in &frame_analysis.functions {
385                if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
386                    return Err(PlanningError::InvalidQuery(
387                        "UNBOUNDED FOLLOWING is not supported in streaming window frames"
388                            .to_string(),
389                    ));
390                }
391            }
392            analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
393        }
394
395        Ok(analysis)
396    }
397
398    /// Extracts window function from a SELECT.
399    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
400        // Check GROUP BY for window functions
401        use sqlparser::ast::GroupByExpr;
402        match &select.group_by {
403            GroupByExpr::Expressions(exprs, _modifiers) => {
404                for group_by_expr in exprs {
405                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
406                    {
407                        return Some(window);
408                    }
409                }
410            }
411            GroupByExpr::All(_) => {}
412        }
413        None
414    }
415
416    /// Gets a registered source by name.
417    #[must_use]
418    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
419        self.sources.get(name)
420    }
421
422    /// Gets a registered sink by name.
423    #[must_use]
424    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
425        self.sinks.get(name)
426    }
427
428    /// Lists all registered sources.
429    #[must_use]
430    pub fn list_sources(&self) -> Vec<&SourceInfo> {
431        self.sources.values().collect()
432    }
433
434    /// Lists all registered sinks.
435    #[must_use]
436    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
437        self.sinks.values().collect()
438    }
439
440    /// Creates a `DataFusion` logical plan from a query plan.
441    ///
442    /// Converts the query plan's SQL statement into a `DataFusion`
443    /// `LogicalPlan` using the session context's state. Window UDFs
444    /// (TUMBLE, HOP, SESSION) must be registered on the context via
445    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
446    /// for windowed queries to resolve correctly.
447    ///
448    /// # Arguments
449    ///
450    /// * `plan` - The streaming query plan containing the SQL statement
451    /// * `ctx` - `DataFusion` session context with registered UDFs
452    ///
453    /// # Errors
454    ///
455    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
456    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
457    pub async fn to_logical_plan(
458        &self,
459        plan: &QueryPlan,
460        ctx: &SessionContext,
461    ) -> Result<LogicalPlan, PlanningError> {
462        // Convert the AST statement back to SQL and let DataFusion re-parse
463        // it with its own sqlparser version. This avoids version mismatches
464        // between our sqlparser (0.60) and DataFusion's (0.59).
465        let sql = plan.statement.to_string();
466        ctx.state()
467            .create_logical_plan(&sql)
468            .await
469            .map_err(PlanningError::DataFusion)
470    }
471}
472
473impl Default for StreamingPlanner {
474    fn default() -> Self {
475        Self::new()
476    }
477}
478
479/// Intermediate query analysis result
480#[derive(Debug, Default)]
481#[allow(clippy::struct_field_names)]
482struct QueryAnalysis {
483    window_config: Option<WindowOperatorConfig>,
484    join_config: Option<Vec<JoinOperatorConfig>>,
485    order_config: Option<OrderOperatorConfig>,
486    analytic_config: Option<AnalyticWindowConfig>,
487    having_config: Option<HavingFilterConfig>,
488    frame_config: Option<WindowFrameConfig>,
489}
490
491/// Helper to convert `ObjectName` to String
492fn object_name_to_string(name: &ObjectName) -> String {
493    name.to_string()
494}
495
496/// Planning errors
497#[derive(Debug, thiserror::Error)]
498pub enum PlanningError {
499    /// Unsupported SQL feature
500    #[error("Unsupported SQL: {0}")]
501    UnsupportedSql(String),
502
503    /// Invalid query
504    #[error("Invalid query: {0}")]
505    InvalidQuery(String),
506
507    /// Source not found
508    #[error("Source not found: {0}")]
509    SourceNotFound(String),
510
511    /// Sink not found
512    #[error("Sink not found: {0}")]
513    SinkNotFound(String),
514
515    /// `DataFusion` error during logical plan creation
516    #[error("DataFusion error: {0}")]
517    DataFusion(#[from] datafusion_common::DataFusionError),
518}
519
520#[cfg(test)]
521mod tests {
522    use super::*;
523    use crate::parser::StreamingParser;
524
525    #[test]
526    fn test_plan_create_source() {
527        let mut planner = StreamingPlanner::new();
528        let statements =
529            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
530
531        let plan = planner.plan(&statements[0]).unwrap();
532        match plan {
533            StreamingPlan::RegisterSource(info) => {
534                assert_eq!(info.name, "events");
535            }
536            _ => panic!("Expected RegisterSource plan"),
537        }
538    }
539
540    #[test]
541    fn test_plan_create_sink() {
542        let mut planner = StreamingPlanner::new();
543        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
544
545        let plan = planner.plan(&statements[0]).unwrap();
546        match plan {
547            StreamingPlan::RegisterSink(info) => {
548                assert_eq!(info.name, "output");
549                assert_eq!(info.from, "events");
550            }
551            _ => panic!("Expected RegisterSink plan"),
552        }
553    }
554
555    #[test]
556    fn test_plan_duplicate_source() {
557        let mut planner = StreamingPlanner::new();
558
559        // First source
560        let statements =
561            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
562        planner.plan(&statements[0]).unwrap();
563
564        // Duplicate should fail
565        let result = planner.plan(&statements[0]);
566        assert!(result.is_err());
567    }
568
569    #[test]
570    fn test_plan_source_if_not_exists() {
571        let mut planner = StreamingPlanner::new();
572
573        // First source
574        let statements =
575            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
576        planner.plan(&statements[0]).unwrap();
577
578        // IF NOT EXISTS should succeed
579        let statements =
580            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
581                .unwrap();
582        let result = planner.plan(&statements[0]);
583        assert!(result.is_ok());
584    }
585
586    #[test]
587    fn test_plan_source_or_replace() {
588        let mut planner = StreamingPlanner::new();
589
590        // First source
591        let statements =
592            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
593        planner.plan(&statements[0]).unwrap();
594
595        // OR REPLACE should succeed
596        let statements =
597            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
598                .unwrap();
599        let result = planner.plan(&statements[0]);
600        assert!(result.is_ok());
601    }
602
603    #[test]
604    fn test_plan_source_with_watermark() {
605        let mut planner = StreamingPlanner::new();
606        let statements = StreamingParser::parse_sql(
607            "CREATE SOURCE events (
608                id INT,
609                ts TIMESTAMP,
610                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
611            )",
612        )
613        .unwrap();
614
615        let plan = planner.plan(&statements[0]).unwrap();
616        match plan {
617            StreamingPlan::RegisterSource(info) => {
618                assert_eq!(info.name, "events");
619                assert_eq!(info.watermark_column, Some("ts".to_string()));
620            }
621            _ => panic!("Expected RegisterSource plan"),
622        }
623    }
624
625    #[test]
626    fn test_plan_standard_select() {
627        let mut planner = StreamingPlanner::new();
628        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
629
630        let plan = planner.plan(&statements[0]).unwrap();
631        match plan {
632            StreamingPlan::Standard(_) => {}
633            _ => panic!("Expected Standard plan for simple SELECT"),
634        }
635    }
636
637    #[test]
638    fn test_list_sources_and_sinks() {
639        let mut planner = StreamingPlanner::new();
640
641        // Create sources
642        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
643        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
644        planner.plan(&s1[0]).unwrap();
645        planner.plan(&s2[0]).unwrap();
646
647        // Create sinks
648        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
649        planner.plan(&k1[0]).unwrap();
650
651        assert_eq!(planner.list_sources().len(), 2);
652        assert_eq!(planner.list_sinks().len(), 1);
653        assert!(planner.get_source("src1").is_some());
654        assert!(planner.get_sink("sink1").is_some());
655    }
656
657    #[test]
658    fn test_plan_query_with_window() {
659        let mut planner = StreamingPlanner::new();
660        let statements = StreamingParser::parse_sql(
661            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
662        )
663        .unwrap();
664
665        let plan = planner.plan(&statements[0]).unwrap();
666        match plan {
667            StreamingPlan::Query(query_plan) => {
668                assert!(query_plan.window_config.is_some());
669                let config = query_plan.window_config.unwrap();
670                assert_eq!(config.time_column, "event_time");
671                assert_eq!(config.size.as_secs(), 300);
672            }
673            _ => panic!("Expected Query plan"),
674        }
675    }
676
677    #[test]
678    fn test_plan_query_with_join() {
679        let mut planner = StreamingPlanner::new();
680        let statements = StreamingParser::parse_sql(
681            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
682        )
683        .unwrap();
684
685        let plan = planner.plan(&statements[0]).unwrap();
686        match plan {
687            StreamingPlan::Query(query_plan) => {
688                assert!(query_plan.join_config.is_some());
689                let configs = query_plan.join_config.unwrap();
690                assert_eq!(configs.len(), 1);
691                assert_eq!(configs[0].left_key(), "order_id");
692                assert_eq!(configs[0].right_key(), "order_id");
693            }
694            _ => panic!("Expected Query plan"),
695        }
696    }
697
698    #[test]
699    fn test_plan_query_with_lag() {
700        let mut planner = StreamingPlanner::new();
701        let statements = StreamingParser::parse_sql(
702            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
703        )
704        .unwrap();
705
706        let plan = planner.plan(&statements[0]).unwrap();
707        match plan {
708            StreamingPlan::Query(query_plan) => {
709                assert!(query_plan.analytic_config.is_some());
710                let config = query_plan.analytic_config.unwrap();
711                assert_eq!(config.functions.len(), 1);
712                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
713            }
714            _ => panic!("Expected Query plan with analytic config"),
715        }
716    }
717
718    #[test]
719    fn test_plan_query_with_having() {
720        let mut planner = StreamingPlanner::new();
721        let statements = StreamingParser::parse_sql(
722            "SELECT symbol, COUNT(*) AS cnt FROM trades \
723             GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
724             HAVING COUNT(*) > 10",
725        )
726        .unwrap();
727
728        let plan = planner.plan(&statements[0]).unwrap();
729        match plan {
730            StreamingPlan::Query(query_plan) => {
731                assert!(query_plan.window_config.is_some());
732                assert!(query_plan.having_config.is_some());
733                let config = query_plan.having_config.unwrap();
734                assert!(
735                    config.predicate().contains("COUNT(*)"),
736                    "predicate was: {}",
737                    config.predicate()
738                );
739            }
740            _ => panic!("Expected Query plan with having config"),
741        }
742    }
743
744    #[test]
745    fn test_plan_query_without_having() {
746        let mut planner = StreamingPlanner::new();
747        let statements = StreamingParser::parse_sql(
748            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
749        )
750        .unwrap();
751
752        let plan = planner.plan(&statements[0]).unwrap();
753        match plan {
754            StreamingPlan::Query(query_plan) => {
755                assert!(query_plan.having_config.is_none());
756            }
757            _ => panic!("Expected Query plan"),
758        }
759    }
760
761    #[test]
762    fn test_plan_having_only_produces_query_plan() {
763        // HAVING without window function still produces a Query plan
764        let mut planner = StreamingPlanner::new();
765        let statements = StreamingParser::parse_sql(
766            "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
767        )
768        .unwrap();
769
770        let plan = planner.plan(&statements[0]).unwrap();
771        match plan {
772            StreamingPlan::Query(query_plan) => {
773                assert!(query_plan.having_config.is_some());
774                assert!(query_plan.window_config.is_none());
775            }
776            _ => panic!("Expected Query plan for HAVING-only query"),
777        }
778    }
779
780    #[test]
781    fn test_plan_having_compound_predicate() {
782        let mut planner = StreamingPlanner::new();
783        let statements = StreamingParser::parse_sql(
784            "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
785             FROM trades GROUP BY symbol \
786             HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
787        )
788        .unwrap();
789
790        let plan = planner.plan(&statements[0]).unwrap();
791        match plan {
792            StreamingPlan::Query(query_plan) => {
793                let config = query_plan.having_config.unwrap();
794                let pred = config.predicate();
795                assert!(pred.contains("AND"), "predicate was: {pred}");
796            }
797            _ => panic!("Expected Query plan"),
798        }
799    }
800
801    #[test]
802    fn test_plan_query_with_lead() {
803        let mut planner = StreamingPlanner::new();
804        let statements = StreamingParser::parse_sql(
805            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
806        )
807        .unwrap();
808
809        let plan = planner.plan(&statements[0]).unwrap();
810        match plan {
811            StreamingPlan::Query(query_plan) => {
812                assert!(query_plan.analytic_config.is_some());
813                let config = query_plan.analytic_config.unwrap();
814                assert!(config.has_lookahead());
815                assert_eq!(config.functions[0].offset, 2);
816            }
817            _ => panic!("Expected Query plan with analytic config"),
818        }
819    }
820
821    // -- Multi-way join planner tests (F-SQL-005) --
822
823    #[test]
824    fn test_plan_single_join_produces_vec_of_one() {
825        let mut planner = StreamingPlanner::new();
826        let statements =
827            StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
828
829        let plan = planner.plan(&statements[0]).unwrap();
830        match plan {
831            StreamingPlan::Query(qp) => {
832                let configs = qp.join_config.unwrap();
833                assert_eq!(configs.len(), 1);
834            }
835            _ => panic!("Expected Query plan"),
836        }
837    }
838
839    #[test]
840    fn test_plan_two_way_join() {
841        let mut planner = StreamingPlanner::new();
842        let statements = StreamingParser::parse_sql(
843            "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
844        )
845        .unwrap();
846
847        let plan = planner.plan(&statements[0]).unwrap();
848        match plan {
849            StreamingPlan::Query(qp) => {
850                let configs = qp.join_config.unwrap();
851                assert_eq!(configs.len(), 2);
852                assert_eq!(configs[0].left_key(), "id");
853                assert_eq!(configs[0].right_key(), "a_id");
854                assert_eq!(configs[1].left_key(), "id");
855                assert_eq!(configs[1].right_key(), "b_id");
856            }
857            _ => panic!("Expected Query plan"),
858        }
859    }
860
861    #[test]
862    fn test_plan_mixed_join_types() {
863        let mut planner = StreamingPlanner::new();
864        let statements = StreamingParser::parse_sql(
865            "SELECT * FROM orders o \
866             JOIN payments p ON o.id = p.order_id \
867                 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
868             JOIN customers c ON p.cust_id = c.id",
869        )
870        .unwrap();
871
872        let plan = planner.plan(&statements[0]).unwrap();
873        match plan {
874            StreamingPlan::Query(qp) => {
875                let configs = qp.join_config.unwrap();
876                assert_eq!(configs.len(), 2);
877                assert!(configs[0].is_stream_stream());
878                assert!(configs[1].is_lookup());
879            }
880            _ => panic!("Expected Query plan"),
881        }
882    }
883
884    #[test]
885    fn test_plan_backward_compat_no_join() {
886        let mut planner = StreamingPlanner::new();
887        let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
888
889        let plan = planner.plan(&statements[0]).unwrap();
890        match plan {
891            StreamingPlan::Standard(_) => {} // No join → pass-through
892            _ => panic!("Expected Standard plan for simple SELECT"),
893        }
894    }
895
896    // -- Window Frame planner tests (F-SQL-006) --
897
898    #[test]
899    fn test_plan_query_with_rows_frame() {
900        let mut planner = StreamingPlanner::new();
901        let statements = StreamingParser::parse_sql(
902            "SELECT AVG(price) OVER (ORDER BY ts \
903             ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
904        )
905        .unwrap();
906
907        let plan = planner.plan(&statements[0]).unwrap();
908        match plan {
909            StreamingPlan::Query(qp) => {
910                assert!(qp.frame_config.is_some());
911                let fc = qp.frame_config.unwrap();
912                assert_eq!(fc.functions.len(), 1);
913                assert_eq!(fc.functions[0].source_column, "price");
914            }
915            _ => panic!("Expected Query plan with frame_config"),
916        }
917    }
918
919    #[test]
920    fn test_plan_frame_with_partition() {
921        let mut planner = StreamingPlanner::new();
922        let statements = StreamingParser::parse_sql(
923            "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
924             ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
925        )
926        .unwrap();
927
928        let plan = planner.plan(&statements[0]).unwrap();
929        match plan {
930            StreamingPlan::Query(qp) => {
931                let fc = qp.frame_config.unwrap();
932                assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
933                assert_eq!(fc.order_columns, vec!["ts".to_string()]);
934            }
935            _ => panic!("Expected Query plan with frame_config"),
936        }
937    }
938
939    #[test]
940    fn test_plan_no_frame_is_standard() {
941        let mut planner = StreamingPlanner::new();
942        let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
943
944        let plan = planner.plan(&statements[0]).unwrap();
945        match plan {
946            StreamingPlan::Standard(_) => {} // No frame → pass-through
947            _ => panic!("Expected Standard plan for simple SELECT"),
948        }
949    }
950
951    #[test]
952    fn test_plan_unbounded_following_rejected() {
953        let mut planner = StreamingPlanner::new();
954        let statements = StreamingParser::parse_sql(
955            "SELECT SUM(amount) OVER (ORDER BY id \
956             ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
957             FROM orders",
958        )
959        .unwrap();
960
961        let result = planner.plan(&statements[0]);
962        assert!(result.is_err());
963        let err = result.unwrap_err().to_string();
964        assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
965    }
966}