Skip to main content

nodedb_sql/planner/
cte.rs

1//! CTE (WITH clause) and WITH RECURSIVE planning.
2
3use sqlparser::ast::{self, Query, SetExpr};
4
5use crate::error::{Result, SqlError};
6use crate::functions::registry::FunctionRegistry;
7use crate::types::*;
8
9/// Plan a WITH RECURSIVE query.
10pub fn plan_recursive_cte(
11    query: &Query,
12    catalog: &dyn SqlCatalog,
13    functions: &FunctionRegistry,
14) -> Result<SqlPlan> {
15    let with = query.with.as_ref().ok_or_else(|| SqlError::Parse {
16        detail: "expected WITH clause".into(),
17    })?;
18
19    // Get the CTE definition.
20    let cte = with.cte_tables.first().ok_or_else(|| SqlError::Parse {
21        detail: "empty WITH clause".into(),
22    })?;
23
24    let cte_query = &cte.query;
25
26    // The CTE body should be a UNION of base case and recursive case.
27    match &*cte_query.body {
28        SetExpr::SetOperation {
29            op: ast::SetOperator::Union,
30            left,
31            right,
32            ..
33        } => {
34            let base = plan_cte_branch(left, catalog, functions)?;
35            let recursive = plan_cte_branch(right, catalog, functions)?;
36
37            // Extract collection and filters from base/recursive plans.
38            let collection = extract_collection(&base)
39                .or_else(|| extract_collection(&recursive))
40                .unwrap_or_default();
41
42            Ok(SqlPlan::RecursiveScan {
43                collection,
44                base_filters: extract_filters(&base),
45                recursive_filters: extract_filters(&recursive),
46                max_iterations: 100,
47                distinct: true,
48                limit: 10000,
49            })
50        }
51        _ => Err(SqlError::Unsupported {
52            detail: "WITH RECURSIVE requires UNION in CTE body".into(),
53        }),
54    }
55}
56
57fn plan_cte_branch(
58    expr: &SetExpr,
59    catalog: &dyn SqlCatalog,
60    functions: &FunctionRegistry,
61) -> Result<SqlPlan> {
62    match expr {
63        SetExpr::Select(select) => {
64            let query = Query {
65                with: None,
66                body: Box::new(SetExpr::Select(select.clone())),
67                order_by: None,
68                limit_clause: None,
69                fetch: None,
70                locks: Vec::new(),
71                for_clause: None,
72                settings: None,
73                format_clause: None,
74                pipe_operators: Vec::new(),
75            };
76            super::select::plan_query(&query, catalog, functions)
77        }
78        _ => Err(SqlError::Unsupported {
79            detail: "CTE branch must be SELECT".into(),
80        }),
81    }
82}
83
84fn extract_collection(plan: &SqlPlan) -> Option<String> {
85    match plan {
86        SqlPlan::Scan { collection, .. } => Some(collection.clone()),
87        _ => None,
88    }
89}
90
91fn extract_filters(plan: &SqlPlan) -> Vec<Filter> {
92    match plan {
93        SqlPlan::Scan { filters, .. } => filters.clone(),
94        _ => Vec::new(),
95    }
96}