Skip to main content

laminar_sql/planner/
mod.rs

1//! Query planner for streaming SQL
2//!
3//! This module translates parsed streaming SQL statements into execution plans.
4//! It integrates with the parser and translator modules to produce complete
5//! operator configurations for Ring 0 execution.
6
7pub mod channel_derivation;
8
9use std::collections::HashMap;
10
11use datafusion::logical_expr::LogicalPlan;
12use datafusion::prelude::SessionContext;
13use sqlparser::ast::{ObjectName, SetExpr, Statement};
14
15use crate::parser::analytic_parser::analyze_analytic_functions;
16use crate::parser::join_parser::analyze_join;
17use crate::parser::order_analyzer::analyze_order_by;
18use crate::parser::{
19    CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom, StreamingStatement,
20    WindowFunction, WindowRewriter,
21};
22use crate::translator::{
23    AnalyticWindowConfig, DagExplainOutput, JoinOperatorConfig, OrderOperatorConfig,
24    WindowOperatorConfig,
25};
26
27/// Streaming query planner
28pub struct StreamingPlanner {
29    /// Registered sources
30    sources: HashMap<String, SourceInfo>,
31    /// Registered sinks
32    sinks: HashMap<String, SinkInfo>,
33}
34
35/// Information about a registered source
36#[derive(Debug, Clone)]
37pub struct SourceInfo {
38    /// Source name
39    pub name: String,
40    /// Watermark column (if configured)
41    pub watermark_column: Option<String>,
42    /// Connector options
43    pub options: HashMap<String, String>,
44}
45
46/// Information about a registered sink
47#[derive(Debug, Clone)]
48pub struct SinkInfo {
49    /// Sink name
50    pub name: String,
51    /// Source table or query name
52    pub from: String,
53    /// Connector options
54    pub options: HashMap<String, String>,
55}
56
57/// Result of planning a streaming statement
58#[derive(Debug)]
59#[allow(clippy::large_enum_variant)]
60pub enum StreamingPlan {
61    /// Source registration (DDL)
62    RegisterSource(SourceInfo),
63
64    /// Sink registration (DDL)
65    RegisterSink(SinkInfo),
66
67    /// Query plan with streaming configurations
68    Query(QueryPlan),
69
70    /// Standard SQL statement (pass-through to DataFusion)
71    Standard(Box<Statement>),
72
73    /// DAG topology explanation (from EXPLAIN DAG)
74    DagExplain(DagExplainOutput),
75}
76
77/// A query plan with streaming operator configurations
78#[derive(Debug)]
79pub struct QueryPlan {
80    /// Optional name for the continuous query
81    pub name: Option<String>,
82    /// Window configuration if the query has windowed aggregation
83    pub window_config: Option<WindowOperatorConfig>,
84    /// Join configuration if the query has joins
85    pub join_config: Option<JoinOperatorConfig>,
86    /// ORDER BY configuration if the query has ordering
87    pub order_config: Option<OrderOperatorConfig>,
88    /// Analytic window function configuration (LAG/LEAD/etc.)
89    pub analytic_config: Option<AnalyticWindowConfig>,
90    /// Emit strategy
91    pub emit_clause: Option<EmitClause>,
92    /// The underlying SQL statement
93    pub statement: Box<Statement>,
94}
95
96impl StreamingPlanner {
97    /// Creates a new streaming planner
98    #[must_use]
99    pub fn new() -> Self {
100        Self {
101            sources: HashMap::new(),
102            sinks: HashMap::new(),
103        }
104    }
105
106    /// Plans a streaming statement.
107    ///
108    /// # Errors
109    ///
110    /// Returns `PlanningError` if the statement cannot be planned.
111    pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
112        match statement {
113            StreamingStatement::CreateSource(source) => self.plan_create_source(source),
114            StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
115            StreamingStatement::CreateContinuousQuery {
116                name,
117                query,
118                emit_clause,
119            }
120            | StreamingStatement::CreateStream {
121                name,
122                query,
123                emit_clause,
124                ..
125            } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
126            StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
127            StreamingStatement::DropSource { .. }
128            | StreamingStatement::DropSink { .. }
129            | StreamingStatement::DropStream { .. }
130            | StreamingStatement::DropMaterializedView { .. }
131            | StreamingStatement::Show(_)
132            | StreamingStatement::Describe { .. }
133            | StreamingStatement::Explain { .. }
134            | StreamingStatement::CreateMaterializedView { .. }
135            | StreamingStatement::InsertInto { .. } => {
136                // These statements are handled directly by the database facade
137                // and don't need query planning. Return as Standard pass-through.
138                Err(PlanningError::UnsupportedSql(format!(
139                    "Statement type {:?} is handled by the database layer, not the planner",
140                    std::mem::discriminant(statement)
141                )))
142            }
143        }
144    }
145
146    /// Plans a CREATE SOURCE statement.
147    fn plan_create_source(
148        &mut self,
149        source: &CreateSourceStatement,
150    ) -> Result<StreamingPlan, PlanningError> {
151        let name = object_name_to_string(&source.name);
152
153        // Check for existing source
154        if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
155            return Err(PlanningError::InvalidQuery(format!(
156                "Source '{}' already exists",
157                name
158            )));
159        }
160
161        // Extract watermark column
162        let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
163
164        let info = SourceInfo {
165            name: name.clone(),
166            watermark_column,
167            options: source.with_options.clone(),
168        };
169
170        // Register the source
171        self.sources.insert(name, info.clone());
172
173        Ok(StreamingPlan::RegisterSource(info))
174    }
175
176    /// Plans a CREATE SINK statement.
177    fn plan_create_sink(
178        &mut self,
179        sink: &CreateSinkStatement,
180    ) -> Result<StreamingPlan, PlanningError> {
181        let name = object_name_to_string(&sink.name);
182
183        // Check for existing sink
184        if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
185            return Err(PlanningError::InvalidQuery(format!(
186                "Sink '{}' already exists",
187                name
188            )));
189        }
190
191        // Determine the source
192        let from = match &sink.from {
193            SinkFrom::Table(table) => object_name_to_string(table),
194            SinkFrom::Query(_) => format!("{}_query", name),
195        };
196
197        let info = SinkInfo {
198            name: name.clone(),
199            from,
200            options: sink.with_options.clone(),
201        };
202
203        // Register the sink
204        self.sinks.insert(name, info.clone());
205
206        Ok(StreamingPlan::RegisterSink(info))
207    }
208
209    /// Plans a CREATE CONTINUOUS QUERY statement.
210    #[allow(clippy::unused_self)] // Will use planner state for query registration
211    fn plan_continuous_query(
212        &mut self,
213        name: &ObjectName,
214        query: &StreamingStatement,
215        emit_clause: Option<&EmitClause>,
216    ) -> Result<StreamingPlan, PlanningError> {
217        // The query inside should be a standard SELECT
218        let stmt = match query {
219            StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
220            _ => {
221                return Err(PlanningError::InvalidQuery(
222                    "Continuous query must contain a SELECT statement".to_string(),
223                ))
224            }
225        };
226
227        // Analyze the query for streaming features
228        let query_plan = Self::analyze_query(&stmt, emit_clause)?;
229
230        Ok(StreamingPlan::Query(QueryPlan {
231            name: Some(object_name_to_string(name)),
232            window_config: query_plan.window_config,
233            join_config: query_plan.join_config,
234            order_config: query_plan.order_config,
235            analytic_config: query_plan.analytic_config,
236            emit_clause: emit_clause.cloned(),
237            statement: Box::new(stmt),
238        }))
239    }
240
241    /// Plans a standard SQL statement.
242    #[allow(clippy::unused_self)] // Will use planner state for plan optimization
243    fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
244        // Check if it's a query that might have streaming features
245        if let Statement::Query(query) = stmt {
246            if let SetExpr::Select(select) = query.body.as_ref() {
247                // Check for window functions in GROUP BY
248                let window_function = Self::extract_window_from_select(select);
249
250                // Check for joins
251                let join_analysis = analyze_join(select).map_err(|e| {
252                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
253                })?;
254
255                // Check for ORDER BY
256                let order_analysis = analyze_order_by(stmt);
257                let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
258                    .map_err(PlanningError::InvalidQuery)?;
259
260                // Check for analytic functions (LAG/LEAD/etc.)
261                let analytic_analysis = analyze_analytic_functions(stmt);
262                let analytic_config =
263                    analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
264
265                let has_streaming_features = window_function.is_some()
266                    || join_analysis.is_some()
267                    || order_config.is_some()
268                    || analytic_config.is_some();
269
270                if has_streaming_features {
271                    let window_config = match window_function {
272                        Some(w) => Some(
273                            WindowOperatorConfig::from_window_function(&w)
274                                .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
275                        ),
276                        None => None,
277                    };
278
279                    let join_config = join_analysis.map(|j| JoinOperatorConfig::from_analysis(&j));
280
281                    return Ok(StreamingPlan::Query(QueryPlan {
282                        name: None,
283                        window_config,
284                        join_config,
285                        order_config,
286                        analytic_config,
287                        emit_clause: None,
288                        statement: Box::new(stmt.clone()),
289                    }));
290                }
291            }
292        }
293
294        // Pass through standard SQL
295        Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
296    }
297
298    /// Analyzes a query for streaming features.
299    fn analyze_query(
300        stmt: &Statement,
301        emit_clause: Option<&EmitClause>,
302    ) -> Result<QueryAnalysis, PlanningError> {
303        let mut analysis = QueryAnalysis::default();
304
305        if let Statement::Query(query) = stmt {
306            if let SetExpr::Select(select) = query.body.as_ref() {
307                // Extract window function
308                if let Some(window) = Self::extract_window_from_select(select) {
309                    let mut config = WindowOperatorConfig::from_window_function(&window)
310                        .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
311
312                    // Apply emit clause if present
313                    if let Some(emit) = emit_clause {
314                        config = config
315                            .with_emit_clause(emit)
316                            .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
317                    }
318
319                    analysis.window_config = Some(config);
320                }
321
322                // Extract join info
323                if let Some(join) = analyze_join(select).map_err(|e| {
324                    PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
325                })? {
326                    analysis.join_config = Some(JoinOperatorConfig::from_analysis(&join));
327                }
328            }
329        }
330
331        // Extract ORDER BY info
332        let order_analysis = analyze_order_by(stmt);
333        analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
334            .map_err(PlanningError::InvalidQuery)?;
335
336        // Extract analytic function info (LAG/LEAD/etc.)
337        if let Some(analytic) = analyze_analytic_functions(stmt) {
338            analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
339        }
340
341        Ok(analysis)
342    }
343
344    /// Extracts window function from a SELECT.
345    fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
346        // Check GROUP BY for window functions
347        use sqlparser::ast::GroupByExpr;
348        match &select.group_by {
349            GroupByExpr::Expressions(exprs, _modifiers) => {
350                for group_by_expr in exprs {
351                    if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
352                    {
353                        return Some(window);
354                    }
355                }
356            }
357            GroupByExpr::All(_) => {}
358        }
359        None
360    }
361
362    /// Gets a registered source by name.
363    #[must_use]
364    pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
365        self.sources.get(name)
366    }
367
368    /// Gets a registered sink by name.
369    #[must_use]
370    pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
371        self.sinks.get(name)
372    }
373
374    /// Lists all registered sources.
375    #[must_use]
376    pub fn list_sources(&self) -> Vec<&SourceInfo> {
377        self.sources.values().collect()
378    }
379
380    /// Lists all registered sinks.
381    #[must_use]
382    pub fn list_sinks(&self) -> Vec<&SinkInfo> {
383        self.sinks.values().collect()
384    }
385
386    /// Creates a `DataFusion` logical plan from a query plan.
387    ///
388    /// Converts the query plan's SQL statement into a `DataFusion`
389    /// `LogicalPlan` using the session context's state. Window UDFs
390    /// (TUMBLE, HOP, SESSION) must be registered on the context via
391    /// [`register_streaming_functions`](crate::datafusion::register_streaming_functions)
392    /// for windowed queries to resolve correctly.
393    ///
394    /// # Arguments
395    ///
396    /// * `plan` - The streaming query plan containing the SQL statement
397    /// * `ctx` - `DataFusion` session context with registered UDFs
398    ///
399    /// # Errors
400    ///
401    /// Returns `PlanningError` if `DataFusion` cannot create the logical plan.
402    #[allow(clippy::unused_self)] // Method will use planner state for plan optimization
403    pub async fn to_logical_plan(
404        &self,
405        plan: &QueryPlan,
406        ctx: &SessionContext,
407    ) -> Result<LogicalPlan, PlanningError> {
408        // Convert the AST statement back to SQL and let DataFusion re-parse
409        // it with its own sqlparser version. This avoids version mismatches
410        // between our sqlparser (0.60) and DataFusion's (0.59).
411        let sql = plan.statement.to_string();
412        ctx.state()
413            .create_logical_plan(&sql)
414            .await
415            .map_err(PlanningError::DataFusion)
416    }
417}
418
419impl Default for StreamingPlanner {
420    fn default() -> Self {
421        Self::new()
422    }
423}
424
425/// Intermediate query analysis result
426#[derive(Debug, Default)]
427#[allow(clippy::struct_field_names)]
428struct QueryAnalysis {
429    window_config: Option<WindowOperatorConfig>,
430    join_config: Option<JoinOperatorConfig>,
431    order_config: Option<OrderOperatorConfig>,
432    analytic_config: Option<AnalyticWindowConfig>,
433}
434
435/// Helper to convert `ObjectName` to String
436fn object_name_to_string(name: &ObjectName) -> String {
437    name.to_string()
438}
439
440/// Planning errors
441#[derive(Debug, thiserror::Error)]
442pub enum PlanningError {
443    /// Unsupported SQL feature
444    #[error("Unsupported SQL: {0}")]
445    UnsupportedSql(String),
446
447    /// Invalid query
448    #[error("Invalid query: {0}")]
449    InvalidQuery(String),
450
451    /// Source not found
452    #[error("Source not found: {0}")]
453    SourceNotFound(String),
454
455    /// Sink not found
456    #[error("Sink not found: {0}")]
457    SinkNotFound(String),
458
459    /// `DataFusion` error during logical plan creation
460    #[error("DataFusion error: {0}")]
461    DataFusion(#[from] datafusion_common::DataFusionError),
462}
463
464#[cfg(test)]
465mod tests {
466    use super::*;
467    use crate::parser::StreamingParser;
468
469    #[test]
470    fn test_plan_create_source() {
471        let mut planner = StreamingPlanner::new();
472        let statements =
473            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
474
475        let plan = planner.plan(&statements[0]).unwrap();
476        match plan {
477            StreamingPlan::RegisterSource(info) => {
478                assert_eq!(info.name, "events");
479            }
480            _ => panic!("Expected RegisterSource plan"),
481        }
482    }
483
484    #[test]
485    fn test_plan_create_sink() {
486        let mut planner = StreamingPlanner::new();
487        let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
488
489        let plan = planner.plan(&statements[0]).unwrap();
490        match plan {
491            StreamingPlan::RegisterSink(info) => {
492                assert_eq!(info.name, "output");
493                assert_eq!(info.from, "events");
494            }
495            _ => panic!("Expected RegisterSink plan"),
496        }
497    }
498
499    #[test]
500    fn test_plan_duplicate_source() {
501        let mut planner = StreamingPlanner::new();
502
503        // First source
504        let statements =
505            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
506        planner.plan(&statements[0]).unwrap();
507
508        // Duplicate should fail
509        let result = planner.plan(&statements[0]);
510        assert!(result.is_err());
511    }
512
513    #[test]
514    fn test_plan_source_if_not_exists() {
515        let mut planner = StreamingPlanner::new();
516
517        // First source
518        let statements =
519            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
520        planner.plan(&statements[0]).unwrap();
521
522        // IF NOT EXISTS should succeed
523        let statements =
524            StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
525                .unwrap();
526        let result = planner.plan(&statements[0]);
527        assert!(result.is_ok());
528    }
529
530    #[test]
531    fn test_plan_source_or_replace() {
532        let mut planner = StreamingPlanner::new();
533
534        // First source
535        let statements =
536            StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
537        planner.plan(&statements[0]).unwrap();
538
539        // OR REPLACE should succeed
540        let statements =
541            StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
542                .unwrap();
543        let result = planner.plan(&statements[0]);
544        assert!(result.is_ok());
545    }
546
547    #[test]
548    fn test_plan_source_with_watermark() {
549        let mut planner = StreamingPlanner::new();
550        let statements = StreamingParser::parse_sql(
551            "CREATE SOURCE events (
552                id INT,
553                ts TIMESTAMP,
554                WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
555            )",
556        )
557        .unwrap();
558
559        let plan = planner.plan(&statements[0]).unwrap();
560        match plan {
561            StreamingPlan::RegisterSource(info) => {
562                assert_eq!(info.name, "events");
563                assert_eq!(info.watermark_column, Some("ts".to_string()));
564            }
565            _ => panic!("Expected RegisterSource plan"),
566        }
567    }
568
569    #[test]
570    fn test_plan_standard_select() {
571        let mut planner = StreamingPlanner::new();
572        let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
573
574        let plan = planner.plan(&statements[0]).unwrap();
575        match plan {
576            StreamingPlan::Standard(_) => {}
577            _ => panic!("Expected Standard plan for simple SELECT"),
578        }
579    }
580
581    #[test]
582    fn test_list_sources_and_sinks() {
583        let mut planner = StreamingPlanner::new();
584
585        // Create sources
586        let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
587        let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
588        planner.plan(&s1[0]).unwrap();
589        planner.plan(&s2[0]).unwrap();
590
591        // Create sinks
592        let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
593        planner.plan(&k1[0]).unwrap();
594
595        assert_eq!(planner.list_sources().len(), 2);
596        assert_eq!(planner.list_sinks().len(), 1);
597        assert!(planner.get_source("src1").is_some());
598        assert!(planner.get_sink("sink1").is_some());
599    }
600
601    #[test]
602    fn test_plan_query_with_window() {
603        let mut planner = StreamingPlanner::new();
604        let statements = StreamingParser::parse_sql(
605            "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
606        )
607        .unwrap();
608
609        let plan = planner.plan(&statements[0]).unwrap();
610        match plan {
611            StreamingPlan::Query(query_plan) => {
612                assert!(query_plan.window_config.is_some());
613                let config = query_plan.window_config.unwrap();
614                assert_eq!(config.time_column, "event_time");
615                assert_eq!(config.size.as_secs(), 300);
616            }
617            _ => panic!("Expected Query plan"),
618        }
619    }
620
621    #[test]
622    fn test_plan_query_with_join() {
623        let mut planner = StreamingPlanner::new();
624        let statements = StreamingParser::parse_sql(
625            "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
626        )
627        .unwrap();
628
629        let plan = planner.plan(&statements[0]).unwrap();
630        match plan {
631            StreamingPlan::Query(query_plan) => {
632                assert!(query_plan.join_config.is_some());
633                let config = query_plan.join_config.unwrap();
634                assert_eq!(config.left_key(), "order_id");
635                assert_eq!(config.right_key(), "order_id");
636            }
637            _ => panic!("Expected Query plan"),
638        }
639    }
640
641    #[test]
642    fn test_plan_query_with_lag() {
643        let mut planner = StreamingPlanner::new();
644        let statements = StreamingParser::parse_sql(
645            "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
646        )
647        .unwrap();
648
649        let plan = planner.plan(&statements[0]).unwrap();
650        match plan {
651            StreamingPlan::Query(query_plan) => {
652                assert!(query_plan.analytic_config.is_some());
653                let config = query_plan.analytic_config.unwrap();
654                assert_eq!(config.functions.len(), 1);
655                assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
656            }
657            _ => panic!("Expected Query plan with analytic config"),
658        }
659    }
660
661    #[test]
662    fn test_plan_query_with_lead() {
663        let mut planner = StreamingPlanner::new();
664        let statements = StreamingParser::parse_sql(
665            "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
666        )
667        .unwrap();
668
669        let plan = planner.plan(&statements[0]).unwrap();
670        match plan {
671            StreamingPlan::Query(query_plan) => {
672                assert!(query_plan.analytic_config.is_some());
673                let config = query_plan.analytic_config.unwrap();
674                assert!(config.has_lookahead());
675                assert_eq!(config.functions[0].offset, 2);
676            }
677            _ => panic!("Expected Query plan with analytic config"),
678        }
679    }
680}