1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12
13use std::collections::HashMap;
14
15use datafusion::logical_expr::LogicalPlan;
16use datafusion::prelude::SessionContext;
17use sqlparser::ast::{ObjectName, SetExpr, Statement};
18
19use crate::parser::aggregation_parser::analyze_aggregates;
20use crate::parser::analytic_parser::{
21 analyze_analytic_functions, analyze_window_frames, FrameBound,
22};
23use crate::parser::join_parser::analyze_joins;
24use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
25use crate::parser::order_analyzer::analyze_order_by;
26use crate::parser::{
27 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
28 StreamingStatement, WindowFunction, WindowRewriter,
29};
30use crate::translator::{
31 AnalyticWindowConfig, DagExplainOutput, HavingFilterConfig, JoinOperatorConfig,
32 OrderOperatorConfig, WindowFrameConfig, WindowOperatorConfig,
33};
34
35#[derive(Debug, Clone)]
37pub struct LookupTableInfo {
38 pub name: String,
40 pub columns: Vec<(String, String)>,
42 pub primary_key: Vec<String>,
44 pub properties: LookupTableProperties,
46}
47
48pub struct StreamingPlanner {
50 sources: HashMap<String, SourceInfo>,
52 sinks: HashMap<String, SinkInfo>,
54 lookup_tables: HashMap<String, LookupTableInfo>,
56}
57
58#[derive(Debug, Clone)]
60pub struct SourceInfo {
61 pub name: String,
63 pub watermark_column: Option<String>,
65 pub options: HashMap<String, String>,
67}
68
69#[derive(Debug, Clone)]
71pub struct SinkInfo {
72 pub name: String,
74 pub from: String,
76 pub options: HashMap<String, String>,
78}
79
80#[derive(Debug)]
82#[allow(clippy::large_enum_variant)]
83pub enum StreamingPlan {
84 RegisterSource(SourceInfo),
86
87 RegisterSink(SinkInfo),
89
90 Query(QueryPlan),
92
93 Standard(Box<Statement>),
95
96 DagExplain(DagExplainOutput),
98
99 RegisterLookupTable(LookupTableInfo),
101
102 DropLookupTable {
104 name: String,
106 },
107}
108
109#[derive(Debug)]
111pub struct QueryPlan {
112 pub name: Option<String>,
114 pub window_config: Option<WindowOperatorConfig>,
116 pub join_config: Option<Vec<JoinOperatorConfig>>,
118 pub order_config: Option<OrderOperatorConfig>,
120 pub analytic_config: Option<AnalyticWindowConfig>,
122 pub having_config: Option<HavingFilterConfig>,
124 pub frame_config: Option<WindowFrameConfig>,
126 pub emit_clause: Option<EmitClause>,
128 pub statement: Box<Statement>,
130}
131
132impl StreamingPlanner {
133 #[must_use]
135 pub fn new() -> Self {
136 Self {
137 sources: HashMap::new(),
138 sinks: HashMap::new(),
139 lookup_tables: HashMap::new(),
140 }
141 }
142
143 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
149 match statement {
150 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
151 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
152 StreamingStatement::CreateContinuousQuery {
153 name,
154 query,
155 emit_clause,
156 }
157 | StreamingStatement::CreateStream {
158 name,
159 query,
160 emit_clause,
161 ..
162 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
163 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
164 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
165 StreamingStatement::DropLookupTable { name, if_exists } => {
166 self.plan_drop_lookup_table(name, *if_exists)
167 }
168 StreamingStatement::DropSource { .. }
169 | StreamingStatement::DropSink { .. }
170 | StreamingStatement::DropStream { .. }
171 | StreamingStatement::DropMaterializedView { .. }
172 | StreamingStatement::Show(_)
173 | StreamingStatement::Describe { .. }
174 | StreamingStatement::Explain { .. }
175 | StreamingStatement::CreateMaterializedView { .. }
176 | StreamingStatement::InsertInto { .. }
177 | StreamingStatement::AlterSource { .. } => {
178 Err(PlanningError::UnsupportedSql(format!(
181 "Statement type {:?} is handled by the database layer, not the planner",
182 std::mem::discriminant(statement)
183 )))
184 }
185 }
186 }
187
188 fn plan_create_source(
190 &mut self,
191 source: &CreateSourceStatement,
192 ) -> Result<StreamingPlan, PlanningError> {
193 let name = object_name_to_string(&source.name);
194
195 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
197 return Err(PlanningError::InvalidQuery(format!(
198 "Source '{}' already exists",
199 name
200 )));
201 }
202
203 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
205
206 let info = SourceInfo {
207 name: name.clone(),
208 watermark_column,
209 options: source.with_options.clone(),
210 };
211
212 self.sources.insert(name, info.clone());
214
215 Ok(StreamingPlan::RegisterSource(info))
216 }
217
218 fn plan_create_sink(
220 &mut self,
221 sink: &CreateSinkStatement,
222 ) -> Result<StreamingPlan, PlanningError> {
223 let name = object_name_to_string(&sink.name);
224
225 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
227 return Err(PlanningError::InvalidQuery(format!(
228 "Sink '{}' already exists",
229 name
230 )));
231 }
232
233 let from = match &sink.from {
235 SinkFrom::Table(table) => object_name_to_string(table),
236 SinkFrom::Query(_) => format!("{}_query", name),
237 };
238
239 let info = SinkInfo {
240 name: name.clone(),
241 from,
242 options: sink.with_options.clone(),
243 };
244
245 self.sinks.insert(name, info.clone());
247
248 Ok(StreamingPlan::RegisterSink(info))
249 }
250
251 #[allow(clippy::unused_self)] fn plan_continuous_query(
254 &mut self,
255 name: &ObjectName,
256 query: &StreamingStatement,
257 emit_clause: Option<&EmitClause>,
258 ) -> Result<StreamingPlan, PlanningError> {
259 let stmt = match query {
261 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
262 _ => {
263 return Err(PlanningError::InvalidQuery(
264 "Continuous query must contain a SELECT statement".to_string(),
265 ))
266 }
267 };
268
269 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
271
272 Ok(StreamingPlan::Query(QueryPlan {
273 name: Some(object_name_to_string(name)),
274 window_config: query_plan.window_config,
275 join_config: query_plan.join_config,
276 order_config: query_plan.order_config,
277 analytic_config: query_plan.analytic_config,
278 having_config: query_plan.having_config,
279 frame_config: query_plan.frame_config,
280 emit_clause: emit_clause.cloned(),
281 statement: Box::new(stmt),
282 }))
283 }
284
285 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
288 if let Statement::Query(query) = stmt {
290 if let SetExpr::Select(select) = query.body.as_ref() {
291 let window_function = Self::extract_window_from_select(select);
293
294 let join_analysis = analyze_joins(select).map_err(|e| {
296 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
297 })?;
298
299 let order_analysis = analyze_order_by(stmt);
301 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
302 .map_err(PlanningError::InvalidQuery)?;
303
304 let analytic_analysis = analyze_analytic_functions(stmt);
306 let analytic_config =
307 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
308
309 let agg_analysis = analyze_aggregates(stmt);
311 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
312
313 let frame_analysis = analyze_window_frames(stmt);
315 let frame_config = frame_analysis
316 .as_ref()
317 .map(WindowFrameConfig::from_analysis);
318
319 if let Some(fa) = &frame_analysis {
321 for f in &fa.functions {
322 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
323 return Err(PlanningError::InvalidQuery(
324 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
325 .to_string(),
326 ));
327 }
328 }
329 }
330
331 let has_streaming_features = window_function.is_some()
332 || join_analysis.is_some()
333 || order_config.is_some()
334 || analytic_config.is_some()
335 || having_config.is_some()
336 || frame_config.is_some();
337
338 if has_streaming_features {
339 let window_config = match window_function {
340 Some(w) => Some(
341 WindowOperatorConfig::from_window_function(&w)
342 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
343 ),
344 None => None,
345 };
346
347 let join_config =
348 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
349
350 return Ok(StreamingPlan::Query(QueryPlan {
351 name: None,
352 window_config,
353 join_config,
354 order_config,
355 analytic_config,
356 having_config,
357 frame_config,
358 emit_clause: None,
359 statement: Box::new(stmt.clone()),
360 }));
361 }
362 }
363 }
364
365 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
367 }
368
369 fn analyze_query(
371 stmt: &Statement,
372 emit_clause: Option<&EmitClause>,
373 ) -> Result<QueryAnalysis, PlanningError> {
374 let mut analysis = QueryAnalysis::default();
375
376 if let Statement::Query(query) = stmt {
377 if let SetExpr::Select(select) = query.body.as_ref() {
378 if let Some(window) = Self::extract_window_from_select(select) {
380 let mut config = WindowOperatorConfig::from_window_function(&window)
381 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
382
383 if let Some(emit) = emit_clause {
385 config = config
386 .with_emit_clause(emit)
387 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
388 }
389
390 analysis.window_config = Some(config);
391 }
392
393 if let Some(multi) = analyze_joins(select).map_err(|e| {
395 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
396 })? {
397 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
398 }
399 }
400 }
401
402 let order_analysis = analyze_order_by(stmt);
404 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
405 .map_err(PlanningError::InvalidQuery)?;
406
407 if let Some(analytic) = analyze_analytic_functions(stmt) {
409 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
410 }
411
412 let agg_analysis = analyze_aggregates(stmt);
414 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
415
416 if let Some(frame_analysis) = analyze_window_frames(stmt) {
418 for f in &frame_analysis.functions {
420 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
421 return Err(PlanningError::InvalidQuery(
422 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
423 .to_string(),
424 ));
425 }
426 }
427 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
428 }
429
430 Ok(analysis)
431 }
432
433 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
435 use sqlparser::ast::GroupByExpr;
437 match &select.group_by {
438 GroupByExpr::Expressions(exprs, _modifiers) => {
439 for group_by_expr in exprs {
440 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
441 {
442 return Some(window);
443 }
444 }
445 }
446 GroupByExpr::All(_) => {}
447 }
448 None
449 }
450
451 fn plan_create_lookup_table(
453 &mut self,
454 lt: &CreateLookupTableStatement,
455 ) -> Result<StreamingPlan, PlanningError> {
456 let name = object_name_to_string(<.name);
457
458 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
459 return Err(PlanningError::InvalidQuery(format!(
460 "Lookup table '{}' already exists",
461 name
462 )));
463 }
464
465 let columns: Vec<(String, String)> = lt
466 .columns
467 .iter()
468 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
469 .collect();
470
471 let properties = validate_properties(<.with_options).map_err(|e| {
472 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
473 })?;
474
475 let info = LookupTableInfo {
476 name: name.clone(),
477 columns,
478 primary_key: lt.primary_key.clone(),
479 properties,
480 };
481
482 self.lookup_tables.insert(name, info.clone());
483
484 Ok(StreamingPlan::RegisterLookupTable(info))
485 }
486
487 fn plan_drop_lookup_table(
489 &mut self,
490 name: &ObjectName,
491 if_exists: bool,
492 ) -> Result<StreamingPlan, PlanningError> {
493 let name_str = object_name_to_string(name);
494
495 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
496 return Err(PlanningError::InvalidQuery(format!(
497 "Lookup table '{}' does not exist",
498 name_str
499 )));
500 }
501
502 self.lookup_tables.remove(&name_str);
503
504 Ok(StreamingPlan::DropLookupTable { name: name_str })
505 }
506
507 #[must_use]
509 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
510 self.sources.get(name)
511 }
512
513 #[must_use]
515 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
516 self.sinks.get(name)
517 }
518
519 #[must_use]
521 pub fn list_sources(&self) -> Vec<&SourceInfo> {
522 self.sources.values().collect()
523 }
524
525 #[must_use]
527 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
528 self.sinks.values().collect()
529 }
530
531 #[must_use]
533 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
534 self.lookup_tables.get(name)
535 }
536
537 #[must_use]
539 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
540 self.lookup_tables.values().collect()
541 }
542
543 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
561 &self,
562 plan: &QueryPlan,
563 ctx: &SessionContext,
564 ) -> Result<LogicalPlan, PlanningError> {
565 let sql = plan.statement.to_string();
569 ctx.state()
570 .create_logical_plan(&sql)
571 .await
572 .map_err(PlanningError::DataFusion)
573 }
574}
575
576impl Default for StreamingPlanner {
577 fn default() -> Self {
578 Self::new()
579 }
580}
581
582#[derive(Debug, Default)]
584#[allow(clippy::struct_field_names)]
585struct QueryAnalysis {
586 window_config: Option<WindowOperatorConfig>,
587 join_config: Option<Vec<JoinOperatorConfig>>,
588 order_config: Option<OrderOperatorConfig>,
589 analytic_config: Option<AnalyticWindowConfig>,
590 having_config: Option<HavingFilterConfig>,
591 frame_config: Option<WindowFrameConfig>,
592}
593
594fn object_name_to_string(name: &ObjectName) -> String {
596 name.to_string()
597}
598
599#[derive(Debug, thiserror::Error)]
601pub enum PlanningError {
602 UnsupportedSql(String),
604
605 InvalidQuery(String),
607
608 SourceNotFound(String),
610
611 SinkNotFound(String),
613
614 DataFusion(#[from] datafusion_common::DataFusionError),
616}
617
618impl std::fmt::Display for PlanningError {
619 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
620 match self {
621 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
622 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
623 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
624 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
625 Self::DataFusion(e) => {
626 let translated = crate::error::translate_datafusion_error(&e.to_string());
627 write!(f, "{translated}")
628 }
629 }
630 }
631}
632
633#[cfg(test)]
634mod tests {
635 use super::*;
636 use crate::parser::StreamingParser;
637
638 #[test]
639 fn test_plan_create_source() {
640 let mut planner = StreamingPlanner::new();
641 let statements =
642 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
643
644 let plan = planner.plan(&statements[0]).unwrap();
645 match plan {
646 StreamingPlan::RegisterSource(info) => {
647 assert_eq!(info.name, "events");
648 }
649 _ => panic!("Expected RegisterSource plan"),
650 }
651 }
652
653 #[test]
654 fn test_plan_create_sink() {
655 let mut planner = StreamingPlanner::new();
656 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
657
658 let plan = planner.plan(&statements[0]).unwrap();
659 match plan {
660 StreamingPlan::RegisterSink(info) => {
661 assert_eq!(info.name, "output");
662 assert_eq!(info.from, "events");
663 }
664 _ => panic!("Expected RegisterSink plan"),
665 }
666 }
667
668 #[test]
669 fn test_plan_duplicate_source() {
670 let mut planner = StreamingPlanner::new();
671
672 let statements =
674 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
675 planner.plan(&statements[0]).unwrap();
676
677 let result = planner.plan(&statements[0]);
679 assert!(result.is_err());
680 }
681
682 #[test]
683 fn test_plan_source_if_not_exists() {
684 let mut planner = StreamingPlanner::new();
685
686 let statements =
688 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
689 planner.plan(&statements[0]).unwrap();
690
691 let statements =
693 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
694 .unwrap();
695 let result = planner.plan(&statements[0]);
696 assert!(result.is_ok());
697 }
698
699 #[test]
700 fn test_plan_source_or_replace() {
701 let mut planner = StreamingPlanner::new();
702
703 let statements =
705 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
706 planner.plan(&statements[0]).unwrap();
707
708 let statements =
710 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
711 .unwrap();
712 let result = planner.plan(&statements[0]);
713 assert!(result.is_ok());
714 }
715
716 #[test]
717 fn test_plan_source_with_watermark() {
718 let mut planner = StreamingPlanner::new();
719 let statements = StreamingParser::parse_sql(
720 "CREATE SOURCE events (
721 id INT,
722 ts TIMESTAMP,
723 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
724 )",
725 )
726 .unwrap();
727
728 let plan = planner.plan(&statements[0]).unwrap();
729 match plan {
730 StreamingPlan::RegisterSource(info) => {
731 assert_eq!(info.name, "events");
732 assert_eq!(info.watermark_column, Some("ts".to_string()));
733 }
734 _ => panic!("Expected RegisterSource plan"),
735 }
736 }
737
738 #[test]
739 fn test_plan_standard_select() {
740 let mut planner = StreamingPlanner::new();
741 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
742
743 let plan = planner.plan(&statements[0]).unwrap();
744 match plan {
745 StreamingPlan::Standard(_) => {}
746 _ => panic!("Expected Standard plan for simple SELECT"),
747 }
748 }
749
750 #[test]
751 fn test_list_sources_and_sinks() {
752 let mut planner = StreamingPlanner::new();
753
754 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
756 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
757 planner.plan(&s1[0]).unwrap();
758 planner.plan(&s2[0]).unwrap();
759
760 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
762 planner.plan(&k1[0]).unwrap();
763
764 assert_eq!(planner.list_sources().len(), 2);
765 assert_eq!(planner.list_sinks().len(), 1);
766 assert!(planner.get_source("src1").is_some());
767 assert!(planner.get_sink("sink1").is_some());
768 }
769
770 #[test]
771 fn test_plan_query_with_window() {
772 let mut planner = StreamingPlanner::new();
773 let statements = StreamingParser::parse_sql(
774 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
775 )
776 .unwrap();
777
778 let plan = planner.plan(&statements[0]).unwrap();
779 match plan {
780 StreamingPlan::Query(query_plan) => {
781 assert!(query_plan.window_config.is_some());
782 let config = query_plan.window_config.unwrap();
783 assert_eq!(config.time_column, "event_time");
784 assert_eq!(config.size.as_secs(), 300);
785 }
786 _ => panic!("Expected Query plan"),
787 }
788 }
789
790 #[test]
791 fn test_plan_query_with_join() {
792 let mut planner = StreamingPlanner::new();
793 let statements = StreamingParser::parse_sql(
794 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
795 )
796 .unwrap();
797
798 let plan = planner.plan(&statements[0]).unwrap();
799 match plan {
800 StreamingPlan::Query(query_plan) => {
801 assert!(query_plan.join_config.is_some());
802 let configs = query_plan.join_config.unwrap();
803 assert_eq!(configs.len(), 1);
804 assert_eq!(configs[0].left_key(), "order_id");
805 assert_eq!(configs[0].right_key(), "order_id");
806 }
807 _ => panic!("Expected Query plan"),
808 }
809 }
810
811 #[test]
812 fn test_plan_query_with_lag() {
813 let mut planner = StreamingPlanner::new();
814 let statements = StreamingParser::parse_sql(
815 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
816 )
817 .unwrap();
818
819 let plan = planner.plan(&statements[0]).unwrap();
820 match plan {
821 StreamingPlan::Query(query_plan) => {
822 assert!(query_plan.analytic_config.is_some());
823 let config = query_plan.analytic_config.unwrap();
824 assert_eq!(config.functions.len(), 1);
825 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
826 }
827 _ => panic!("Expected Query plan with analytic config"),
828 }
829 }
830
831 #[test]
832 fn test_plan_query_with_having() {
833 let mut planner = StreamingPlanner::new();
834 let statements = StreamingParser::parse_sql(
835 "SELECT symbol, COUNT(*) AS cnt FROM trades \
836 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
837 HAVING COUNT(*) > 10",
838 )
839 .unwrap();
840
841 let plan = planner.plan(&statements[0]).unwrap();
842 match plan {
843 StreamingPlan::Query(query_plan) => {
844 assert!(query_plan.window_config.is_some());
845 assert!(query_plan.having_config.is_some());
846 let config = query_plan.having_config.unwrap();
847 assert!(
848 config.predicate().contains("COUNT(*)"),
849 "predicate was: {}",
850 config.predicate()
851 );
852 }
853 _ => panic!("Expected Query plan with having config"),
854 }
855 }
856
857 #[test]
858 fn test_plan_query_without_having() {
859 let mut planner = StreamingPlanner::new();
860 let statements = StreamingParser::parse_sql(
861 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
862 )
863 .unwrap();
864
865 let plan = planner.plan(&statements[0]).unwrap();
866 match plan {
867 StreamingPlan::Query(query_plan) => {
868 assert!(query_plan.having_config.is_none());
869 }
870 _ => panic!("Expected Query plan"),
871 }
872 }
873
874 #[test]
875 fn test_plan_having_only_produces_query_plan() {
876 let mut planner = StreamingPlanner::new();
878 let statements = StreamingParser::parse_sql(
879 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
880 )
881 .unwrap();
882
883 let plan = planner.plan(&statements[0]).unwrap();
884 match plan {
885 StreamingPlan::Query(query_plan) => {
886 assert!(query_plan.having_config.is_some());
887 assert!(query_plan.window_config.is_none());
888 }
889 _ => panic!("Expected Query plan for HAVING-only query"),
890 }
891 }
892
893 #[test]
894 fn test_plan_having_compound_predicate() {
895 let mut planner = StreamingPlanner::new();
896 let statements = StreamingParser::parse_sql(
897 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
898 FROM trades GROUP BY symbol \
899 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
900 )
901 .unwrap();
902
903 let plan = planner.plan(&statements[0]).unwrap();
904 match plan {
905 StreamingPlan::Query(query_plan) => {
906 let config = query_plan.having_config.unwrap();
907 let pred = config.predicate();
908 assert!(pred.contains("AND"), "predicate was: {pred}");
909 }
910 _ => panic!("Expected Query plan"),
911 }
912 }
913
914 #[test]
915 fn test_plan_query_with_lead() {
916 let mut planner = StreamingPlanner::new();
917 let statements = StreamingParser::parse_sql(
918 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
919 )
920 .unwrap();
921
922 let plan = planner.plan(&statements[0]).unwrap();
923 match plan {
924 StreamingPlan::Query(query_plan) => {
925 assert!(query_plan.analytic_config.is_some());
926 let config = query_plan.analytic_config.unwrap();
927 assert!(config.has_lookahead());
928 assert_eq!(config.functions[0].offset, 2);
929 }
930 _ => panic!("Expected Query plan with analytic config"),
931 }
932 }
933
934 #[test]
937 fn test_plan_single_join_produces_vec_of_one() {
938 let mut planner = StreamingPlanner::new();
939 let statements =
940 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
941
942 let plan = planner.plan(&statements[0]).unwrap();
943 match plan {
944 StreamingPlan::Query(qp) => {
945 let configs = qp.join_config.unwrap();
946 assert_eq!(configs.len(), 1);
947 }
948 _ => panic!("Expected Query plan"),
949 }
950 }
951
952 #[test]
953 fn test_plan_two_way_join() {
954 let mut planner = StreamingPlanner::new();
955 let statements = StreamingParser::parse_sql(
956 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
957 )
958 .unwrap();
959
960 let plan = planner.plan(&statements[0]).unwrap();
961 match plan {
962 StreamingPlan::Query(qp) => {
963 let configs = qp.join_config.unwrap();
964 assert_eq!(configs.len(), 2);
965 assert_eq!(configs[0].left_key(), "id");
966 assert_eq!(configs[0].right_key(), "a_id");
967 assert_eq!(configs[1].left_key(), "id");
968 assert_eq!(configs[1].right_key(), "b_id");
969 }
970 _ => panic!("Expected Query plan"),
971 }
972 }
973
974 #[test]
975 fn test_plan_mixed_join_types() {
976 let mut planner = StreamingPlanner::new();
977 let statements = StreamingParser::parse_sql(
978 "SELECT * FROM orders o \
979 JOIN payments p ON o.id = p.order_id \
980 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
981 JOIN customers c ON p.cust_id = c.id",
982 )
983 .unwrap();
984
985 let plan = planner.plan(&statements[0]).unwrap();
986 match plan {
987 StreamingPlan::Query(qp) => {
988 let configs = qp.join_config.unwrap();
989 assert_eq!(configs.len(), 2);
990 assert!(configs[0].is_stream_stream());
991 assert!(configs[1].is_lookup());
992 }
993 _ => panic!("Expected Query plan"),
994 }
995 }
996
997 #[test]
998 fn test_plan_backward_compat_no_join() {
999 let mut planner = StreamingPlanner::new();
1000 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1001
1002 let plan = planner.plan(&statements[0]).unwrap();
1003 match plan {
1004 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1006 }
1007 }
1008
1009 #[test]
1012 fn test_plan_query_with_rows_frame() {
1013 let mut planner = StreamingPlanner::new();
1014 let statements = StreamingParser::parse_sql(
1015 "SELECT AVG(price) OVER (ORDER BY ts \
1016 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1017 )
1018 .unwrap();
1019
1020 let plan = planner.plan(&statements[0]).unwrap();
1021 match plan {
1022 StreamingPlan::Query(qp) => {
1023 assert!(qp.frame_config.is_some());
1024 let fc = qp.frame_config.unwrap();
1025 assert_eq!(fc.functions.len(), 1);
1026 assert_eq!(fc.functions[0].source_column, "price");
1027 }
1028 _ => panic!("Expected Query plan with frame_config"),
1029 }
1030 }
1031
1032 #[test]
1033 fn test_plan_frame_with_partition() {
1034 let mut planner = StreamingPlanner::new();
1035 let statements = StreamingParser::parse_sql(
1036 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1037 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1038 )
1039 .unwrap();
1040
1041 let plan = planner.plan(&statements[0]).unwrap();
1042 match plan {
1043 StreamingPlan::Query(qp) => {
1044 let fc = qp.frame_config.unwrap();
1045 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1046 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1047 }
1048 _ => panic!("Expected Query plan with frame_config"),
1049 }
1050 }
1051
1052 #[test]
1053 fn test_plan_no_frame_is_standard() {
1054 let mut planner = StreamingPlanner::new();
1055 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1056
1057 let plan = planner.plan(&statements[0]).unwrap();
1058 match plan {
1059 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1061 }
1062 }
1063
1064 #[test]
1065 fn test_plan_unbounded_following_rejected() {
1066 let mut planner = StreamingPlanner::new();
1067 let statements = StreamingParser::parse_sql(
1068 "SELECT SUM(amount) OVER (ORDER BY id \
1069 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1070 FROM orders",
1071 )
1072 .unwrap();
1073
1074 let result = planner.plan(&statements[0]);
1075 assert!(result.is_err());
1076 let err = result.unwrap_err().to_string();
1077 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1078 }
1079}