1pub mod channel_derivation;
8pub mod lookup_join;
10pub mod predicate_split;
12pub mod streaming_optimizer;
14
15#[allow(clippy::disallowed_types)] use std::collections::HashMap;
17use std::sync::Arc;
18
19use arrow::datatypes::{Field, Schema, SchemaRef};
20use datafusion::logical_expr::LogicalPlan;
21use datafusion::prelude::SessionContext;
22use sqlparser::ast::{ObjectName, SetExpr, Statement};
23
24use crate::parser::aggregation_parser::analyze_aggregates;
25use crate::parser::analytic_parser::{
26 analyze_analytic_functions, analyze_window_frames, FrameBound,
27};
28use crate::parser::join_parser::analyze_joins;
29use crate::parser::lookup_table::{validate_properties, LookupTableProperties};
30use crate::parser::order_analyzer::analyze_order_by;
31use crate::parser::{
32 CreateLookupTableStatement, CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom,
33 StreamingStatement, WindowFunction, WindowRewriter,
34};
35use crate::translator::{
36 AnalyticWindowConfig, HavingFilterConfig, JoinOperatorConfig, OrderOperatorConfig,
37 WindowFrameConfig, WindowOperatorConfig,
38};
39
40#[derive(Debug, Clone)]
42pub struct LookupTableInfo {
43 pub name: String,
45 pub columns: Vec<(String, String)>,
47 pub primary_key: Vec<String>,
49 pub properties: LookupTableProperties,
51 pub arrow_schema: SchemaRef,
53 pub raw_options: HashMap<String, String>,
55}
56
57pub struct StreamingPlanner {
59 sources: HashMap<String, SourceInfo>,
61 sinks: HashMap<String, SinkInfo>,
63 lookup_tables: HashMap<String, LookupTableInfo>,
65}
66
67#[derive(Debug, Clone)]
69pub struct SourceInfo {
70 pub name: String,
72 pub watermark_column: Option<String>,
74 pub options: HashMap<String, String>,
76}
77
78#[derive(Debug, Clone)]
80pub struct SinkInfo {
81 pub name: String,
83 pub from: String,
85 pub options: HashMap<String, String>,
87}
88
89#[derive(Debug)]
91#[allow(clippy::large_enum_variant)]
92pub enum StreamingPlan {
93 RegisterSource(SourceInfo),
95
96 RegisterSink(SinkInfo),
98
99 Query(QueryPlan),
101
102 Standard(Box<Statement>),
104
105 RegisterLookupTable(LookupTableInfo),
107
108 DropLookupTable {
110 name: String,
112 },
113}
114
115#[derive(Debug)]
117pub struct QueryPlan {
118 pub name: Option<String>,
120 pub window_config: Option<WindowOperatorConfig>,
122 pub join_config: Option<Vec<JoinOperatorConfig>>,
124 pub order_config: Option<OrderOperatorConfig>,
126 pub analytic_config: Option<AnalyticWindowConfig>,
128 pub having_config: Option<HavingFilterConfig>,
130 pub frame_config: Option<WindowFrameConfig>,
132 pub emit_clause: Option<EmitClause>,
134 pub statement: Box<Statement>,
136}
137
138impl StreamingPlanner {
139 #[must_use]
141 pub fn new() -> Self {
142 Self {
143 sources: HashMap::new(),
144 sinks: HashMap::new(),
145 lookup_tables: HashMap::new(),
146 }
147 }
148
149 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
155 match statement {
156 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
157 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
158 StreamingStatement::CreateContinuousQuery {
159 name,
160 query,
161 emit_clause,
162 }
163 | StreamingStatement::CreateStream {
164 name,
165 query,
166 emit_clause,
167 ..
168 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
169 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
170 StreamingStatement::CreateLookupTable(lt) => self.plan_create_lookup_table(lt),
171 StreamingStatement::DropLookupTable { name, if_exists } => {
172 self.plan_drop_lookup_table(name, *if_exists)
173 }
174 StreamingStatement::DropSource { .. }
175 | StreamingStatement::DropSink { .. }
176 | StreamingStatement::DropStream { .. }
177 | StreamingStatement::DropMaterializedView { .. }
178 | StreamingStatement::Show(_)
179 | StreamingStatement::Describe { .. }
180 | StreamingStatement::Explain { .. }
181 | StreamingStatement::CreateMaterializedView { .. }
182 | StreamingStatement::InsertInto { .. }
183 | StreamingStatement::AlterSource { .. }
184 | StreamingStatement::Checkpoint
185 | StreamingStatement::RestoreCheckpoint { .. } => {
186 Err(PlanningError::UnsupportedSql(format!(
189 "Statement type {:?} is handled by the database layer, not the planner",
190 std::mem::discriminant(statement)
191 )))
192 }
193 }
194 }
195
196 fn plan_create_source(
198 &mut self,
199 source: &CreateSourceStatement,
200 ) -> Result<StreamingPlan, PlanningError> {
201 let name = object_name_to_string(&source.name);
202
203 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
205 return Err(PlanningError::InvalidQuery(format!(
206 "Source '{}' already exists",
207 name
208 )));
209 }
210
211 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
213
214 let info = SourceInfo {
215 name: name.clone(),
216 watermark_column,
217 options: source.with_options.clone(),
218 };
219
220 self.sources.insert(name, info.clone());
222
223 Ok(StreamingPlan::RegisterSource(info))
224 }
225
226 fn plan_create_sink(
228 &mut self,
229 sink: &CreateSinkStatement,
230 ) -> Result<StreamingPlan, PlanningError> {
231 let name = object_name_to_string(&sink.name);
232
233 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
235 return Err(PlanningError::InvalidQuery(format!(
236 "Sink '{}' already exists",
237 name
238 )));
239 }
240
241 let from = match &sink.from {
243 SinkFrom::Table(table) => object_name_to_string(table),
244 SinkFrom::Query(_) => format!("{}_query", name),
245 };
246
247 let info = SinkInfo {
248 name: name.clone(),
249 from,
250 options: sink.with_options.clone(),
251 };
252
253 self.sinks.insert(name, info.clone());
255
256 Ok(StreamingPlan::RegisterSink(info))
257 }
258
259 #[allow(clippy::unused_self)] fn plan_continuous_query(
262 &mut self,
263 name: &ObjectName,
264 query: &StreamingStatement,
265 emit_clause: Option<&EmitClause>,
266 ) -> Result<StreamingPlan, PlanningError> {
267 let stmt = match query {
269 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
270 _ => {
271 return Err(PlanningError::InvalidQuery(
272 "Continuous query must contain a SELECT statement".to_string(),
273 ))
274 }
275 };
276
277 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
279
280 Ok(StreamingPlan::Query(QueryPlan {
281 name: Some(object_name_to_string(name)),
282 window_config: query_plan.window_config,
283 join_config: query_plan.join_config,
284 order_config: query_plan.order_config,
285 analytic_config: query_plan.analytic_config,
286 having_config: query_plan.having_config,
287 frame_config: query_plan.frame_config,
288 emit_clause: emit_clause.cloned(),
289 statement: Box::new(stmt),
290 }))
291 }
292
293 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
296 if let Statement::Query(query) = stmt {
298 if let SetExpr::Select(select) = query.body.as_ref() {
299 let window_function = Self::extract_window_from_select(select);
301
302 let join_analysis = analyze_joins(select).map_err(|e| {
304 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
305 })?;
306
307 let order_analysis = analyze_order_by(stmt);
309 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
310 .map_err(PlanningError::InvalidQuery)?;
311
312 let analytic_analysis = analyze_analytic_functions(stmt);
314 let analytic_config =
315 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
316
317 let agg_analysis = analyze_aggregates(stmt);
319 let having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
320
321 let frame_analysis = analyze_window_frames(stmt);
323 let frame_config = frame_analysis
324 .as_ref()
325 .map(WindowFrameConfig::from_analysis);
326
327 if let Some(fa) = &frame_analysis {
329 for f in &fa.functions {
330 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
331 return Err(PlanningError::InvalidQuery(
332 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
333 .to_string(),
334 ));
335 }
336 }
337 }
338
339 let has_streaming_features = window_function.is_some()
340 || join_analysis.is_some()
341 || order_config.is_some()
342 || analytic_config.is_some()
343 || having_config.is_some()
344 || frame_config.is_some();
345
346 if has_streaming_features {
347 let window_config = match window_function {
348 Some(w) => Some(
349 WindowOperatorConfig::from_window_function(&w)
350 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
351 ),
352 None => None,
353 };
354
355 let join_config =
356 join_analysis.map(|m| JoinOperatorConfig::from_multi_analysis(&m));
357
358 return Ok(StreamingPlan::Query(QueryPlan {
359 name: None,
360 window_config,
361 join_config,
362 order_config,
363 analytic_config,
364 having_config,
365 frame_config,
366 emit_clause: None,
367 statement: Box::new(stmt.clone()),
368 }));
369 }
370 }
371 }
372
373 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
375 }
376
377 fn analyze_query(
379 stmt: &Statement,
380 emit_clause: Option<&EmitClause>,
381 ) -> Result<QueryAnalysis, PlanningError> {
382 let mut analysis = QueryAnalysis::default();
383
384 if let Statement::Query(query) = stmt {
385 if let SetExpr::Select(select) = query.body.as_ref() {
386 if let Some(window) = Self::extract_window_from_select(select) {
388 let mut config = WindowOperatorConfig::from_window_function(&window)
389 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
390
391 if let Some(emit) = emit_clause {
393 config = config
394 .with_emit_clause(emit)
395 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
396 }
397
398 analysis.window_config = Some(config);
399 }
400
401 if let Some(multi) = analyze_joins(select).map_err(|e| {
403 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
404 })? {
405 analysis.join_config = Some(JoinOperatorConfig::from_multi_analysis(&multi));
406 }
407 }
408 }
409
410 let order_analysis = analyze_order_by(stmt);
412 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
413 .map_err(PlanningError::InvalidQuery)?;
414
415 if let Some(analytic) = analyze_analytic_functions(stmt) {
417 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
418 }
419
420 let agg_analysis = analyze_aggregates(stmt);
422 analysis.having_config = agg_analysis.having_expr.map(HavingFilterConfig::new);
423
424 if let Some(frame_analysis) = analyze_window_frames(stmt) {
426 for f in &frame_analysis.functions {
428 if matches!(f.end_bound, FrameBound::UnboundedFollowing) {
429 return Err(PlanningError::InvalidQuery(
430 "UNBOUNDED FOLLOWING is not supported in streaming window frames"
431 .to_string(),
432 ));
433 }
434 }
435 analysis.frame_config = Some(WindowFrameConfig::from_analysis(&frame_analysis));
436 }
437
438 Ok(analysis)
439 }
440
441 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
443 use sqlparser::ast::GroupByExpr;
445 match &select.group_by {
446 GroupByExpr::Expressions(exprs, _modifiers) => {
447 for group_by_expr in exprs {
448 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
449 {
450 return Some(window);
451 }
452 }
453 }
454 GroupByExpr::All(_) => {}
455 }
456 None
457 }
458
459 fn plan_create_lookup_table(
461 &mut self,
462 lt: &CreateLookupTableStatement,
463 ) -> Result<StreamingPlan, PlanningError> {
464 let name = object_name_to_string(<.name);
465
466 if !lt.or_replace && !lt.if_not_exists && self.lookup_tables.contains_key(&name) {
467 return Err(PlanningError::InvalidQuery(format!(
468 "Lookup table '{}' already exists",
469 name
470 )));
471 }
472
473 let columns: Vec<(String, String)> = lt
474 .columns
475 .iter()
476 .map(|c| (c.name.value.clone(), c.data_type.to_string()))
477 .collect();
478
479 let properties = validate_properties(<.with_options).map_err(|e| {
480 PlanningError::InvalidQuery(format!("Invalid lookup table properties: {e}"))
481 })?;
482
483 let arrow_fields: Vec<Field> = lt
485 .columns
486 .iter()
487 .map(|c| {
488 let dt = crate::translator::streaming_ddl::sql_type_to_arrow(&c.data_type)
489 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
490 let nullable = !c
491 .options
492 .iter()
493 .any(|opt| matches!(opt.option, sqlparser::ast::ColumnOption::NotNull));
494 Ok(Field::new(&c.name.value, dt, nullable))
495 })
496 .collect::<Result<_, PlanningError>>()?;
497 let arrow_schema = Arc::new(Schema::new(arrow_fields));
498
499 let info = LookupTableInfo {
500 name: name.clone(),
501 columns,
502 primary_key: lt.primary_key.clone(),
503 properties,
504 arrow_schema,
505 raw_options: lt.with_options.clone(),
506 };
507
508 self.lookup_tables.insert(name, info.clone());
509
510 Ok(StreamingPlan::RegisterLookupTable(info))
511 }
512
513 fn plan_drop_lookup_table(
515 &mut self,
516 name: &ObjectName,
517 if_exists: bool,
518 ) -> Result<StreamingPlan, PlanningError> {
519 let name_str = object_name_to_string(name);
520
521 if !if_exists && !self.lookup_tables.contains_key(&name_str) {
522 return Err(PlanningError::InvalidQuery(format!(
523 "Lookup table '{}' does not exist",
524 name_str
525 )));
526 }
527
528 self.lookup_tables.remove(&name_str);
529
530 Ok(StreamingPlan::DropLookupTable { name: name_str })
531 }
532
533 #[must_use]
535 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
536 self.sources.get(name)
537 }
538
539 #[must_use]
541 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
542 self.sinks.get(name)
543 }
544
545 #[must_use]
547 pub fn list_sources(&self) -> Vec<&SourceInfo> {
548 self.sources.values().collect()
549 }
550
551 #[must_use]
553 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
554 self.sinks.values().collect()
555 }
556
557 #[must_use]
559 pub fn get_lookup_table(&self, name: &str) -> Option<&LookupTableInfo> {
560 self.lookup_tables.get(name)
561 }
562
563 #[must_use]
565 pub fn list_lookup_tables(&self) -> Vec<&LookupTableInfo> {
566 self.lookup_tables.values().collect()
567 }
568
569 #[must_use]
571 pub fn lookup_tables_cloned(&self) -> HashMap<String, LookupTableInfo> {
572 self.lookup_tables.clone()
573 }
574
575 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
586 &self,
587 plan: &QueryPlan,
588 ctx: &SessionContext,
589 ) -> Result<LogicalPlan, PlanningError> {
590 let sql = plan.statement.to_string();
594 ctx.state()
595 .create_logical_plan(&sql)
596 .await
597 .map_err(PlanningError::DataFusion)
598 }
599}
600
601impl Default for StreamingPlanner {
602 fn default() -> Self {
603 Self::new()
604 }
605}
606
607#[derive(Debug, Default)]
609#[allow(clippy::struct_field_names)]
610struct QueryAnalysis {
611 window_config: Option<WindowOperatorConfig>,
612 join_config: Option<Vec<JoinOperatorConfig>>,
613 order_config: Option<OrderOperatorConfig>,
614 analytic_config: Option<AnalyticWindowConfig>,
615 having_config: Option<HavingFilterConfig>,
616 frame_config: Option<WindowFrameConfig>,
617}
618
619fn object_name_to_string(name: &ObjectName) -> String {
621 name.to_string()
622}
623
624#[derive(Debug, thiserror::Error)]
626pub enum PlanningError {
627 UnsupportedSql(String),
629
630 InvalidQuery(String),
632
633 SourceNotFound(String),
635
636 SinkNotFound(String),
638
639 DataFusion(#[from] datafusion_common::DataFusionError),
641}
642
643impl std::fmt::Display for PlanningError {
644 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
645 match self {
646 Self::UnsupportedSql(msg) => write!(f, "Unsupported SQL: {msg}"),
647 Self::InvalidQuery(msg) => write!(f, "Invalid query: {msg}"),
648 Self::SourceNotFound(name) => write!(f, "Source not found: {name}"),
649 Self::SinkNotFound(name) => write!(f, "Sink not found: {name}"),
650 Self::DataFusion(e) => {
651 let translated = crate::error::translate_datafusion_error(&e.to_string());
652 write!(f, "{translated}")
653 }
654 }
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661 use crate::parser::StreamingParser;
662
663 #[test]
664 fn test_plan_create_source() {
665 let mut planner = StreamingPlanner::new();
666 let statements =
667 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
668
669 let plan = planner.plan(&statements[0]).unwrap();
670 match plan {
671 StreamingPlan::RegisterSource(info) => {
672 assert_eq!(info.name, "events");
673 }
674 _ => panic!("Expected RegisterSource plan"),
675 }
676 }
677
678 #[test]
679 fn test_plan_create_sink() {
680 let mut planner = StreamingPlanner::new();
681 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
682
683 let plan = planner.plan(&statements[0]).unwrap();
684 match plan {
685 StreamingPlan::RegisterSink(info) => {
686 assert_eq!(info.name, "output");
687 assert_eq!(info.from, "events");
688 }
689 _ => panic!("Expected RegisterSink plan"),
690 }
691 }
692
693 #[test]
694 fn test_plan_duplicate_source() {
695 let mut planner = StreamingPlanner::new();
696
697 let statements =
699 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
700 planner.plan(&statements[0]).unwrap();
701
702 let result = planner.plan(&statements[0]);
704 assert!(result.is_err());
705 }
706
707 #[test]
708 fn test_plan_source_if_not_exists() {
709 let mut planner = StreamingPlanner::new();
710
711 let statements =
713 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
714 planner.plan(&statements[0]).unwrap();
715
716 let statements =
718 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
719 .unwrap();
720 let result = planner.plan(&statements[0]);
721 assert!(result.is_ok());
722 }
723
724 #[test]
725 fn test_plan_source_or_replace() {
726 let mut planner = StreamingPlanner::new();
727
728 let statements =
730 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
731 planner.plan(&statements[0]).unwrap();
732
733 let statements =
735 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
736 .unwrap();
737 let result = planner.plan(&statements[0]);
738 assert!(result.is_ok());
739 }
740
741 #[test]
742 fn test_plan_source_with_watermark() {
743 let mut planner = StreamingPlanner::new();
744 let statements = StreamingParser::parse_sql(
745 "CREATE SOURCE events (
746 id INT,
747 ts TIMESTAMP,
748 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
749 )",
750 )
751 .unwrap();
752
753 let plan = planner.plan(&statements[0]).unwrap();
754 match plan {
755 StreamingPlan::RegisterSource(info) => {
756 assert_eq!(info.name, "events");
757 assert_eq!(info.watermark_column, Some("ts".to_string()));
758 }
759 _ => panic!("Expected RegisterSource plan"),
760 }
761 }
762
763 #[test]
764 fn test_plan_standard_select() {
765 let mut planner = StreamingPlanner::new();
766 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
767
768 let plan = planner.plan(&statements[0]).unwrap();
769 match plan {
770 StreamingPlan::Standard(_) => {}
771 _ => panic!("Expected Standard plan for simple SELECT"),
772 }
773 }
774
775 #[test]
776 fn test_list_sources_and_sinks() {
777 let mut planner = StreamingPlanner::new();
778
779 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
781 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
782 planner.plan(&s1[0]).unwrap();
783 planner.plan(&s2[0]).unwrap();
784
785 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
787 planner.plan(&k1[0]).unwrap();
788
789 assert_eq!(planner.list_sources().len(), 2);
790 assert_eq!(planner.list_sinks().len(), 1);
791 assert!(planner.get_source("src1").is_some());
792 assert!(planner.get_sink("sink1").is_some());
793 }
794
795 #[test]
796 fn test_plan_query_with_window() {
797 let mut planner = StreamingPlanner::new();
798 let statements = StreamingParser::parse_sql(
799 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
800 )
801 .unwrap();
802
803 let plan = planner.plan(&statements[0]).unwrap();
804 match plan {
805 StreamingPlan::Query(query_plan) => {
806 assert!(query_plan.window_config.is_some());
807 let config = query_plan.window_config.unwrap();
808 assert_eq!(config.time_column, "event_time");
809 assert_eq!(config.size.as_secs(), 300);
810 }
811 _ => panic!("Expected Query plan"),
812 }
813 }
814
815 #[test]
816 fn test_plan_query_with_join() {
817 let mut planner = StreamingPlanner::new();
818 let statements = StreamingParser::parse_sql(
819 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
820 )
821 .unwrap();
822
823 let plan = planner.plan(&statements[0]).unwrap();
824 match plan {
825 StreamingPlan::Query(query_plan) => {
826 assert!(query_plan.join_config.is_some());
827 let configs = query_plan.join_config.unwrap();
828 assert_eq!(configs.len(), 1);
829 assert_eq!(configs[0].left_key(), "order_id");
830 assert_eq!(configs[0].right_key(), "order_id");
831 }
832 _ => panic!("Expected Query plan"),
833 }
834 }
835
836 #[test]
837 fn test_plan_query_with_lag() {
838 let mut planner = StreamingPlanner::new();
839 let statements = StreamingParser::parse_sql(
840 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
841 )
842 .unwrap();
843
844 let plan = planner.plan(&statements[0]).unwrap();
845 match plan {
846 StreamingPlan::Query(query_plan) => {
847 assert!(query_plan.analytic_config.is_some());
848 let config = query_plan.analytic_config.unwrap();
849 assert_eq!(config.functions.len(), 1);
850 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
851 }
852 _ => panic!("Expected Query plan with analytic config"),
853 }
854 }
855
856 #[test]
857 fn test_plan_query_with_having() {
858 let mut planner = StreamingPlanner::new();
859 let statements = StreamingParser::parse_sql(
860 "SELECT symbol, COUNT(*) AS cnt FROM trades \
861 GROUP BY symbol, TUMBLE(ts, INTERVAL '5' MINUTE) \
862 HAVING COUNT(*) > 10",
863 )
864 .unwrap();
865
866 let plan = planner.plan(&statements[0]).unwrap();
867 match plan {
868 StreamingPlan::Query(query_plan) => {
869 assert!(query_plan.window_config.is_some());
870 assert!(query_plan.having_config.is_some());
871 let config = query_plan.having_config.unwrap();
872 assert!(
873 config.predicate().contains("COUNT(*)"),
874 "predicate was: {}",
875 config.predicate()
876 );
877 }
878 _ => panic!("Expected Query plan with having config"),
879 }
880 }
881
882 #[test]
883 fn test_plan_query_without_having() {
884 let mut planner = StreamingPlanner::new();
885 let statements = StreamingParser::parse_sql(
886 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
887 )
888 .unwrap();
889
890 let plan = planner.plan(&statements[0]).unwrap();
891 match plan {
892 StreamingPlan::Query(query_plan) => {
893 assert!(query_plan.having_config.is_none());
894 }
895 _ => panic!("Expected Query plan"),
896 }
897 }
898
899 #[test]
900 fn test_plan_having_only_produces_query_plan() {
901 let mut planner = StreamingPlanner::new();
903 let statements = StreamingParser::parse_sql(
904 "SELECT category, SUM(amount) FROM orders GROUP BY category HAVING SUM(amount) > 1000",
905 )
906 .unwrap();
907
908 let plan = planner.plan(&statements[0]).unwrap();
909 match plan {
910 StreamingPlan::Query(query_plan) => {
911 assert!(query_plan.having_config.is_some());
912 assert!(query_plan.window_config.is_none());
913 }
914 _ => panic!("Expected Query plan for HAVING-only query"),
915 }
916 }
917
918 #[test]
919 fn test_plan_having_compound_predicate() {
920 let mut planner = StreamingPlanner::new();
921 let statements = StreamingParser::parse_sql(
922 "SELECT symbol, COUNT(*) AS cnt, SUM(vol) AS total \
923 FROM trades GROUP BY symbol \
924 HAVING COUNT(*) >= 5 AND SUM(vol) > 10000",
925 )
926 .unwrap();
927
928 let plan = planner.plan(&statements[0]).unwrap();
929 match plan {
930 StreamingPlan::Query(query_plan) => {
931 let config = query_plan.having_config.unwrap();
932 let pred = config.predicate();
933 assert!(pred.contains("AND"), "predicate was: {pred}");
934 }
935 _ => panic!("Expected Query plan"),
936 }
937 }
938
939 #[test]
940 fn test_plan_query_with_lead() {
941 let mut planner = StreamingPlanner::new();
942 let statements = StreamingParser::parse_sql(
943 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
944 )
945 .unwrap();
946
947 let plan = planner.plan(&statements[0]).unwrap();
948 match plan {
949 StreamingPlan::Query(query_plan) => {
950 assert!(query_plan.analytic_config.is_some());
951 let config = query_plan.analytic_config.unwrap();
952 assert!(config.has_lookahead());
953 assert_eq!(config.functions[0].offset, 2);
954 }
955 _ => panic!("Expected Query plan with analytic config"),
956 }
957 }
958
959 #[test]
962 fn test_plan_single_join_produces_vec_of_one() {
963 let mut planner = StreamingPlanner::new();
964 let statements =
965 StreamingParser::parse_sql("SELECT * FROM a JOIN b ON a.id = b.a_id").unwrap();
966
967 let plan = planner.plan(&statements[0]).unwrap();
968 match plan {
969 StreamingPlan::Query(qp) => {
970 let configs = qp.join_config.unwrap();
971 assert_eq!(configs.len(), 1);
972 }
973 _ => panic!("Expected Query plan"),
974 }
975 }
976
977 #[test]
978 fn test_plan_two_way_join() {
979 let mut planner = StreamingPlanner::new();
980 let statements = StreamingParser::parse_sql(
981 "SELECT * FROM a JOIN b ON a.id = b.a_id JOIN c ON b.id = c.b_id",
982 )
983 .unwrap();
984
985 let plan = planner.plan(&statements[0]).unwrap();
986 match plan {
987 StreamingPlan::Query(qp) => {
988 let configs = qp.join_config.unwrap();
989 assert_eq!(configs.len(), 2);
990 assert_eq!(configs[0].left_key(), "id");
991 assert_eq!(configs[0].right_key(), "a_id");
992 assert_eq!(configs[1].left_key(), "id");
993 assert_eq!(configs[1].right_key(), "b_id");
994 }
995 _ => panic!("Expected Query plan"),
996 }
997 }
998
999 #[test]
1000 fn test_plan_mixed_join_types() {
1001 let mut planner = StreamingPlanner::new();
1002 let statements = StreamingParser::parse_sql(
1003 "SELECT * FROM orders o \
1004 JOIN payments p ON o.id = p.order_id \
1005 AND p.ts BETWEEN o.ts AND o.ts + INTERVAL '1' HOUR \
1006 JOIN customers c ON p.cust_id = c.id",
1007 )
1008 .unwrap();
1009
1010 let plan = planner.plan(&statements[0]).unwrap();
1011 match plan {
1012 StreamingPlan::Query(qp) => {
1013 let configs = qp.join_config.unwrap();
1014 assert_eq!(configs.len(), 2);
1015 assert!(configs[0].is_stream_stream());
1016 assert!(configs[1].is_lookup());
1017 }
1018 _ => panic!("Expected Query plan"),
1019 }
1020 }
1021
1022 #[test]
1023 fn test_plan_backward_compat_no_join() {
1024 let mut planner = StreamingPlanner::new();
1025 let statements = StreamingParser::parse_sql("SELECT * FROM orders").unwrap();
1026
1027 let plan = planner.plan(&statements[0]).unwrap();
1028 match plan {
1029 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1031 }
1032 }
1033
1034 #[test]
1037 fn test_plan_query_with_rows_frame() {
1038 let mut planner = StreamingPlanner::new();
1039 let statements = StreamingParser::parse_sql(
1040 "SELECT AVG(price) OVER (ORDER BY ts \
1041 ROWS BETWEEN 9 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1042 )
1043 .unwrap();
1044
1045 let plan = planner.plan(&statements[0]).unwrap();
1046 match plan {
1047 StreamingPlan::Query(qp) => {
1048 assert!(qp.frame_config.is_some());
1049 let fc = qp.frame_config.unwrap();
1050 assert_eq!(fc.functions.len(), 1);
1051 assert_eq!(fc.functions[0].source_column, "price");
1052 }
1053 _ => panic!("Expected Query plan with frame_config"),
1054 }
1055 }
1056
1057 #[test]
1058 fn test_plan_frame_with_partition() {
1059 let mut planner = StreamingPlanner::new();
1060 let statements = StreamingParser::parse_sql(
1061 "SELECT AVG(price) OVER (PARTITION BY symbol ORDER BY ts \
1062 ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) AS ma FROM trades",
1063 )
1064 .unwrap();
1065
1066 let plan = planner.plan(&statements[0]).unwrap();
1067 match plan {
1068 StreamingPlan::Query(qp) => {
1069 let fc = qp.frame_config.unwrap();
1070 assert_eq!(fc.partition_columns, vec!["symbol".to_string()]);
1071 assert_eq!(fc.order_columns, vec!["ts".to_string()]);
1072 }
1073 _ => panic!("Expected Query plan with frame_config"),
1074 }
1075 }
1076
1077 #[test]
1078 fn test_plan_no_frame_is_standard() {
1079 let mut planner = StreamingPlanner::new();
1080 let statements = StreamingParser::parse_sql("SELECT * FROM trades").unwrap();
1081
1082 let plan = planner.plan(&statements[0]).unwrap();
1083 match plan {
1084 StreamingPlan::Standard(_) => {} _ => panic!("Expected Standard plan for simple SELECT"),
1086 }
1087 }
1088
1089 #[test]
1090 fn test_plan_unbounded_following_rejected() {
1091 let mut planner = StreamingPlanner::new();
1092 let statements = StreamingParser::parse_sql(
1093 "SELECT SUM(amount) OVER (ORDER BY id \
1094 ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING) AS rest \
1095 FROM orders",
1096 )
1097 .unwrap();
1098
1099 let result = planner.plan(&statements[0]);
1100 assert!(result.is_err());
1101 let err = result.unwrap_err().to_string();
1102 assert!(err.contains("UNBOUNDED FOLLOWING"), "error was: {err}");
1103 }
1104}