1pub mod channel_derivation;
8
9use std::collections::HashMap;
10
11use datafusion::logical_expr::LogicalPlan;
12use datafusion::prelude::SessionContext;
13use sqlparser::ast::{ObjectName, SetExpr, Statement};
14
15use crate::parser::analytic_parser::analyze_analytic_functions;
16use crate::parser::join_parser::analyze_join;
17use crate::parser::order_analyzer::analyze_order_by;
18use crate::parser::{
19 CreateSinkStatement, CreateSourceStatement, EmitClause, SinkFrom, StreamingStatement,
20 WindowFunction, WindowRewriter,
21};
22use crate::translator::{
23 AnalyticWindowConfig, DagExplainOutput, JoinOperatorConfig, OrderOperatorConfig,
24 WindowOperatorConfig,
25};
26
27pub struct StreamingPlanner {
29 sources: HashMap<String, SourceInfo>,
31 sinks: HashMap<String, SinkInfo>,
33}
34
35#[derive(Debug, Clone)]
37pub struct SourceInfo {
38 pub name: String,
40 pub watermark_column: Option<String>,
42 pub options: HashMap<String, String>,
44}
45
46#[derive(Debug, Clone)]
48pub struct SinkInfo {
49 pub name: String,
51 pub from: String,
53 pub options: HashMap<String, String>,
55}
56
57#[derive(Debug)]
59#[allow(clippy::large_enum_variant)]
60pub enum StreamingPlan {
61 RegisterSource(SourceInfo),
63
64 RegisterSink(SinkInfo),
66
67 Query(QueryPlan),
69
70 Standard(Box<Statement>),
72
73 DagExplain(DagExplainOutput),
75}
76
77#[derive(Debug)]
79pub struct QueryPlan {
80 pub name: Option<String>,
82 pub window_config: Option<WindowOperatorConfig>,
84 pub join_config: Option<JoinOperatorConfig>,
86 pub order_config: Option<OrderOperatorConfig>,
88 pub analytic_config: Option<AnalyticWindowConfig>,
90 pub emit_clause: Option<EmitClause>,
92 pub statement: Box<Statement>,
94}
95
96impl StreamingPlanner {
97 #[must_use]
99 pub fn new() -> Self {
100 Self {
101 sources: HashMap::new(),
102 sinks: HashMap::new(),
103 }
104 }
105
106 pub fn plan(&mut self, statement: &StreamingStatement) -> Result<StreamingPlan, PlanningError> {
112 match statement {
113 StreamingStatement::CreateSource(source) => self.plan_create_source(source),
114 StreamingStatement::CreateSink(sink) => self.plan_create_sink(sink),
115 StreamingStatement::CreateContinuousQuery {
116 name,
117 query,
118 emit_clause,
119 }
120 | StreamingStatement::CreateStream {
121 name,
122 query,
123 emit_clause,
124 ..
125 } => self.plan_continuous_query(name, query, emit_clause.as_ref()),
126 StreamingStatement::Standard(stmt) => self.plan_standard_statement(stmt),
127 StreamingStatement::DropSource { .. }
128 | StreamingStatement::DropSink { .. }
129 | StreamingStatement::DropStream { .. }
130 | StreamingStatement::DropMaterializedView { .. }
131 | StreamingStatement::Show(_)
132 | StreamingStatement::Describe { .. }
133 | StreamingStatement::Explain { .. }
134 | StreamingStatement::CreateMaterializedView { .. }
135 | StreamingStatement::InsertInto { .. } => {
136 Err(PlanningError::UnsupportedSql(format!(
139 "Statement type {:?} is handled by the database layer, not the planner",
140 std::mem::discriminant(statement)
141 )))
142 }
143 }
144 }
145
146 fn plan_create_source(
148 &mut self,
149 source: &CreateSourceStatement,
150 ) -> Result<StreamingPlan, PlanningError> {
151 let name = object_name_to_string(&source.name);
152
153 if !source.or_replace && !source.if_not_exists && self.sources.contains_key(&name) {
155 return Err(PlanningError::InvalidQuery(format!(
156 "Source '{}' already exists",
157 name
158 )));
159 }
160
161 let watermark_column = source.watermark.as_ref().map(|w| w.column.value.clone());
163
164 let info = SourceInfo {
165 name: name.clone(),
166 watermark_column,
167 options: source.with_options.clone(),
168 };
169
170 self.sources.insert(name, info.clone());
172
173 Ok(StreamingPlan::RegisterSource(info))
174 }
175
176 fn plan_create_sink(
178 &mut self,
179 sink: &CreateSinkStatement,
180 ) -> Result<StreamingPlan, PlanningError> {
181 let name = object_name_to_string(&sink.name);
182
183 if !sink.or_replace && !sink.if_not_exists && self.sinks.contains_key(&name) {
185 return Err(PlanningError::InvalidQuery(format!(
186 "Sink '{}' already exists",
187 name
188 )));
189 }
190
191 let from = match &sink.from {
193 SinkFrom::Table(table) => object_name_to_string(table),
194 SinkFrom::Query(_) => format!("{}_query", name),
195 };
196
197 let info = SinkInfo {
198 name: name.clone(),
199 from,
200 options: sink.with_options.clone(),
201 };
202
203 self.sinks.insert(name, info.clone());
205
206 Ok(StreamingPlan::RegisterSink(info))
207 }
208
209 #[allow(clippy::unused_self)] fn plan_continuous_query(
212 &mut self,
213 name: &ObjectName,
214 query: &StreamingStatement,
215 emit_clause: Option<&EmitClause>,
216 ) -> Result<StreamingPlan, PlanningError> {
217 let stmt = match query {
219 StreamingStatement::Standard(stmt) => stmt.as_ref().clone(),
220 _ => {
221 return Err(PlanningError::InvalidQuery(
222 "Continuous query must contain a SELECT statement".to_string(),
223 ))
224 }
225 };
226
227 let query_plan = Self::analyze_query(&stmt, emit_clause)?;
229
230 Ok(StreamingPlan::Query(QueryPlan {
231 name: Some(object_name_to_string(name)),
232 window_config: query_plan.window_config,
233 join_config: query_plan.join_config,
234 order_config: query_plan.order_config,
235 analytic_config: query_plan.analytic_config,
236 emit_clause: emit_clause.cloned(),
237 statement: Box::new(stmt),
238 }))
239 }
240
241 #[allow(clippy::unused_self)] fn plan_standard_statement(&self, stmt: &Statement) -> Result<StreamingPlan, PlanningError> {
244 if let Statement::Query(query) = stmt {
246 if let SetExpr::Select(select) = query.body.as_ref() {
247 let window_function = Self::extract_window_from_select(select);
249
250 let join_analysis = analyze_join(select).map_err(|e| {
252 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
253 })?;
254
255 let order_analysis = analyze_order_by(stmt);
257 let order_config = OrderOperatorConfig::from_analysis(&order_analysis)
258 .map_err(PlanningError::InvalidQuery)?;
259
260 let analytic_analysis = analyze_analytic_functions(stmt);
262 let analytic_config =
263 analytic_analysis.map(|a| AnalyticWindowConfig::from_analysis(&a));
264
265 let has_streaming_features = window_function.is_some()
266 || join_analysis.is_some()
267 || order_config.is_some()
268 || analytic_config.is_some();
269
270 if has_streaming_features {
271 let window_config = match window_function {
272 Some(w) => Some(
273 WindowOperatorConfig::from_window_function(&w)
274 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?,
275 ),
276 None => None,
277 };
278
279 let join_config = join_analysis.map(|j| JoinOperatorConfig::from_analysis(&j));
280
281 return Ok(StreamingPlan::Query(QueryPlan {
282 name: None,
283 window_config,
284 join_config,
285 order_config,
286 analytic_config,
287 emit_clause: None,
288 statement: Box::new(stmt.clone()),
289 }));
290 }
291 }
292 }
293
294 Ok(StreamingPlan::Standard(Box::new(stmt.clone())))
296 }
297
298 fn analyze_query(
300 stmt: &Statement,
301 emit_clause: Option<&EmitClause>,
302 ) -> Result<QueryAnalysis, PlanningError> {
303 let mut analysis = QueryAnalysis::default();
304
305 if let Statement::Query(query) = stmt {
306 if let SetExpr::Select(select) = query.body.as_ref() {
307 if let Some(window) = Self::extract_window_from_select(select) {
309 let mut config = WindowOperatorConfig::from_window_function(&window)
310 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
311
312 if let Some(emit) = emit_clause {
314 config = config
315 .with_emit_clause(emit)
316 .map_err(|e| PlanningError::InvalidQuery(e.to_string()))?;
317 }
318
319 analysis.window_config = Some(config);
320 }
321
322 if let Some(join) = analyze_join(select).map_err(|e| {
324 PlanningError::InvalidQuery(format!("Join analysis failed: {e}"))
325 })? {
326 analysis.join_config = Some(JoinOperatorConfig::from_analysis(&join));
327 }
328 }
329 }
330
331 let order_analysis = analyze_order_by(stmt);
333 analysis.order_config = OrderOperatorConfig::from_analysis(&order_analysis)
334 .map_err(PlanningError::InvalidQuery)?;
335
336 if let Some(analytic) = analyze_analytic_functions(stmt) {
338 analysis.analytic_config = Some(AnalyticWindowConfig::from_analysis(&analytic));
339 }
340
341 Ok(analysis)
342 }
343
344 fn extract_window_from_select(select: &sqlparser::ast::Select) -> Option<WindowFunction> {
346 use sqlparser::ast::GroupByExpr;
348 match &select.group_by {
349 GroupByExpr::Expressions(exprs, _modifiers) => {
350 for group_by_expr in exprs {
351 if let Ok(Some(window)) = WindowRewriter::extract_window_function(group_by_expr)
352 {
353 return Some(window);
354 }
355 }
356 }
357 GroupByExpr::All(_) => {}
358 }
359 None
360 }
361
362 #[must_use]
364 pub fn get_source(&self, name: &str) -> Option<&SourceInfo> {
365 self.sources.get(name)
366 }
367
368 #[must_use]
370 pub fn get_sink(&self, name: &str) -> Option<&SinkInfo> {
371 self.sinks.get(name)
372 }
373
374 #[must_use]
376 pub fn list_sources(&self) -> Vec<&SourceInfo> {
377 self.sources.values().collect()
378 }
379
380 #[must_use]
382 pub fn list_sinks(&self) -> Vec<&SinkInfo> {
383 self.sinks.values().collect()
384 }
385
386 #[allow(clippy::unused_self)] pub async fn to_logical_plan(
404 &self,
405 plan: &QueryPlan,
406 ctx: &SessionContext,
407 ) -> Result<LogicalPlan, PlanningError> {
408 let sql = plan.statement.to_string();
412 ctx.state()
413 .create_logical_plan(&sql)
414 .await
415 .map_err(PlanningError::DataFusion)
416 }
417}
418
419impl Default for StreamingPlanner {
420 fn default() -> Self {
421 Self::new()
422 }
423}
424
425#[derive(Debug, Default)]
427#[allow(clippy::struct_field_names)]
428struct QueryAnalysis {
429 window_config: Option<WindowOperatorConfig>,
430 join_config: Option<JoinOperatorConfig>,
431 order_config: Option<OrderOperatorConfig>,
432 analytic_config: Option<AnalyticWindowConfig>,
433}
434
435fn object_name_to_string(name: &ObjectName) -> String {
437 name.to_string()
438}
439
440#[derive(Debug, thiserror::Error)]
442pub enum PlanningError {
443 #[error("Unsupported SQL: {0}")]
445 UnsupportedSql(String),
446
447 #[error("Invalid query: {0}")]
449 InvalidQuery(String),
450
451 #[error("Source not found: {0}")]
453 SourceNotFound(String),
454
455 #[error("Sink not found: {0}")]
457 SinkNotFound(String),
458
459 #[error("DataFusion error: {0}")]
461 DataFusion(#[from] datafusion_common::DataFusionError),
462}
463
464#[cfg(test)]
465mod tests {
466 use super::*;
467 use crate::parser::StreamingParser;
468
469 #[test]
470 fn test_plan_create_source() {
471 let mut planner = StreamingPlanner::new();
472 let statements =
473 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
474
475 let plan = planner.plan(&statements[0]).unwrap();
476 match plan {
477 StreamingPlan::RegisterSource(info) => {
478 assert_eq!(info.name, "events");
479 }
480 _ => panic!("Expected RegisterSource plan"),
481 }
482 }
483
484 #[test]
485 fn test_plan_create_sink() {
486 let mut planner = StreamingPlanner::new();
487 let statements = StreamingParser::parse_sql("CREATE SINK output FROM events").unwrap();
488
489 let plan = planner.plan(&statements[0]).unwrap();
490 match plan {
491 StreamingPlan::RegisterSink(info) => {
492 assert_eq!(info.name, "output");
493 assert_eq!(info.from, "events");
494 }
495 _ => panic!("Expected RegisterSink plan"),
496 }
497 }
498
499 #[test]
500 fn test_plan_duplicate_source() {
501 let mut planner = StreamingPlanner::new();
502
503 let statements =
505 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
506 planner.plan(&statements[0]).unwrap();
507
508 let result = planner.plan(&statements[0]);
510 assert!(result.is_err());
511 }
512
513 #[test]
514 fn test_plan_source_if_not_exists() {
515 let mut planner = StreamingPlanner::new();
516
517 let statements =
519 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
520 planner.plan(&statements[0]).unwrap();
521
522 let statements =
524 StreamingParser::parse_sql("CREATE SOURCE IF NOT EXISTS events (id INT, name VARCHAR)")
525 .unwrap();
526 let result = planner.plan(&statements[0]);
527 assert!(result.is_ok());
528 }
529
530 #[test]
531 fn test_plan_source_or_replace() {
532 let mut planner = StreamingPlanner::new();
533
534 let statements =
536 StreamingParser::parse_sql("CREATE SOURCE events (id INT, name VARCHAR)").unwrap();
537 planner.plan(&statements[0]).unwrap();
538
539 let statements =
541 StreamingParser::parse_sql("CREATE OR REPLACE SOURCE events (id INT, name VARCHAR)")
542 .unwrap();
543 let result = planner.plan(&statements[0]);
544 assert!(result.is_ok());
545 }
546
547 #[test]
548 fn test_plan_source_with_watermark() {
549 let mut planner = StreamingPlanner::new();
550 let statements = StreamingParser::parse_sql(
551 "CREATE SOURCE events (
552 id INT,
553 ts TIMESTAMP,
554 WATERMARK FOR ts AS ts - INTERVAL '5' SECOND
555 )",
556 )
557 .unwrap();
558
559 let plan = planner.plan(&statements[0]).unwrap();
560 match plan {
561 StreamingPlan::RegisterSource(info) => {
562 assert_eq!(info.name, "events");
563 assert_eq!(info.watermark_column, Some("ts".to_string()));
564 }
565 _ => panic!("Expected RegisterSource plan"),
566 }
567 }
568
569 #[test]
570 fn test_plan_standard_select() {
571 let mut planner = StreamingPlanner::new();
572 let statements = StreamingParser::parse_sql("SELECT * FROM events").unwrap();
573
574 let plan = planner.plan(&statements[0]).unwrap();
575 match plan {
576 StreamingPlan::Standard(_) => {}
577 _ => panic!("Expected Standard plan for simple SELECT"),
578 }
579 }
580
581 #[test]
582 fn test_list_sources_and_sinks() {
583 let mut planner = StreamingPlanner::new();
584
585 let s1 = StreamingParser::parse_sql("CREATE SOURCE src1 (id INT)").unwrap();
587 let s2 = StreamingParser::parse_sql("CREATE SOURCE src2 (id INT)").unwrap();
588 planner.plan(&s1[0]).unwrap();
589 planner.plan(&s2[0]).unwrap();
590
591 let k1 = StreamingParser::parse_sql("CREATE SINK sink1 FROM src1").unwrap();
593 planner.plan(&k1[0]).unwrap();
594
595 assert_eq!(planner.list_sources().len(), 2);
596 assert_eq!(planner.list_sinks().len(), 1);
597 assert!(planner.get_source("src1").is_some());
598 assert!(planner.get_sink("sink1").is_some());
599 }
600
601 #[test]
602 fn test_plan_query_with_window() {
603 let mut planner = StreamingPlanner::new();
604 let statements = StreamingParser::parse_sql(
605 "SELECT COUNT(*) FROM events GROUP BY TUMBLE(event_time, INTERVAL '5' MINUTE)",
606 )
607 .unwrap();
608
609 let plan = planner.plan(&statements[0]).unwrap();
610 match plan {
611 StreamingPlan::Query(query_plan) => {
612 assert!(query_plan.window_config.is_some());
613 let config = query_plan.window_config.unwrap();
614 assert_eq!(config.time_column, "event_time");
615 assert_eq!(config.size.as_secs(), 300);
616 }
617 _ => panic!("Expected Query plan"),
618 }
619 }
620
621 #[test]
622 fn test_plan_query_with_join() {
623 let mut planner = StreamingPlanner::new();
624 let statements = StreamingParser::parse_sql(
625 "SELECT * FROM orders o JOIN payments p ON o.order_id = p.order_id",
626 )
627 .unwrap();
628
629 let plan = planner.plan(&statements[0]).unwrap();
630 match plan {
631 StreamingPlan::Query(query_plan) => {
632 assert!(query_plan.join_config.is_some());
633 let config = query_plan.join_config.unwrap();
634 assert_eq!(config.left_key(), "order_id");
635 assert_eq!(config.right_key(), "order_id");
636 }
637 _ => panic!("Expected Query plan"),
638 }
639 }
640
641 #[test]
642 fn test_plan_query_with_lag() {
643 let mut planner = StreamingPlanner::new();
644 let statements = StreamingParser::parse_sql(
645 "SELECT price, LAG(price) OVER (PARTITION BY symbol ORDER BY ts) AS prev FROM trades",
646 )
647 .unwrap();
648
649 let plan = planner.plan(&statements[0]).unwrap();
650 match plan {
651 StreamingPlan::Query(query_plan) => {
652 assert!(query_plan.analytic_config.is_some());
653 let config = query_plan.analytic_config.unwrap();
654 assert_eq!(config.functions.len(), 1);
655 assert_eq!(config.partition_columns, vec!["symbol".to_string()]);
656 }
657 _ => panic!("Expected Query plan with analytic config"),
658 }
659 }
660
661 #[test]
662 fn test_plan_query_with_lead() {
663 let mut planner = StreamingPlanner::new();
664 let statements = StreamingParser::parse_sql(
665 "SELECT LEAD(price, 2) OVER (ORDER BY ts) AS next2 FROM trades",
666 )
667 .unwrap();
668
669 let plan = planner.plan(&statements[0]).unwrap();
670 match plan {
671 StreamingPlan::Query(query_plan) => {
672 assert!(query_plan.analytic_config.is_some());
673 let config = query_plan.analytic_config.unwrap();
674 assert!(config.has_lookahead());
675 assert_eq!(config.functions[0].offset, 2);
676 }
677 _ => panic!("Expected Query plan with analytic config"),
678 }
679 }
680}