1pub mod channel_derivation;
8
9use std::collections::HashMap;
10
11use datafusion::logical_expr::LogicalPlan;
12use datafusion::prelude::SessionContext;
13use sqlparser::ast::{ObjectName, SetExpr, Statement};
14
15use crate::parser::aggregation_parser::analyze_aggregates;
16use crate::parser::analytic_parser::{
17 analyze_analytic_functions, analyze_window_frames, FrameBound,
18};
19use crate::parser::join_parser::analyze_joins;
20use crate::parser::order_analyzer::analyze_order_by;
21use crate::parser::{
22 CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom, StreamingStatement,
23 WindowFunction, WindowRewriter,
24};
25use crate::translator::{
26 AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
27 OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
28};
29
30pub struct StreamingPlanner {
32 sources: HashMap<String, SourceInfo>,
34 sinks: HashMap<String, SinkInfo>,
36}
37
38#[derive(Debug, Clone)]
40pub struct SourceInfo {
41 pub name: String,
43 pub watermark_column: Option<String>,
45 pub options: HashMap<String, String>,
47}
48
49#[derive(Debug, Clone)]
51pub struct SinkInfo {
52 pub name: String,
54 pub from: String,
56 pub options: HashMap<String, String>,
58}
59
60#[derive(Debug)]
62#[allow(clippy::large_enum_variant)]
63pub enum StreamingPlan {
64 RegisterSource(SourceInfo),
66
67 RegisterSink(SinkInfo),
69
70 Query(QueryPlan),
72
73 Standard(Box<Statement>),
75
76 DagExplain(DagExplainOutput),
78}
79
80#[derive(Debug)]
82pub struct QueryPlan {
83 pub name: Option<String>,
85 pub window_config: Option<WindowOperatorConfig>,
87 pub join_config: Option<Vec<JoinOperatorConfig>>,
89 pub order_config: Option<OrderOperatorConfig>,
91 pub analytic_config: Option<AnalyticWindowConfig>,
93 pub having_config: Option<HavingFilterConfig>,
95 pub frame_config: Option<WindowFrameConfig>,
97 pub emit_clause: Option<EmitClause>,
99 pub statement: Box<Statement>,
101}
102
103impl StreamingPlanner {
104 #[must_use]
106 pub fn new() -> Self {
107 Self {
108 sources: HashMap::new(),
109 sinks: HashMap::new(),
110 }
111 }
112
113 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
119 match statement {
120 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
121 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
122 StreamingStatement::CreateContinuousQuery {
123 name,
124 query,
125 emit_clause,
126 }
127 | StreamingStatement::CreateStream {
128 name,
129 query,
130 emit_clause,
131 ..
132 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
133 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
134 StreamingStatement::DropSource { .. }
135 | StreamingStatement::DropSink { .. }
136 | StreamingStatement::DropStream { .. }
137 | StreamingStatement::DropMaterializedView { .. }
138 | StreamingStatement::Show(_)
139 | StreamingStatement::Describe { .. }
140 | StreamingStatement::Explain { .. }
141 | StreamingStatement::CreateMaterializedView { .. }
142 | StreamingStatement::InsertInto { .. } => {
143 Err(PlanningError::UnsupportedSql(format!(
146 "Statement type {:?} is handled by the database layer, not the planner",
147 std::mem::discriminant(statement)
148 )))
149 }
150 }
151 }
152
153 fn plan_create_source(
155 &mut self,
156 source: &CreateSourceStatement,
157 ) -> Result<StreamingPlan, PlanningError> {
158 let name = object_name_to_string(&source.name);
159
160 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
162 return Err(PlanningError::InvalidQuery(format!(
163 "Source '{}' already exists",
164 name
165 )));
166 }
167
168 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
170
171 let info = SourceInfo {
172 name: name.clone(),
173 watermark_column,
174 options: source.with_options.clone(),
175 };
176
177 self.sources.insert(name, info.clone());
179
180 Ok(StreamingPlan::RegisterSource(info))
181 }
182
183 fn plan_create_sink(
185 &mut self,
186 sink: &CreateSinkStatement,
187 ) -> Result<StreamingPlan, PlanningError> {
188 let name = object_name_to_string(&sink.name);
189
190 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
192 return Err(PlanningError::InvalidQuery(format!(
193 "Sink '{}' already exists",
194 name
195 )));
196 }
197
198 let from = match &sink.from {
200 SinkFrom::Table(table) => object_name_to_string(table),
201 SinkFrom::Query(_) => format!("{}_query", name),
202 };
203
204 let info = SinkInfo {
205 name: name.clone(),
206 from,
207 options: sink.with_options.clone(),
208 };
209
210 self.sinks.insert(name, info.clone());
212
213 Ok(StreamingPlan::RegisterSink(info))
214 }
215
216 #[allow(clippy::unused_self)] fn plan_continuous_query(
219 &mut self,
220 name: &ObjectName,
221 query: &StreamingStatement,
222 emit_clause: Option<&EmitClause>,
223 ) -> Result<StreamingPlan, PlanningError> {
224 let stmt = match query {
226 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
227 _ => {
228 return Err(PlanningError::InvalidQuery(
229 "Continuous query must contain a SELECT statement".to_string(),
230 ))
231 }
232 };
233
234 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
236
237 Ok(StreamingPlan::Query(QueryPlan {
238 name: Some(object_name_to_string(name)),
239 window_config: query_plan.window_config,
240 join_config: query_plan.join_config,
241 order_config: query_plan.order_config,
242 analytic_config: query_plan.analytic_config,
243 having_config: query_plan.having_config,
244 frame_config: query_plan.frame_config,
245 emit_clause: emit_clause.cloned(),
246 statement: Box::new(stmt),
247 }))
248 }
249
250 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
253 if let Statement::Query(query) = stmt {
255 if let SetExpr::Select(select) = query.body.as_ref() {
256 let window_function = Self::extract_window_from_select(select);
258
259 let join_analysis = analyze_joins(select).map_err(|e| {
261 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
262 })?;
263
264 let order_analysis = analyze_order_by(stmt);
266 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
267 .map_err(PlanningError::InvalidQuery)?;
268
269 let analytic_analysis = analyze_analytic_functions(stmt);
271 let analytic_config =
272 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
273
274 let agg_analysis = analyze_aggregates(stmt);
276 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
277
278 let frame_analysis = analyze_window_frames(stmt);
280 let frame_config = frame_analysis
281 .as_ref()
282 .map(WindowFrameConfig::from_analysis);
283
284 if let Some(fa) = &frame_analysis {
286 for f in &fa.functions {
287 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
288 return Err(PlanningError::InvalidQuery(
289 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
290 .to_string(),
291 ));
292 }
293 }
294 }
295
296 let has_streaming_features = window_function.is_some()
297 || join_analysis.is_some()
298 || order_config.is_some()
299 || analytic_config.is_some()
300 || having_config.is_some()
301 || frame_config.is_some();
302
303 if has_streaming_features {
304 let window_config = match window_function {
305 Some(w) => Some(
306 WindowOperatorConfig::from_window_function(&w)
307 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
308 ),
309 None => None,
310 };
311
312 let join_config =
313 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
314
315 return Ok(StreamingPlan::Query(QueryPlan {
316 name: None,
317 window_config,
318 join_config,
319 order_config,
320 analytic_config,
321 having_config,
322 frame_config,
323 emit_clause: None,
324 statement: Box::new(stmt.clone()),
325 }));
326 }
327 }
328 }
329
330 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
332 }
333
334 fn analyze_query(
336 stmt: &Statement,
337 emit_clause: Option<&EmitClause>,
338 ) -> Result<QueryAnalysis, PlanningError> {
339 let mut analysis = QueryAnalysis::default();
340
341 if let Statement::Query(query) = stmt {
342 if let SetExpr::Select(select) = query.body.as_ref() {
343 if let Some(window) = Self::extract_window_from_select(select) {
345 let mut config = WindowOperatorConfig::from_window_function(&window)
346 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
347
348 if let Some(emit) = emit_clause {
350 config = config
351 .with_emit_clause(emit)
352 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
353 }
354
355 analysis.window_config = Some(config);
356 }
357
358 if let Some(multi) = analyze_joins(select).map_err(|e| {
360 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
361 })? {
362 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
363 }
364 }
365 }
366
367 let order_analysis = analyze_order_by(stmt);
369 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
370 .map_err(PlanningError::InvalidQuery)?;
371
372 if let Some(analytic) = analyze_analytic_functions(stmt) {
374 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
375 }
376
377 let agg_analysis = analyze_aggregates(stmt);
379 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
380
381 if let Some(frame_analysis) = analyze_window_frames(stmt) {
383 for f in &frame_analysis.functions {
385 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
386 return Err(PlanningError::InvalidQuery(
387 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
388 .to_string(),
389 ));
390 }
391 }
392 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
393 }
394
395 Ok(analysis)
396 }
397
398 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
400 use sqlparser::ast::GroupByExpr;
402 match &select.group_by {
403 GroupByExpr::Expressions(exprs, _modifiers) => {
404 for group_by_expr in exprs {
405 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
406 {
407 return Some(window);
408 }
409 }
410 }
411 GroupByExpr::All(_) => {}
412 }
413 None
414 }
415
416 #[must_use]
418 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
419 self.sources.get(name)
420 }
421
422 #[must_use]
424 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
425 self.sinks.get(name)
426 }
427
428 #[must_use]
430 pub fn list_sources(&self) -> Vec<&SourceInfo> {
431 self.sources.values().collect()
432 }
433
434 #[must_use]
436 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
437 self.sinks.values().collect()
438 }
439
440 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
458 &self,
459 plan: &QueryPlan,
460 ctx: &SessionContext,
461 ) -> Result<LogicalPlan, PlanningError> {
462 let sql = plan.statement.to_string();
466 ctx.state()
467 .create_logical_plan(&sql)
468 .await
469 .map_err(PlanningError::DataFusion)
470 }
471}
472
473impl Default for StreamingPlanner {
474 fn default() -> Self {
475 Self::new()
476 }
477}
478
479#[derive(Debug, Default)]
481#[allow(clippy::struct_field_names)]
482struct QueryAnalysis {
483 window_config: Option<WindowOperatorConfig>,
484 join_config: Option<Vec<JoinOperatorConfig>>,
485 order_config: Option<OrderOperatorConfig>,
486 analytic_config: Option<AnalyticWindowConfig>,
487 having_config: Option<HavingFilterConfig>,
488 frame_config: Option<WindowFrameConfig>,
489}
490
491fn object_name_to_string(name: &ObjectName) -> String {
493 name.to_string()
494}
495
496#[derive(Debug, thiserror::Error)]
498pub enum PlanningError {
499 #[error("Unsupported SQL: {0}")]
501 UnsupportedSql(String),
502
503 #[error("Invalid query: {0}")]
505 InvalidQuery(String),
506
507 #[error("Source not found: {0}")]
509 SourceNotFound(String),
510
511 #[error("Sink not found: {0}")]
513 SinkNotFound(String),
514
515 #[error("DataFusion error: {0}")]
517 DataFusion(#[from] datafusion_common::DataFusionError),
518}
519
520#[cfg(test)]
521mod tests {
522 use super::*;
523 use crate::parser::StreamingParser;
524
525 #[test]
526 fn test_plan_create_source() {
527 let mut planner = StreamingPlanner::new();
528 let statements =
529 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
530
531 let plan = planner.plan(&statements[0]).unwrap();
532 match plan {
533 StreamingPlan::RegisterSource(info) => {
534 assert_eq!(info.name, "events");
535 }
536 _ => panic!("Expected RegisterSource plan"),
537 }
538 }
539
540 #[test]
541 fn test_plan_create_sink() {
542 let mut planner = StreamingPlanner::new();
543 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
544
545 let plan = planner.plan(&statements[0]).unwrap();
546 match plan {
547 StreamingPlan::RegisterSink(info) => {
548 assert_eq!(info.name, "output");
549 assert_eq!(info.from, "events");
550 }
551 _ => panic!("Expected RegisterSink plan"),
552 }
553 }
554
555 #[test]
556 fn test_plan_duplicate_source() {
557 let mut planner = StreamingPlanner::new();
558
559 let statements =
561 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
562 planner.plan(&statements[0]).unwrap();
563
564 let result = planner.plan(&statements[0]);
566 assert!(result.is_err());
567 }
568
569 #[test]
570 fn test_plan_source_if_not_exists() {
571 let mut planner = StreamingPlanner::new();
572
573 let statements =
575 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
576 planner.plan(&statements[0]).unwrap();
577
578 let statements =
580 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
581 .unwrap();
582 let result = planner.plan(&statements[0]);
583 assert!(result.is_ok());
584 }
585
586 #[test]
587 fn test_plan_source_or_replace() {
588 let mut planner = StreamingPlanner::new();
589
590 let statements =
592 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
593 planner.plan(&statements[0]).unwrap();
594
595 let statements =
597 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
598 .unwrap();
599 let result = planner.plan(&statements[0]);
600 assert!(result.is_ok());
601 }
602
603 #[test]
604 fn test_plan_source_with_watermark() {
605 let mut planner = StreamingPlanner::new();
606 let statements = StreamingParser::parse_sql(
607 "CREATE SOURCE events (
608 id INT,
609 ts TIMESTAMP,
610 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
611 )",
612 )
613 .unwrap();
614
615 let plan = planner.plan(&statements[0]).unwrap();
616 match plan {
617 StreamingPlan::RegisterSource(info) => {
618 assert_eq!(info.name, "events");
619 assert_eq!(info.watermark_column, Some("ts".to_string()));
620 }
621 _ => panic!("Expected RegisterSource plan"),
622 }
623 }
624
625 #[test]
626 fn test_plan_standard_select() {
627 let mut planner = StreamingPlanner::new();
628 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
629
630 let plan = planner.plan(&statements[0]).unwrap();
631 match plan {
632 StreamingPlan::Standard(_) => {}
633 _ => panic!("Expected Standard plan for simple SELECT"),
634 }
635 }
636
637 #[test]
638 fn test_list_sources_and_sinks() {
639 let mut planner = StreamingPlanner::new();
640
641 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
643 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
644 planner.plan(&s1[0]).unwrap();
645 planner.plan(&s2[0]).unwrap();
646
647 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
649 planner.plan(&k1[0]).unwrap();
650
651 assert_eq!(planner.list_sources().len(), 2);
652 assert_eq!(planner.list_sinks().len(), 1);
653 assert!(planner.get_source("src1").is_some());
654 assert!(planner.get_sink("sink1").is_some());
655 }
656
657 #[test]
658 fn test_plan_query_with_window() {
659 let mut planner = StreamingPlanner::new();
660 let statements = StreamingParser::parse_sql(
661 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
662 )
663 .unwrap();
664
665 let plan = planner.plan(&statements[0]).unwrap();
666 match plan {
667 StreamingPlan::Query(query_plan) => {
668 assert!(query_plan.window_config.is_some());
669 let config = query_plan.window_config.unwrap();
670 assert_eq!(config.time_column, "event_time");
671 assert_eq!(config.size.as_secs(), 300);
672 }
673 _ => panic!("Expected Query plan"),
674 }
675 }
676
677 #[test]
678 fn test_plan_query_with_join() {
679 let mut planner = StreamingPlanner::new();
680 let statements = StreamingParser::parse_sql(
681 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
682 )
683 .unwrap();
684
685 let plan = planner.plan(&statements[0]).unwrap();
686 match plan {
687 StreamingPlan::Query(query_plan) => {
688 assert!(query_plan.join_config.is_some());
689 let configs = query_plan.join_config.unwrap();
690 assert_eq!(configs.len(), 1);
691 assert_eq!(configs[0].left_key(), "order_id");
692 assert_eq!(configs[0].right_key(), "order_id");
693 }
694 _ => panic!("Expected Query plan"),
695 }
696 }
697
698 #[test]
699 fn test_plan_query_with_lag() {
700 let mut planner = StreamingPlanner::new();
701 let statements = StreamingParser::parse_sql(
702 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
703 )
704 .unwrap();
705
706 let plan = planner.plan(&statements[0]).unwrap();
707 match plan {
708 StreamingPlan::Query(query_plan) => {
709 assert!(query_plan.analytic_config.is_some());
710 let config = query_plan.analytic_config.unwrap();
711 assert_eq!(config.functions.len(), 1);
712 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
713 }
714 _ => panic!("Expected Query plan with analytic config"),
715 }
716 }
717
718 #[test]
719 fn test_plan_query_with_having() {
720 let mut planner = StreamingPlanner::new();
721 let statements = StreamingParser::parse_sql(
722 "SELECT symbol, COUNT(*) AS cnt FROM trades \
723 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
724 HAVING COUNT(*) > 10",
725 )
726 .unwrap();
727
728 let plan = planner.plan(&statements[0]).unwrap();
729 match plan {
730 StreamingPlan::Query(query_plan) => {
731 assert!(query_plan.window_config.is_some());
732 assert!(query_plan.having_config.is_some());
733 let config = query_plan.having_config.unwrap();
734 assert!(
735 config.predicate().contains("COUNT(*)"),
736 "predicate was: {}",
737 config.predicate()
738 );
739 }
740 _ => panic!("Expected Query plan with having config"),
741 }
742 }
743
744 #[test]
745 fn test_plan_query_without_having() {
746 let mut planner = StreamingPlanner::new();
747 let statements = StreamingParser::parse_sql(
748 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
749 )
750 .unwrap();
751
752 let plan = planner.plan(&statements[0]).unwrap();
753 match plan {
754 StreamingPlan::Query(query_plan) => {
755 assert!(query_plan.having_config.is_none());
756 }
757 _ => panic!("Expected Query plan"),
758 }
759 }
760
761 #[test]
762 fn test_plan_having_only_produces_query_plan() {
763 let mut planner = StreamingPlanner::new();
765 let statements = StreamingParser::parse_sql(
766 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
767 )
768 .unwrap();
769
770 let plan = planner.plan(&statements[0]).unwrap();
771 match plan {
772 StreamingPlan::Query(query_plan) => {
773 assert!(query_plan.having_config.is_some());
774 assert!(query_plan.window_config.is_none());
775 }
776 _ => panic!("Expected Query plan for HAVING-only query"),
777 }
778 }
779
780 #[test]
781 fn test_plan_having_compound_predicate() {
782 let mut planner = StreamingPlanner::new();
783 let statements = StreamingParser::parse_sql(
784 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
785 FROM trades GROUP BY symbol \
786 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
787 )
788 .unwrap();
789
790 let plan = planner.plan(&statements[0]).unwrap();
791 match plan {
792 StreamingPlan::Query(query_plan) => {
793 let config = query_plan.having_config.unwrap();
794 let pred = config.predicate();
795 assert!(pred.contains("AND"), "predicate was: {pred}");
796 }
797 _ => panic!("Expected Query plan"),
798 }
799 }
800
801 #[test]
802 fn test_plan_query_with_lead() {
803 let mut planner = StreamingPlanner::new();
804 let statements = StreamingParser::parse_sql(
805 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
806 )
807 .unwrap();
808
809 let plan = planner.plan(&statements[0]).unwrap();
810 match plan {
811 StreamingPlan::Query(query_plan) => {
812 assert!(query_plan.analytic_config.is_some());
813 let config = query_plan.analytic_config.unwrap();
814 assert!(config.has_lookahead());
815 assert_eq!(config.functions[0].offset, 2);
816 }
817 _ => panic!("Expected Query plan with analytic config"),
818 }
819 }
820
821 #[test]
824 fn test_plan_single_join_produces_vec_of_one() {
825 let mut planner = StreamingPlanner::new();
826 let statements =
827 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
828
829 let plan = planner.plan(&statements[0]).unwrap();
830 match plan {
831 StreamingPlan::Query(qp) => {
832 let configs = qp.join_config.unwrap();
833 assert_eq!(configs.len(), 1);
834 }
835 _ => panic!("Expected Query plan"),
836 }
837 }
838
839 #[test]
840 fn test_plan_two_way_join() {
841 let mut planner = StreamingPlanner::new();
842 let statements = StreamingParser::parse_sql(
843 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
844 )
845 .unwrap();
846
847 let plan = planner.plan(&statements[0]).unwrap();
848 match plan {
849 StreamingPlan::Query(qp) => {
850 let configs = qp.join_config.unwrap();
851 assert_eq!(configs.len(), 2);
852 assert_eq!(configs[0].left_key(), "id");
853 assert_eq!(configs[0].right_key(), "a_id");
854 assert_eq!(configs[1].left_key(), "id");
855 assert_eq!(configs[1].right_key(), "b_id");
856 }
857 _ => panic!("Expected Query plan"),
858 }
859 }
860
861 #[test]
862 fn test_plan_mixed_join_types() {
863 let mut planner = StreamingPlanner::new();
864 let statements = StreamingParser::parse_sql(
865 "SELECT * FROM orders o \
866 JOIN payments p ON o.id = p.order_id \
867 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
868 JOIN customers c ON p.cust_id = c.id",
869 )
870 .unwrap();
871
872 let plan = planner.plan(&statements[0]).unwrap();
873 match plan {
874 StreamingPlan::Query(qp) => {
875 let configs = qp.join_config.unwrap();
876 assert_eq!(configs.len(), 2);
877 assert!(configs[0].is_stream_stream());
878 assert!(configs[1].is_lookup());
879 }
880 _ => panic!("Expected Query plan"),
881 }
882 }
883
884 #[test]
885 fn test_plan_backward_compat_no_join() {
886 let mut planner = StreamingPlanner::new();
887 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
888
889 let plan = planner.plan(&statements[0]).unwrap();
890 match plan {
891 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
893 }
894 }
895
896 #[test]
899 fn test_plan_query_with_rows_frame() {
900 let mut planner = StreamingPlanner::new();
901 let statements = StreamingParser::parse_sql(
902 "SELECT AVG(price) OVER (ORDER BY ts \
903 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
904 )
905 .unwrap();
906
907 let plan = planner.plan(&statements[0]).unwrap();
908 match plan {
909 StreamingPlan::Query(qp) => {
910 assert!(qp.frame_config.is_some());
911 let fc = qp.frame_config.unwrap();
912 assert_eq!(fc.functions.len(), 1);
913 assert_eq!(fc.functions[0].source_column, "price");
914 }
915 _ => panic!("Expected Query plan with frame_config"),
916 }
917 }
918
919 #[test]
920 fn test_plan_frame_with_partition() {
921 let mut planner = StreamingPlanner::new();
922 let statements = StreamingParser::parse_sql(
923 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
924 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
925 )
926 .unwrap();
927
928 let plan = planner.plan(&statements[0]).unwrap();
929 match plan {
930 StreamingPlan::Query(qp) => {
931 let fc = qp.frame_config.unwrap();
932 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
933 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
934 }
935 _ => panic!("Expected Query plan with frame_config"),
936 }
937 }
938
939 #[test]
940 fn test_plan_no_frame_is_standard() {
941 let mut planner = StreamingPlanner::new();
942 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
943
944 let plan = planner.plan(&statements[0]).unwrap();
945 match plan {
946 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
948 }
949 }
950
951 #[test]
952 fn test_plan_unbounded_following_rejected() {
953 let mut planner = StreamingPlanner::new();
954 let statements = StreamingParser::parse_sql(
955 "SELECT SUM(amount) OVER (ORDER BY id \
956 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
957 FROM orders",
958 )
959 .unwrap();
960
961 let result = planner.plan(&statements[0]);
962 assert!(result.is_err());
963 let err = result.unwrap_err().to_string();
964 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
965 }
966}