1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12pub mod streaming_optimizer;
14
15use std::collections::HashMap;
16
17use datafusion::logical_expr::LogicalPlan;
18use datafusion::prelude::SessionContext;
19use sqlparser::ast::{ObjectName, SetExpr, Statement};
20
21use crate::parser::aggregation_parser::analyze_aggregates;
22use crate::parser::analytic_parser::{
23 analyze_analytic_functions, analyze_window_frames, FrameBound,
24};
25use crate::parser::join_parser::analyze_joins;
26use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
27use crate::parser::order_analyzer::analyze_order_by;
28use crate::parser::{
29 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
30 StreamingStatement, WindowFunction, WindowRewriter,
31};
32use crate::translator::{
33 AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
34 OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
35};
36
37#[derive(Debug, Clone)]
39pub struct LookupTableInfo {
40 pub name: String,
42 pub columns: Vec<(String, String)>,
44 pub primary_key: Vec<String>,
46 pub properties: LookupTableProperties,
48}
49
50pub struct StreamingPlanner {
52 sources: HashMap<String, SourceInfo>,
54 sinks: HashMap<String, SinkInfo>,
56 lookup_tables: HashMap<String, LookupTableInfo>,
58}
59
60#[derive(Debug, Clone)]
62pub struct SourceInfo {
63 pub name: String,
65 pub watermark_column: Option<String>,
67 pub options: HashMap<String, String>,
69}
70
71#[derive(Debug, Clone)]
73pub struct SinkInfo {
74 pub name: String,
76 pub from: String,
78 pub options: HashMap<String, String>,
80}
81
82#[derive(Debug)]
84#[allow(clippy::large_enum_variant)]
85pub enum StreamingPlan {
86 RegisterSource(SourceInfo),
88
89 RegisterSink(SinkInfo),
91
92 Query(QueryPlan),
94
95 Standard(Box<Statement>),
97
98 DagExplain(DagExplainOutput),
100
101 RegisterLookupTable(LookupTableInfo),
103
104 DropLookupTable {
106 name: String,
108 },
109}
110
111#[derive(Debug)]
113pub struct QueryPlan {
114 pub name: Option<String>,
116 pub window_config: Option<WindowOperatorConfig>,
118 pub join_config: Option<Vec<JoinOperatorConfig>>,
120 pub order_config: Option<OrderOperatorConfig>,
122 pub analytic_config: Option<AnalyticWindowConfig>,
124 pub having_config: Option<HavingFilterConfig>,
126 pub frame_config: Option<WindowFrameConfig>,
128 pub emit_clause: Option<EmitClause>,
130 pub statement: Box<Statement>,
132}
133
134impl StreamingPlanner {
135 #[must_use]
137 pub fn new() -> Self {
138 Self {
139 sources: HashMap::new(),
140 sinks: HashMap::new(),
141 lookup_tables: HashMap::new(),
142 }
143 }
144
145 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
151 match statement {
152 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
153 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
154 StreamingStatement::CreateContinuousQuery {
155 name,
156 query,
157 emit_clause,
158 }
159 | StreamingStatement::CreateStream {
160 name,
161 query,
162 emit_clause,
163 ..
164 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
165 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
166 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
167 StreamingStatement::DropLookupTable { name, if_exists } => {
168 self.plan_drop_lookup_table(name, *if_exists)
169 }
170 StreamingStatement::DropSource { .. }
171 | StreamingStatement::DropSink { .. }
172 | StreamingStatement::DropStream { .. }
173 | StreamingStatement::DropMaterializedView { .. }
174 | StreamingStatement::Show(_)
175 | StreamingStatement::Describe { .. }
176 | StreamingStatement::Explain { .. }
177 | StreamingStatement::CreateMaterializedView { .. }
178 | StreamingStatement::InsertInto { .. }
179 | StreamingStatement::AlterSource { .. }
180 | StreamingStatement::Checkpoint
181 | StreamingStatement::RestoreCheckpoint { .. } => {
182 Err(PlanningError::UnsupportedSql(format!(
185 "Statement type {:?} is handled by the database layer, not the planner",
186 std::mem::discriminant(statement)
187 )))
188 }
189 }
190 }
191
192 fn plan_create_source(
194 &mut self,
195 source: &CreateSourceStatement,
196 ) -> Result<StreamingPlan, PlanningError> {
197 let name = object_name_to_string(&source.name);
198
199 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
201 return Err(PlanningError::InvalidQuery(format!(
202 "Source '{}' already exists",
203 name
204 )));
205 }
206
207 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
209
210 let info = SourceInfo {
211 name: name.clone(),
212 watermark_column,
213 options: source.with_options.clone(),
214 };
215
216 self.sources.insert(name, info.clone());
218
219 Ok(StreamingPlan::RegisterSource(info))
220 }
221
222 fn plan_create_sink(
224 &mut self,
225 sink: &CreateSinkStatement,
226 ) -> Result<StreamingPlan, PlanningError> {
227 let name = object_name_to_string(&sink.name);
228
229 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
231 return Err(PlanningError::InvalidQuery(format!(
232 "Sink '{}' already exists",
233 name
234 )));
235 }
236
237 let from = match &sink.from {
239 SinkFrom::Table(table) => object_name_to_string(table),
240 SinkFrom::Query(_) => format!("{}_query", name),
241 };
242
243 let info = SinkInfo {
244 name: name.clone(),
245 from,
246 options: sink.with_options.clone(),
247 };
248
249 self.sinks.insert(name, info.clone());
251
252 Ok(StreamingPlan::RegisterSink(info))
253 }
254
255 #[allow(clippy::unused_self)] fn plan_continuous_query(
258 &mut self,
259 name: &ObjectName,
260 query: &StreamingStatement,
261 emit_clause: Option<&EmitClause>,
262 ) -> Result<StreamingPlan, PlanningError> {
263 let stmt = match query {
265 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
266 _ => {
267 return Err(PlanningError::InvalidQuery(
268 "Continuous query must contain a SELECT statement".to_string(),
269 ))
270 }
271 };
272
273 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
275
276 Ok(StreamingPlan::Query(QueryPlan {
277 name: Some(object_name_to_string(name)),
278 window_config: query_plan.window_config,
279 join_config: query_plan.join_config,
280 order_config: query_plan.order_config,
281 analytic_config: query_plan.analytic_config,
282 having_config: query_plan.having_config,
283 frame_config: query_plan.frame_config,
284 emit_clause: emit_clause.cloned(),
285 statement: Box::new(stmt),
286 }))
287 }
288
289 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
292 if let Statement::Query(query) = stmt {
294 if let SetExpr::Select(select) = query.body.as_ref() {
295 let window_function = Self::extract_window_from_select(select);
297
298 let join_analysis = analyze_joins(select).map_err(|e| {
300 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
301 })?;
302
303 let order_analysis = analyze_order_by(stmt);
305 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
306 .map_err(PlanningError::InvalidQuery)?;
307
308 let analytic_analysis = analyze_analytic_functions(stmt);
310 let analytic_config =
311 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
312
313 let agg_analysis = analyze_aggregates(stmt);
315 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
316
317 let frame_analysis = analyze_window_frames(stmt);
319 let frame_config = frame_analysis
320 .as_ref()
321 .map(WindowFrameConfig::from_analysis);
322
323 if let Some(fa) = &frame_analysis {
325 for f in &fa.functions {
326 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
327 return Err(PlanningError::InvalidQuery(
328 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
329 .to_string(),
330 ));
331 }
332 }
333 }
334
335 let has_streaming_features = window_function.is_some()
336 || join_analysis.is_some()
337 || order_config.is_some()
338 || analytic_config.is_some()
339 || having_config.is_some()
340 || frame_config.is_some();
341
342 if has_streaming_features {
343 let window_config = match window_function {
344 Some(w) => Some(
345 WindowOperatorConfig::from_window_function(&w)
346 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
347 ),
348 None => None,
349 };
350
351 let join_config =
352 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
353
354 return Ok(StreamingPlan::Query(QueryPlan {
355 name: None,
356 window_config,
357 join_config,
358 order_config,
359 analytic_config,
360 having_config,
361 frame_config,
362 emit_clause: None,
363 statement: Box::new(stmt.clone()),
364 }));
365 }
366 }
367 }
368
369 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
371 }
372
373 fn analyze_query(
375 stmt: &Statement,
376 emit_clause: Option<&EmitClause>,
377 ) -> Result<QueryAnalysis, PlanningError> {
378 let mut analysis = QueryAnalysis::default();
379
380 if let Statement::Query(query) = stmt {
381 if let SetExpr::Select(select) = query.body.as_ref() {
382 if let Some(window) = Self::extract_window_from_select(select) {
384 let mut config = WindowOperatorConfig::from_window_function(&window)
385 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
386
387 if let Some(emit) = emit_clause {
389 config = config
390 .with_emit_clause(emit)
391 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
392 }
393
394 analysis.window_config = Some(config);
395 }
396
397 if let Some(multi) = analyze_joins(select).map_err(|e| {
399 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
400 })? {
401 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
402 }
403 }
404 }
405
406 let order_analysis = analyze_order_by(stmt);
408 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
409 .map_err(PlanningError::InvalidQuery)?;
410
411 if let Some(analytic) = analyze_analytic_functions(stmt) {
413 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
414 }
415
416 let agg_analysis = analyze_aggregates(stmt);
418 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
419
420 if let Some(frame_analysis) = analyze_window_frames(stmt) {
422 for f in &frame_analysis.functions {
424 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
425 return Err(PlanningError::InvalidQuery(
426 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
427 .to_string(),
428 ));
429 }
430 }
431 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
432 }
433
434 Ok(analysis)
435 }
436
437 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
439 use sqlparser::ast::GroupByExpr;
441 match &select.group_by {
442 GroupByExpr::Expressions(exprs, _modifiers) => {
443 for group_by_expr in exprs {
444 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
445 {
446 return Some(window);
447 }
448 }
449 }
450 GroupByExpr::All(_) => {}
451 }
452 None
453 }
454
455 fn plan_create_lookup_table(
457 &mut self,
458 lt: &CreateLookupTableStatement,
459 ) -> Result<StreamingPlan, PlanningError> {
460 let name = object_name_to_string(<.name);
461
462 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
463 return Err(PlanningError::InvalidQuery(format!(
464 "Lookup table '{}' already exists",
465 name
466 )));
467 }
468
469 let columns: Vec<(String, String)> = lt
470 .columns
471 .iter()
472 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
473 .collect();
474
475 let properties = validate_properties(<.with_options).map_err(|e| {
476 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
477 })?;
478
479 let info = LookupTableInfo {
480 name: name.clone(),
481 columns,
482 primary_key: lt.primary_key.clone(),
483 properties,
484 };
485
486 self.lookup_tables.insert(name, info.clone());
487
488 Ok(StreamingPlan::RegisterLookupTable(info))
489 }
490
491 fn plan_drop_lookup_table(
493 &mut self,
494 name: &ObjectName,
495 if_exists: bool,
496 ) -> Result<StreamingPlan, PlanningError> {
497 let name_str = object_name_to_string(name);
498
499 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
500 return Err(PlanningError::InvalidQuery(format!(
501 "Lookup table '{}' does not exist",
502 name_str
503 )));
504 }
505
506 self.lookup_tables.remove(&name_str);
507
508 Ok(StreamingPlan::DropLookupTable { name: name_str })
509 }
510
511 #[must_use]
513 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
514 self.sources.get(name)
515 }
516
517 #[must_use]
519 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
520 self.sinks.get(name)
521 }
522
523 #[must_use]
525 pub fn list_sources(&self) -> Vec<&SourceInfo> {
526 self.sources.values().collect()
527 }
528
529 #[must_use]
531 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
532 self.sinks.values().collect()
533 }
534
535 #[must_use]
537 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
538 self.lookup_tables.get(name)
539 }
540
541 #[must_use]
543 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
544 self.lookup_tables.values().collect()
545 }
546
547 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
565 &self,
566 plan: &QueryPlan,
567 ctx: &SessionContext,
568 ) -> Result<LogicalPlan, PlanningError> {
569 let sql = plan.statement.to_string();
573 ctx.state()
574 .create_logical_plan(&sql)
575 .await
576 .map_err(PlanningError::DataFusion)
577 }
578}
579
580impl Default for StreamingPlanner {
581 fn default() -> Self {
582 Self::new()
583 }
584}
585
586#[derive(Debug, Default)]
588#[allow(clippy::struct_field_names)]
589struct QueryAnalysis {
590 window_config: Option<WindowOperatorConfig>,
591 join_config: Option<Vec<JoinOperatorConfig>>,
592 order_config: Option<OrderOperatorConfig>,
593 analytic_config: Option<AnalyticWindowConfig>,
594 having_config: Option<HavingFilterConfig>,
595 frame_config: Option<WindowFrameConfig>,
596}
597
598fn object_name_to_string(name: &ObjectName) -> String {
600 name.to_string()
601}
602
603#[derive(Debug, thiserror::Error)]
605pub enum PlanningError {
606 UnsupportedSql(String),
608
609 InvalidQuery(String),
611
612 SourceNotFound(String),
614
615 SinkNotFound(String),
617
618 DataFusion(#[from] datafusion_common::DataFusionError),
620}
621
622impl std::fmt::Display for PlanningError {
623 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
624 match self {
625 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
626 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
627 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
628 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
629 Self::DataFusion(e) => {
630 let translated = crate::error::translate_datafusion_error(&e.to_string());
631 write!(f, "{translated}")
632 }
633 }
634 }
635}
636
637#[cfg(test)]
638mod tests {
639 use super::*;
640 use crate::parser::StreamingParser;
641
642 #[test]
643 fn test_plan_create_source() {
644 let mut planner = StreamingPlanner::new();
645 let statements =
646 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
647
648 let plan = planner.plan(&statements[0]).unwrap();
649 match plan {
650 StreamingPlan::RegisterSource(info) => {
651 assert_eq!(info.name, "events");
652 }
653 _ => panic!("Expected RegisterSource plan"),
654 }
655 }
656
657 #[test]
658 fn test_plan_create_sink() {
659 let mut planner = StreamingPlanner::new();
660 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
661
662 let plan = planner.plan(&statements[0]).unwrap();
663 match plan {
664 StreamingPlan::RegisterSink(info) => {
665 assert_eq!(info.name, "output");
666 assert_eq!(info.from, "events");
667 }
668 _ => panic!("Expected RegisterSink plan"),
669 }
670 }
671
672 #[test]
673 fn test_plan_duplicate_source() {
674 let mut planner = StreamingPlanner::new();
675
676 let statements =
678 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
679 planner.plan(&statements[0]).unwrap();
680
681 let result = planner.plan(&statements[0]);
683 assert!(result.is_err());
684 }
685
686 #[test]
687 fn test_plan_source_if_not_exists() {
688 let mut planner = StreamingPlanner::new();
689
690 let statements =
692 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
693 planner.plan(&statements[0]).unwrap();
694
695 let statements =
697 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
698 .unwrap();
699 let result = planner.plan(&statements[0]);
700 assert!(result.is_ok());
701 }
702
703 #[test]
704 fn test_plan_source_or_replace() {
705 let mut planner = StreamingPlanner::new();
706
707 let statements =
709 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
710 planner.plan(&statements[0]).unwrap();
711
712 let statements =
714 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
715 .unwrap();
716 let result = planner.plan(&statements[0]);
717 assert!(result.is_ok());
718 }
719
720 #[test]
721 fn test_plan_source_with_watermark() {
722 let mut planner = StreamingPlanner::new();
723 let statements = StreamingParser::parse_sql(
724 "CREATE SOURCE events (
725 id INT,
726 ts TIMESTAMP,
727 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
728 )",
729 )
730 .unwrap();
731
732 let plan = planner.plan(&statements[0]).unwrap();
733 match plan {
734 StreamingPlan::RegisterSource(info) => {
735 assert_eq!(info.name, "events");
736 assert_eq!(info.watermark_column, Some("ts".to_string()));
737 }
738 _ => panic!("Expected RegisterSource plan"),
739 }
740 }
741
742 #[test]
743 fn test_plan_standard_select() {
744 let mut planner = StreamingPlanner::new();
745 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
746
747 let plan = planner.plan(&statements[0]).unwrap();
748 match plan {
749 StreamingPlan::Standard(_) => {}
750 _ => panic!("Expected Standard plan for simple SELECT"),
751 }
752 }
753
754 #[test]
755 fn test_list_sources_and_sinks() {
756 let mut planner = StreamingPlanner::new();
757
758 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
760 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
761 planner.plan(&s1[0]).unwrap();
762 planner.plan(&s2[0]).unwrap();
763
764 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
766 planner.plan(&k1[0]).unwrap();
767
768 assert_eq!(planner.list_sources().len(), 2);
769 assert_eq!(planner.list_sinks().len(), 1);
770 assert!(planner.get_source("src1").is_some());
771 assert!(planner.get_sink("sink1").is_some());
772 }
773
774 #[test]
775 fn test_plan_query_with_window() {
776 let mut planner = StreamingPlanner::new();
777 let statements = StreamingParser::parse_sql(
778 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
779 )
780 .unwrap();
781
782 let plan = planner.plan(&statements[0]).unwrap();
783 match plan {
784 StreamingPlan::Query(query_plan) => {
785 assert!(query_plan.window_config.is_some());
786 let config = query_plan.window_config.unwrap();
787 assert_eq!(config.time_column, "event_time");
788 assert_eq!(config.size.as_secs(), 300);
789 }
790 _ => panic!("Expected Query plan"),
791 }
792 }
793
794 #[test]
795 fn test_plan_query_with_join() {
796 let mut planner = StreamingPlanner::new();
797 let statements = StreamingParser::parse_sql(
798 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
799 )
800 .unwrap();
801
802 let plan = planner.plan(&statements[0]).unwrap();
803 match plan {
804 StreamingPlan::Query(query_plan) => {
805 assert!(query_plan.join_config.is_some());
806 let configs = query_plan.join_config.unwrap();
807 assert_eq!(configs.len(), 1);
808 assert_eq!(configs[0].left_key(), "order_id");
809 assert_eq!(configs[0].right_key(), "order_id");
810 }
811 _ => panic!("Expected Query plan"),
812 }
813 }
814
815 #[test]
816 fn test_plan_query_with_lag() {
817 let mut planner = StreamingPlanner::new();
818 let statements = StreamingParser::parse_sql(
819 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
820 )
821 .unwrap();
822
823 let plan = planner.plan(&statements[0]).unwrap();
824 match plan {
825 StreamingPlan::Query(query_plan) => {
826 assert!(query_plan.analytic_config.is_some());
827 let config = query_plan.analytic_config.unwrap();
828 assert_eq!(config.functions.len(), 1);
829 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
830 }
831 _ => panic!("Expected Query plan with analytic config"),
832 }
833 }
834
835 #[test]
836 fn test_plan_query_with_having() {
837 let mut planner = StreamingPlanner::new();
838 let statements = StreamingParser::parse_sql(
839 "SELECT symbol, COUNT(*) AS cnt FROM trades \
840 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
841 HAVING COUNT(*) > 10",
842 )
843 .unwrap();
844
845 let plan = planner.plan(&statements[0]).unwrap();
846 match plan {
847 StreamingPlan::Query(query_plan) => {
848 assert!(query_plan.window_config.is_some());
849 assert!(query_plan.having_config.is_some());
850 let config = query_plan.having_config.unwrap();
851 assert!(
852 config.predicate().contains("COUNT(*)"),
853 "predicate was: {}",
854 config.predicate()
855 );
856 }
857 _ => panic!("Expected Query plan with having config"),
858 }
859 }
860
861 #[test]
862 fn test_plan_query_without_having() {
863 let mut planner = StreamingPlanner::new();
864 let statements = StreamingParser::parse_sql(
865 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
866 )
867 .unwrap();
868
869 let plan = planner.plan(&statements[0]).unwrap();
870 match plan {
871 StreamingPlan::Query(query_plan) => {
872 assert!(query_plan.having_config.is_none());
873 }
874 _ => panic!("Expected Query plan"),
875 }
876 }
877
878 #[test]
879 fn test_plan_having_only_produces_query_plan() {
880 let mut planner = StreamingPlanner::new();
882 let statements = StreamingParser::parse_sql(
883 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
884 )
885 .unwrap();
886
887 let plan = planner.plan(&statements[0]).unwrap();
888 match plan {
889 StreamingPlan::Query(query_plan) => {
890 assert!(query_plan.having_config.is_some());
891 assert!(query_plan.window_config.is_none());
892 }
893 _ => panic!("Expected Query plan for HAVING-only query"),
894 }
895 }
896
897 #[test]
898 fn test_plan_having_compound_predicate() {
899 let mut planner = StreamingPlanner::new();
900 let statements = StreamingParser::parse_sql(
901 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
902 FROM trades GROUP BY symbol \
903 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
904 )
905 .unwrap();
906
907 let plan = planner.plan(&statements[0]).unwrap();
908 match plan {
909 StreamingPlan::Query(query_plan) => {
910 let config = query_plan.having_config.unwrap();
911 let pred = config.predicate();
912 assert!(pred.contains("AND"), "predicate was: {pred}");
913 }
914 _ => panic!("Expected Query plan"),
915 }
916 }
917
918 #[test]
919 fn test_plan_query_with_lead() {
920 let mut planner = StreamingPlanner::new();
921 let statements = StreamingParser::parse_sql(
922 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
923 )
924 .unwrap();
925
926 let plan = planner.plan(&statements[0]).unwrap();
927 match plan {
928 StreamingPlan::Query(query_plan) => {
929 assert!(query_plan.analytic_config.is_some());
930 let config = query_plan.analytic_config.unwrap();
931 assert!(config.has_lookahead());
932 assert_eq!(config.functions[0].offset, 2);
933 }
934 _ => panic!("Expected Query plan with analytic config"),
935 }
936 }
937
938 #[test]
941 fn test_plan_single_join_produces_vec_of_one() {
942 let mut planner = StreamingPlanner::new();
943 let statements =
944 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
945
946 let plan = planner.plan(&statements[0]).unwrap();
947 match plan {
948 StreamingPlan::Query(qp) => {
949 let configs = qp.join_config.unwrap();
950 assert_eq!(configs.len(), 1);
951 }
952 _ => panic!("Expected Query plan"),
953 }
954 }
955
956 #[test]
957 fn test_plan_two_way_join() {
958 let mut planner = StreamingPlanner::new();
959 let statements = StreamingParser::parse_sql(
960 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
961 )
962 .unwrap();
963
964 let plan = planner.plan(&statements[0]).unwrap();
965 match plan {
966 StreamingPlan::Query(qp) => {
967 let configs = qp.join_config.unwrap();
968 assert_eq!(configs.len(), 2);
969 assert_eq!(configs[0].left_key(), "id");
970 assert_eq!(configs[0].right_key(), "a_id");
971 assert_eq!(configs[1].left_key(), "id");
972 assert_eq!(configs[1].right_key(), "b_id");
973 }
974 _ => panic!("Expected Query plan"),
975 }
976 }
977
978 #[test]
979 fn test_plan_mixed_join_types() {
980 let mut planner = StreamingPlanner::new();
981 let statements = StreamingParser::parse_sql(
982 "SELECT * FROM orders o \
983 JOIN payments p ON o.id = p.order_id \
984 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
985 JOIN customers c ON p.cust_id = c.id",
986 )
987 .unwrap();
988
989 let plan = planner.plan(&statements[0]).unwrap();
990 match plan {
991 StreamingPlan::Query(qp) => {
992 let configs = qp.join_config.unwrap();
993 assert_eq!(configs.len(), 2);
994 assert!(configs[0].is_stream_stream());
995 assert!(configs[1].is_lookup());
996 }
997 _ => panic!("Expected Query plan"),
998 }
999 }
1000
1001 #[test]
1002 fn test_plan_backward_compat_no_join() {
1003 let mut planner = StreamingPlanner::new();
1004 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1005
1006 let plan = planner.plan(&statements[0]).unwrap();
1007 match plan {
1008 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1010 }
1011 }
1012
1013 #[test]
1016 fn test_plan_query_with_rows_frame() {
1017 let mut planner = StreamingPlanner::new();
1018 let statements = StreamingParser::parse_sql(
1019 "SELECT AVG(price) OVER (ORDER BY ts \
1020 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1021 )
1022 .unwrap();
1023
1024 let plan = planner.plan(&statements[0]).unwrap();
1025 match plan {
1026 StreamingPlan::Query(qp) => {
1027 assert!(qp.frame_config.is_some());
1028 let fc = qp.frame_config.unwrap();
1029 assert_eq!(fc.functions.len(), 1);
1030 assert_eq!(fc.functions[0].source_column, "price");
1031 }
1032 _ => panic!("Expected Query plan with frame_config"),
1033 }
1034 }
1035
1036 #[test]
1037 fn test_plan_frame_with_partition() {
1038 let mut planner = StreamingPlanner::new();
1039 let statements = StreamingParser::parse_sql(
1040 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1041 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1042 )
1043 .unwrap();
1044
1045 let plan = planner.plan(&statements[0]).unwrap();
1046 match plan {
1047 StreamingPlan::Query(qp) => {
1048 let fc = qp.frame_config.unwrap();
1049 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1050 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1051 }
1052 _ => panic!("Expected Query plan with frame_config"),
1053 }
1054 }
1055
1056 #[test]
1057 fn test_plan_no_frame_is_standard() {
1058 let mut planner = StreamingPlanner::new();
1059 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1060
1061 let plan = planner.plan(&statements[0]).unwrap();
1062 match plan {
1063 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1065 }
1066 }
1067
1068 #[test]
1069 fn test_plan_unbounded_following_rejected() {
1070 let mut planner = StreamingPlanner::new();
1071 let statements = StreamingParser::parse_sql(
1072 "SELECT SUM(amount) OVER (ORDER BY id \
1073 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1074 FROM orders",
1075 )
1076 .unwrap();
1077
1078 let result = planner.plan(&statements[0]);
1079 assert!(result.is_err());
1080 let err = result.unwrap_err().to_string();
1081 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1082 }
1083}