Skip to main content

nodedb_sql/planner/
window.rs

1// SPDX-License-Identifier: Apache-2.0
2
3//! Window function extraction from SELECT projection.
4
5use sqlparser::ast;
6
7use crate::error::{Result, SqlError};
8use crate::functions::registry::{FunctionCategory, FunctionRegistry};
9use crate::parser::normalize::{SCHEMA_QUALIFIED_MSG, normalize_ident};
10use crate::resolver::expr::convert_expr;
11use crate::types::{SortKey, WindowSpec};
12use nodedb_query::{FrameBound, WindowFrame};
13
14/// Extract window function specifications from SELECT items.
15///
16/// Validates each `<func>() OVER (...)` against the function registry.
17/// Names that are neither registered window functions nor aggregates
18/// (PostgreSQL allows aggregates as windows) are rejected here so the
19/// Data-Plane evaluator never receives an unrecognised verb.
20pub fn extract_window_functions(
21    items: &[ast::SelectItem],
22    functions: &FunctionRegistry,
23) -> Result<Vec<WindowSpec>> {
24    let mut specs = Vec::new();
25    for item in items {
26        let (expr, alias) = match item {
27            ast::SelectItem::UnnamedExpr(e) => (e, format!("{e}")),
28            ast::SelectItem::ExprWithAlias { expr, alias } => (expr, normalize_ident(alias)),
29            _ => continue,
30        };
31        if let ast::Expr::Function(func) = expr
32            && func.over.is_some()
33        {
34            specs.push(convert_window_spec(func, &alias, functions)?);
35        }
36    }
37    Ok(specs)
38}
39
40fn convert_window_spec(
41    func: &ast::Function,
42    alias: &str,
43    functions: &FunctionRegistry,
44) -> Result<WindowSpec> {
45    if func.name.0.len() > 1 {
46        let qualified: String = func
47            .name
48            .0
49            .iter()
50            .map(|p| match p {
51                ast::ObjectNamePart::Identifier(ident) => ident.value.clone(),
52                _ => String::new(),
53            })
54            .collect::<Vec<_>>()
55            .join(".");
56        return Err(SqlError::Unsupported {
57            detail: format!(
58                "schema-qualified window function name '{qualified}': {SCHEMA_QUALIFIED_MSG}"
59            ),
60        });
61    }
62    let name = func
63        .name
64        .0
65        .iter()
66        .map(|p| match p {
67            ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident),
68            _ => String::new(),
69        })
70        .collect::<Vec<_>>()
71        .join(".");
72
73    // Reject unknown names at plan time. PostgreSQL permits aggregates as
74    // windows, so accept either Window or Aggregate categories.
75    match functions.lookup(&name).map(|m| m.category) {
76        Some(FunctionCategory::Window) | Some(FunctionCategory::Aggregate) => {}
77        Some(FunctionCategory::Scalar) => {
78            return Err(SqlError::InvalidFunction {
79                detail: format!(
80                    "function '{name}() OVER ()' does not exist as a window function \
81                     (it is a scalar function)"
82                ),
83            });
84        }
85        None => {
86            return Err(SqlError::InvalidFunction {
87                detail: format!("function '{name}() OVER ()' does not exist"),
88            });
89        }
90    }
91
92    let args = match &func.args {
93        ast::FunctionArguments::List(args) => args
94            .args
95            .iter()
96            .filter_map(|a| match a {
97                ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) => convert_expr(e).ok(),
98                _ => None,
99            })
100            .collect(),
101        _ => Vec::new(),
102    };
103
104    let (partition_by, order_by, frame) = match &func.over {
105        Some(ast::WindowType::WindowSpec(spec)) => {
106            let pb = spec
107                .partition_by
108                .iter()
109                .map(convert_expr)
110                .collect::<Result<Vec<_>>>()?;
111            let ob = spec
112                .order_by
113                .iter()
114                .map(|o| {
115                    Ok(SortKey {
116                        expr: convert_expr(&o.expr)?,
117                        ascending: o.options.asc.unwrap_or(true),
118                        nulls_first: o.options.nulls_first.unwrap_or(false),
119                    })
120                })
121                .collect::<Result<Vec<_>>>()?;
122            let frame = match &spec.window_frame {
123                Some(f) => convert_window_frame(f, &ob)?,
124                // PostgreSQL default: when ORDER BY is present, RANGE UNBOUNDED
125                // PRECEDING TO CURRENT ROW; when no ORDER BY, the window covers
126                // the whole partition (RANGE UNBOUNDED PRECEDING TO UNBOUNDED
127                // FOLLOWING).
128                None => {
129                    if ob.is_empty() {
130                        WindowFrame {
131                            mode: "range".into(),
132                            start: FrameBound::UnboundedPreceding,
133                            end: FrameBound::UnboundedFollowing,
134                        }
135                    } else {
136                        WindowFrame::default()
137                    }
138                }
139            };
140            (pb, ob, frame)
141        }
142        _ => (
143            Vec::new(),
144            Vec::new(),
145            WindowFrame {
146                mode: "range".into(),
147                start: FrameBound::UnboundedPreceding,
148                end: FrameBound::UnboundedFollowing,
149            },
150        ),
151    };
152
153    Ok(WindowSpec {
154        function: name,
155        args,
156        partition_by,
157        order_by,
158        alias: alias.into(),
159        frame,
160    })
161}
162
163/// Convert a sqlparser `WindowFrame` to the executor's `WindowFrame`.
164///
165/// `order_by` is needed for semantic validation:
166/// - GROUPS without ORDER BY is invalid (PostgreSQL parity).
167/// - RANGE with numeric offsets (Preceding(N)/Following(N)) requires a single
168///   numeric ORDER BY column; without one the semantics are undefined and we
169///   reject at plan time.
170fn convert_window_frame(
171    frame: &ast::WindowFrame,
172    order_by: &[crate::types::SortKey],
173) -> Result<WindowFrame> {
174    let mode = match frame.units {
175        ast::WindowFrameUnits::Rows => "rows",
176        ast::WindowFrameUnits::Range => "range",
177        ast::WindowFrameUnits::Groups => {
178            if order_by.is_empty() {
179                return Err(SqlError::InvalidWindowFrame {
180                    detail: "GROUPS mode requires an ORDER BY clause in the window specification"
181                        .into(),
182                });
183            }
184            "groups"
185        }
186    };
187
188    let start = convert_window_frame_bound(&frame.start_bound)?;
189    let end = match &frame.end_bound {
190        Some(b) => convert_window_frame_bound(b)?,
191        None => FrameBound::CurrentRow,
192    };
193
194    // RANGE with numeric offsets requires a single-column ORDER BY so we can
195    // compare values. Reject if ORDER BY is absent or has more than one key
196    // (multi-key RANGE offsets are undefined in SQL standards).
197    if mode == "range" {
198        let needs_order = matches!(start, FrameBound::Preceding(n) if n > 0)
199            || matches!(start, FrameBound::Following(n) if n > 0)
200            || matches!(end, FrameBound::Preceding(n) if n > 0)
201            || matches!(end, FrameBound::Following(n) if n > 0);
202        if needs_order && order_by.len() != 1 {
203            return Err(SqlError::InvalidWindowFrame {
204                detail: "RANGE with numeric PRECEDING/FOLLOWING offset requires exactly one ORDER BY column".into(),
205            });
206        }
207    }
208
209    Ok(WindowFrame {
210        mode: mode.into(),
211        start,
212        end,
213    })
214}
215
216fn convert_window_frame_bound(bound: &ast::WindowFrameBound) -> Result<FrameBound> {
217    match bound {
218        ast::WindowFrameBound::CurrentRow => Ok(FrameBound::CurrentRow),
219        ast::WindowFrameBound::Preceding(None) => Ok(FrameBound::UnboundedPreceding),
220        ast::WindowFrameBound::Following(None) => Ok(FrameBound::UnboundedFollowing),
221        ast::WindowFrameBound::Preceding(Some(expr)) => {
222            Ok(FrameBound::Preceding(extract_frame_offset(expr)?))
223        }
224        ast::WindowFrameBound::Following(Some(expr)) => {
225            Ok(FrameBound::Following(extract_frame_offset(expr)?))
226        }
227    }
228}
229
230fn extract_frame_offset(expr: &ast::Expr) -> Result<u64> {
231    if let ast::Expr::Value(v) = expr
232        && let ast::Value::Number(n, _) = &v.value
233        && let Ok(parsed) = n.parse::<u64>()
234    {
235        return Ok(parsed);
236    }
237    Err(SqlError::Unsupported {
238        detail: format!("window frame offset must be a non-negative integer literal, got {expr}"),
239    })
240}