nodedb_sql/planner/
window.rs1use sqlparser::ast;
6
7use crate::error::{Result, SqlError};
8use crate::functions::registry::{FunctionCategory, FunctionRegistry};
9use crate::parser::normalize::{SCHEMA_QUALIFIED_MSG, normalize_ident};
10use crate::resolver::expr::convert_expr;
11use crate::types::{SortKey, WindowSpec};
12use nodedb_query::{FrameBound, WindowFrame};
13
14pub fn extract_window_functions(
21 items: &[ast::SelectItem],
22 functions: &FunctionRegistry,
23) -> Result<Vec<WindowSpec>> {
24 let mut specs = Vec::new();
25 for item in items {
26 let (expr, alias) = match item {
27 ast::SelectItem::UnnamedExpr(e) => (e, format!("{e}")),
28 ast::SelectItem::ExprWithAlias { expr, alias } => (expr, normalize_ident(alias)),
29 _ => continue,
30 };
31 if let ast::Expr::Function(func) = expr
32 && func.over.is_some()
33 {
34 specs.push(convert_window_spec(func, &alias, functions)?);
35 }
36 }
37 Ok(specs)
38}
39
40fn convert_window_spec(
41 func: &ast::Function,
42 alias: &str,
43 functions: &FunctionRegistry,
44) -> Result<WindowSpec> {
45 if func.name.0.len() > 1 {
46 let qualified: String = func
47 .name
48 .0
49 .iter()
50 .map(|p| match p {
51 ast::ObjectNamePart::Identifier(ident) => ident.value.clone(),
52 _ => String::new(),
53 })
54 .collect::<Vec<_>>()
55 .join(".");
56 return Err(SqlError::Unsupported {
57 detail: format!(
58 "schema-qualified window function name '{qualified}': {SCHEMA_QUALIFIED_MSG}"
59 ),
60 });
61 }
62 let name = func
63 .name
64 .0
65 .iter()
66 .map(|p| match p {
67 ast::ObjectNamePart::Identifier(ident) => normalize_ident(ident),
68 _ => String::new(),
69 })
70 .collect::<Vec<_>>()
71 .join(".");
72
73 match functions.lookup(&name).map(|m| m.category) {
76 Some(FunctionCategory::Window) | Some(FunctionCategory::Aggregate) => {}
77 Some(FunctionCategory::Scalar) => {
78 return Err(SqlError::InvalidFunction {
79 detail: format!(
80 "function '{name}() OVER ()' does not exist as a window function \
81 (it is a scalar function)"
82 ),
83 });
84 }
85 None => {
86 return Err(SqlError::InvalidFunction {
87 detail: format!("function '{name}() OVER ()' does not exist"),
88 });
89 }
90 }
91
92 let args = match &func.args {
93 ast::FunctionArguments::List(args) => args
94 .args
95 .iter()
96 .filter_map(|a| match a {
97 ast::FunctionArg::Unnamed(ast::FunctionArgExpr::Expr(e)) => convert_expr(e).ok(),
98 _ => None,
99 })
100 .collect(),
101 _ => Vec::new(),
102 };
103
104 let (partition_by, order_by, frame) = match &func.over {
105 Some(ast::WindowType::WindowSpec(spec)) => {
106 let pb = spec
107 .partition_by
108 .iter()
109 .map(convert_expr)
110 .collect::<Result<Vec<_>>>()?;
111 let ob = spec
112 .order_by
113 .iter()
114 .map(|o| {
115 Ok(SortKey {
116 expr: convert_expr(&o.expr)?,
117 ascending: o.options.asc.unwrap_or(true),
118 nulls_first: o.options.nulls_first.unwrap_or(false),
119 })
120 })
121 .collect::<Result<Vec<_>>>()?;
122 let frame = match &spec.window_frame {
123 Some(f) => convert_window_frame(f, &ob)?,
124 None => {
129 if ob.is_empty() {
130 WindowFrame {
131 mode: "range".into(),
132 start: FrameBound::UnboundedPreceding,
133 end: FrameBound::UnboundedFollowing,
134 }
135 } else {
136 WindowFrame::default()
137 }
138 }
139 };
140 (pb, ob, frame)
141 }
142 _ => (
143 Vec::new(),
144 Vec::new(),
145 WindowFrame {
146 mode: "range".into(),
147 start: FrameBound::UnboundedPreceding,
148 end: FrameBound::UnboundedFollowing,
149 },
150 ),
151 };
152
153 Ok(WindowSpec {
154 function: name,
155 args,
156 partition_by,
157 order_by,
158 alias: alias.into(),
159 frame,
160 })
161}
162
163fn convert_window_frame(
171 frame: &ast::WindowFrame,
172 order_by: &[crate::types::SortKey],
173) -> Result<WindowFrame> {
174 let mode = match frame.units {
175 ast::WindowFrameUnits::Rows => "rows",
176 ast::WindowFrameUnits::Range => "range",
177 ast::WindowFrameUnits::Groups => {
178 if order_by.is_empty() {
179 return Err(SqlError::InvalidWindowFrame {
180 detail: "GROUPS mode requires an ORDER BY clause in the window specification"
181 .into(),
182 });
183 }
184 "groups"
185 }
186 };
187
188 let start = convert_window_frame_bound(&frame.start_bound)?;
189 let end = match &frame.end_bound {
190 Some(b) => convert_window_frame_bound(b)?,
191 None => FrameBound::CurrentRow,
192 };
193
194 if mode == "range" {
198 let needs_order = matches!(start, FrameBound::Preceding(n) if n > 0)
199 || matches!(start, FrameBound::Following(n) if n > 0)
200 || matches!(end, FrameBound::Preceding(n) if n > 0)
201 || matches!(end, FrameBound::Following(n) if n > 0);
202 if needs_order && order_by.len() != 1 {
203 return Err(SqlError::InvalidWindowFrame {
204 detail: "RANGE with numeric PRECEDING/FOLLOWING offset requires exactly one ORDER BY column".into(),
205 });
206 }
207 }
208
209 Ok(WindowFrame {
210 mode: mode.into(),
211 start,
212 end,
213 })
214}
215
216fn convert_window_frame_bound(bound: &ast::WindowFrameBound) -> Result<FrameBound> {
217 match bound {
218 ast::WindowFrameBound::CurrentRow => Ok(FrameBound::CurrentRow),
219 ast::WindowFrameBound::Preceding(None) => Ok(FrameBound::UnboundedPreceding),
220 ast::WindowFrameBound::Following(None) => Ok(FrameBound::UnboundedFollowing),
221 ast::WindowFrameBound::Preceding(Some(expr)) => {
222 Ok(FrameBound::Preceding(extract_frame_offset(expr)?))
223 }
224 ast::WindowFrameBound::Following(Some(expr)) => {
225 Ok(FrameBound::Following(extract_frame_offset(expr)?))
226 }
227 }
228}
229
230fn extract_frame_offset(expr: &ast::Expr) -> Result<u64> {
231 if let ast::Expr::Value(v) = expr
232 && let ast::Value::Number(n, _) = &v.value
233 && let Ok(parsed) = n.parse::<u64>()
234 {
235 return Ok(parsed);
236 }
237 Err(SqlError::Unsupported {
238 detail: format!("window frame offset must be a non-negative integer literal, got {expr}"),
239 })
240}